DEV Community

Thesius Code
Thesius Code

Posted on

Real-Time Data Streaming with Apache Kafka and Spark

Most teams bolt on streaming as an afterthought — and it shows. Consumer lag spirals, late events silently vanish, and "exactly-once" turns out to mean "at-least-twice with fingers crossed." The difference between a production streaming pipeline and a demo isn't the tech stack; it's the patterns you apply from the start.

This guide walks through building a production-grade real-time data pipeline from Kafka ingestion through Spark Structured Streaming to a Delta Lake sink, with practical code for every component.

Architecture

┌──────────┐     ┌─────────┐     ┌───────────────────┐     ┌──────────┐
│  Event   │────>│  Kafka  │────>│ Spark Structured  │────>│  Delta   │
│ Sources  │     │ Cluster │     │    Streaming      │     │  Lake    │
└──────────┘     └─────────┘     └───────────────────┘     └──────────┘
  (APIs,           (Buffer,         (Transform,              (Bronze,
   Apps,            decouple)        aggregate,               Silver,
   IoT)                              enrich)                  Gold)
Enter fullscreen mode Exit fullscreen mode

Why This Stack?

  • Kafka handles ingestion, buffering, and replay. It decouples producers from consumers and provides durable message storage.
  • Spark Structured Streaming handles transformations at scale with exactly-once semantics and built-in state management.
  • Delta Lake provides ACID transactions on the sink side, enabling reliable batch + streaming queries on the same tables.

Part 1: Kafka Producer

A well-designed producer is the foundation of your pipeline. Serialize properly, handle errors, and use partitioning to guarantee ordering where it matters.

import json
import time
from datetime import datetime, timezone
from confluent_kafka import Producer, KafkaError
from confluent_kafka.serialization import (
    StringSerializer, SerializationContext, MessageField
)


class EventProducer:
    """Production Kafka producer with error handling and metrics."""

    def __init__(self, bootstrap_servers: str, topic: str):
        self.topic = topic
        self.delivered = 0
        self.failed = 0

        self.producer = Producer({
            "bootstrap.servers": bootstrap_servers,
            "client.id": f"producer-{topic}",
            "acks": "all",                    # Wait for all replicas
            "retries": 5,                     # Retry transient errors
            "retry.backoff.ms": 500,
            "enable.idempotence": True,       # Exactly-once producer
            "compression.type": "snappy",     # Compress for throughput
            "linger.ms": 20,                  # Batch messages for 20ms
            "batch.size": 65536,              # 64KB batch size
            "max.in.flight.requests.per.connection": 5,
        })

    def produce(
        self,
        key: str,
        value: dict,
        headers: dict | None = None,
    ):
        """Produce a message to Kafka with automatic metadata enrichment."""
        # Add event metadata
        value["_event_time"] = datetime.now(timezone.utc).isoformat()
        value["_event_id"] = f"{key}_{int(time.time() * 1000)}"

        kafka_headers = []
        if headers:
            kafka_headers = [
                (k, v.encode("utf-8")) for k, v in headers.items()
            ]

        self.producer.produce(
            topic=self.topic,
            key=key,
            value=json.dumps(value),
            headers=kafka_headers,
            callback=self._delivery_callback,
        )

    def _delivery_callback(self, err, msg):
        if err:
            self.failed += 1
            print(f"Delivery failed: {err}")
        else:
            self.delivered += 1

    def flush(self, timeout: float = 30):
        """Wait for all messages to be delivered."""
        remaining = self.producer.flush(timeout)
        if remaining > 0:
            print(f"Warning: {remaining} messages not delivered")

    def get_stats(self) -> dict:
        """Return delivery success metrics."""
        return {
            "delivered": self.delivered,
            "failed": self.failed,
            "success_rate": (
                self.delivered / (self.delivered + self.failed)
                if (self.delivered + self.failed) > 0
                else 0
            ),
        }


# Usage
producer = EventProducer(
    bootstrap_servers="kafka-broker-1:9092,kafka-broker-2:9092",
    topic="user-events",
)

