Building a Real-Time Healthcare Data Pipeline - Phase 2: Stream Processing with Apache Spark
This is Part 2 of my VitalWatch series. In Phase 1, I built a producer that streams ICU patient data from MIMIC-IV to AWS SQS. Now I'm building the processing engine that transforms this raw stream into analytics-ready data.
The Evolution: From Message Queue to Data Lake
Where We Left Off
Phase 1 created a data faucet - ICU patient data flowing continuously into an SQS queue. But raw messages sitting in a queue aren't useful for healthcare analytics. We need to process, structure, and store this data efficiently.
The Big Picture: What Phase 2 Accomplishes
Phase 1: ICU Data → SQS Queue
Phase 2: SQS Queue → Spark → Analytics → Parquet Files → S3
Phase 3: S3 → Dashboard → Alerts [Coming Next]
Phase 2 is like adding a smart water treatment plant that:
- Continuously reads from the SQS queue as messages arrive
- Processes the raw JSON data into structured formats
- Transforms data (timestamps, data validation, schema enforcement)
- Stores results in an organized, queryable format
Understanding the Technology Stack - Key Technologies Explained
Why Apache Spark for Stream Processing?
Apache Spark is like a super-powered Excel that can process millions of rows in real-time. Unlike traditional databases that struggle with large-scale analytics, Spark excels at:
- Distributed processing: Spreads work across multiple CPU cores (or machines)
- In-memory computing: Keeps data in RAM for faster processing
- Fault tolerance: Automatically recovers from failures using lineage information
- Schema-on-read: Applies structure to data at query time, not storage time
Why Spark works better for analytics than traditional databases:
Traditional databases maintain ACID properties (Atomicity, Consistency, Isolation, Durability), which requires complex coordination between nodes in distributed systems. Spark trades off some consistency guarantees for massive performance gains in analytics workloads where eventual consistency is acceptable.
The Storage Revolution: Parquet Format
Moving from CSV to Parquet is like upgrading from a filing cabinet to a modern database:
CSV (Row-oriented):
patient_id,age,diagnosis,length_of_stay
1,25,Heart Attack,5.2
2,67,Pneumonia,8.7
Parquet (Column-oriented):
patient_id: [1, 2, 3, ...]
age: [25, 67, 45, ...]
diagnosis: [Heart Attack, Pneumonia, ...]
los_hours: [5.2, 8.7, 3.1, ...]
The analytics advantage: When you query "What's the average length of stay?", Parquet only reads the los_hours
column, making queries 10x faster and using much less storage due to column-specific compression algorithms.
Architecture Deep Dive
Data Schema Definition
schema = StructType([
StructField("subject_id", IntegerType()), # Patient identifier
StructField("stay_id", IntegerType()), # ICU stay identifier
StructField("intime", StringType()), # Admission time (converted later)
StructField("outtime", StringType()), # Discharge time (converted later)
StructField("los_hrs", DoubleType()), # Length of stay in hours
])
Why explicit schemas matter:
- Performance: Spark doesn't need to infer data types by scanning data
- Data quality: Type validation catches malformed data at ingestion time
- Consistency: Ensures all batches have the same structure across time
- Memory efficiency: Proper types use optimal memory allocation
The Stream Processing Pipeline
def next_batch():
messages = []
while not messages:
resp = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=min(BATCH_SIZE, 10), # SQS limit
WaitTimeSeconds=10,
)
messages = resp.get("Messages", [])
# Clean up processed messages to prevent reprocessing
entries = [{"Id": m["MessageId"], "ReceiptHandle": m["ReceiptHandle"]} for m in messages]
if entries:
sqs.delete_message_batch(QueueUrl=QUEUE_URL, Entries=entries)
return [m["Body"] for m in messages]
Key Design Decisions Explained
1. Batch Processing with SQS Limits
MaxNumberOfMessages=min(BATCH_SIZE, 10)
The constraint: AWS SQS has a hard limit of 10 messages per request. Even if I set BATCH_SIZE=50
, each batch actually processes a maximum of 10 messages per next_batch()
call.
Current limitation: To truly process 50 messages per batch, the code would need to accumulate multiple SQS calls within the next_batch()
function.
Why this matters:
- Keeps processing efficient without overwhelming the system
- Respects AWS service limits
- Spark performs better with larger batches, but we're currently limited by this design
2. Long Polling for Efficiency
WaitTimeSeconds=10
Instead of constantly hammering the SQS queue (short polling), the system uses long polling:
- Request sent → SQS waits up to 10 seconds for messages to arrive
- Messages arrive → Return immediately
- No messages → Return empty after 10 seconds, try again
SQS long polling waits for the timeout OR until it hits the MaxNumberOfMessages limit - whichever comes first
This reduces costs and improves responsiveness by minimizing empty API calls.
Potential improvement: During quiet periods (like overnight), the system still polls every 10 seconds. A production system might implement exponential backoff to reduce polling frequency when no messages are available.
3. Message Cleanup and Race Condition Prevention
entries = [{"Id": m["MessageId"], "ReceiptHandle": m["ReceiptHandle"]} for m in messages]
sqs.delete_message_batch(QueueUrl=QUEUE_URL, Entries=entries)
SQS requires both MessageId and ReceiptHandle for deletion. The ReceiptHandle acts like a "secret token" that prevents other consumers or processes from accidentally deleting messages they didn't process.
Message structure:
{
"MessageId": "12345-abcde-67890",
"ReceiptHandle": "xyz789...", // Secret deletion token (changes each time message is received)
"Body": '{"subject_id": 10000032, "stay_id": 30000123, "intime": "2180-07-23 15:09:00"}',
"Attributes": {"SentTimestamp": "1625097600000"}
}
Data Processing and Transformation
From JSON Strings to Structured Data
# Convert JSON strings to Spark DataFrame
df = (
spark.read.json(spark.sparkContext.parallelize(rows), schema=schema)
.withColumn("intime", to_timestamp("intime"))
.withColumn("outtime", to_timestamp("outtime"))
)
The Transformation Pipeline
spark.sparkContext.parallelize(rows)
: Converts Python list to Spark RDD (Resilient Distributed Dataset) for distributed processing. RDDs are Spark's fundamental data structure - they can be parallelized across multiple CPU cores or machines, unlike regular Python lists.spark.read.json(..., schema=schema)
: Parses JSON strings into structured columns with predefined types. It converts the RDD into a Spark DataFrame (higher-level abstraction) using the schema we defined.to_timestamp()
: Converts string timestamps like "2180-07-23 15:09:00" to proper datetime objects for time-based analytics and partitioning.
Partitioning Strategy: The Key to Fast Analytics
The Double Partitioning Approach
df.repartition("intime") # In-memory organization
.write
.partitionBy("intime") # Disk organization
.parquet(str(out))
This might look redundant, but each serves a different purpose:
.repartition("intime")
- Spark Processing Optimization
Purpose: Groups similar timestamps together in Spark's memory partitions before writing to disk.
Example:
Before repartition (data scattered randomly):
Spark Partition 1: [2180-07-23 15:09:00, 2180-07-24 08:30:00]
Spark Partition 2: [2180-07-23 16:15:00, 2180-07-24 14:45:00]
After repartition("intime") (data grouped by time):
Spark Partition 1: [2180-07-23 15:09:00, 2180-07-23 16:15:00] # Same day grouped
Spark Partition 2: [2180-07-24 08:30:00, 2180-07-24 14:45:00] # Same day grouped
This prevents creating many small files - without repartitioning, Spark might create multiple files for the same time partition.
.partitionBy("intime")
- Query Performance Optimization
Purpose: Creates time-based folder structure on disk for fast filtering (called "partition pruning").
Resulting structure:
lake/icu/
├── intime=2180-07-23 15:09:00/
│ └── part-00000.parquet
├── intime=2180-07-23 16:15:00/
│ └── part-00001.parquet
└── intime=2180-07-24 08:30:00/
└── part-00002.parquet
Why Time-Based Partitioning?
Healthcare analytics are inherently time-focused:
- "ICU capacity trends over the past month"
- "Average length of stay this quarter vs last quarter"
- "Weekend vs weekday admission patterns"
Partition pruning in action:
-- This query only reads July 23rd folders, ignoring everything else
SELECT AVG(los_hrs)
FROM icu_data
WHERE intime >= '2180-07-23' AND intime < '2180-07-24'
Performance impact: Instead of scanning millions of records, the query only reads thousands - this is called "partition pruning" where the query engine skips entire partitions based on filter conditions.
Stream Processing Loop: Continuous Operation
The Forever Loop Design
try:
while True: # Continuous processing
rows = next_batch() # Get messages (blocks until available)
# Process and transform data
df = (
spark.read.json(spark.sparkContext.parallelize(rows), schema=schema)
.withColumn("intime", to_timestamp("intime"))
.withColumn("outtime", to_timestamp("outtime"))
)
# Only save if we have data
if df.count():
(df.repartition("intime")
.write
.mode("append")
.partitionBy("intime")
.parquet(str(out)))
time.sleep(2) # Gentle pause between batches
except KeyboardInterrupt:
print("Stopped.")
spark.stop() # Clean shutdown
Why Continuous Processing?
Real-time healthcare monitoring requires:
- Immediate processing as patients are admitted/discharged
- Always-on availability for 24/7 hospital operations
- Graceful handling of quiet periods (no new patients)
Current design considerations:
- The 2-second pause prevents overwhelming AWS with requests during processing
- The 10-second SQS wait provides natural batching during quiet periods
- For production, you'd want exponential backoff during extended quiet periods
Error Handling and Graceful Shutdown
Keyboard interrupt handling ensures:
- Current batch completes processing (data consistency)
- Spark resources are properly released (memory cleanup)
- No data loss from incomplete operations
- Clean connection termination to AWS services
Development Strategy: Local First, Cloud Later
Configuration Flexibility
OUTPUT_DIR = os.environ.get("VW_LAKE_DIR", "./lake/icu") # Default: local
Development workflow:
# Start with local testing (Spark runs locally on your machine)
python stream_sqs_to_parquet.py
# Scale to S3 when ready (same code, different output location)
export VW_LAKE_DIR="s3://your-bucket/data/"
python stream_sqs_to_parquet.py
Benefits of local-first approach:
- Faster iteration (no network delays for file I/O)
- Cost control (no AWS storage charges during development)
-
Easy debugging (can inspect Parquet files directly with tools like
parquet-tools
) - Same code works for both local and cloud storage (abstraction layer)
Real-World Impact: From Raw Messages to Healthcare Insights
Before: Raw SQS Messages
{"subject_id": 10000032, "stay_id": 30000123, "intime": "2180-07-23 15:09:00", "outtime": "2180-07-31 13:58:00", "los_hrs": 190.82}
After: Analytics-Ready Data Lake
lake/icu/ (or s3://vitalwatch-lake/)
├── intime=2180-07-23 15:09:00/
│ ├── part-00000.parquet (compressed, columnar, schema-enforced)
│ └── _SUCCESS (Spark success marker)
├── intime=2180-07-23 16:15:00/
│ ├── part-00001.parquet
│ └── _SUCCESS
Future Analytics Possibilities
-- Hospital capacity planning
SELECT DATE(intime), COUNT(*) as daily_admissions
FROM icu_data
WHERE intime >= '2024-01-01'
GROUP BY DATE(intime);
-- Length of stay analysis
SELECT AVG(los_hrs) as avg_los
FROM icu_data
WHERE intime >= current_date() - interval 30 days;
-- Peak hours identification
SELECT HOUR(intime), COUNT(*) as admissions
FROM icu_data
GROUP BY HOUR(intime)
ORDER BY admissions DESC;
Key Learnings and Insights
1. Stream Processing vs Batch Processing
Traditional ETL processes data in large chunks once per day. Stream processing handles data as it arrives, enabling:
- Real-time dashboards for hospital administrators
- Immediate alerts for unusual patterns
- Up-to-date analytics for clinical decision-making
Trade-off: Stream processing adds complexity but enables real-time insights.
2. The Power of Columnar Storage
Switching from row-based (CSV) to column-based (Parquet) storage isn't just a technical detail:
- Query performance improves by 10-100x for analytics workloads
- Storage costs decrease by 50-80% due to column-specific compression
- Data processing scales to billions of records efficiently
3. Partition Strategy Matters
Time-based partitioning aligns with how healthcare data is actually queried:
- Most questions involve time ranges ("last month", "this quarter")
- Regulatory reporting is time-period based
- Operational metrics track daily/weekly/monthly trends
4. Graceful Degradation and Production Considerations
Current system handles basic scenarios but production systems need:
- Exponential backoff during quiet periods
- Dead letter queues for message processing failures
- Monitoring and alerting for system health
- Batch size optimization for throughput
Current Limitations and Future Improvements
What Could Be Enhanced:
- True batch accumulation: Currently limited to 10 messages per batch due to SQS API limits
- Quiet period optimization: System polls every 10 seconds even when no data is flowing
- Error handling: Basic error handling - production would need retry logic, dead letter queues
- Monitoring: No metrics on processing latency, throughput, or system health
For Production Deployment:
- Spark cluster instead of local Spark instance
- Auto-scaling based on queue depth
- Multiple consumer instances for higher throughput
- Data quality monitoring and alerting
What's Next: Phase 3
With Phase 2 complete, we now have:
- ✅ Real-time data ingestion (Phase 1)
- ✅ Stream processing pipeline (Phase 2)
- ✅ Analytics-ready data lake with columnar storage and time-based partitioning
Phase 3 will add:
- DynamoDB for real-time alerts storage
- FastAPI web service for dashboard APIs
- WebSocket connections for live updates to browser clients
- Kubernetes deployment for production scaling and reliability
Resources and Code
The complete Phase 2 implementation is available in my VitalWatch repository.
Key files:
-
stream_sqs_to_parquet.py
- Main stream processing script -
pyproject.toml
- Project dependencies and configuration -
cleanup.sh
- Resource cleanup for cost control
Technical Specifications
Dependencies:
pip install pyarrow==16.1 awscli boto3
We use pyarrow because it allows faster read/write for Spark to process Parquet files
AWS Services Used:
- SQS: Message queuing and stream buffering (FIFO queues for ordered processing)
- S3: Data lake storage for Parquet files (optional, can start local)
- IAM: Access control and permissions
Local Requirements:
- Apache Spark 3.5: Stream processing engine (runs locally for development)
- Python 3.8+: Runtime environment
This is Part 2 of the VitalWatch series. Next up: "Phase 3: Building Real-time Healthcare Dashboards with FastAPI and WebSockets"
Series Navigation:
- Part 1: Building the Data Ingestion Pipeline
- Part 2: Stream Processing with Apache Spark ← You are here
- Part 3: Real-time Dashboards and Alerts ← Coming soon
Top comments (0)