DEV Community

jayanti-neu
jayanti-neu

Posted on

Building a Real-Time Healthcare Data Pipeline with Apache Spark: From SQS to Parquet (Part 2)

Building a Real-Time Healthcare Data Pipeline - Phase 2: Stream Processing with Apache Spark

This is Part 2 of my VitalWatch series. In Phase 1, I built a producer that streams ICU patient data from MIMIC-IV to AWS SQS. Now I'm building the processing engine that transforms this raw stream into analytics-ready data.

The Evolution: From Message Queue to Data Lake

Where We Left Off

Phase 1 created a data faucet - ICU patient data flowing continuously into an SQS queue. But raw messages sitting in a queue aren't useful for healthcare analytics. We need to process, structure, and store this data efficiently.

The Big Picture: What Phase 2 Accomplishes

Phase 1: ICU Data → SQS Queue
Phase 2: SQS Queue → Spark → Analytics → Parquet Files → S3
Phase 3: S3 → Dashboard → Alerts [Coming Next]
Enter fullscreen mode Exit fullscreen mode

Phase 2 is like adding a smart water treatment plant that:

  • Continuously reads from the SQS queue as messages arrive
  • Processes the raw JSON data into structured formats
  • Transforms data (timestamps, data validation, schema enforcement)
  • Stores results in an organized, queryable format

Understanding the Technology Stack - Key Technologies Explained

Why Apache Spark for Stream Processing?

Apache Spark is like a super-powered Excel that can process millions of rows in real-time. Unlike traditional databases that struggle with large-scale analytics, Spark excels at:

  • Distributed processing: Spreads work across multiple CPU cores (or machines)
  • In-memory computing: Keeps data in RAM for faster processing
  • Fault tolerance: Automatically recovers from failures using lineage information
  • Schema-on-read: Applies structure to data at query time, not storage time

Why Spark works better for analytics than traditional databases:
Traditional databases maintain ACID properties (Atomicity, Consistency, Isolation, Durability), which requires complex coordination between nodes in distributed systems. Spark trades off some consistency guarantees for massive performance gains in analytics workloads where eventual consistency is acceptable.

The Storage Revolution: Parquet Format

Moving from CSV to Parquet is like upgrading from a filing cabinet to a modern database:

CSV (Row-oriented):

patient_id,age,diagnosis,length_of_stay
1,25,Heart Attack,5.2
2,67,Pneumonia,8.7
Enter fullscreen mode Exit fullscreen mode

Parquet (Column-oriented):

patient_id: [1, 2, 3, ...]
age:        [25, 67, 45, ...]
diagnosis:  [Heart Attack, Pneumonia, ...]
los_hours:  [5.2, 8.7, 3.1, ...]
Enter fullscreen mode Exit fullscreen mode

The analytics advantage: When you query "What's the average length of stay?", Parquet only reads the los_hours column, making queries 10x faster and using much less storage due to column-specific compression algorithms.

Architecture Deep Dive

Data Schema Definition

schema = StructType([
    StructField("subject_id", IntegerType()),    # Patient identifier
    StructField("stay_id",    IntegerType()),    # ICU stay identifier
    StructField("intime",     StringType()),     # Admission time (converted later)
    StructField("outtime",    StringType()),     # Discharge time (converted later)
    StructField("los_hrs",    DoubleType()),     # Length of stay in hours
])
Enter fullscreen mode Exit fullscreen mode

Why explicit schemas matter:

  • Performance: Spark doesn't need to infer data types by scanning data
  • Data quality: Type validation catches malformed data at ingestion time
  • Consistency: Ensures all batches have the same structure across time
  • Memory efficiency: Proper types use optimal memory allocation

The Stream Processing Pipeline

def next_batch():
    messages = []
    while not messages:
        resp = sqs.receive_message(
            QueueUrl=QUEUE_URL,
            MaxNumberOfMessages=min(BATCH_SIZE, 10),  # SQS limit
            WaitTimeSeconds=10,
        )
        messages = resp.get("Messages", [])

    # Clean up processed messages to prevent reprocessing
    entries = [{"Id": m["MessageId"], "ReceiptHandle": m["ReceiptHandle"]} for m in messages]
    if entries:
        sqs.delete_message_batch(QueueUrl=QUEUE_URL, Entries=entries)

    return [m["Body"] for m in messages]
