DEV Community

Cover image for PECOS Data Extraction Pipeline - DevOps Documentation
Durrell  Gemuh
Durrell Gemuh

Posted on

PECOS Data Extraction Pipeline - DevOps Documentation

Overview

The PECOS Data Extraction Pipeline is an enterprise-grade ETL workflow that extracts, transforms, and loads healthcare provider data from CMS PECOS datasets. The pipeline processes four datasets (Clinicians, Practices, Canonical Providers, Detail Tables) in parallel using PySpark and AWS Glue, orchestrated by AWS Step Functions.

Repository: https://github.com/durrello/PECOS-Data-Extraction-Pipeline

Architecture

AWS Architecture (Primary)

                    ┌─────────────────────────────────────┐
                    │         AWS Step Functions           │
                    │      State Machine Orchestrator      │
                    └───────────────┬─────────────────────┘
                                    │
                          [ValidateInput]  ← Pass State
                                    │
                    ┌───────────────▼─────────────────────┐
                    │         ExtractParallel              │ ← Parallel State
                    │  ┌──────┐ ┌──────┐ ┌──────┐ ┌────┐ │
                    │  │Clin. │ │Prac. │ │Canon.│ │Det.│ │ ← Task States
                    │  │ ETL  │ │ ETL  │ │ ETL  │ │ETL │ │
                    │  └──────┘ └──────┘ └──────┘ └────┘ │
                    │   ↺ 3 retries per branch            │
                    └───────────────┬─────────────────────┘
                                    │
                          [AllSucceeded?]  ← Choice State
                         /                         \
               [NotifySuccess]              [NotifyFailure]
                     │                              │
             [PipelineSucceed]            [PipelineFail]
              ← Succeed State              ← Fail State
Enter fullscreen mode Exit fullscreen mode

Alternative Architectures

Local Python (Development)

  • Uses ThreadPoolExecutor for parallel execution
  • Local file system for data storage
  • Console logging for notifications
  • No AWS dependencies required

Docker (EMR Simulation)

  • Containerized EMR-like environment
  • Volume mounts for data persistence
  • Isolated Python 3.11 + PySpark 3.5.1 environment

AWS Services Used

Core Services

  • AWS Step Functions: Orchestrates the ETL pipeline workflow
  • AWS Glue: Serverless PySpark ETL jobs (4 parallel jobs)
  • Amazon S3: Data lake for input, output, and logs
  • AWS IAM: Fine-grained permissions for services
  • Amazon SNS: Email notifications for pipeline status

Supporting Services

  • AWS CloudWatch: Logging and monitoring
  • AWS CloudWatch Logs: Detailed execution logs
  • AWS X-Ray: Distributed tracing (optional)

Prerequisites

AWS Prerequisites

  • AWS CLI v2 configured with admin permissions
  • S3 bucket for data storage (auto-created by bootstrap)
  • IAM permissions to create roles, policies, and services

Local Development Prerequisites

  • Python 3.11 (required for PySpark compatibility)
  • Java 17+ (for Spark runtime)
  • Docker (optional, for containerized runs)

Deployment Options

Option 1: AWS One-Command Bootstrap (Recommended)

The bootstrap script automates the entire AWS deployment:

# Basic deployment
./bootstrap.sh

# With custom environment and email
./bootstrap.sh production your-email@example.com
Enter fullscreen mode Exit fullscreen mode

What the bootstrap script does:

  1. Creates IAM roles with minimal required permissions
  2. Verifies/creates S3 bucket
  3. Uploads code, config, and data to S3
  4. Creates/updates 4 AWS Glue ETL jobs
  5. Sets up SNS topic and email notifications
  6. Deploys Step Functions state machine
  7. Starts initial pipeline execution

Output:

  • Execution ARN for monitoring
  • Console URLs for Step Functions and Glue
  • SNS topic ARN for notifications

Option 2: Manual AWS Deployment

For custom deployments or CI/CD integration:

1. IAM Roles Setup

# Step Functions execution role
aws iam create-role --role-name PECOSStepFunctionsRole \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Principal": {"Service": "states.amazonaws.com"},
      "Action": "sts:AssumeRole"
    }]
  }'

# Glue execution role
aws iam create-role --role-name PECOSGlueRole \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Principal": {"Service": "glue.amazonaws.com"},
      "Action": "sts:AssumeRole"
    }]
  }'
