DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

Streaming Pipeline Kit: Streaming Patterns & Best Practices

Streaming Patterns & Best Practices

A guide to building reliable, scalable streaming pipelines with Spark Structured Streaming
on Databricks.

By Datanest Digital | Streaming Pipeline Kit v1.0.0


Table of Contents

  1. Exactly-Once Processing
  2. Watermarks & Late Data
  3. Trigger Strategies
  4. State Management
  5. Checkpointing
  6. Schema Evolution
  7. Error Handling & Dead Letter Queues
  8. Performance Tuning
  9. Monitoring & Alerting
  10. Common Pitfalls

Exactly-Once Processing

Spark Structured Streaming provides exactly-once guarantees through the combination of:

  1. Idempotent sources — Kafka offsets are tracked in the checkpoint
  2. Idempotent sinks — Delta Lake MERGE provides natural deduplication
  3. Checkpointing — WAL (write-ahead log) ensures replay on failure

Key Rules

  • Always configure checkpoint locations that are durable (cloud storage, not local disk)
  • Never change the checkpoint location for a running query — this resets all state
  • Use MERGE for upserts — it's idempotent by design and handles retries gracefully
  • Combine with event-level dedup using dropDuplicatesWithinWatermark
# Idempotent pipeline: dedup + merge
processed = stream_df.dropDuplicatesWithinWatermark(["event_id"])
delta_table.alias("t").merge(
    processed.alias("s"),
    "t.event_id = s.event_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Enter fullscreen mode Exit fullscreen mode

Watermarks & Late Data

Watermarks define how long the engine waits for late-arriving data before finalizing
window aggregations and dropping state.

Choosing a Watermark Delay

Scenario Suggested Delay Rationale
Real-time dashboards 1-5 minutes Low latency is critical
Hourly ETL 30-60 minutes Tolerate moderate delays
Cross-timezone events 2-6 hours Account for timezone edge cases
IoT / mobile devices 1-24 hours Devices may be offline

How Watermarks Work

Event time:     10:00  10:01  10:02  10:03  10:04  10:05
Watermark (2m): 09:58  09:59  10:00  10:01  10:02  10:03
                                              ▲
                                              Events before 10:02 can be
                                              dropped after this point
Enter fullscreen mode Exit fullscreen mode

Best Practices

  • Set watermarks before any groupBy/window operations
  • Use withWatermark() on the event time column, not processing time
  • Monitor the watermark value via StreamingQueryListener
  • For dedup without windows, dropDuplicatesWithinWatermark is more efficient than dropDuplicates (bounded state)

Trigger Strategies

Trigger Use Case Trade-off
processingTime="10s" Near-real-time Higher cluster cost
processingTime="5m" Standard streaming Balanced cost/latency
availableNow=True Catch-up / backfill Processes all available, then stops
once=True Scheduled batch-as-stream Single micro-batch, good for testing

Recommendations

  • Start with 30-second triggers for most use cases
  • Use availableNow=True for backfill operations (replaces trigger(once=True))
  • Avoid sub-second triggers unless you truly need it — it increases checkpoint overhead
  • Match trigger frequency to your SLA, not to "as fast as possible"

State Management

Stateful operations (dedup, windows, joins) accumulate state in memory/disk.
Unmanaged state growth is the #1 cause of streaming pipeline failures.

Controlling State Size

  1. Always set watermarks — they let Spark evict expired state
  2. Use dropDuplicatesWithinWatermark instead of dropDuplicates — bounded state
  3. Avoid unbounded collect_list in aggregations
  4. Monitor state size via StreamingQueryListener.stateOperators

State Store Configuration

# Use RocksDB state store for large state (Databricks)
spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

# Configure state cleanup
spark.conf.set("spark.sql.streaming.stateStore.maintenanceInterval", "10min")
Enter fullscreen mode Exit fullscreen mode

State Size Alerts

# Alert if state exceeds threshold
for state_op in query.lastProgress.stateOperators:
    if state_op.memoryUsedBytes > 1_000_000_000:  # 1 GB
        send_alert(f"State size exceeded 1GB: {state_op.memoryUsedBytes}")
Enter fullscreen mode Exit fullscreen mode

Checkpointing

Checkpoints store query progress, offsets, and state snapshots.

Rules

  • Location: Always use cloud storage (S3, ADLS, GCS) — never local disk
  • Naming: Use a unique, versioned path per query (e.g., /checkpoints/user-events-v2)
  • Retention: Checkpoint files are auto-cleaned; don't manually delete them
  • Migration: Changing the query (e.g., adding columns) may require a new checkpoint

When to Reset Checkpoints

  • Adding/removing stateful operations (window, dedup, join)
  • Changing the watermark expression
  • Upgrading from a breaking Spark version
  • Changing the source topic or partition count

Checkpoint Reset Process

# 1. Stop the running query
query.stop()

# 2. Remove the old checkpoint
dbutils.fs.rm("/mnt/checkpoints/user-events-v1", recurse=True)

# 3. Restart with starting_offsets="earliest" (to reprocess)
reader = KafkaStreamReader(
    KafkaConfig(starting_offsets="earliest", ...)
)
Enter fullscreen mode Exit fullscreen mode

Schema Evolution

Delta Lake supports schema evolution for streaming sinks.

Enabling Auto-Merge

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
Enter fullscreen mode Exit fullscreen mode

What's Supported

Change Auto-Merge Manual Merge
Add new column Yes Not needed
Widen type (int -> long) Yes Not needed
Rename column No Requires migration
Remove column No Requires migration
Change type (string -> int) No Requires migration

Best Practice

Use Schema Registry to validate schemas before they hit the pipeline. The
SchemaRegistryClient in this kit can check compatibility before registration.


Error Handling & Dead Letter Queues

Not all records can be processed successfully. A dead letter queue (DLQ)
captures failed records for later analysis and reprocessing.

DLQ Pattern

def process_batch(batch_df, batch_id):
    try:
        # Process normally
        valid, invalid = validate(batch_df)
        write_to_delta(valid)
        route_to_dlq(invalid, "validation_failed")
    except Exception as e:
        # Route entire batch to DLQ on unhandled errors
        route_to_dlq(batch_df, str(e))
Enter fullscreen mode Exit fullscreen mode

DLQ Table Schema

CREATE TABLE analytics.bronze.dead_letter_queue (
    record_json STRING,
    error_message STRING,
    source_topic STRING,
    failed_at TIMESTAMP
)
USING DELTA
PARTITIONED BY (source_topic)
Enter fullscreen mode Exit fullscreen mode

Reprocessing DLQ Records

  1. Query the DLQ for records matching a specific error
  2. Fix the upstream issue
  3. Replay the records through the pipeline
  4. Delete reprocessed records from the DLQ

Performance Tuning

Kafka Consumer

  • Set maxOffsetsPerTrigger to control batch size (default: 100K)
  • Use minOffsetsPerTrigger (Spark 3.4+) to avoid tiny batches
  • Increase Kafka partitions for higher parallelism

Spark Configuration

# Shuffle partitions — match to cluster cores
spark.conf.set("spark.sql.shuffle.partitions", "200")

# Adaptive query execution
spark.conf.set("spark.sql.adaptive.enabled", "true")

# Broadcast threshold for small dimension tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50MB")
Enter fullscreen mode Exit fullscreen mode

Delta Lake Write Optimization

  • Partition by date for time-series data (not by high-cardinality columns)
  • Enable auto-optimize: delta.autoOptimize.optimizeWrite = true
  • Z-order on query columns for fast reads
  • Use MERGE judiciously — it's more expensive than APPEND

Monitoring & Alerting

What to Monitor

Metric Threshold Action
Processing lag > 5 minutes Investigate backpressure
Batch duration > 2x trigger interval Scale up or optimize
State size > 1 GB per operator Review watermark settings
Input rows/sec Drops to 0 Check Kafka connectivity
Error rate > 5% Review DLQ for patterns

Using the StreamMonitor

from src.stream_monitor import StreamMonitor

monitor = StreamMonitor(
    alert_on_lag_seconds=300,
    dead_letter_table="analytics.bronze.dead_letter_queue",
    alert_callback=lambda alert_type, data: send_to_slack(data),
)
monitor.attach_to_all_queries()
Enter fullscreen mode Exit fullscreen mode

Common Pitfalls

1. Forgetting Watermarks

Without watermarks, stateful operations accumulate state forever. The pipeline
will eventually OOM.

2. Changing Checkpoints Mid-Stream

Moving or deleting checkpoint directories causes data loss or duplicate
processing. Always version your checkpoint paths.

3. Using Processing Time for Event Time Logic

Processing time depends on when Spark reads the record, not when the event
happened. Always use the event's own timestamp.

4. Over-Partitioning Delta Tables

Partitioning by high-cardinality columns (user_id, event_id) creates millions
of tiny files. Partition by date or coarse categories only.

5. Ignoring Schema Registry

Schema changes that break deserialization will crash your pipeline. Always
validate compatibility before publishing new schemas.

6. Not Testing with availableNow

Use trigger(availableNow=True) in integration tests to process all available
data and verify end-to-end correctness without running a long-lived stream.


Further Reading


Part of the Data Pipeline Collection by Datanest Digital


This is 1 of 11 resources in the Data Pipeline Pro toolkit. Get the complete [Streaming Pipeline Kit] with all files, templates, and documentation for $49.

Get the Full Kit →

Or grab the entire Data Pipeline Pro bundle (11 products) for $169 — save 30%.

Get the Complete Bundle →


Related Articles

Top comments (0)