Enter fullscreen mode Exit fullscreen mode

Key Design Decisions Explained

1. Batch Processing with SQS Limits

MaxNumberOfMessages=min(BATCH_SIZE, 10)
Enter fullscreen mode Exit fullscreen mode

The constraint: AWS SQS has a hard limit of 10 messages per request. Even if I set BATCH_SIZE=50, each batch actually processes a maximum of 10 messages per next_batch() call.

Current limitation: To truly process 50 messages per batch, the code would need to accumulate multiple SQS calls within the next_batch() function.

Why this matters:

  • Keeps processing efficient without overwhelming the system
  • Respects AWS service limits
  • Spark performs better with larger batches, but we're currently limited by this design

2. Long Polling for Efficiency

WaitTimeSeconds=10
Enter fullscreen mode Exit fullscreen mode

Instead of constantly hammering the SQS queue (short polling), the system uses long polling:

  • Request sent → SQS waits up to 10 seconds for messages to arrive
  • Messages arrive → Return immediately
  • No messages → Return empty after 10 seconds, try again

SQS long polling waits for the timeout OR until it hits the MaxNumberOfMessages limit - whichever comes first

This reduces costs and improves responsiveness by minimizing empty API calls.

Potential improvement: During quiet periods (like overnight), the system still polls every 10 seconds. A production system might implement exponential backoff to reduce polling frequency when no messages are available.

3. Message Cleanup and Race Condition Prevention

entries = [{"Id": m["MessageId"], "ReceiptHandle": m["ReceiptHandle"]} for m in messages]
sqs.delete_message_batch(QueueUrl=QUEUE_URL, Entries=entries)
Enter fullscreen mode Exit fullscreen mode

SQS requires both MessageId and ReceiptHandle for deletion. The ReceiptHandle acts like a "secret token" that prevents other consumers or processes from accidentally deleting messages they didn't process.

Message structure:

{
  "MessageId": "12345-abcde-67890",
  "ReceiptHandle": "xyz789...",  // Secret deletion token (changes each time message is received)
  "Body": '{"subject_id": 10000032, "stay_id": 30000123, "intime": "2180-07-23 15:09:00"}',
  "Attributes": {"SentTimestamp": "1625097600000"}
}
Enter fullscreen mode Exit fullscreen mode

Data Processing and Transformation

From JSON Strings to Structured Data

# Convert JSON strings to Spark DataFrame
df = (
    spark.read.json(spark.sparkContext.parallelize(rows), schema=schema)
         .withColumn("intime",  to_timestamp("intime"))
         .withColumn("outtime", to_timestamp("outtime"))
)
Enter fullscreen mode Exit fullscreen mode

The Transformation Pipeline

  1. spark.sparkContext.parallelize(rows): Converts Python list to Spark RDD (Resilient Distributed Dataset) for distributed processing. RDDs are Spark's fundamental data structure - they can be parallelized across multiple CPU cores or machines, unlike regular Python lists.

  2. spark.read.json(..., schema=schema): Parses JSON strings into structured columns with predefined types. It converts the RDD into a Spark DataFrame (higher-level abstraction) using the schema we defined.

  3. to_timestamp(): Converts string timestamps like "2180-07-23 15:09:00" to proper datetime objects for time-based analytics and partitioning.

Partitioning Strategy: The Key to Fast Analytics

The Double Partitioning Approach

df.repartition("intime")          # In-memory organization
  .write
  .partitionBy("intime")          # Disk organization
  .parquet(str(out))
Enter fullscreen mode Exit fullscreen mode

This might look redundant, but each serves a different purpose:

.repartition("intime") - Spark Processing Optimization

Purpose: Groups similar timestamps together in Spark's memory partitions before writing to disk.

Example:

Before repartition (data scattered randomly):
Spark Partition 1: [2180-07-23 15:09:00, 2180-07-24 08:30:00]
Spark Partition 2: [2180-07-23 16:15:00, 2180-07-24 14:45:00]