Enter fullscreen mode Exit fullscreen mode

2. S3 Bucket Setup

BUCKET="durrell-backend-bucket-terraform"
aws s3 mb s3://$BUCKET
Enter fullscreen mode Exit fullscreen mode

3. Upload Artifacts

# Upload Spark jobs
aws s3 sync spark_jobs/ s3://$BUCKET/spark_jobs/ \
  --exclude "*.pyc" --exclude "__pycache__/*"

# Upload config
aws s3 cp config/pipeline_config.yaml s3://$BUCKET/config/

# Upload input data
aws s3 sync data/input/ s3://$BUCKET/data/input/
Enter fullscreen mode Exit fullscreen mode

4. Create Glue Jobs

# Example for Clinicians ETL job
aws glue create-job \
  --name "PECOS-Clinicians-ETL" \
  --role "arn:aws:iam::ACCOUNT_ID:role/PECOSGlueRole" \
  --command "{
    \"Name\": \"glueetl\",
    \"ScriptLocation\": \"s3://$BUCKET/spark_jobs/glue_clinicians.py\",
    \"PythonVersion\": \"3\"
  }" \
  --default-arguments "{
    \"--config\": \"s3://$BUCKET/config/pipeline_config.yaml\",
    \"--additional-python-modules\": \"pyyaml\",
    \"--enable-continuous-cloudwatch-log\": \"true\",
    \"--enable-metrics\": \"true\"
  }" \
  --glue-version "4.0" \
  --number-of-workers 2 \
  --worker-type "G.1X"
Enter fullscreen mode Exit fullscreen mode

5. SNS Notifications Setup

# Create topic
TOPIC_ARN=$(aws sns create-topic --name pecos-pipeline-notifications --query TopicArn --output text)

# Subscribe email
aws sns subscribe --topic-arn $TOPIC_ARN --protocol email --notification-endpoint your-email@example.com
Enter fullscreen mode Exit fullscreen mode

6. Deploy State Machine

# Substitute variables in ASL
sed -e "s/\${REGION}/us-east-1/g" \
    -e "s/\${ACCOUNT_ID}/YOUR_ACCOUNT_ID/g" \
    -e "s/\${BUCKET}/$BUCKET/g" \
    state_machine/pecos_state_machine.asl.json > temp_asl.json

# Create state machine
aws stepfunctions create-state-machine \
  --name "PECOS-Extraction-Pipeline" \
  --definition file://temp_asl.json \
  --role-arn "arn:aws:iam::ACCOUNT_ID:role/PECOSStepFunctionsRole"
Enter fullscreen mode Exit fullscreen mode

Option 3: Local Python Development

For development and testing without AWS:

Environment Setup

# Install Python 3.11
brew install python@3.11  # macOS
python3.11 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

Run Pipeline Locally

# Full pipeline
python state_machine/pipeline_orchestrator.py \
  --input state_machine/sample_input.json \
  --config config/pipeline_config.yaml

# Individual ETL jobs
python spark_jobs/etl_clinicians.py --execution-id DEV-001
Enter fullscreen mode Exit fullscreen mode

Option 4: Docker Development

EMR-like containerized environment:

# Build and run
docker build -f docker/Dockerfile -t pecos-pipeline .
docker-compose up

# Or run specific command
docker run --rm -v $(pwd)/data:/app/data pecos-pipeline \
  python state_machine/pipeline_orchestrator.py
Enter fullscreen mode Exit fullscreen mode

Configuration Management

Pipeline Configuration (pipeline_config.yaml)

The configuration is environment-aware and supports both local and AWS deployments:

pipeline:
  name: "PECOS-Extraction-Pipeline"
  environment: "local"  # local | dev | staging | prod
  execution_id_prefix: "PECOS"

# S3 paths (AWS) vs local paths
paths:
  base_input_dir: "s3://bucket/data/input"   # AWS
  base_output_dir: "s3://bucket/data/output" # AWS

local_paths:
  base_input_dir: "data/input"    # Local
  base_output_dir: "data/output"  # Local

# Dataset-specific configs
datasets:
  clinicians:
    input_file: "clinicians.csv"
    partition_by: ["state"]
    primary_key: "npi"
    required_columns: [...]
Enter fullscreen mode Exit fullscreen mode

Environment Variables

Override configuration at runtime:

export PECOS_BASE_INPUT=data/input
export PECOS_BASE_OUTPUT=data/output
export PECOS_LOG_DIR=logs
Enter fullscreen mode Exit fullscreen mode