# Produce events
events = [
    {"user_id": "u123", "action": "page_view", "page": "/pricing"},
    {"user_id": "u456", "action": "purchase", "amount": 99.99},
    {"user_id": "u123", "action": "click", "element": "cta_button"},
]

for event in events:
    producer.produce(
        key=event["user_id"],    # Partition by user_id for ordering
        value=event,
        headers={"source": "web-app", "version": "1.0"},
    )

producer.flush()
print(f"Stats: {producer.get_stats()}")
Enter fullscreen mode Exit fullscreen mode

Part 2: Kafka Consumer

import json
from confluent_kafka import Consumer, KafkaError, TopicPartition


class EventConsumer:
    """Production Kafka consumer with manual offset management."""

    def __init__(
        self,
        bootstrap_servers: str,
        topic: str,
        group_id: str,
        auto_commit: bool = False,
    ):
        self.topic = topic
        self.consumer = Consumer({
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": auto_commit,
            "max.poll.interval.ms": 300000,     # 5 min max processing
            "session.timeout.ms": 30000,
            "fetch.min.bytes": 1024,             # Wait for 1KB
            "fetch.wait.max.ms": 500,
        })
        self.consumer.subscribe([topic])

    def consume_batch(
        self, batch_size: int = 100, timeout: float = 1.0
    ) -> list[dict]:
        """Consume a batch of messages with deserialization."""
        messages = []
        while len(messages) < batch_size:
            msg = self.consumer.poll(timeout)
            if msg is None:
                break
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                print(f"Consumer error: {msg.error()}")
                continue

            messages.append({
                "key": msg.key().decode("utf-8") if msg.key() else None,
                "value": json.loads(msg.value().decode("utf-8")),
                "topic": msg.topic(),
                "partition": msg.partition(),
                "offset": msg.offset(),
                "timestamp": msg.timestamp(),
                "headers": {
                    h[0]: h[1].decode("utf-8")
                    for h in (msg.headers() or [])
                },
            })

        return messages

    def commit(self):
        """Manually commit offsets after successful processing."""
        self.consumer.commit()

    def close(self):
        """Close consumer and release partition assignments."""
        self.consumer.close()


# Usage
consumer = EventConsumer(
    bootstrap_servers="kafka-broker-1:9092",
    topic="user-events",
    group_id="analytics-pipeline",
)

try:
    while True:
        batch = consumer.consume_batch(batch_size=100, timeout=2.0)
        if not batch:
            continue

        # Process batch
        for msg in batch:
            process_event(msg["value"])

        # Commit only after successful processing
        consumer.commit()
        print(f"Processed {len(batch)} events")
except KeyboardInterrupt:
    pass
finally:
    consumer.close()
Enter fullscreen mode Exit fullscreen mode

Part 3: Spark Structured Streaming

This is where the real processing happens. Spark Structured Streaming reads from Kafka, transforms the data, and writes to Delta Lake in a Medallion Architecture.

Basic Stream Processing

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, window, count, sum as spark_sum,
    avg, current_timestamp, expr, to_timestamp
)
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, TimestampType
)


spark = SparkSession.builder \
    .appName("UserEventsPipeline") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


# Define the event schema
event_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("action", StringType(), True),
    StructField("page", StringType(), True),
    StructField("element", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("_event_time", StringType(), True),
    StructField("_event_id", StringType(), True),
])


def create_kafka_stream(spark: SparkSession, topic: str):
    """Read from Kafka topic as a streaming DataFrame."""

    raw_stream = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka-broker-1:9092") \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .option("maxOffsetsPerTrigger", 100000) \
        .option("kafka.security.protocol", "SASL_SSL") \
        .load()

    # Parse JSON values and flatten the nested structure
    parsed_stream = raw_stream \
        .select(
            col("key").cast("string").alias("kafka_key"),
            from_json(
                col("value").cast("string"), event_schema
            ).alias("data"),
            col("topic"),
            col("partition"),
            col("offset"),
            col("timestamp").alias("kafka_timestamp"),
        ) \
        .select(
            "kafka_key",
            "data.*",
            "topic",
            "partition",
            "offset",
            "kafka_timestamp",
        ) \
        .withColumn(
            "event_time",
            to_timestamp(col("_event_time"))
        )

    return parsed_stream