After repartition("intime") (data grouped by time):
Spark Partition 1: [2180-07-23 15:09:00, 2180-07-23 16:15:00]  # Same day grouped
Spark Partition 2: [2180-07-24 08:30:00, 2180-07-24 14:45:00]  # Same day grouped
Enter fullscreen mode Exit fullscreen mode

This prevents creating many small files - without repartitioning, Spark might create multiple files for the same time partition.

.partitionBy("intime") - Query Performance Optimization

Purpose: Creates time-based folder structure on disk for fast filtering (called "partition pruning").

Resulting structure:

lake/icu/
├── intime=2180-07-23 15:09:00/
│   └── part-00000.parquet
├── intime=2180-07-23 16:15:00/
│   └── part-00001.parquet
└── intime=2180-07-24 08:30:00/
    └── part-00002.parquet
Enter fullscreen mode Exit fullscreen mode

Why Time-Based Partitioning?

Healthcare analytics are inherently time-focused:

  • "ICU capacity trends over the past month"
  • "Average length of stay this quarter vs last quarter"
  • "Weekend vs weekday admission patterns"

Partition pruning in action:

-- This query only reads July 23rd folders, ignoring everything else
SELECT AVG(los_hrs) 
FROM icu_data 
WHERE intime >= '2180-07-23' AND intime < '2180-07-24'
Enter fullscreen mode Exit fullscreen mode

Performance impact: Instead of scanning millions of records, the query only reads thousands - this is called "partition pruning" where the query engine skips entire partitions based on filter conditions.

Stream Processing Loop: Continuous Operation

The Forever Loop Design

try:
    while True:  # Continuous processing
        rows = next_batch()  # Get messages (blocks until available)

        # Process and transform data
        df = (
            spark.read.json(spark.sparkContext.parallelize(rows), schema=schema)
                 .withColumn("intime", to_timestamp("intime"))
                 .withColumn("outtime", to_timestamp("outtime"))
        )

        # Only save if we have data
        if df.count():
            (df.repartition("intime")
               .write
               .mode("append")
               .partitionBy("intime")
               .parquet(str(out)))

        time.sleep(2)  # Gentle pause between batches

except KeyboardInterrupt:
    print("Stopped.")
    spark.stop()  # Clean shutdown
Enter fullscreen mode Exit fullscreen mode

Why Continuous Processing?

Real-time healthcare monitoring requires:

  • Immediate processing as patients are admitted/discharged
  • Always-on availability for 24/7 hospital operations
  • Graceful handling of quiet periods (no new patients)

Current design considerations:

  • The 2-second pause prevents overwhelming AWS with requests during processing
  • The 10-second SQS wait provides natural batching during quiet periods
  • For production, you'd want exponential backoff during extended quiet periods

Error Handling and Graceful Shutdown

Keyboard interrupt handling ensures:

  1. Current batch completes processing (data consistency)
  2. Spark resources are properly released (memory cleanup)
  3. No data loss from incomplete operations
  4. Clean connection termination to AWS services

Development Strategy: Local First, Cloud Later

Configuration Flexibility

OUTPUT_DIR = os.environ.get("VW_LAKE_DIR", "./lake/icu")  # Default: local
Enter fullscreen mode Exit fullscreen mode

Development workflow:

# Start with local testing (Spark runs locally on your machine)
python stream_sqs_to_parquet.py

# Scale to S3 when ready (same code, different output location)
export VW_LAKE_DIR="s3://your-bucket/data/"
python stream_sqs_to_parquet.py
Enter fullscreen mode Exit fullscreen mode

Benefits of local-first approach:

  • Faster iteration (no network delays for file I/O)
  • Cost control (no AWS storage charges during development)
  • Easy debugging (can inspect Parquet files directly with tools like parquet-tools)
  • Same code works for both local and cloud storage (abstraction layer)

Real-World Impact: From Raw Messages to Healthcare Insights

Before: Raw SQS Messages

{"subject_id": 10000032, "stay_id": 30000123, "intime": "2180-07-23 15:09:00", "outtime": "2180-07-31 13:58:00", "los_hrs": 190.82}
Enter fullscreen mode Exit fullscreen mode

After: Analytics-Ready Data Lake