Monitoring and Observability

AWS Monitoring

Step Functions Console

  • Real-time execution visualization
  • State transition timing
  • Error details and stack traces
  • Execution history and re-runs

Glue Console

  • Job run status and duration
  • Resource utilization (DPUs)
  • Error logs and troubleshooting

CloudWatch Logs

  • Structured logs from all components
  • Log groups:
    • /aws/states/PECOS-Extraction-Pipeline
    • /aws-glue/jobs/logs-v2/

CloudWatch Metrics

  • Step Functions: Execution metrics
  • Glue: Job performance metrics
  • S3: Storage and access metrics

Local Monitoring

Console Logging

  • Structured JSON logs with timestamps
  • Execution summaries in logs/{execution_id}_summary.json
  • Dataset-specific logs in logs/{execution_id}_{dataset}.log

File System Monitoring

  • Output Parquet files in output
  • Partitioned by state/status
  • Snappy-compressed for efficiency

Pipeline Execution

Starting Executions

AWS Step Functions

# Prepare input
EXECUTION_INPUT='{
  "trigger_source": "manual",
  "run_date": "'$(date +%Y-%m-%d)'",
  "environment": "production",
  "config_s3_uri": "s3://bucket/config/pipeline_config.yaml",
  "input_s3_prefix": "s3://bucket/data/input/",
  "output_s3_prefix": "s3://bucket/data/output/"
}'

# Start execution
EXECUTION_ARN=$(aws stepfunctions start-execution \
  --state-machine-arn $STATE_MACHINE_ARN \
  --input "$EXECUTION_INPUT" \
  --query executionArn --output text)
Enter fullscreen mode Exit fullscreen mode

Local Execution

python state_machine/pipeline_orchestrator.py \
  --input state_machine/sample_input.json \
  --config config/pipeline_config.yaml
Enter fullscreen mode Exit fullscreen mode

Execution States

State Type Description Retry Behavior
ValidateInput Pass Enrich input with metadata N/A
ExtractParallel Parallel Run 4 ETL jobs concurrently N/A
ExtractCliniciansTask Task Clinicians ETL 3 retries, 2x backoff
ExtractPracticesTask Task Practices ETL 3 retries, 2x backoff
ExtractCanonicalProvidersTask Task Canonical Providers ETL 3 retries, 2x backoff
ExtractDetailTablesTask Task Detail Tables ETL 3 retries, 2x backoff
AllSucceeded Choice Route based on results N/A
NotifySuccess Task Send success notification N/A
NotifyFailure Task Send failure notification N/A
PipelineSucceed Succeed Terminal success N/A
PipelineFail Fail Terminal failure N/A

Data Processing

ETL Transformations

Clinicians Dataset

  • Filter active providers (status = A)
  • Zero-pad NPI to 10 digits
  • Derive credential_group (Physician/Mid-Level/Nursing/Other)
  • Compute years_enrolled from enrollment date
  • Data quality flag: dq_has_specialty
  • Deduplicate on (npi, state)

Practices Dataset

  • Filter active practices
  • Derive practice_size_category (Solo/Small/Medium/Large/Enterprise)
  • Map state → US Census region
  • Flag multi-specialty groups
  • Deduplicate on practice_id

Canonical Providers Dataset

  • Build golden record using window functions
  • Compute participation_score (0-3: Medicare + Medicaid + Telehealth)
  • Assign participation_tier (Full/Partial/Limited/None)
  • Build display_name for individuals and organizations

Detail Tables Dataset

  • Parse and classify enrollment events
  • Compute enrollment_duration_days
  • Decode reason_code to human-readable descriptions
  • Classify termination_type
  • Compute window-based group_reassignment_rate
  • Flag is_recent_enrollment (< 180 days) and is_stale_record (> 5 years)

Output Format

All datasets output Snappy-compressed Parquet with metadata columns:

  • _pipeline_execution_id: Unique run identifier
  • _pipeline_ingested_at: UTC ingestion timestamp
  • _pipeline_source_file: Source file path

Partitioning strategy:

  • Clinicians/Practices/Canonical: state=XX/
  • Detail Tables: state=XX/status=Y/

Notifications

AWS SNS Notifications

  • Email notifications for success/failure
  • Structured JSON payload with execution details
  • Configurable recipients

