Streaming Patterns & Best Practices
A guide to building reliable, scalable streaming pipelines with Spark Structured Streaming
on Databricks.
By Datanest Digital | Streaming Pipeline Kit v1.0.0
Table of Contents
- Exactly-Once Processing
- Watermarks & Late Data
- Trigger Strategies
- State Management
- Checkpointing
- Schema Evolution
- Error Handling & Dead Letter Queues
- Performance Tuning
- Monitoring & Alerting
- Common Pitfalls
Exactly-Once Processing
Spark Structured Streaming provides exactly-once guarantees through the combination of:
- Idempotent sources — Kafka offsets are tracked in the checkpoint
- Idempotent sinks — Delta Lake MERGE provides natural deduplication
- Checkpointing — WAL (write-ahead log) ensures replay on failure
Key Rules
- Always configure checkpoint locations that are durable (cloud storage, not local disk)
- Never change the checkpoint location for a running query — this resets all state
- Use MERGE for upserts — it's idempotent by design and handles retries gracefully
-
Combine with event-level dedup using
dropDuplicatesWithinWatermark
# Idempotent pipeline: dedup + merge
processed = stream_df.dropDuplicatesWithinWatermark(["event_id"])
delta_table.alias("t").merge(
processed.alias("s"),
"t.event_id = s.event_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Watermarks & Late Data
Watermarks define how long the engine waits for late-arriving data before finalizing
window aggregations and dropping state.
Choosing a Watermark Delay
| Scenario | Suggested Delay | Rationale |
|---|---|---|
| Real-time dashboards | 1-5 minutes | Low latency is critical |
| Hourly ETL | 30-60 minutes | Tolerate moderate delays |
| Cross-timezone events | 2-6 hours | Account for timezone edge cases |
| IoT / mobile devices | 1-24 hours | Devices may be offline |
How Watermarks Work
Event time: 10:00 10:01 10:02 10:03 10:04 10:05
Watermark (2m): 09:58 09:59 10:00 10:01 10:02 10:03
▲
Events before 10:02 can be
dropped after this point
Best Practices
- Set watermarks before any groupBy/window operations
- Use
withWatermark()on the event time column, not processing time - Monitor the watermark value via
StreamingQueryListener - For dedup without windows,
dropDuplicatesWithinWatermarkis more efficient thandropDuplicates(bounded state)
Trigger Strategies
| Trigger | Use Case | Trade-off |
|---|---|---|
processingTime="10s" |
Near-real-time | Higher cluster cost |
processingTime="5m" |
Standard streaming | Balanced cost/latency |
availableNow=True |
Catch-up / backfill | Processes all available, then stops |
once=True |
Scheduled batch-as-stream | Single micro-batch, good for testing |
Recommendations
- Start with 30-second triggers for most use cases
- Use
availableNow=Truefor backfill operations (replacestrigger(once=True)) - Avoid sub-second triggers unless you truly need it — it increases checkpoint overhead
- Match trigger frequency to your SLA, not to "as fast as possible"
State Management
Stateful operations (dedup, windows, joins) accumulate state in memory/disk.
Unmanaged state growth is the #1 cause of streaming pipeline failures.
Controlling State Size
- Always set watermarks — they let Spark evict expired state
-
Use
dropDuplicatesWithinWatermarkinstead ofdropDuplicates— bounded state -
Avoid unbounded
collect_listin aggregations -
Monitor state size via
StreamingQueryListener.stateOperators
State Store Configuration
# Use RocksDB state store for large state (Databricks)
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)
# Configure state cleanup
spark.conf.set("spark.sql.streaming.stateStore.maintenanceInterval", "10min")
State Size Alerts
# Alert if state exceeds threshold
for state_op in query.lastProgress.stateOperators:
if state_op.memoryUsedBytes > 1_000_000_000: # 1 GB
send_alert(f"State size exceeded 1GB: {state_op.memoryUsedBytes}")
Checkpointing
Checkpoints store query progress, offsets, and state snapshots.
Rules
- Location: Always use cloud storage (S3, ADLS, GCS) — never local disk
-
Naming: Use a unique, versioned path per query (e.g.,
/checkpoints/user-events-v2) - Retention: Checkpoint files are auto-cleaned; don't manually delete them
- Migration: Changing the query (e.g., adding columns) may require a new checkpoint
When to Reset Checkpoints
- Adding/removing stateful operations (window, dedup, join)
- Changing the watermark expression
- Upgrading from a breaking Spark version
- Changing the source topic or partition count
Checkpoint Reset Process
# 1. Stop the running query
query.stop()
# 2. Remove the old checkpoint
dbutils.fs.rm("/mnt/checkpoints/user-events-v1", recurse=True)
# 3. Restart with starting_offsets="earliest" (to reprocess)
reader = KafkaStreamReader(
KafkaConfig(starting_offsets="earliest", ...)
)
Schema Evolution
Delta Lake supports schema evolution for streaming sinks.
Enabling Auto-Merge
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
What's Supported
| Change | Auto-Merge | Manual Merge |
|---|---|---|
| Add new column | Yes | Not needed |
| Widen type (int -> long) | Yes | Not needed |
| Rename column | No | Requires migration |
| Remove column | No | Requires migration |
| Change type (string -> int) | No | Requires migration |
Best Practice
Use Schema Registry to validate schemas before they hit the pipeline. The
SchemaRegistryClient in this kit can check compatibility before registration.
Error Handling & Dead Letter Queues
Not all records can be processed successfully. A dead letter queue (DLQ)
captures failed records for later analysis and reprocessing.
DLQ Pattern
def process_batch(batch_df, batch_id):
try:
# Process normally
valid, invalid = validate(batch_df)
write_to_delta(valid)
route_to_dlq(invalid, "validation_failed")
except Exception as e:
# Route entire batch to DLQ on unhandled errors
route_to_dlq(batch_df, str(e))
DLQ Table Schema
CREATE TABLE analytics.bronze.dead_letter_queue (
record_json STRING,
error_message STRING,
source_topic STRING,
failed_at TIMESTAMP
)
USING DELTA
PARTITIONED BY (source_topic)
Reprocessing DLQ Records
- Query the DLQ for records matching a specific error
- Fix the upstream issue
- Replay the records through the pipeline
- Delete reprocessed records from the DLQ
Performance Tuning
Kafka Consumer
- Set
maxOffsetsPerTriggerto control batch size (default: 100K) - Use
minOffsetsPerTrigger(Spark 3.4+) to avoid tiny batches - Increase Kafka partitions for higher parallelism
Spark Configuration
# Shuffle partitions — match to cluster cores
spark.conf.set("spark.sql.shuffle.partitions", "200")
# Adaptive query execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Broadcast threshold for small dimension tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50MB")
Delta Lake Write Optimization
- Partition by date for time-series data (not by high-cardinality columns)
-
Enable auto-optimize:
delta.autoOptimize.optimizeWrite = true - Z-order on query columns for fast reads
- Use MERGE judiciously — it's more expensive than APPEND
Monitoring & Alerting
What to Monitor
| Metric | Threshold | Action |
|---|---|---|
| Processing lag | > 5 minutes | Investigate backpressure |
| Batch duration | > 2x trigger interval | Scale up or optimize |
| State size | > 1 GB per operator | Review watermark settings |
| Input rows/sec | Drops to 0 | Check Kafka connectivity |
| Error rate | > 5% | Review DLQ for patterns |
Using the StreamMonitor
from src.stream_monitor import StreamMonitor
monitor = StreamMonitor(
alert_on_lag_seconds=300,
dead_letter_table="analytics.bronze.dead_letter_queue",
alert_callback=lambda alert_type, data: send_to_slack(data),
)
monitor.attach_to_all_queries()
Common Pitfalls
1. Forgetting Watermarks
Without watermarks, stateful operations accumulate state forever. The pipeline
will eventually OOM.
2. Changing Checkpoints Mid-Stream
Moving or deleting checkpoint directories causes data loss or duplicate
processing. Always version your checkpoint paths.
3. Using Processing Time for Event Time Logic
Processing time depends on when Spark reads the record, not when the event
happened. Always use the event's own timestamp.
4. Over-Partitioning Delta Tables
Partitioning by high-cardinality columns (user_id, event_id) creates millions
of tiny files. Partition by date or coarse categories only.
5. Ignoring Schema Registry
Schema changes that break deserialization will crash your pipeline. Always
validate compatibility before publishing new schemas.
6. Not Testing with availableNow
Use trigger(availableNow=True) in integration tests to process all available
data and verify end-to-end correctness without running a long-lived stream.
Further Reading
- Structured Streaming Programming Guide
- Delta Lake Streaming Guide
- Databricks Streaming Best Practices
Part of the Data Pipeline Collection by Datanest Digital
This is 1 of 11 resources in the Data Pipeline Pro toolkit. Get the complete [Streaming Pipeline Kit] with all files, templates, and documentation for $49.
Or grab the entire Data Pipeline Pro bundle (11 products) for $169 — save 30%.
Top comments (0)