lake/icu/  (or s3://vitalwatch-lake/)
├── intime=2180-07-23 15:09:00/
│   ├── part-00000.parquet (compressed, columnar, schema-enforced)
│   └── _SUCCESS (Spark success marker)
├── intime=2180-07-23 16:15:00/
│   ├── part-00001.parquet
│   └── _SUCCESS
Enter fullscreen mode Exit fullscreen mode

Future Analytics Possibilities

-- Hospital capacity planning
SELECT DATE(intime), COUNT(*) as daily_admissions
FROM icu_data 
WHERE intime >= '2024-01-01'
GROUP BY DATE(intime);

-- Length of stay analysis
SELECT AVG(los_hrs) as avg_los
FROM icu_data 
WHERE intime >= current_date() - interval 30 days;

-- Peak hours identification  
SELECT HOUR(intime), COUNT(*) as admissions
FROM icu_data 
GROUP BY HOUR(intime)
ORDER BY admissions DESC;
Enter fullscreen mode Exit fullscreen mode

Key Learnings and Insights

1. Stream Processing vs Batch Processing

Traditional ETL processes data in large chunks once per day. Stream processing handles data as it arrives, enabling:

  • Real-time dashboards for hospital administrators
  • Immediate alerts for unusual patterns
  • Up-to-date analytics for clinical decision-making

Trade-off: Stream processing adds complexity but enables real-time insights.

2. The Power of Columnar Storage

Switching from row-based (CSV) to column-based (Parquet) storage isn't just a technical detail:

  • Query performance improves by 10-100x for analytics workloads
  • Storage costs decrease by 50-80% due to column-specific compression
  • Data processing scales to billions of records efficiently

3. Partition Strategy Matters

Time-based partitioning aligns with how healthcare data is actually queried:

  • Most questions involve time ranges ("last month", "this quarter")
  • Regulatory reporting is time-period based
  • Operational metrics track daily/weekly/monthly trends

4. Graceful Degradation and Production Considerations

Current system handles basic scenarios but production systems need:

  • Exponential backoff during quiet periods
  • Dead letter queues for message processing failures
  • Monitoring and alerting for system health
  • Batch size optimization for throughput

Current Limitations and Future Improvements

What Could Be Enhanced:

  1. True batch accumulation: Currently limited to 10 messages per batch due to SQS API limits
  2. Quiet period optimization: System polls every 10 seconds even when no data is flowing
  3. Error handling: Basic error handling - production would need retry logic, dead letter queues
  4. Monitoring: No metrics on processing latency, throughput, or system health

For Production Deployment:

  • Spark cluster instead of local Spark instance
  • Auto-scaling based on queue depth
  • Multiple consumer instances for higher throughput
  • Data quality monitoring and alerting

What's Next: Phase 3

With Phase 2 complete, we now have:

  • Real-time data ingestion (Phase 1)
  • Stream processing pipeline (Phase 2)
  • Analytics-ready data lake with columnar storage and time-based partitioning

Phase 3 will add:

  • DynamoDB for real-time alerts storage
  • FastAPI web service for dashboard APIs
  • WebSocket connections for live updates to browser clients
  • Kubernetes deployment for production scaling and reliability

Resources and Code

The complete Phase 2 implementation is available in my VitalWatch repository.

Key files:

  • stream_sqs_to_parquet.py - Main stream processing script
  • pyproject.toml - Project dependencies and configuration
  • cleanup.sh - Resource cleanup for cost control

Technical Specifications

Dependencies:

pip install pyarrow==16.1 awscli boto3
Enter fullscreen mode Exit fullscreen mode

We use pyarrow because it allows faster read/write for Spark to process Parquet files

AWS Services Used:

  • SQS: Message queuing and stream buffering (FIFO queues for ordered processing)
  • S3: Data lake storage for Parquet files (optional, can start local)
  • IAM: Access control and permissions

Local Requirements:

  • Apache Spark 3.5: Stream processing engine (runs locally for development)
  • Python 3.8+: Runtime environment

This is Part 2 of the VitalWatch series. Next up: "Phase 3: Building Real-time Healthcare Dashboards with FastAPI and WebSockets"

Series Navigation:

Top comments (0)