Local Notifications

  • Console output with execution summary
  • JSON-formatted logs
  • File-based execution summaries

Security

IAM Permissions

Principle of Least Privilege

  • Step Functions role: Only Glue start/run, SNS publish, CloudWatch logs
  • Glue role: Scoped S3 access (only pipeline bucket), Glue service role

S3 Bucket Policies

  • Separate permissions for input, output, and logs
  • No wildcard permissions
  • Encryption at rest (SSE-S3)

Network Security

  • All services communicate within AWS network
  • No public endpoints exposed
  • VPC deployment possible for enhanced security

Cost Optimization

AWS Glue

  • G.1X worker type (cost-effective for CPU-bound workloads)
  • 2 workers per job (parallel processing within job)
  • Auto-scaling disabled (predictable costs)
  • Timeout: 60 minutes

Step Functions

  • Standard workflow (cost-effective for ETL orchestration)
  • No Express workflows needed

S3 Storage

  • Intelligent tiering for output data
  • Lifecycle policies for logs
  • Snappy compression reduces storage costs

Troubleshooting

Common Issues

SNS Email Not Received

  • Confirm subscription in email
  • Check SNS topic permissions
  • Verify email address format

Glue Job Failures

  • Check CloudWatch logs: /aws-glue/jobs/logs-v2/
  • Verify S3 permissions
  • Check PySpark version compatibility

Step Functions Timeouts

  • Increase Glue job timeout
  • Optimize Spark configurations
  • Check for data skew

Local Development Issues

  • Ensure Python 3.11 (PySpark requirement)
  • Check Java version (17+)
  • Verify virtual environment activation

Debugging Commands

# Check execution status
aws stepfunctions describe-execution --execution-arn $EXECUTION_ARN

# View Glue job logs
aws logs tail /aws-glue/jobs/logs-v2/ --follow

# List output files
aws s3 ls s3://bucket/data/output/ --recursive

# Check S3 permissions
aws s3api get-bucket-policy --bucket bucket
Enter fullscreen mode Exit fullscreen mode

Cleanup

AWS Resource Cleanup

# Automated teardown
./teardown.sh
Enter fullscreen mode Exit fullscreen mode

Deletes:

  • Step Functions state machine
  • 4 AWS Glue ETL jobs
  • SNS topic and subscriptions
  • S3 bucket and all contents
  • IAM roles and policies

Local Cleanup

# Remove outputs and logs
rm -rf data/output/* logs/*

# Remove virtual environment
rm -rf .venv
Enter fullscreen mode Exit fullscreen mode

Performance Tuning

Spark Configurations

spark:
  configs:
    spark.sql.shuffle.partitions: "4"
    spark.default.parallelism: "4"
    spark.sql.adaptive.enabled: "true"
    spark.sql.parquet.compression.codec: "snappy"
Enter fullscreen mode Exit fullscreen mode

Glue Job Sizing

  • G.1X workers: 1 DPU, 16GB RAM, 4 vCPU
  • Scale workers based on data volume
  • Monitor DPU utilization in Glue console

Parallel Processing

  • 4 ETL jobs run concurrently
  • No inter-job dependencies
  • Optimal for I/O bound workloads

Backup and Recovery

Data Backup

  • S3 versioning enabled on bucket
  • Cross-region replication for critical data
  • Regular snapshots of configuration

Pipeline Recovery

  • Step Functions supports execution re-runs
  • Idempotent ETL jobs (overwrite mode)
  • Partial failure handling (continue with successful jobs)

Compliance and Governance

Data Governance

  • Structured logging with execution IDs
  • Data lineage tracking
  • Audit trails in CloudWatch

Healthcare Compliance

  • PHI data handling considerations
  • Encryption at rest and in transit
  • Access logging and monitoring

Future Enhancements

Potential Improvements

  • Event-driven triggers (S3 event notifications)
  • Data quality frameworks (Great Expectations)
  • Advanced monitoring (custom CloudWatch dashboards)
  • Multi-region deployment
  • Blue/green deployment strategy

Scaling Considerations

  • EMR migration for large datasets
  • Kubernetes deployment option
  • Serverless architecture evolution

This documentation provides comprehensive DevOps guidance for deploying and managing the PECOS Data Extraction Pipeline across AWS, local, and containerized environments. The pipeline demonstrates enterprise-grade ETL patterns with robust error handling, monitoring, and cost optimization.

Top comments (0)