Enter fullscreen mode Exit fullscreen mode

Bronze Layer: Raw Event Sink

def write_bronze_stream(parsed_stream, checkpoint_path: str):
    """Write raw events to Bronze Delta table with ingestion timestamp."""

    bronze_stream = parsed_stream \
        .withColumn("_bronze_ingested_at", current_timestamp()) \
        .writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", f"{checkpoint_path}/bronze") \
        .option("mergeSchema", "true") \
        .trigger(processingTime="30 seconds") \
        .toTable("streaming.bronze.user_events")

    return bronze_stream
Enter fullscreen mode Exit fullscreen mode

Silver Layer: Cleaned and Enriched

def create_silver_stream(bronze_table: str):
    """Read from Bronze and apply cleaning, dedup, and enrichment."""

    silver_stream = spark.readStream \
        .format("delta") \
        .table(bronze_table) \
        .filter(col("user_id").isNotNull()) \
        .filter(col("action").isNotNull()) \
        .dropDuplicates(["_event_id"]) \
        .withColumn("event_date", col("event_time").cast("date")) \
        .withColumn(
            "action_category",
            expr("""
                CASE
                    WHEN action IN ('page_view', 'scroll') THEN 'engagement'
                    WHEN action IN ('click', 'hover') THEN 'interaction'
                    WHEN action IN ('purchase', 'add_to_cart') THEN 'conversion'
                    ELSE 'other'
                END
            """)
        )

    return silver_stream


def write_silver_stream(silver_stream, checkpoint_path: str):
    """Write cleaned Silver stream to Delta table."""

    query = silver_stream \
        .withColumn("_silver_processed_at", current_timestamp()) \
        .writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", f"{checkpoint_path}/silver") \
        .trigger(processingTime="30 seconds") \
        .toTable("streaming.silver.user_events")

    return query
Enter fullscreen mode Exit fullscreen mode

Gold Layer: Real-Time Aggregations

def create_realtime_dashboard_stream(silver_table: str):
    """Create windowed aggregations for real-time dashboards."""

    agg_stream = spark.readStream \
        .format("delta") \
        .table(silver_table) \
        .withWatermark("event_time", "5 minutes") \
        .groupBy(
            window(col("event_time"), "1 minute"),
            col("action_category"),
        ) \
        .agg(
            count("*").alias("event_count"),
            spark_sum(
                expr("CASE WHEN amount IS NOT NULL THEN amount ELSE 0 END")
            ).alias("total_revenue"),
            avg("amount").alias("avg_order_value"),
        ) \
        .select(
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            "action_category",
            "event_count",
            "total_revenue",
            "avg_order_value",
        )

    return agg_stream


def write_dashboard_stream(agg_stream, checkpoint_path: str):
    """Write aggregations to Gold table for dashboard consumption."""

    query = agg_stream \
        .writeStream \
        .format("delta") \
        .outputMode("complete") \
        .option("checkpointLocation", f"{checkpoint_path}/gold_dashboard") \
        .trigger(processingTime="1 minute") \
        .toTable("streaming.gold.realtime_dashboard")

    return query
Enter fullscreen mode Exit fullscreen mode

Part 4: Exactly-Once Semantics

The hardest problem in streaming is processing each event exactly once. Here's how each layer of the stack contributes:

Kafka → Spark → Delta Lake (Exactly-Once Path)

1. Kafka: Idempotent producer (enable.idempotence=True)
   - Deduplicates at the broker level using sequence numbers

2. Spark Structured Streaming: Checkpointing
   - Tracks Kafka offsets in a checkpoint directory
   - On failure, replays from the last committed offset
   - Uses write-ahead log for state management

