Overview
The PECOS Data Extraction Pipeline is an enterprise-grade ETL workflow that extracts, transforms, and loads healthcare provider data from CMS PECOS datasets. The pipeline processes four datasets (Clinicians, Practices, Canonical Providers, Detail Tables) in parallel using PySpark and AWS Glue, orchestrated by AWS Step Functions.
Repository: https://github.com/durrello/PECOS-Data-Extraction-Pipeline
Architecture
AWS Architecture (Primary)
┌─────────────────────────────────────┐
│ AWS Step Functions │
│ State Machine Orchestrator │
└───────────────┬─────────────────────┘
│
[ValidateInput] ← Pass State
│
┌───────────────▼─────────────────────┐
│ ExtractParallel │ ← Parallel State
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌────┐ │
│ │Clin. │ │Prac. │ │Canon.│ │Det.│ │ ← Task States
│ │ ETL │ │ ETL │ │ ETL │ │ETL │ │
│ └──────┘ └──────┘ └──────┘ └────┘ │
│ ↺ 3 retries per branch │
└───────────────┬─────────────────────┘
│
[AllSucceeded?] ← Choice State
/ \
[NotifySuccess] [NotifyFailure]
│ │
[PipelineSucceed] [PipelineFail]
← Succeed State ← Fail State
Alternative Architectures
Local Python (Development)
- Uses ThreadPoolExecutor for parallel execution
- Local file system for data storage
- Console logging for notifications
- No AWS dependencies required
Docker (EMR Simulation)
- Containerized EMR-like environment
- Volume mounts for data persistence
- Isolated Python 3.11 + PySpark 3.5.1 environment
AWS Services Used
Core Services
- AWS Step Functions: Orchestrates the ETL pipeline workflow
- AWS Glue: Serverless PySpark ETL jobs (4 parallel jobs)
- Amazon S3: Data lake for input, output, and logs
- AWS IAM: Fine-grained permissions for services
- Amazon SNS: Email notifications for pipeline status
Supporting Services
- AWS CloudWatch: Logging and monitoring
- AWS CloudWatch Logs: Detailed execution logs
- AWS X-Ray: Distributed tracing (optional)
Prerequisites
AWS Prerequisites
- AWS CLI v2 configured with admin permissions
- S3 bucket for data storage (auto-created by bootstrap)
- IAM permissions to create roles, policies, and services
Local Development Prerequisites
- Python 3.11 (required for PySpark compatibility)
- Java 17+ (for Spark runtime)
- Docker (optional, for containerized runs)
Deployment Options
Option 1: AWS One-Command Bootstrap (Recommended)
The bootstrap script automates the entire AWS deployment:
# Basic deployment
./bootstrap.sh
# With custom environment and email
./bootstrap.sh production your-email@example.com
What the bootstrap script does:
- Creates IAM roles with minimal required permissions
- Verifies/creates S3 bucket
- Uploads code, config, and data to S3
- Creates/updates 4 AWS Glue ETL jobs
- Sets up SNS topic and email notifications
- Deploys Step Functions state machine
- Starts initial pipeline execution
Output:
- Execution ARN for monitoring
- Console URLs for Step Functions and Glue
- SNS topic ARN for notifications
Option 2: Manual AWS Deployment
For custom deployments or CI/CD integration:
1. IAM Roles Setup
# Step Functions execution role
aws iam create-role --role-name PECOSStepFunctionsRole \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "states.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}'
# Glue execution role
aws iam create-role --role-name PECOSGlueRole \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "glue.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}'
2. S3 Bucket Setup
BUCKET="durrell-backend-bucket-terraform"
aws s3 mb s3://$BUCKET
3. Upload Artifacts
# Upload Spark jobs
aws s3 sync spark_jobs/ s3://$BUCKET/spark_jobs/ \
--exclude "*.pyc" --exclude "__pycache__/*"
# Upload config
aws s3 cp config/pipeline_config.yaml s3://$BUCKET/config/
# Upload input data
aws s3 sync data/input/ s3://$BUCKET/data/input/
4. Create Glue Jobs
# Example for Clinicians ETL job
aws glue create-job \
--name "PECOS-Clinicians-ETL" \
--role "arn:aws:iam::ACCOUNT_ID:role/PECOSGlueRole" \
--command "{
\"Name\": \"glueetl\",
\"ScriptLocation\": \"s3://$BUCKET/spark_jobs/glue_clinicians.py\",
\"PythonVersion\": \"3\"
}" \
--default-arguments "{
\"--config\": \"s3://$BUCKET/config/pipeline_config.yaml\",
\"--additional-python-modules\": \"pyyaml\",
\"--enable-continuous-cloudwatch-log\": \"true\",
\"--enable-metrics\": \"true\"
}" \
--glue-version "4.0" \
--number-of-workers 2 \
--worker-type "G.1X"
5. SNS Notifications Setup
# Create topic
TOPIC_ARN=$(aws sns create-topic --name pecos-pipeline-notifications --query TopicArn --output text)
# Subscribe email
aws sns subscribe --topic-arn $TOPIC_ARN --protocol email --notification-endpoint your-email@example.com
6. Deploy State Machine
# Substitute variables in ASL
sed -e "s/\${REGION}/us-east-1/g" \
-e "s/\${ACCOUNT_ID}/YOUR_ACCOUNT_ID/g" \
-e "s/\${BUCKET}/$BUCKET/g" \
state_machine/pecos_state_machine.asl.json > temp_asl.json
# Create state machine
aws stepfunctions create-state-machine \
--name "PECOS-Extraction-Pipeline" \
--definition file://temp_asl.json \
--role-arn "arn:aws:iam::ACCOUNT_ID:role/PECOSStepFunctionsRole"
Option 3: Local Python Development
For development and testing without AWS:
Environment Setup
# Install Python 3.11
brew install python@3.11 # macOS
python3.11 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Run Pipeline Locally
# Full pipeline
python state_machine/pipeline_orchestrator.py \
--input state_machine/sample_input.json \
--config config/pipeline_config.yaml
# Individual ETL jobs
python spark_jobs/etl_clinicians.py --execution-id DEV-001
Option 4: Docker Development
EMR-like containerized environment:
# Build and run
docker build -f docker/Dockerfile -t pecos-pipeline .
docker-compose up
# Or run specific command
docker run --rm -v $(pwd)/data:/app/data pecos-pipeline \
python state_machine/pipeline_orchestrator.py
Configuration Management
Pipeline Configuration (pipeline_config.yaml)
The configuration is environment-aware and supports both local and AWS deployments:
pipeline:
name: "PECOS-Extraction-Pipeline"
environment: "local" # local | dev | staging | prod
execution_id_prefix: "PECOS"
# S3 paths (AWS) vs local paths
paths:
base_input_dir: "s3://bucket/data/input" # AWS
base_output_dir: "s3://bucket/data/output" # AWS
local_paths:
base_input_dir: "data/input" # Local
base_output_dir: "data/output" # Local
# Dataset-specific configs
datasets:
clinicians:
input_file: "clinicians.csv"
partition_by: ["state"]
primary_key: "npi"
required_columns: [...]
Environment Variables
Override configuration at runtime:
export PECOS_BASE_INPUT=data/input
export PECOS_BASE_OUTPUT=data/output
export PECOS_LOG_DIR=logs
Monitoring and Observability
AWS Monitoring
Step Functions Console
- Real-time execution visualization
- State transition timing
- Error details and stack traces
- Execution history and re-runs
Glue Console
- Job run status and duration
- Resource utilization (DPUs)
- Error logs and troubleshooting
CloudWatch Logs
- Structured logs from all components
- Log groups:
/aws/states/PECOS-Extraction-Pipeline/aws-glue/jobs/logs-v2/
CloudWatch Metrics
- Step Functions: Execution metrics
- Glue: Job performance metrics
- S3: Storage and access metrics
Local Monitoring
Console Logging
- Structured JSON logs with timestamps
- Execution summaries in
logs/{execution_id}_summary.json - Dataset-specific logs in
logs/{execution_id}_{dataset}.log
File System Monitoring
- Output Parquet files in output
- Partitioned by state/status
- Snappy-compressed for efficiency
Pipeline Execution
Starting Executions
AWS Step Functions
# Prepare input
EXECUTION_INPUT='{
"trigger_source": "manual",
"run_date": "'$(date +%Y-%m-%d)'",
"environment": "production",
"config_s3_uri": "s3://bucket/config/pipeline_config.yaml",
"input_s3_prefix": "s3://bucket/data/input/",
"output_s3_prefix": "s3://bucket/data/output/"
}'
# Start execution
EXECUTION_ARN=$(aws stepfunctions start-execution \
--state-machine-arn $STATE_MACHINE_ARN \
--input "$EXECUTION_INPUT" \
--query executionArn --output text)
Local Execution
python state_machine/pipeline_orchestrator.py \
--input state_machine/sample_input.json \
--config config/pipeline_config.yaml
Execution States
| State | Type | Description | Retry Behavior |
|---|---|---|---|
ValidateInput |
Pass | Enrich input with metadata | N/A |
ExtractParallel |
Parallel | Run 4 ETL jobs concurrently | N/A |
ExtractCliniciansTask |
Task | Clinicians ETL | 3 retries, 2x backoff |
ExtractPracticesTask |
Task | Practices ETL | 3 retries, 2x backoff |
ExtractCanonicalProvidersTask |
Task | Canonical Providers ETL | 3 retries, 2x backoff |
ExtractDetailTablesTask |
Task | Detail Tables ETL | 3 retries, 2x backoff |
AllSucceeded |
Choice | Route based on results | N/A |
NotifySuccess |
Task | Send success notification | N/A |
NotifyFailure |
Task | Send failure notification | N/A |
PipelineSucceed |
Succeed | Terminal success | N/A |
PipelineFail |
Fail | Terminal failure | N/A |
Data Processing
ETL Transformations
Clinicians Dataset
- Filter active providers (
status = A) - Zero-pad NPI to 10 digits
- Derive
credential_group(Physician/Mid-Level/Nursing/Other) - Compute
years_enrolledfrom enrollment date - Data quality flag:
dq_has_specialty - Deduplicate on
(npi, state)
Practices Dataset
- Filter active practices
- Derive
practice_size_category(Solo/Small/Medium/Large/Enterprise) - Map state → US Census region
- Flag multi-specialty groups
- Deduplicate on
practice_id
Canonical Providers Dataset
- Build golden record using window functions
- Compute
participation_score(0-3: Medicare + Medicaid + Telehealth) - Assign
participation_tier(Full/Partial/Limited/None) - Build
display_namefor individuals and organizations
Detail Tables Dataset
- Parse and classify enrollment events
- Compute
enrollment_duration_days - Decode
reason_codeto human-readable descriptions - Classify
termination_type - Compute window-based
group_reassignment_rate - Flag
is_recent_enrollment(< 180 days) andis_stale_record(> 5 years)
Output Format
All datasets output Snappy-compressed Parquet with metadata columns:
-
_pipeline_execution_id: Unique run identifier -
_pipeline_ingested_at: UTC ingestion timestamp -
_pipeline_source_file: Source file path
Partitioning strategy:
- Clinicians/Practices/Canonical:
state=XX/ - Detail Tables:
state=XX/status=Y/
Notifications
AWS SNS Notifications
- Email notifications for success/failure
- Structured JSON payload with execution details
- Configurable recipients
Local Notifications
- Console output with execution summary
- JSON-formatted logs
- File-based execution summaries
Security
IAM Permissions
Principle of Least Privilege
- Step Functions role: Only Glue start/run, SNS publish, CloudWatch logs
- Glue role: Scoped S3 access (only pipeline bucket), Glue service role
S3 Bucket Policies
- Separate permissions for input, output, and logs
- No wildcard permissions
- Encryption at rest (SSE-S3)
Network Security
- All services communicate within AWS network
- No public endpoints exposed
- VPC deployment possible for enhanced security
Cost Optimization
AWS Glue
- G.1X worker type (cost-effective for CPU-bound workloads)
- 2 workers per job (parallel processing within job)
- Auto-scaling disabled (predictable costs)
- Timeout: 60 minutes
Step Functions
- Standard workflow (cost-effective for ETL orchestration)
- No Express workflows needed
S3 Storage
- Intelligent tiering for output data
- Lifecycle policies for logs
- Snappy compression reduces storage costs
Troubleshooting
Common Issues
SNS Email Not Received
- Confirm subscription in email
- Check SNS topic permissions
- Verify email address format
Glue Job Failures
- Check CloudWatch logs:
/aws-glue/jobs/logs-v2/ - Verify S3 permissions
- Check PySpark version compatibility
Step Functions Timeouts
- Increase Glue job timeout
- Optimize Spark configurations
- Check for data skew
Local Development Issues
- Ensure Python 3.11 (PySpark requirement)
- Check Java version (17+)
- Verify virtual environment activation
Debugging Commands
# Check execution status
aws stepfunctions describe-execution --execution-arn $EXECUTION_ARN
# View Glue job logs
aws logs tail /aws-glue/jobs/logs-v2/ --follow
# List output files
aws s3 ls s3://bucket/data/output/ --recursive
# Check S3 permissions
aws s3api get-bucket-policy --bucket bucket
Cleanup
AWS Resource Cleanup
# Automated teardown
./teardown.sh
Deletes:
- Step Functions state machine
- 4 AWS Glue ETL jobs
- SNS topic and subscriptions
- S3 bucket and all contents
- IAM roles and policies
Local Cleanup
# Remove outputs and logs
rm -rf data/output/* logs/*
# Remove virtual environment
rm -rf .venv
Performance Tuning
Spark Configurations
spark:
configs:
spark.sql.shuffle.partitions: "4"
spark.default.parallelism: "4"
spark.sql.adaptive.enabled: "true"
spark.sql.parquet.compression.codec: "snappy"
Glue Job Sizing
- G.1X workers: 1 DPU, 16GB RAM, 4 vCPU
- Scale workers based on data volume
- Monitor DPU utilization in Glue console
Parallel Processing
- 4 ETL jobs run concurrently
- No inter-job dependencies
- Optimal for I/O bound workloads
Backup and Recovery
Data Backup
- S3 versioning enabled on bucket
- Cross-region replication for critical data
- Regular snapshots of configuration
Pipeline Recovery
- Step Functions supports execution re-runs
- Idempotent ETL jobs (overwrite mode)
- Partial failure handling (continue with successful jobs)
Compliance and Governance
Data Governance
- Structured logging with execution IDs
- Data lineage tracking
- Audit trails in CloudWatch
Healthcare Compliance
- PHI data handling considerations
- Encryption at rest and in transit
- Access logging and monitoring
Future Enhancements
Potential Improvements
- Event-driven triggers (S3 event notifications)
- Data quality frameworks (Great Expectations)
- Advanced monitoring (custom CloudWatch dashboards)
- Multi-region deployment
- Blue/green deployment strategy
Scaling Considerations
- EMR migration for large datasets
- Kubernetes deployment option
- Serverless architecture evolution
This documentation provides comprehensive DevOps guidance for deploying and managing the PECOS Data Extraction Pipeline across AWS, local, and containerized environments. The pipeline demonstrates enterprise-grade ETL patterns with robust error handling, monitoring, and cost optimization.
Top comments (0)