3. Delta Lake: ACID transactions
   - Each micro-batch is an atomic commit
   - If a batch partially fails, it's fully rolled back
   - Combined with Spark checkpointing = exactly-once writes
Enter fullscreen mode Exit fullscreen mode

Handling Late Data

# Watermark tells Spark how late data can arrive
stream_with_watermark = parsed_stream \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("action"),
    ) \
    .count()

# Events arriving more than 10 minutes late are dropped
# Events within the 10-minute window update the aggregation
Enter fullscreen mode Exit fullscreen mode

Deduplication

# Deduplicate within a watermark window
deduplicated = parsed_stream \
    .withWatermark("event_time", "1 hour") \
    .dropDuplicates(["_event_id", "event_time"])
Enter fullscreen mode Exit fullscreen mode

Part 5: Monitoring Your Pipeline

A streaming pipeline without monitoring is a ticking time bomb.

Spark Streaming Metrics

def monitor_stream(query):
    """Extract monitoring metrics from a streaming query."""
    while query.isActive:
        progress = query.lastProgress
        if progress:
            metrics = {
                "batch_id": progress["batchId"],
                "input_rows": progress["numInputRows"],
                "processing_time_ms": progress["batchDuration"],
                "input_rows_per_sec": progress.get("inputRowsPerSecond", 0),
                "processed_rows_per_sec": progress.get(
                    "processedRowsPerSecond", 0
                ),
            }

            # Alert when processing falls behind ingestion rate
            if metrics["input_rows_per_sec"] > 0:
                ratio = (
                    metrics["processed_rows_per_sec"]
                    / metrics["input_rows_per_sec"]
                )
                if ratio < 0.8:
                    print(
                        f"WARNING: Processing falling behind. "
                        f"Ratio: {ratio:.2f}"
                    )

            print(f"Batch {metrics['batch_id']}: {metrics}")

        time.sleep(30)
Enter fullscreen mode Exit fullscreen mode

Kafka Consumer Lag Monitoring

# Check consumer group lag
kafka-consumer-groups.sh \
  --bootstrap-server kafka-broker-1:9092 \
  --describe \
  --group analytics-pipeline

# Key metric: LAG column
# If LAG grows continuously, consumers can't keep up
Enter fullscreen mode Exit fullscreen mode

Key Metrics to Track

Metric Alert Threshold Action
Consumer lag > 100K messages Scale consumers
Processing time > trigger interval Optimize transformations
Input rate vs. output rate Ratio < 0.8 Scale cluster
Checkpoint duration > 60s Reduce state size
Failed batches Any Investigate errors

Part 6: Production Checklist

Before going live with a streaming pipeline:

Kafka:

  • [ ] Replication factor >= 3
  • [ ] Topic partitions aligned with parallelism needs
  • [ ] Retention period configured (7+ days for replay)
  • [ ] Consumer group lag alerting enabled
  • [ ] Dead letter topic for poison messages

Spark Streaming:

  • [ ] Checkpoint location on reliable storage (DBFS, S3, ADLS)
  • [ ] Watermark configured for late data handling
  • [ ] Trigger interval matches latency requirements
  • [ ] Memory and shuffle partitions tuned
  • [ ] Graceful shutdown handling

Delta Lake:

  • [ ] OPTIMIZE scheduled for small file compaction
  • [ ] VACUUM configured to clean old files
  • [ ] Table statistics up to date
  • [ ] Schema evolution strategy defined

Summary

Real-time streaming with Kafka + Spark + Delta Lake follows the same Medallion Architecture as batch, but with different write patterns:

Component Role Key Config
Kafka Producer Event ingestion Idempotent, snappy, batched
Kafka Topic Buffer + replay Partitioned, replicated
Spark Streaming Transform + aggregate Checkpointed, watermarked
Delta Lake ACID sink Append (Bronze), Merge (Silver)
Monitoring Observability Lag, latency, throughput

Start with a single Bronze stream sink, verify it's reliable, then layer on Silver and Gold processing incrementally.


For production-ready streaming templates and data pipeline patterns, explore DataStack Pro.

Top comments (0)