DEV Community

Cover image for Spark Structured Streaming: Triggers, State, Watermarks & Exactly-Once Sinks
Gowtham Potureddi
Gowtham Potureddi

Posted on

Spark Structured Streaming: Triggers, State, Watermarks & Exactly-Once Sinks

spark structured streaming is the API every senior data engineer is expected to wield by 2026 — but the four levers that decide whether a query is correct, cheap, and restart-safe are buried in dense docs: triggers, state, watermarks, and the sink contract. Get those four wrong and your pipeline either drops late events, hoards state until OOM, double-writes on restart, or quietly emits the same row twice into a downstream warehouse.

This guide is the long-form cheat sheet for those four levers. It walks through structured streaming triggers (Once, AvailableNow, ProcessingTime, Continuous), spark stateful streaming (state stores, keyed state, flatMapGroupsWithState), spark watermark semantics for windowed aggregations, the three output modes (append / update / complete) crossed with the Delta / Kafka / File / foreachBatch sink matrix, and the production pattern for spark exactly once ingestion — foreachBatch + MERGE INTO with a private checkpoint directory. Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works.

PipeCode blog header for Spark Structured Streaming — bold white headline 'Spark Structured Streaming' with subtitle 'Triggers · State · Watermarks · Exactly-Once' and a stylised micro-batch conveyor of DataFrame chunks flowing left-to-right into a Delta sink on a dark gradient with purple, green, orange, and blue accents and a small pipecode.ai attribution.

When you want hands-on reps immediately after reading, drill the streaming practice library →, rehearse on Meta streaming problems →, and stack the aggregation muscles with aggregation problems →.


On this page


1. Why Structured Streaming replaced DStreams — DataFrame semantics on a streaming source

Structured Streaming is "DataFrames on an unbounded table" — the same SELECT, GROUP BY, and JOIN you write for batch, only continuously re-executed against new input

The one-sentence invariant: a Structured Streaming query is a logical DataFrame plan against an unbounded input table, and the engine is responsible for incrementally computing the result as new rows arrive — you never write loops, threads, or offset commits. Once you internalise that "incremental computation on an unbounded table," the rest of the API (triggers, watermarks, output modes, sinks) is just configuration knobs over a single mental model.

Why DStreams lost.

  • DStreams = RDD of micro-batches. The legacy spark-streaming API was a Scala-only DStream[RDD[T]] with low-level transformations: updateStateByKey, mapWithState, transform. No Catalyst. No Tungsten. No SQL. Every state operation was hand-rolled.
  • No DataFrame semantics. DStreams could not be JOINed with a static DataFrame without manually converting per batch. Window operations were processing-time only — event-time required user-managed timers.
  • No exactly-once except via custom sinks. Output was at-least-once by default; users had to implement their own idempotency.
  • Deprecated since Spark 3.4. Structured Streaming is the only streaming API that gets new features.

Structured Streaming model in three lines.

  • Input table. A streaming source (Kafka, files, Delta CDF, rate, socket) presents as an unbounded table. spark.readStream.format("kafka")... produces a DataFrame, not a DStream.
  • Query. You write SQL or the DataFrame API: df.groupBy(window("event_time", "5 minutes"), "user_id").count(). The query is logical; the engine plans an incremental physical execution.
  • Output sink + trigger. query.writeStream.format("delta").option("checkpointLocation", "...").trigger(...).start() decides where the result goes and how often the engine fires a new batch.

The 2026 reality.

  • Catalyst + Tungsten apply to streaming. Filter pushdown, predicate combination, whole-stage codegen — all free.
  • Delta Lake turns streaming sinks into ACID tables with transaction log and time-travel. Exactly-once is automatic at the sink.
  • RocksDB state store (Spark 3.2+, default on Databricks) lets stateful queries scale to hundreds of millions of keys without OOM.
  • Spark Connect (Spark 3.4+) splits the driver from the engine — your client process can build the streaming query and ship it to a cluster without holding a JVM open.

What interviewers listen for.

  • Do you say "incremental computation on an unbounded table" when asked what Structured Streaming is? — senior signal.
  • Do you reach for withWatermark the moment event-time enters the discussion? — required answer.
  • Do you mention foreachBatch + MERGE INTO as the production CDC sink? — senior signal.
  • Do you recognise that Trigger.Once is deprecated for Trigger.AvailableNow? — Spark-3.3-era candidate.

Worked example — same code, batch vs streaming

Detailed explanation. The point of Structured Streaming is that you write the same DataFrame code for both bounded (batch) and unbounded (streaming) sources. The only difference is the entry point: spark.read vs spark.readStream, and df.write vs df.writeStream. Catalyst plans the rest.

Question. Given an orders table stored as parquet files, write a query that computes daily revenue per region. Show the batch version and the streaming version of the same query and explain why almost no code changes.

Input.

order_id region order_date amount
1 EU 2026-06-10 50
2 US 2026-06-10 80
3 EU 2026-06-11 30
4 US 2026-06-11 100

Code.

# BATCH version
batch_df = (
    spark.read.parquet("s3://bucket/orders/")
        .groupBy("region", "order_date")
        .sum("amount")
)
batch_df.write.mode("overwrite").parquet("s3://bucket/daily_revenue/")

# STREAMING version — same SELECT, different read/write entry points
stream_df = (
    spark.readStream.format("parquet")
        .schema(orders_schema)
        .load("s3://bucket/orders/")
        .groupBy("region", "order_date")
        .sum("amount")
)
query = (
    stream_df.writeStream
        .format("delta")
        .outputMode("complete")          # full table re-emitted each batch
        .option("checkpointLocation", "s3://bucket/checkpoints/daily_revenue/")
        .trigger(availableNow=True)      # process every new file, then stop
        .toTable("daily_revenue")
)
query.awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The batch query reads every file under s3://bucket/orders/ once, groups by (region, order_date), sums amount, and overwrites the output. One shot.
  2. The streaming query uses readStream — the same parquet format, but the file source now watches the directory for new files. Spark tracks which file names it has already consumed in the checkpoint's sources/ directory.
  3. The .groupBy(...).sum(...) lines are byte-for-byte identical to the batch version. The Catalyst optimiser produces the same logical plan.
  4. The writeStream configuration is the streaming-specific surface: an outputMode (here complete so the full result is re-emitted each batch), a checkpointLocation (mandatory for restart-safety), and a trigger (here availableNow so the query exits once every new file has been processed).
  5. The two queries can be deployed side-by-side — the batch one for back-fills, the streaming one for steady-state. The same code path means the same correctness contract.

Output (after both queries — same result).

region order_date sum(amount)
EU 2026-06-10 50
US 2026-06-10 80
EU 2026-06-11 30
US 2026-06-11 100

Rule of thumb. Write your business logic against the DataFrame API once. Decide later whether the entry point is read (batch) or readStream (streaming) based on the SLA — the rest of the query is identical, and you get incremental computation for free.

Worked example — DStream vs Structured Streaming side-by-side

Detailed explanation. Migration teams often have the legacy spark-streaming DStream API in production and need to translate it to Structured Streaming. The two shapes are radically different: DStreams expose RDDs per micro-batch and force you to manage state and offsets manually; Structured Streaming hides both behind a declarative DataFrame plan.

Question. Translate a legacy DStream "count words per 10-second window" pipeline into the equivalent Structured Streaming query. Highlight what disappears in the new code.

Input. A socket stream emitting lines of text on localhost:9999.

Code.

# LEGACY DStream — Scala/Python low-level API
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, batchDuration=1)
lines = ssc.socketTextStream("localhost", 9999)
counts = (lines.flatMap(lambda l: l.split(" "))
                .map(lambda w: (w, 1))
                .reduceByKeyAndWindow(lambda a, b: a + b, windowDuration=10, slideDuration=10))
counts.pprint()
ssc.start()
ssc.awaitTermination()

# STRUCTURED STREAMING — declarative DataFrame API
from pyspark.sql.functions import explode, split, window, current_timestamp

lines_df = (spark.readStream
                  .format("socket")
                  .option("host", "localhost").option("port", 9999)
                  .load()
                  .withColumn("ts", current_timestamp()))

word_counts = (lines_df
    .select(explode(split("value", " ")).alias("word"), "ts")
    .groupBy(window("ts", "10 seconds"), "word")
    .count())

(word_counts.writeStream
    .outputMode("update")
    .format("console")
    .option("checkpointLocation", "/tmp/checkpoints/wc/")
    .start()
    .awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The DStream version creates a StreamingContext with an explicit batchDuration — every batch is a separate RDD. State is implicit in reduceByKeyAndWindow and survives between batches via the checkpoint.
  2. The Structured Streaming version uses spark.readStream, which returns a DataFrame. No StreamingContext. No manual RDD operations.
  3. groupBy(window("ts", "10 seconds"), "word") declares a 10-second tumbling event-time window — equivalent in semantics to reduceByKeyAndWindow but expressed in SQL-like syntax.
  4. State management, restart safety, and offsets are all handled by checkpointLocation. You write the what; Spark handles the how.
  5. The outputMode("update") says "emit only the rows whose count changed in this batch" — there is no DStream analog; DStreams just print every micro-batch.

Output.

Aspect DStream Structured Streaming
API surface RDD per batch DataFrame on unbounded table
State management manual via updateStateByKey implicit via grouping operators
Event-time / watermark not supported (processing-time only) first-class (withWatermark)
Exactly-once sink manual idempotency Delta / Kafka transactional out of the box
Catalyst optimisation none full

Rule of thumb. Every new pipeline goes to Structured Streaming. Migrate DStreams when the cost of the migration is less than the cost of the next outage caused by a hand-rolled state class. Spark 3.4 deprecated the legacy API — the writing is on the wall.

Worked example — the unbounded-table mental model on paper

Detailed explanation. Interviewers love this question because it surfaces whether the candidate understands the difference between "process every micro-batch" and "compute against an unbounded table." The unbounded-table model says: at any point in time, the result table is as if you had run the SQL query against every row received so far.

Question. A Kafka topic emits one row per second. After 60 seconds, the streaming query SELECT category, COUNT(*) FROM events GROUP BY category has emitted what, exactly, into the sink? Walk through how Structured Streaming reasons about the result.

Input (over 60 seconds).

second event_category
1–20 A
21–40 B
41–60 A

Code.

events = (spark.readStream.format("kafka").option(...).load()
                .selectExpr("CAST(value AS STRING) AS event_category"))

counts = events.groupBy("event_category").count()

(counts.writeStream
       .outputMode("complete")
       .format("memory")
       .queryName("category_counts")
       .trigger(processingTime="10 seconds")
       .start())

spark.sql("SELECT * FROM category_counts ORDER BY event_category")
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The unbounded table at second 10 contains rows 1–10, all of category A. The result table is [(A, 10)].
  2. The unbounded table at second 25 contains 20 A rows plus 5 B rows. The result is [(A, 20), (B, 5)].
  3. At second 60 the result is [(A, 40), (B, 20)] — 20 A from seconds 1–20 plus 20 from 41–60 equals 40, and 20 B from seconds 21–40.
  4. outputMode("complete") means every batch re-emits the entire result table. After each 10-second trigger, the sink receives the up-to-date counts.
  5. The user never wrote a loop over batches. The engine inferred from the SQL that the result is a function of every row seen so far, and incremental computation kept state to make this efficient.

Output (final state at second 60).

event_category count
A 40
B 20

Rule of thumb. When asked "what does the engine compute?", the answer is always "the result of running the SQL query against every row received so far." The trigger only decides how often the engine commits an intermediate snapshot — not what the snapshot contains.

Spark interview question on the Structured Streaming model

A senior interviewer often opens with: "Walk me through what spark.readStream.format('kafka').load().groupBy('user_id').count().writeStream.outputMode('update').start() actually does between Kafka and the sink. Be specific about state, watermarks, exactly-once, and what happens on restart."

Solution Using the Structured Streaming model end-to-end

# A minimal but production-shaped streaming query
events = (
    spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", "broker:9092")
        .option("subscribe", "events")
        .option("startingOffsets", "latest")
        .option("maxOffsetsPerTrigger", 100_000)        # backpressure
        .load()
        .selectExpr("CAST(value AS STRING) AS payload")
        .selectExpr("get_json_object(payload, '$.user_id') AS user_id",
                    "CAST(get_json_object(payload, '$.event_time') AS TIMESTAMP) AS event_time")
        .withWatermark("event_time", "10 minutes")       # bounded state
)

per_user = events.groupBy("user_id").count()

query = (
    per_user.writeStream
        .outputMode("update")                            # only changed rows
        .format("delta")
        .option("checkpointLocation", "s3://bucket/ck/per_user_counts/")
        .trigger(processingTime="30 seconds")
        .toTable("per_user_counts")
)
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Phase What happens
1. Read Kafka micro-batch up to maxOffsetsPerTrigger rows is pulled. Offsets are written to checkpoint/offsets/N.
2. Plan Catalyst plans the JSON parsing + groupBy + count as an incremental aggregate over the keyed state store.
3. State update For each user_id, the running count is loaded from state, incremented, and written back to state/0/0/... files.
4. Emit Only the rows whose count changed in this batch are emitted (update mode).
5. Commit The sink writes atomically (Delta transaction log). On success, checkpoint/commits/N is written.
6. Restart On crash, Spark replays from the last committed batch — offset N is re-read, state is restored from state/, output is re-emitted idempotently into Delta.

Output: the trace above shows that the same exact stream of (user_id, count) rows would appear in the sink even if the executor crashed between phases 4 and 5. The combination of replayable source (Kafka offsets) + durable state (RocksDB or HDFS) + idempotent sink (Delta) is what makes the pipeline exactly-once.

Property Guarantee
Source Kafka offsets replayable from checkpoint
State RocksDB / HDFS-backed; restored on restart
Sink Delta atomic commit; same (user_id, count) row written at most once
End-to-end Exactly-once for this query

Why this works — concept by concept:

  • Unbounded table abstractionevents is a DataFrame over an infinite Kafka topic, and groupBy("user_id").count() is a single declarative aggregation against that table. The engine handles incremental execution; the user never writes a batch loop.
  • Checkpoint location — the checkpointLocation is the durable memory of the query: offsets read, batches committed, state files. Without it, a restart has no way to resume — and a checkpoint shared between two different queries is a guaranteed corruption.
  • WatermarkwithWatermark("event_time", "10 minutes") is the promise to the engine that late data older than 10 minutes can be evicted from state, bounding memory. Without it, the state grows unbounded.
  • Update output mode — emits only the keys whose count changed since the last batch, which is exactly what a downstream upsert sink expects. append would not work here because aggregations re-emit existing keys.
  • Replayable source + idempotent sink — Kafka offsets are deterministic; Delta writes are atomic per micro-batch. On crash, the engine replays the last unconfirmed batch and the sink commits exactly once (the second write idempotently overwrites the first if it ever landed).
  • Cost — O(rows_in_batch) for compute, O(unique_user_ids) for state. With RocksDB, state can be on local disk + SSD-backed; HDFS state holds everything in memory and blows past 10M keys.

PySpark
Topic — streaming
Streaming problems (PySpark)

Practice →


2. Trigger modes — Once, AvailableNow, ProcessingTime, Continuous

structured streaming triggers decide when a batch fires — pick by SLA, not by intuition

The mental model in one line: a trigger is the firing rule for the next micro-batch — once, when new input is available, on a fixed interval, or continuously — and changing it never changes the correctness of the query, only the latency, throughput, and cost profile. Once you say "trigger is a knob, not a feature," you can defend any choice in a code review.

Visual side-by-side card matrix of the four Spark Structured Streaming trigger modes — Trigger.Once (legacy), Trigger.AvailableNow, Trigger.ProcessingTime(fixed-interval), Trigger.Continuous — plotted on a latency-vs-throughput axis with a recommendation chip beneath each; on a light PipeCode card.

The four triggers in one table.

Trigger Semantic Latency Use case
Trigger.Once() (deprecated) run one batch of all new data, then stop minutes (one shot) replaced by Trigger.AvailableNow since Spark 3.3
Trigger.AvailableNow() run every batch needed to drain all new input, then stop minutes (resumable) nightly back-fills, scheduled-job ingest, dev workflows
Trigger.ProcessingTime("30 seconds") fire a new batch on a fixed wall-clock cadence seconds steady-state production streaming
Trigger.Continuous("1 second") long-running tasks with no micro-batch boundaries milliseconds Kafka → Kafka, very limited sink support, experimental
(no trigger specified) Trigger.ProcessingTime(0) — fire as soon as the previous batch finishes seconds the default; back-to-back batches

Trigger.Once is deprecated.

  • Spark 3.3 introduced Trigger.AvailableNow which behaves like Trigger.Once but processes multiple batches within a single run if needed (e.g. a Kafka source with too many offsets to fit in one batch).
  • Trigger.Once was prone to OOM on large back-fills because it tried to ingest everything in a single micro-batch.
  • Use Trigger.AvailableNow in every new pipeline; the only valid Trigger.Once call is in a frozen older codebase you cannot upgrade.

ProcessingTime in detail.

  • Fixed interval. Trigger.ProcessingTime("30 seconds") means: fire the next batch 30 seconds after the previous batch started.
  • If the batch takes longer than the interval, the engine fires the next batch immediately when the current one finishes — but it does not fall behind a sliding schedule.
  • Default is processingTime=0, i.e. "fire batches back-to-back" — useful for low-latency pipelines but watch CPU costs.
  • Picking the interval. Match it to the SLA: 30 seconds for "near-real-time" dashboards, 5 minutes for "every coffee break" reports, 1 hour for "the dashboard refreshes on the hour."

Continuous mode (experimental).

  • Trigger.Continuous("1 second") runs long-lived tasks per partition instead of micro-batches.
  • Latency: ~1ms between input and output.
  • Sink support is extremely limited. Only map-only Kafka → Kafka (no aggregations, no joins, no foreachBatch).
  • In practice, almost no production pipeline uses Continuous. Treat it as a marketing feature; pick ProcessingTime in 99% of cases.

Backpressure controls (per source).

  • Kafka: maxOffsetsPerTrigger (default unlimited). Cap the number of offsets per partition per micro-batch — prevents the first batch after a long downtime from OOM-ing the cluster.
  • File source: maxFilesPerTrigger (default 1000). Bound how many files are read per batch.
  • Bytes-based: maxBytesPerTrigger (Spark 3.4+) — replaces file/offset count with an explicit byte cap. Smarter for skewed file sizes.
  • Always set one of these for production. Without backpressure, restart-from-backlog is a guaranteed cluster outage.

Common interview probes on triggers.

  • "Why is Trigger.Once deprecated?" — because Trigger.AvailableNow can drain a large backlog over multiple batches instead of trying to do it in one.
  • "What's the difference between Trigger.ProcessingTime('0 seconds') and no trigger?" — they are the same; both mean "back-to-back batches."
  • "Can you change a trigger between runs?" — yes; the trigger is a driver-side setting and does not affect checkpoint compatibility.
  • "What latency does Trigger.Continuous achieve, and what's the catch?" — sub-second to ms; the catch is that only map-only Kafka → Kafka is supported.

Worked example — same query under each trigger

Detailed explanation. Pick a single query and run it under each trigger to feel the difference. The query itself never changes; only the firing rule does. The output is identical in correctness but radically different in latency, throughput, and cost.

Question. Given a Kafka source events and a Delta sink events_curated, write the same passthrough query under four trigger configurations and explain when to pick which.

Input. Kafka topic events receiving 1 row per second on average; 1 hour of backlog after a deploy.

Code.

base = (spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "broker:9092")
              .option("subscribe", "events")
              .option("maxOffsetsPerTrigger", 50_000)
              .load()
              .selectExpr("CAST(key AS STRING) AS key",
                          "CAST(value AS STRING) AS payload",
                          "timestamp AS kafka_ts"))

def writer(trigger):
    return (base.writeStream
                .format("delta")
                .outputMode("append")
                .option("checkpointLocation", f"/ck/events_curated/{trigger.__class__.__name__}/")
                .trigger(**trigger)
                .toTable("events_curated"))

# 1) AvailableNow — drains the 1h backlog, then stops
writer({"availableNow": True}).awaitTermination()

# 2) ProcessingTime 30s — steady-state production
writer({"processingTime": "30 seconds"}).awaitTermination()

# 3) Back-to-back (default) — lowest possible latency for micro-batch
writer({}).awaitTermination()

# 4) Continuous 1s — only valid because this is map-only and the sink is Kafka
# (Delta does NOT support continuous; this would fail at start)
# writer({"continuous": "1 second"}).awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. AvailableNow sees a 1-hour backlog. It drains in N micro-batches (each capped by maxOffsetsPerTrigger=50k), each batch processed sequentially. After the last offset commits, the query exits.
  2. ProcessingTime("30 seconds") runs continuously. Every 30 seconds it pulls up to 50k offsets, processes them, writes to Delta, then sleeps until the next interval. If a batch overruns, the next batch starts immediately.
  3. Back-to-back is what you get with no trigger argument. The engine starts a new batch the moment the previous one commits. For low-volume topics, this can mean the engine spins idle scanning for new data — useful for the lowest possible latency but wasteful for low-traffic topics.
  4. Continuous("1 second") is impossible for this query because the Delta sink does not support continuous mode. The code is commented out — only Kafka → Kafka map-only pipelines can use Continuous.

Output.

Trigger Time to drain 1h backlog Steady-state latency Cost profile
AvailableNow ~10 minutes (then exits) n/a — query stops one-shot, low
ProcessingTime("30s") ~10 minutes (with watermark) 30s p50 continuous, medium
back-to-back ~10 minutes sub-second continuous, high
Continuous("1s") n/a — fails at start n/a n/a

Rule of thumb. Default to Trigger.ProcessingTime("30 seconds") for production. Use Trigger.AvailableNow for nightly/scheduled ingest. Use back-to-back only when the SLA requires sub-second latency and the cluster is sized for it. Forget Continuous exists.

Worked example — Trigger.Once to Trigger.AvailableNow migration

Detailed explanation. Teams who built file-source ingestion pipelines with Trigger.Once learned the hard way that a single batch cannot drain six months of accumulated files. The fix is Trigger.AvailableNow, which processes batches sequentially until the source is drained. Migration is a one-line code change.

Question. Migrate a Trigger.Once file-ingest pipeline to Trigger.AvailableNow. Show why the migration matters even when the day-to-day behaviour looks identical.

Input. A directory of Parquet files, ingested nightly via Airflow. After a 10-day backlog (Airflow paused), the directory has 100 GB of new files.

Code.

df = (spark.readStream.format("parquet")
             .schema(events_schema)
             .option("maxFilesPerTrigger", 100)
             .load("s3://lake/raw/events/"))

# LEGACY — Trigger.Once tries to put everything in one batch, OOMs
(df.writeStream
   .format("delta")
   .option("checkpointLocation", "/ck/raw_to_curated/")
   .trigger(once=True)
   .toTable("curated_events").awaitTermination())

# MODERN — Trigger.AvailableNow drains across multiple batches
(df.writeStream
   .format("delta")
   .option("checkpointLocation", "/ck/raw_to_curated/")
   .trigger(availableNow=True)
   .toTable("curated_events").awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Trigger.Once tries to ingest every new file in a single batch, regardless of maxFilesPerTrigger. The 100 GB backlog OOMs the executors.
  2. Trigger.AvailableNow respects maxFilesPerTrigger=100. It processes 100 files per batch, commits, and starts the next batch — repeating until the source is drained.
  3. The Airflow DAG that called the job sees the same external behaviour: the job runs, drains all new data, then exits. Inside, the engine has run N batches instead of 1.
  4. The checkpoint format is identical between the two triggers, so the migration is safe to do in place without resetting state.

Output.

Trigger 100 GB backlog outcome Recommended?
Trigger.Once OOM after ~30 minutes NO — deprecated
Trigger.AvailableNow drained in N×100-file batches; ~25 minutes total YES

Rule of thumb. Every trigger(once=True) call in your codebase is technical debt. Migrate to trigger(availableNow=True) — same outer behaviour, no OOM risk, no code reset required.

Worked example — ProcessingTime interval tuning

Detailed explanation. Most teams pick a ProcessingTime interval by intuition ("30 seconds feels right") instead of by reasoning. The right interval is a function of (a) the downstream SLA, (b) the per-batch fixed overhead, and (c) the input rate.

Question. A Delta sink consumer reads the curated table once every 5 minutes for a BI dashboard. The Spark cluster spends 8 seconds of overhead per micro-batch (driver scheduling, checkpoint write, Delta commit). What ProcessingTime interval should you pick?

Input. SLA: data visible to the dashboard within 5 minutes of arrival. Per-batch overhead: 8s. Input rate: 1k rows/sec.

Code.

# Option A — very frequent (sub-SLA, wasteful)
.trigger(processingTime="10 seconds")

# Option B — close to SLA (efficient)
.trigger(processingTime="1 minute")

# Option C — back-to-back (lowest latency, highest cost)
.trigger(processingTime="0 seconds")  # equivalent to no trigger
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Option A (10 seconds). Every 10 seconds, the engine pays the 8-second fixed cost — 80% of the cluster's compute is overhead. Output latency = 10s, but the SLA is 5 minutes, so 10s is over-engineered.
  2. Option B (1 minute). Every 60 seconds, the engine pays 8s of overhead — 13% efficiency loss. Output latency = 60s, well within the 5-minute SLA. This is the right choice.
  3. Option C (back-to-back). Latency drops to ~10 seconds (batch duration), but the cluster is always busy — 80% of work is the same 8s overhead. Costly and unnecessary.
  4. The optimisation rule: the ProcessingTime interval should be the largest value that still meets the SLA. Every doubling of the interval halves the overhead fraction and keeps your cluster cheap.

Output.

Option Interval Latency to dashboard Overhead fraction
A 10s 10s + 5min refresh = ~5min 80%
B 1min 1min + 5min refresh = ~6min 13%
C 0s ~10s + 5min refresh = ~5min 80%

Rule of thumb. Pick the largest ProcessingTime interval that still meets the SLA. Cluster cost is roughly inversely proportional to the interval — and SLAs above 1 minute almost always tolerate a 1-minute trigger.

Spark interview question on trigger selection

A senior interviewer often frames this as: "We have a Kafka topic with 100k rows/sec. The downstream dashboard refreshes every 2 minutes. The SRE team is screaming about cluster cost. Pick a trigger and defend it."

Solution Using Trigger.ProcessingTime matched to the SLA + maxOffsetsPerTrigger for backpressure

events = (spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "broker:9092")
              .option("subscribe", "events")
              .option("maxOffsetsPerTrigger", 6_000_000)   # 60s × 100k rows
              .option("startingOffsets", "latest")
              .load())

query = (events.writeStream
              .format("delta")
              .outputMode("append")
              .option("checkpointLocation", "/ck/events_to_delta/")
              .trigger(processingTime="1 minute")
              .toTable("events_curated"))
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Step What happens
1 Every 60 seconds, the engine fires a batch.
2 The Kafka source caps the batch at 6M offsets — exactly one minute's worth at the steady-state rate.
3 Delta sink commits the batch atomically; the dashboard sees the new rows on its next 2-minute refresh — within SLA.
4 After a 10-minute outage and replay, the source caps the catch-up batch at 6M offsets and processes 10 batches sequentially — backpressure prevents the cluster from being overwhelmed.
5 Steady-state utilisation: ~52s of work per 60s, ~8s of overhead — 13% overhead fraction. Cluster size matches the steady rate.

Output:

Metric Value
Trigger processingTime="1 minute"
Backpressure maxOffsetsPerTrigger=6_000_000
Steady-state latency 60s (one trigger interval)
Catch-up time after 10-min outage ~10 minutes (10 capped batches)
Overhead fraction ~13%

Why this works — concept by concept:

  • Match trigger to SLA, not intuition — the dashboard refreshes every 2 minutes, so a 1-minute trigger satisfies the SLA with margin. Picking 10 seconds would over-pay 6×.
  • Backpressure prevents the first-batch-after-outage OOMmaxOffsetsPerTrigger=6M is calibrated to "one minute's worth at steady-state rate." After an outage, the catch-up takes N batches instead of one impossibly large one.
  • ProcessingTime is wall-clock, not data-clock — the engine fires a batch every 60 seconds regardless of input volume. If a batch overruns, the next batch fires immediately when the current finishes.
  • AvailableNow is wrong here — this is steady-state ingest, not a one-shot back-fill. AvailableNow would exit after draining once; ProcessingTime stays up.
  • Continuous is wrong here — Delta sink does not support continuous mode. Even if it did, sub-second latency would over-shoot a 2-minute SLA at high cost.
  • Cost — O(rows_per_minute) of compute, fixed driver overhead of ~8s/batch. Cluster size is sized for steady-state rate × interval, not for peak burst.

PySpark
Topic — streaming · medium
Medium streaming problems (PySpark)

Practice →


3. State + watermarks + event-time

spark watermark is the engine's promise that data older than (max event_time − threshold) will be dropped — bounding state without dropping correctness

The mental model in one line: a watermark is a moving boundary on the event-time axis that says "any event with event_time older than this is too late to influence the result, and its state can be safely evicted". Without a watermark, every stateful operator (windowed aggregation, dedup, stream-stream join) accumulates state forever and eventually OOMs the executors.

Visual diagram of state stores + watermarks — an event-time axis with a watermark buoy at 12:10, a late-events lane showing two events arriving after the watermark (one accepted, one dropped), and a state retention window covering the open windows on the right; a small inset shows the RocksDB vs HDFS state-store choice; on a light PipeCode card.

Event-time vs processing-time.

  • Event-time is the timestamp recorded inside the event payload — when the click actually happened on the mobile device.
  • Processing-time is the wall-clock time when Spark received the event — could be seconds or hours after event-time due to network buffering, retries, or device offline mode.
  • Watermarks are computed on event-time, not processing-time. Late events are events whose event-time is older than the watermark, regardless of how recently they arrived in Kafka.
  • Windowed aggregations are event-time by default. window("event_time", "5 minutes") groups by event-time bucket, not processing-time.

Watermark mechanics.

  • Declaration. df.withWatermark("event_time", "10 minutes") says: "the engine may evict any state whose event-time is older than max(event_time observed) - 10 minutes."
  • Update cadence. The watermark moves forward each batch based on the maximum event-time seen so far. It never moves backward.
  • Late event handling. An event with event_time < watermark is dropped from stateful operators (the row may still appear in stateless select / filter operators, but does not update windowed aggregates).
  • Tuning. Too tight (e.g. 1 minute) drops legitimate late data from mobile clients with poor connectivity. Too loose (e.g. 1 day) keeps state alive for 24 hours, multiplying memory cost.

State stores.

  • HDFS-backed (default). State is kept in-memory in executor heap, snapshotted to HDFS / S3 every checkpoint. Cheap up to ~1M keys; OOM risk beyond.
  • RocksDB-backed (Spark 3.2+; default on Databricks Runtime 11+). State is kept on local SSD, paged in/out via RocksDB. Scales to 100M+ keys. Enable via spark.sql.streaming.stateStore.providerClass = org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.
  • State changelog — RocksDB state writes a changelog to the checkpoint dir. On restart, the changelog is replayed to reconstruct local state.
  • State retention is bounded by the watermark + window size. A 5-minute window with a 10-minute watermark retains state for 15 minutes of event-time; after that, the window is closed and its state evicted.

Stateful operators.

  • Windowed aggregationsgroupBy(window("event_time", "5 minutes")).count(). State per (window, key).
  • Dedup with watermarkdropDuplicatesWithinWatermark(["event_id"]) keeps a state set of event IDs within the watermark.
  • Stream-stream joindf1.join(df2, "key", "inner") requires both sides to have a watermark + a time constraint on the join condition.
  • Arbitrary statefulflatMapGroupsWithState (Spark) or applyInPandasWithState (Spark 3.4+) for custom session windows, fraud rules, etc.

The arbitrary stateful streaming API.

  • Used when groupBy + window does not fit. Examples: session windows with custom gap timeouts; fraud detection rules with multi-step state machines.
  • API shape. df.groupByKey(...).flatMapGroupsWithState(outputMode, timeoutConf)((key, events, state) => ...).
  • Timeout configuration. ProcessingTimeTimeout (deadline by wall clock) or EventTimeTimeout (deadline by watermark).
  • Spark 3.4+ adds the Python-friendly applyInPandasWithState — same model with pandas DataFrames.

Common interview probes.

  • "What does withWatermark actually guarantee?" — that state can be evicted past max(event_time) − threshold. Late events older than the watermark are dropped from stateful operators.
  • "What happens if I forget withWatermark on a windowed aggregation in append mode?" — Spark raises an error at start — append mode requires a watermark.
  • "RocksDB vs HDFS state — when do I switch?" — when state holds > 1M keys, or when shuffle-spill of state files dominates checkpoint time.
  • "Why does my windowed count not emit anything in append mode?" — because append mode only emits a window after the watermark has passed it. A 5-minute window + 10-minute watermark = 15 minutes of waiting before the first row appears.

Worked example — windowed aggregation with watermark

Detailed explanation. The textbook stateful query: count events per 5-minute event-time window per user. Without watermark, state grows forever; with watermark, state for closed windows is evicted, and append mode emits the final count for each window once the watermark passes.

Question. Given a stream of click events with event_time, count clicks per user per 5-minute tumbling window. Use a 10-minute watermark and append output mode.

Input (event-time order; processing-time may differ).

user_id event_time
u1 12:01:30
u2 12:02:10
u1 12:04:50
u1 12:06:00
u2 12:09:00
u1 11:58:00 (late, arrives at 12:14 processing-time)

Code.

clicks = (spark.readStream.format("kafka").option(...).load()
              .selectExpr("user_id", "CAST(event_time AS TIMESTAMP) AS event_time"))

windowed = (clicks
    .withWatermark("event_time", "10 minutes")
    .groupBy(window("event_time", "5 minutes"), "user_id")
    .count())

(windowed.writeStream
    .outputMode("append")
    .format("delta")
    .option("checkpointLocation", "/ck/windowed_clicks/")
    .trigger(processingTime="30 seconds")
    .toTable("windowed_clicks")).awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The first batch processes events 1, 2, 3. State has two windows: [12:00, 12:05) with (u1, 1) and (u2, 1); the second event extends one of these. Max event-time is 12:04:50, watermark is 12:04:50 − 10m = 11:54:50.
  2. The second batch adds events 4 (12:06) and 5 (12:09). State now has windows [12:00, 12:05) and [12:05, 12:10). Max event-time is 12:09; watermark advances to 11:59.
  3. The watermark hasn't yet passed [12:00, 12:05) (which closes at 12:05 + 10m = 12:15 watermark passes), so no append output yet.
  4. The late event (event-time 11:58, processing-time 12:14) arrives. Watermark at that batch was 11:59 — the event is older than the watermark, so it is dropped from the windowed count.
  5. A later batch's max event-time reaches 12:15 → watermark = 12:05. Window [12:00, 12:05) is now closed; its final count (u1: 2, u2: 1) is appended to the sink and evicted from state.

Output (after watermark passes 12:05).

window_start window_end user_id count
12:00 12:05 u1 2
12:00 12:05 u2 1

(The window [12:05, 12:10) is still open; its row will appear later once max event-time + watermark passes 12:10.)

Rule of thumb. Append output mode delays emission until the watermark passes the window end. If your dashboard needs near-real-time intermediate counts, use outputMode("update") instead — but be ready to absorb every intermediate snapshot per window.

Worked example — stream dedup with watermark

Detailed explanation. A Kafka source has at-least-once delivery; the same event_id can arrive twice. Use dropDuplicatesWithinWatermark(["event_id"]) to keep a bounded-state dedup set. Without the watermark, the dedup set is unbounded.

Question. Stream events from Kafka with possibly duplicated event_id. Keep each event_id at most once within a 30-minute watermark.

Input. Kafka topic with events; some events repeat within minutes due to producer retries.

Code.

events = (spark.readStream.format("kafka").option(...).load()
              .selectExpr("CAST(value AS STRING) AS payload")
              .selectExpr("get_json_object(payload, '$.event_id') AS event_id",
                          "CAST(get_json_object(payload, '$.event_time') AS TIMESTAMP) AS event_time",
                          "*"))

dedup = (events
    .withWatermark("event_time", "30 minutes")
    .dropDuplicatesWithinWatermark(["event_id"]))

(dedup.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/ck/dedup_events/")
    .toTable("events_dedup")).awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The first batch sees events e1, e2, e1 (e1 duplicated). The state set contains {e1, e2}. The output is two rows; the duplicate e1 is suppressed.
  2. Each event added to the state set is tagged with its event-time. When the watermark advances past event_time + 30 minutes, the event_id is evicted from the state set.
  3. A duplicate e1 arriving 35 minutes after the original is not dedup'd — by then, e1 has been evicted from state. This is the bounded-state tradeoff; the watermark width is your "dedup horizon."
  4. Without withWatermark, the regular dropDuplicates(["event_id"]) would keep every event_id forever — unbounded state and a guaranteed OOM.

Output (after first batch).

event_id event_time
e1 12:00
e2 12:01

Rule of thumb. dropDuplicatesWithinWatermark is the only correct dedup pattern for unbounded streams. The watermark width must be greater than the maximum expected duplicate delay (network retry budget + sink commit lag). For Kafka with at-least-once producers, 30 minutes is a sensible default.

Worked example — RocksDB state store for million-key aggregations

Detailed explanation. A windowed aggregation keyed by user_id with 50M distinct users overwhelms the HDFS state store: every checkpoint snapshots the entire state map to HDFS, taking minutes. Switch to RocksDB: state lives on local SSD, only the changelog is checkpointed.

Question. Configure a streaming query for a 50M-user windowed aggregation. Show the spark config to switch to RocksDB.

Input. Kafka events with user_id (50M distinct), event_time, 5-minute tumbling window, 1-hour watermark.

Code.

# Driver config — set before the query is created
spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
)
# Optional: turn on changelog checkpointing (state mutations only)
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

events = (spark.readStream.format("kafka").option(...).load()
              .selectExpr("get_json_object(value, '$.user_id') AS user_id",
                          "CAST(get_json_object(value, '$.event_time') AS TIMESTAMP) AS event_time"))

agg = (events
    .withWatermark("event_time", "1 hour")
    .groupBy(window("event_time", "5 minutes"), "user_id")
    .count())

(agg.writeStream
    .outputMode("append")
    .format("delta")
    .option("checkpointLocation", "/ck/user_5min_counts/")
    .trigger(processingTime="1 minute")
    .toTable("user_5min_counts")).awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. With the default HDFS state store, every checkpoint copies the full in-memory hash map to durable storage. 50M keys × 100 bytes = 5 GB per checkpoint; serialisation takes minutes.
  2. RocksDB stores state on local SSD as a key-value LSM tree. Only the recent changelog is checkpointed — typically a few MB per batch.
  3. Restart-from-crash replays the changelog from the latest checkpoint to reconstruct local RocksDB state.
  4. The changelogCheckpointing.enabled flag is on by default in Spark 3.5; off in 3.2/3.3. Always set it explicitly to avoid surprise.

Output (steady-state checkpoint sizes).

Backend Per-batch checkpoint size Memory per executor
HDFS state store 5 GB (full snapshot) 5 GB heap
RocksDB + changelog ~50 MB (changes only) ~500 MB heap + SSD

Rule of thumb. Switch to RocksDB the moment state size exceeds 1M keys or the checkpoint write time exceeds 10% of the batch duration. Databricks Runtime 11+ uses RocksDB by default; on vanilla Spark, you have to set the conf yourself.

Worked example — arbitrary stateful streaming with flatMapGroupsWithState

Detailed explanation. A session-window aggregation is a 30-minute gap that closes a session: every event extends the session by 30 minutes, and once a 30-minute gap elapses without a new event, the session is emitted. This does not fit groupBy(window(...)) — you need custom state.

Question. Implement session windows with a 30-minute gap timeout using flatMapGroupsWithState. Each session emits one row per (user_id, session_start, session_end, event_count).

Input. Kafka events with user_id, event_time.

Code.

# Scala-style example; Python users have applyInPandasWithState in 3.4+
from pyspark.sql.streaming import GroupStateTimeout

def session_fn(user_id, events_iter, state):
    if state.hasTimedOut:
        s = state.get
        state.remove()
        yield (user_id, s["start"], s["end"], s["count"])
        return
    cur = state.getOption.getOrElse({"start": None, "end": None, "count": 0})
    for e in events_iter:
        et = e["event_time"]
        cur["start"] = et if cur["start"] is None else min(cur["start"], et)
        cur["end"]   = et if cur["end"]   is None else max(cur["end"],   et)
        cur["count"] += 1
    state.update(cur)
    state.setTimeoutTimestamp(cur["end"], "30 minutes")
    # do not yield while session is still active

events.groupByKey(lambda r: r["user_id"]) \
      .flatMapGroupsWithState(
          outputMode="append",
          timeoutConf=GroupStateTimeout.EventTimeTimeout
      )(session_fn) \
      .writeStream...
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Each micro-batch passes the (user, events_in_batch, state) tuple to session_fn. The function updates the running session state with min/max event-time and a counter.
  2. After updating, the function sets a timeout equal to last_event_time + 30 minutes. If the next batch's watermark passes that timeout without any new events for this user, the function is called again with state.hasTimedOut == True.
  3. On timeout, the function emits the final session row and clears the state. Until timeout, the session is "open" and emits nothing in append mode.
  4. The watermark is essential — it is what advances the event-time clock for timeouts. Without it, sessions never close.

Output.

user_id session_start session_end event_count
u1 12:00 12:15 5
u2 12:05 12:08 2

Rule of thumb. Use flatMapGroupsWithState only when groupBy + window does not fit — session windows, complex fraud rules, multi-step state machines. The API is powerful but verbose; prefer the SQL-style operators whenever they suffice.

Spark interview question on watermarks and state

A senior interviewer often opens with: "Your stream is doing a 5-minute tumbling-window count per user, with outputMode('append'). Production reports that some windows never emit. What's likely going wrong?"

Solution Using a watermark calibrated to the true late-event distribution

# Common culprit: watermark too tight, OR ProcessingTime trigger too slow to advance event-time
# Calibration formula: watermark width >= 99th percentile of (processing_time - event_time)

events = (spark.readStream.format("kafka").option(...).load()
              .selectExpr("get_json_object(value, '$.user_id') AS user_id",
                          "CAST(get_json_object(value, '$.event_time') AS TIMESTAMP) AS event_time"))

windowed = (events
    .withWatermark("event_time", "20 minutes")             # widened from 10m
    .groupBy(window("event_time", "5 minutes"), "user_id")
    .count())

(windowed.writeStream
    .outputMode("append")
    .format("delta")
    .option("checkpointLocation", "/ck/user_5min_counts/")
    .trigger(processingTime="30 seconds")
    .toTable("user_5min_counts")).awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Symptom Root cause Fix
Window never emits watermark not advancing — no events seen for this key calibrate watermark width; ensure events flow
Window emits very late watermark width too large for SLA tighten watermark to the late-event 99th pct
Old-event drops watermark too tight — legitimate late events dropped widen watermark, monitor numRowsDroppedByWatermark
Append mode shows nothing for 15 minutes append delays emission until watermark passes window end use outputMode("update") if interim counts are needed

Output:

Calibration Watermark width First emission after window start
Too tight 1 minute window closes 6 minutes after start; legitimate late events dropped
Calibrated to 99th pct 20 minutes window closes 25 minutes after start; <1% drops
Too loose 6 hours state retention 6× window size; OOM risk on 50M keys

Why this works — concept by concept:

  • Watermark width is a tradeoff — too narrow drops legitimate late events; too wide grows state unbounded. The calibration is the 99th percentile of (processing_time − event_time) from a representative sample.
  • Append mode delays emission — a 5-minute window with a 20-minute watermark emits its final count 25 minutes after window start. If the SLA is shorter, switch to update mode (every batch re-emits the current count) and accept the cost of seeing the same key many times.
  • numRowsDroppedByWatermark — the streaming-query metric you watch in production. Spike = your watermark is too tight; zero with growing state = watermark too loose.
  • Watermark advances per batch — based on the max event-time seen so far. A quiet topic that receives no new events for an hour does not advance the watermark, and windows do not close. Heartbeat events are a common workaround.
  • State is bounded by watermark + window size — a 5-minute window + 20-minute watermark retains state for 25 minutes of event-time. RocksDB scales it; HDFS does not past ~1M keys.
  • Cost — O(windows × keys) for state, dominated by the watermark width. Doubling the watermark roughly doubles the state size at steady-state input rate.

PySpark
Topic — streaming · python
Python streaming problems (PySpark)

Practice →


4. Output modes (append / update / complete) + exactly-once sinks

spark exactly once is a property of the engine + checkpoint + idempotent sink — not of a single magic flag

The mental model in one line: the output mode decides what rows are emitted per batch (new only / changed only / the whole result table), and exactly-once is delivered by the combination of a durable checkpoint, a replayable source, and an idempotent sink — not by the output mode itself. Once you internalise that, the matrix of (output mode × sink) becomes a memorisable table instead of a maze.

Visual matrix of Structured Streaming output modes (append, update, complete) crossed with sink types (Delta, Kafka, File, foreachBatch) — each cell shows a green check, amber chip, or red strike to indicate compatibility; a small chip on the right summarises exactly-once requirements; on a light PipeCode card.

The three output modes.

  • append (default). Emit only rows that are new in this batch and will never change again. Most natural for raw event passthrough; required for windowed aggregations in event-time mode (the engine emits a window only after the watermark passes it).
  • update. Emit only rows that were updated in this batch (including new rows). Used for upsert sinks where the downstream key-value store maps each key to its latest value. Works for aggregations.
  • complete. Emit the entire result table every batch. Only works for aggregations with a finite key space; OOM risk on million-key aggregations. Used for small dashboards or final-state snapshots.

The sink × mode compatibility matrix.

Sink append update complete
Delta yes (exactly-once via TX log) yes via foreachBatch + MERGE only for small aggs
Kafka yes (exactly-once via TX producer, Spark 3.0+) yes (emits updated rows) no
File (parquet/json/csv) yes (new files only) no (files immutable) no
foreachBatch yes yes yes
console yes yes yes
memory yes yes yes
rate yes (source only) n/a n/a

The exactly-once contract has three legs.

  • 1. Replayable source. Kafka offsets, file names, Delta CDF version — all deterministic across replays. On crash, the source can be re-read from a recorded position.
  • 2. Durable checkpoint. option("checkpointLocation", "s3://bucket/ck/") stores the source position, the state, and the commit log. Without it, restart cannot resume — and a checkpoint shared between two queries is corruption.
  • 3. Idempotent sink. The sink must produce the same external state regardless of how many times Spark writes the same batch. Delta transaction log + Kafka transactional producer do this natively; for arbitrary sinks, foreachBatch exposes epochId so the user can dedup.

Delta as the canonical exactly-once sink.

  • Atomic batch commit. Each micro-batch becomes one Delta transaction. The sink either fully succeeds (commit visible) or fully fails (commit invisible).
  • Idempotency by batch id. Delta's _delta_log/ records the streaming batch id. On replay, Delta refuses to commit a batch id it has already seen.
  • No outputMode("update") directly. Delta supports append natively; for update you use foreachBatch + MERGE INTO (covered in section 5).

Kafka as exactly-once sink (Spark 3.0+).

  • Transactional writes. kafka.producer.transactional.id + kafka.producer.enable.idempotence give Kafka exactly-once semantics for the sink half.
  • Consumer-side guarantee. Downstream consumers must read with isolation.level=read_committed to see only committed batches.
  • acks=all is mandatory for durability; the trade is throughput vs durability.

File sink (parquet, json, csv, orc).

  • Append-only. Every batch writes a new set of files into a sub-directory. The driver tracks which files belong to which batch.
  • Atomic per file. A file either appears fully or not at all (temporary file → atomic rename).
  • No update / complete. Files are immutable; the engine refuses these output modes.

Common interview probes on output modes.

  • "When do I use complete?" — only when the result table is small (e.g. top-10 most-clicked URLs) and you can afford to re-emit it every batch.
  • "Why does append require a watermark for stateful queries?" — because a row is "appendable" only when it will never change again. Without a watermark, the engine cannot decide that.
  • "Can I switch output modes between runs?" — no, output mode is checkpoint-compatible only in specific cases; safer to use a new checkpoint dir.
  • "How do I write update to Delta?" — via foreachBatch + MERGE INTO; Delta's native streaming sink supports append only.

Worked example — same aggregation under each output mode

Detailed explanation. Take a single windowed aggregation and run it under each output mode. Watch how many rows the sink receives per batch.

Question. Given the events from section 3 (clicks per user per 5-minute window), write the same query in append, update, and complete modes. Show the per-batch sink output.

Input (event-time order).

user_id event_time
u1 12:01
u2 12:02
u1 12:04

Code.

clicks = (spark.readStream.format("kafka").option(...).load()
              .selectExpr("user_id", "CAST(event_time AS TIMESTAMP) AS event_time"))

windowed = (clicks
    .withWatermark("event_time", "10 minutes")
    .groupBy(window("event_time", "5 minutes"), "user_id")
    .count())

# Mode 1 — append
(windowed.writeStream.outputMode("append").format("delta")
    .option("checkpointLocation", "/ck/wc_append/").toTable("wc_append").awaitTermination())

# Mode 2 — update
(windowed.writeStream.outputMode("update").format("memory")
    .queryName("wc_update").start().awaitTermination())

# Mode 3 — complete
(windowed.writeStream.outputMode("complete").format("memory")
    .queryName("wc_complete").start().awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Append mode emits a window only after the watermark passes the window end. For the 12:00-12:05 window with a 10-minute watermark, that's at watermark = 12:05, which requires max event-time >= 12:15. Until then, the sink receives nothing.
  2. Update mode emits a row every batch for each key whose count changed. Batch 1 sees u1=1; batch 2 sees u2=1; batch 3 sees u1=2.
  3. Complete mode emits the entire result table every batch. Batch 1: [(12:00-12:05, u1, 1)]. Batch 2: [(12:00-12:05, u1, 1), (12:00-12:05, u2, 1)]. Batch 3: full result with u1=2 and u2=1.
  4. The end-state in the sink is the same for all three modes — eventually consistent. The path each took is what differs.

Output (rows emitted across the 3 batches).

Mode Batch 1 Batch 2 Batch 3 Steady-state pattern
append (none) (none) (none until watermark passes) one row per window, ever
update (u1, 1) (u2, 1) (u1, 2) one row per changed key per batch
complete full table full table full table full table re-emitted every batch

Rule of thumb. Default to append for windowed aggregations with watermark — it is the only mode that gives one final row per window. Use update when downstream is an upsert sink with key-value semantics. Use complete only for small result tables.

Worked example — append to Delta with exactly-once

Detailed explanation. The most common production pattern: Kafka → Delta append. Spark + Delta deliver exactly-once for free because Delta records the streaming batch id and refuses duplicate writes.

Question. Read events from Kafka, parse JSON, write to a Delta table with exactly-once semantics. Show the minimal correct code.

Input. Kafka topic events with JSON payloads.

Code.

raw = (spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "broker:9092")
              .option("subscribe", "events")
              .option("startingOffsets", "earliest")
              .option("maxOffsetsPerTrigger", 100_000)
              .load())

parsed = (raw.selectExpr("CAST(value AS STRING) AS payload",
                         "timestamp AS kafka_ts")
              .selectExpr(
                  "get_json_object(payload, '$.event_id')   AS event_id",
                  "get_json_object(payload, '$.user_id')    AS user_id",
                  "CAST(get_json_object(payload, '$.event_time') AS TIMESTAMP) AS event_time",
                  "kafka_ts"))

(parsed.writeStream
       .format("delta")
       .outputMode("append")
       .option("checkpointLocation", "s3://bucket/ck/events_to_delta/")
       .option("path", "s3://bucket/curated/events/")
       .trigger(processingTime="30 seconds")
       .start()
       .awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Kafka source records the starting and ending offsets of each batch in checkpoint/offsets/N.
  2. The parsed DataFrame is computed in-memory; Delta writes the batch as a single atomic transaction.
  3. Delta's _delta_log/N.json records the batch id and the file list. If the batch id has already been seen (replay scenario), Delta skips the write — guaranteed idempotency.
  4. checkpoint/commits/N is written after Delta's commit succeeds. On restart, Spark sees offsets/N exists but commits/N does not — it replays batch N. Delta refuses the duplicate; the engine considers batch N committed and moves on.

Output guarantees.

Failure scenario Behaviour
Driver crash before Delta commit Batch N replayed; Delta accepts the only commit
Driver crash after Delta commit, before commits/N Batch N replayed; Delta refuses (already committed); engine writes commits/N and moves on
Source replay (recompute) Delta refuses duplicate batch id; idempotent
Sink network blip mid-write Atomic file rename on Delta side ensures partial files are not visible

Rule of thumb. Kafka → Delta with checkpointLocation is the safest exactly-once pattern in the Spark ecosystem. Use it as the default for any new streaming ingest into Delta; only reach for foreachBatch when you need upserts or complex sink logic.

Worked example — append to Kafka with transactional writes

Detailed explanation. Spark 3.0+ supports transactional Kafka writes, giving exactly-once for Kafka sinks. Requires a transactional.id per query and enable.idempotence=true. Downstream consumers must read with isolation.level=read_committed.

Question. Stream events from one Kafka topic to another with exactly-once semantics. Configure the producer for transactional writes.

Input. Source topic events_raw; target topic events_curated.

Code.

raw = (spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "broker:9092")
              .option("subscribe", "events_raw")
              .option("maxOffsetsPerTrigger", 50_000)
              .load())

# Optional transformation (this stays map-only for Continuous compat)
curated = raw.selectExpr("key", "value", "topic", "partition", "offset", "timestamp")

(curated.writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "broker:9092")
        .option("topic", "events_curated")
        .option("kafka.acks", "all")
        .option("kafka.enable.idempotence", "true")
        .option("kafka.transactional.id.prefix", "spark-events-curated-")
        .option("checkpointLocation", "/ck/events_raw_to_curated/")
        .trigger(processingTime="30 seconds")
        .start()
        .awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Each task in the writeStream owns a transactional.id derived from the prefix. The producer opens a transaction at batch start.
  2. All records for a batch are written inside the transaction; the transaction is committed atomically when the batch finishes.
  3. Downstream consumers configured with isolation.level=read_committed only see records from committed transactions — uncommitted (in-flight or aborted) records are invisible.
  4. On driver crash mid-batch, the next start aborts the in-flight transaction (via the same transactional.id) and replays the batch fresh. No duplicate records reach read_committed consumers.

Output.

Property Guarantee
Source replayable Kafka offsets
Sink transactional Kafka producer
Consumer requirement isolation.level=read_committed
End-to-end exactly-once for read_committed consumers

Rule of thumb. Kafka → Kafka exactly-once is a producer/consumer contract — both sides must cooperate. If you cannot mandate read_committed on every consumer of the target topic, you do not have exactly-once even with the best producer config.

Worked example — file sink (parquet) for cheap data-lake landing

Detailed explanation. Sometimes you just want raw events landed as parquet files for downstream Hive / Glue / Athena consumption. The file sink is append-only and produces one or more files per batch, tracked atomically.

Question. Stream events into hourly partitions of a parquet table. Show how the file sink names files and how to query them downstream.

Input. Kafka events with event_time.

Code.

raw = (spark.readStream.format("kafka").option(...).load()
              .selectExpr("CAST(value AS STRING) AS payload", "timestamp AS kafka_ts")
              .selectExpr(
                  "get_json_object(payload, '$.user_id') AS user_id",
                  "CAST(get_json_object(payload, '$.event_time') AS TIMESTAMP) AS event_time",
                  "date_format(get_json_object(payload, '$.event_time'), 'yyyy-MM-dd-HH') AS event_hour"))

(raw.writeStream
    .format("parquet")
    .outputMode("append")
    .partitionBy("event_hour")
    .option("path", "s3://lake/raw/events/")
    .option("checkpointLocation", "/ck/events_raw_files/")
    .trigger(processingTime="5 minutes")
    .start()
    .awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Each micro-batch writes one or more parquet files into s3://lake/raw/events/event_hour=YYYY-MM-DD-HH/, named by Spark's task id.
  2. The driver records the file names in checkpoint/sources/0/_spark_metadata/N — Spark refuses to re-emit files for already-committed batches on replay.
  3. Downstream consumers (Athena, Hive, Spark batch) read the directory partition-by-partition; only files listed in _spark_metadata are considered "committed" — but most consumers ignore this and read every file in the directory, which is fine because Spark never emits half-files.
  4. The hourly partition column is computed from event_time, so late events land in the correct hour partition regardless of processing-time.

Output (directory structure).

Path Contents
s3://lake/raw/events/event_hour=2026-06-12-13/ parquet files for 13:00 hour
s3://lake/raw/events/event_hour=2026-06-12-14/ parquet files for 14:00 hour
s3://lake/raw/events/_spark_metadata/N per-batch file list (Spark-specific)

Rule of thumb. Use the file sink for cheap landing of raw events into a data lake when downstream consumers do not need Delta features (ACID, time travel, MERGE). Switch to Delta the moment you need updates, deletes, or schema evolution.

Spark interview question on output modes + exactly-once

A senior interviewer often frames this as: "Pick the right combination of output mode + sink + checkpoint to give exactly-once into a Delta users_latest_state table where each row is the latest state per user_id. The source is a Kafka CDC topic with at-least-once semantics."

Solution Using foreachBatch + MERGE INTO + Delta target

def upsert_users(batch_df, epoch_id):
    # Deduplicate within the batch by latest event_time per user
    latest_per_user = (batch_df
        .withColumn("rn", row_number().over(
            Window.partitionBy("user_id").orderBy(col("event_time").desc())))
        .filter("rn = 1").drop("rn"))

    (DeltaTable.forName(spark, "users_latest_state").alias("t")
        .merge(
            latest_per_user.alias("s"),
            "t.user_id = s.user_id"
        )
        .whenMatchedUpdateAll(condition="s.event_time > t.event_time")
        .whenNotMatchedInsertAll()
        .execute())

raw = (spark.readStream.format("kafka").option(...).load()
              .selectExpr("get_json_object(value, '$.user_id') AS user_id",
                          "get_json_object(value, '$.state')   AS state",
                          "CAST(get_json_object(value, '$.event_time') AS TIMESTAMP) AS event_time"))

(raw.writeStream
     .foreachBatch(upsert_users)
     .option("checkpointLocation", "/ck/users_latest_state/")
     .trigger(processingTime="1 minute")
     .start()
     .awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Step What happens
1 Kafka source emits a batch with possibly multiple rows per user_id (out-of-order CDC).
2 foreachBatch receives the batch as a static DataFrame plus the epoch_id.
3 The function dedups within the batch (keeping the latest event_time per user).
4 MERGE INTO users_latest_state upserts each row: update existing rows whose event_time is older; insert new user_ids.
5 Delta commit is atomic. On crash + replay, the same MERGE is idempotent — re-applying it produces the same target state.

Output:

Property Guarantee
Source replayable Kafka offsets
Sink Delta MERGE — atomic + idempotent by key
Within-batch ordering dedup keeps latest event_time per user
End-to-end exactly-once into target Delta table

Why this works — concept by concept:

  • foreachBatch as the escape hatch — gives you a static DataFrame to operate on, which means you can call the Delta MERGE INTO API (not available in the native streaming sink). Receives epoch_id for idempotency checks.
  • MERGE INTO is the canonical upsert — declarative, atomic, idempotent. The whenMatchedUpdateAll(condition=...) guard prevents older events from overwriting newer state when CDC rows arrive out of order.
  • Within-batch dedup — required when the source can emit multiple updates per key per batch. Without it, MERGE picks a non-deterministic row.
  • Checkpoint guarantees replay-safety — on driver crash, Spark replays the batch. Delta MERGE is idempotent, so the second application produces the same target state.
  • No update output mode on Delta directly — the Delta native streaming sink supports append only. For upserts, foreachBatch + MERGE is the only correct pattern.
  • Cost — O(batch_rows) for the dedup window + O(batch_rows × matched_target_rows / hash_join) for MERGE. Z-ordering or partitioning the target on user_id speeds up MERGE matching dramatically.

PySpark
Topic — ETL
ETL problems with streaming sinks

Practice →


5. Production patterns — foreachBatch + MERGE INTO, checkpoint hygiene, schema evolution, idempotency

foreachBatch is the production escape hatch — every CDC ingest, every custom sink, every "I need MERGE" goes through it

The mental model in one line: foreachBatch(fn) exposes the streaming batch as a static DataFrame, lets you call any DataFrame API (including MERGE INTO, JDBC writes, multi-sink fan-out), and gives you the epochId for idempotency — making it the universal escape hatch when the native streaming sinks do not fit. Once you adopt foreachBatch as the default upsert pattern, the rest of production (checkpoint hygiene, schema evolution, backpressure) clicks into place.

Production topology diagram — Kafka source on the left, structured streaming query in the middle wrapping a foreachBatch(epochId, batchDF) call that issues a MERGE INTO target Delta table, a checkpoint directory shown beneath with offsets / commits / state / sources folders; on a light PipeCode card.

The foreachBatch contract.

  • Signature. def fn(batch_df: DataFrame, epoch_id: int) -> None. Spark calls fn once per micro-batch with the batch's rows as a static DataFrame.
  • epoch_id is monotonically increasing. Use it for idempotency: skip the function if you have already processed this epoch.
  • The batch DataFrame is static. You can call .cache(), .write (to multiple sinks), .collect() (carefully), DeltaTable.merge(...), etc.
  • Errors propagate. A thrown exception fails the batch; the engine retries from the last committed offset.

The canonical CDC into Delta pattern.

  • Source. Kafka CDC topic with at-least-once delivery; rows can arrive out-of-order per key.
  • Within-batch dedup. Window over (user_id, event_time DESC) → keep row 1 per user.
  • MERGE INTO. Upsert the dedup'd batch into the target Delta table; guard updates with s.event_time > t.event_time.
  • Checkpoint. Owned by this query — never shared, never reused.

Checkpoint hygiene rules.

  • One checkpoint per query, ever. Two queries reading the same source must each have their own checkpoint dir. Sharing leads to corruption.
  • Never delete files manually. The offsets/, commits/, state/, sources/ directories are linked; partial deletion corrupts the query.
  • Treat the checkpoint as the query's identity. If you need to re-process from scratch, write to a new checkpoint dir — leave the old one alone.
  • Back it up like a database. S3 / ADLS / GCS lifecycle rules should exclude checkpoint directories; rotating or deleting them silently breaks restart.
  • Monitor checkpoint size. State growth past a few GB per batch is a flag — either watermark is too loose or the keyspace is unbounded.

Schema evolution.

  • Source schema drift. If a new field appears in the Kafka JSON payload, your query keeps working — get_json_object returns NULL for missing fields. To pick up the new field, restart the query with an updated .selectExpr.
  • Delta sink schema evolution. option("mergeSchema", "true") on the Delta sink lets new columns appear in the target. Default is off — strict schema.
  • MERGE INTO schema evolution. Set spark.databricks.delta.schema.autoMerge.enabled = true (Databricks) or use withSchemaEvolution() on the MergeBuilder (Delta 2.4+) to let MERGE create new columns in the target.
  • Breaking changes. Column type changes, column renames, partition column changes — these require a backfill and a new checkpoint.

Idempotency strategies (per sink type).

  • Delta sink. Idempotent natively. Stream batch id recorded in _delta_log; duplicate batches refused.
  • Kafka sink. Transactional producer + read_committed consumers = exactly-once.
  • JDBC sink (no foreachBatch). Use foreachBatch + MERGE INTO target_jdbc or a (epoch_id, key) upsert with ON CONFLICT DO UPDATE.
  • Custom REST sink. Pass epoch_id as an idempotency key the API can dedupe on.

Backpressure must be configured for every source.

  • Kafka. maxOffsetsPerTrigger (or maxBytesPerTrigger in 3.4+).
  • File source. maxFilesPerTrigger (default 1000).
  • Delta source / CDF. maxFilesPerTrigger + maxBytesPerTrigger.
  • Rate source. rowsPerSecond + numPartitions (for testing only).

Monitoring with StreamingQueryListener.

  • Subscribe to events. onQueryStarted, onQueryProgress, onQueryTerminated. Useful for emitting metrics to Datadog / Prometheus.
  • Key metrics in QueryProgressEvent. numInputRows, processedRowsPerSecond, inputRowsPerSecond, numRowsDroppedByWatermark, stateOperators[].numRowsTotal, triggerExecution.
  • Alert thresholds. Input rate >> processed rate → falling behind. State row count growing without bound → watermark too loose.
  • Databricks UI gives most of this for free; on vanilla Spark, you wire it yourself.

Common interview probes on production patterns.

  • "Why is foreachBatch better than foreach for upserts?" — because foreachBatch gives you a DataFrame (MERGE-capable) while foreach gives you one row at a time (no batch-level optimisations).
  • "What's inside the checkpoint directory?" — offsets/, commits/, state/, sources/, metadata. Each is essential; deleting any one corrupts the query.
  • "How do you evolve schema without a backfill?" — mergeSchema=true on Delta sink + Delta MERGE with withSchemaEvolution().
  • "How do you guarantee idempotency for a custom REST sink?" — pass epoch_id as the API's idempotency key.

Worked example — production CDC into Delta with foreachBatch + MERGE

Detailed explanation. The textbook production pattern: Debezium emits CDC rows into Kafka; Spark consumes, dedups within batch, MERGEs into the target Delta table. Restart-safe, schema-evolving, exactly-once.

Question. Write a complete production-quality streaming query that ingests a Debezium CDC topic into a target Delta table with upsert semantics. Include watermark, backpressure, foreachBatch with MERGE, and explicit checkpointing.

Input. Kafka topic debezium.users emitting JSON records with op (c / u / d), after.user_id, after.email, source.ts_ms.

Code.

from delta.tables import DeltaTable
from pyspark.sql.functions import col, row_number, get_json_object, from_unixtime
from pyspark.sql.window import Window

# 1) Source — Kafka with backpressure
src = (spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "broker:9092")
              .option("subscribe", "debezium.users")
              .option("startingOffsets", "earliest")
              .option("maxOffsetsPerTrigger", 100_000)
              .load())

# 2) Parse CDC payload
parsed = (src.selectExpr("CAST(value AS STRING) AS payload",
                         "timestamp AS kafka_ts")
              .selectExpr(
                  "get_json_object(payload, '$.op')                                  AS op",
                  "get_json_object(payload, '$.after.user_id')                       AS user_id",
                  "get_json_object(payload, '$.after.email')                         AS email",
                  "CAST(get_json_object(payload, '$.source.ts_ms') AS LONG) / 1000   AS event_ts_epoch")
              .withColumn("event_time", from_unixtime("event_ts_epoch").cast("timestamp"))
              .withWatermark("event_time", "30 minutes"))

# 3) foreachBatch — dedup within batch + MERGE INTO Delta
def upsert_users(batch_df, epoch_id):
    if batch_df.rdd.isEmpty():
        return
    w = Window.partitionBy("user_id").orderBy(col("event_time").desc())
    latest = (batch_df.withColumn("rn", row_number().over(w))
                      .filter("rn = 1").drop("rn"))

    target = DeltaTable.forName(spark, "users_latest_state")
    (target.alias("t")
           .merge(latest.alias("s"),
                  "t.user_id = s.user_id")
           .whenMatchedDelete(condition="s.op = 'd'")
           .whenMatchedUpdate(
                condition="s.event_time > t.event_time AND s.op IN ('c','u')",
                set={"email": "s.email", "event_time": "s.event_time"})
           .whenNotMatchedInsert(
                condition="s.op IN ('c','u')",
                values={"user_id": "s.user_id",
                        "email":   "s.email",
                        "event_time": "s.event_time"})
           .execute())

# 4) writeStream — checkpoint + trigger
(parsed.writeStream
       .foreachBatch(upsert_users)
       .option("checkpointLocation", "s3://bucket/ck/users_latest_state/")
       .trigger(processingTime="1 minute")
       .start()
       .awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The Kafka source pulls up to 100k offsets per batch. The event_time is parsed from Debezium's source.ts_ms (millisecond epoch) and a 30-minute watermark is declared.
  2. Each batch arrives as a static DataFrame in upsert_users. The function deduplicates the batch by user_id, keeping the row with the latest event_time (handles out-of-order CDC within a single batch).
  3. MERGE INTO users_latest_state is the canonical CDC upsert: delete rows on op='d', update rows on op='c'/'u' (guarded by s.event_time > t.event_time to prevent old updates overwriting newer state), insert new user_ids.
  4. The checkpoint location is owned by this query alone. On restart, Kafka offsets are replayed and MERGE is idempotent — the same target state results.

Output (steady-state behaviour).

Concern Behaviour
Out-of-order CDC within-batch dedup picks latest event_time per user
Replay after crash Kafka offsets replay, MERGE idempotent — target state unchanged
New user_id inserted by whenNotMatchedInsert
Hard delete row removed by whenMatchedDelete(condition="s.op = 'd'")
Schema drift new fields surface as NULL; restart with new selectExpr to pick up

Rule of thumb. Every production CDC ingest follows this shape: Kafka source + watermark + foreachBatch + MERGE INTO + private checkpoint. Memorise the four pillars; the rest is business logic inside the function.

Worked example — checkpoint directory anatomy

Detailed explanation. Inspecting the checkpoint directory is the fastest way to debug a streaming query that won't start. Each sub-directory has a specific purpose, and understanding the layout helps you distinguish "checkpoint corruption" from "expected behaviour."

Question. A streaming query's checkpoint dir is s3://bucket/ck/curated/. Inspect the layout and explain what each sub-directory means.

Input. The checkpoint after 50 successful batches:

s3://bucket/ck/curated/
├── metadata
├── offsets/
│   ├── 48
│   ├── 49
│   └── 50
├── commits/
│   ├── 48
│   ├── 49
│   └── 50
├── state/
│   └── 0/
│       └── 0/
│           ├── 50.delta
│           ├── 50.snapshot
│           └── _metadata/
└── sources/
    └── 0/
        └── _spark_metadata/
            └── 50
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. metadata — one-time query metadata: query id, version, source/sink summary.
  2. offsets/N — the source position before batch N runs. Written before the batch executes.
  3. commits/N — written after the batch's sink commit succeeds. The pair (offsets/N, commits/N) together means "batch N is durable."
  4. state/0/0/ — keyed state for partition 0 of operator 0 (stateful operators are numbered). .delta files are per-batch updates; .snapshot is a periodic full snapshot for fast restart.
  5. sources/0/_spark_metadata/N — source-specific metadata. For Kafka, it's the consumer group state. For file sources, it's the list of files committed so far.

Restart logic.

State on disk Engine behaviour
offsets/50 exists, commits/50 exists batch 50 fully durable; start batch 51
offsets/50 exists, commits/50 missing batch 50 was in-flight at crash; replay batch 50
offsets/50 missing nothing in-flight; start batch 50 fresh from last committed offset

Output.

Sub-dir Purpose Safe to delete?
metadata query id + version NO — never
offsets/N source position pre-batch NO — only as a pair with commits/N
commits/N sink commit confirmation NO
state/X/Y/ keyed state for stateful operators NO — deleting forces state reset
sources/N/_spark_metadata source-specific tracking NO

Rule of thumb. Never manually delete files inside a checkpoint directory. To "reset" a query, point it at a new checkpoint dir and start fresh; leave the old one for archaeology.

Worked example — schema evolution with mergeSchema

Detailed explanation. Upstream adds a new column to the Kafka payload. Without schema evolution, the new column is silently dropped at the parser layer; with mergeSchema=true, the Delta target picks up the new column on the next batch.

Question. A streaming query writes Kafka JSON to Delta. Upstream adds a new field loyalty_tier. Show the config that lets the target Delta table evolve.

Input. Original Kafka payload: {"user_id": 1, "email": "x@y"}. New payload (mid-day): {"user_id": 1, "email": "x@y", "loyalty_tier": "gold"}.

Code.

# Update the parser to project the new field
parsed = (src.selectExpr("CAST(value AS STRING) AS payload")
              .selectExpr(
                  "get_json_object(payload, '$.user_id')        AS user_id",
                  "get_json_object(payload, '$.email')          AS email",
                  "get_json_object(payload, '$.loyalty_tier')   AS loyalty_tier"))   # new

(parsed.writeStream
       .format("delta")
       .outputMode("append")
       .option("mergeSchema", "true")                                                 # KEY
       .option("checkpointLocation", "/ck/users_curated/")
       .toTable("users_curated")
       .awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Without mergeSchema=true, the first batch with loyalty_tier fails with AnalysisException: A schema mismatch detected when writing to the Delta table.
  2. With mergeSchema=true, Delta widens the target schema by adding the loyalty_tier column on the next commit. Existing rows have NULL loyalty_tier.
  3. The streaming query must be restarted with the new .selectExpr that includes the new field. The checkpoint is forward-compatible — the new column is appended, not breaking.
  4. For MERGE INTO with schema evolution: use target.merge(...).withSchemaEvolution() (Delta 2.4+) or set spark.databricks.delta.schema.autoMerge.enabled = true on Databricks.

Output.

Step Target schema
Before (user_id, email)
After first batch with loyalty_tier (user_id, email, loyalty_tier) — old rows NULL
Subsequent batches new column populated for new rows

Rule of thumb. Set mergeSchema=true on the Delta sink for every CDC pipeline; the cost is negligible and the cost of not having it is a 3am page when upstream adds a field. For MERGE INTO, also enable withSchemaEvolution() or the Databricks autoMerge conf.

Worked example — multi-sink fan-out via foreachBatch

Detailed explanation. Sometimes a single batch needs to land in two places — e.g. raw events into a Delta table for analytics and a Kafka topic for downstream microservices. foreachBatch is the natural place: cache the batch DataFrame once, write to both sinks.

Question. Write the same Kafka source batch to (a) a Delta table for analytics and (b) a Kafka topic for downstream consumers. Avoid double-computing the source DataFrame.

Input. Kafka topic events_raw; targets: Delta table events_curated_delta and Kafka topic events_curated_kafka.

Code.

def fan_out(batch_df, epoch_id):
    batch_df.cache()                       # avoid double-computing the source plan
    try:
        # Sink 1 — Delta
        (batch_df.write.format("delta").mode("append").saveAsTable("events_curated_delta"))
        # Sink 2 — Kafka
        (batch_df.selectExpr("CAST(user_id AS STRING) AS key", "to_json(struct(*)) AS value")
                .write.format("kafka")
                .option("kafka.bootstrap.servers", "broker:9092")
                .option("topic", "events_curated_kafka")
                .option("kafka.acks", "all")
                .save())
    finally:
        batch_df.unpersist()

(spark.readStream.format("kafka").option(...).load()
       .writeStream
       .foreachBatch(fan_out)
       .option("checkpointLocation", "/ck/fan_out/")
       .trigger(processingTime="30 seconds")
       .start()
       .awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. batch_df.cache() materialises the source DataFrame in memory once. Without it, the Kafka source would be re-read for each sink — duplicating reads and possibly producing different results if the source is non-deterministic.
  2. The Delta append + Kafka write each consume the cached DataFrame. Both should succeed or fail together — if one fails, the whole batch fails and Spark retries from the same offset.
  3. try/finally ensures the cache is unpersisted regardless of failure path.
  4. The checkpoint tracks the source offsets only; the two-sink fan-out is logically one batch. On crash + replay, Delta refuses duplicate batch ids and Kafka transactional producer (if configured) aborts the in-flight transaction — both sides idempotent.

Output.

Sink Behaviour
Delta events_curated_delta atomic append per batch
Kafka events_curated_kafka per-batch produce; use transactional producer for exactly-once
Cache one read per batch, regardless of sink count

Rule of thumb. Multi-sink fan-out is the only good reason to call .cache() inside foreachBatch. Always wrap in try/finally so the cache is released — orphaned cached batches eat executor memory fast.

Worked example — monitoring with StreamingQueryListener

Detailed explanation. Production streaming queries need real-time monitoring: input rate, processed rate, watermark advancement, state size. StreamingQueryListener is the hook to wire these into Prometheus / Datadog.

Question. Implement a StreamingQueryListener that logs the input rate, processing rate, and numRowsDroppedByWatermark for each batch.

Input. A running streaming query.

Code.

from pyspark.sql.streaming import StreamingQueryListener

class MetricsListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"[query started] id={event.id} runId={event.runId}")

    def onQueryProgress(self, event):
        p = event.progress
        print(f"[progress] batchId={p.batchId} "
              f"input={p.inputRowsPerSecond:.1f}/s "
              f"processed={p.processedRowsPerSecond:.1f}/s "
              f"trigger={p.durationMs.get('triggerExecution', 0)}ms "
              f"drops={p.stateOperators[0].numRowsDroppedByWatermark if p.stateOperators else 0}")

    def onQueryTerminated(self, event):
        print(f"[query terminated] id={event.id}")

spark.streams.addListener(MetricsListener())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The listener subscribes to three callbacks. onQueryProgress fires after every micro-batch — that's where the metrics live.
  2. inputRowsPerSecond is the rate at which rows arrived from the source during the batch window. processedRowsPerSecond is the rate at which the batch processed them.
  3. When input > processed for sustained batches, the query is falling behind — alert.
  4. numRowsDroppedByWatermark is the count of late events dropped from stateful operators in this batch. Spike = your watermark is too tight.
  5. In production, replace print with a Datadog / Prometheus push or a structured log line picked up by your observability pipeline.

Output (per-batch log line).

Field Meaning Alert threshold
inputRowsPerSecond source arrival rate n/a
processedRowsPerSecond engine processing rate alert if << input for 3+ batches
triggerExecution (ms) total time for the batch alert if > trigger interval consistently
numRowsDroppedByWatermark late events dropped alert if > 0.1% of input

Rule of thumb. Wire a StreamingQueryListener into every production streaming query. The default Spark UI shows you the same data but is operator-driven; metrics in Datadog / Prometheus are SRE-driven. Both matter.

Spark interview question on production hardening

A senior interviewer often opens with: "Walk me through everything you'd add to a hello-world readStream → writeStream query to make it production-grade. Be specific."

Solution Using the production hardening checklist

# Production-grade Kafka → Delta CDC pipeline
from delta.tables import DeltaTable
from pyspark.sql.functions import col, row_number, get_json_object, from_unixtime, lit
from pyspark.sql.window import Window
from pyspark.sql.streaming import StreamingQueryListener

# 1) Backpressure + watermark + parse
src = (spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "broker:9092")
              .option("subscribe", "debezium.users")
              .option("startingOffsets", "earliest")
              .option("maxOffsetsPerTrigger", 100_000)
              .option("failOnDataLoss", "false")
              .load())

parsed = (src.selectExpr("CAST(value AS STRING) AS payload", "timestamp AS kafka_ts")
              .selectExpr(
                  "get_json_object(payload, '$.after.user_id') AS user_id",
                  "get_json_object(payload, '$.after.email')   AS email",
                  "CAST(get_json_object(payload, '$.source.ts_ms') AS LONG)/1000 AS event_ts_epoch")
              .withColumn("event_time", from_unixtime("event_ts_epoch").cast("timestamp"))
              .withWatermark("event_time", "30 minutes"))

# 2) foreachBatch upsert with within-batch dedup
def upsert_users(batch_df, epoch_id):
    if batch_df.rdd.isEmpty():
        return
    w = Window.partitionBy("user_id").orderBy(col("event_time").desc())
    latest = (batch_df.withColumn("rn", row_number().over(w))
                      .filter("rn = 1").drop("rn"))

    (DeltaTable.forName(spark, "users_latest_state").alias("t")
        .merge(latest.alias("s"), "t.user_id = s.user_id")
        .whenMatchedUpdate(condition="s.event_time > t.event_time",
                           set={"email": "s.email", "event_time": "s.event_time"})
        .whenNotMatchedInsertAll()
        .execute())

# 3) Listener for SRE metrics
class MetricsListener(StreamingQueryListener):
    def onQueryStarted(self, event): pass
    def onQueryProgress(self, event):
        # push to Datadog / Prometheus
        pass
    def onQueryTerminated(self, event): pass
spark.streams.addListener(MetricsListener())

# 4) writeStream with private checkpoint + processing-time trigger
(parsed.writeStream
       .foreachBatch(upsert_users)
       .option("checkpointLocation", "s3://bucket/ck/users_latest_state/")
       .trigger(processingTime="1 minute")
       .start()
       .awaitTermination())
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Hardening What it gives you
maxOffsetsPerTrigger=100k bounded batch size — no first-batch OOM after outage
failOnDataLoss=false tolerates Kafka retention deleting offsets older than startingOffsets
withWatermark("event_time", "30 minutes") bounded state for any future stateful operator
within-batch dedup handles multiple CDC updates per key per batch
MERGE INTO ... whenMatchedUpdate(condition=event_time > t.event_time) out-of-order safety
private checkpointLocation restart-safe, owned by this query
processingTime="1 minute" matches SLA; controls cost
StreamingQueryListener SRE-grade metrics in Datadog/Prometheus

Output:

Hardening pillar Behaviour
Source Kafka offsets, capped, replayable
State bounded by watermark
Sink Delta MERGE — exactly-once, idempotent
Restart safe via private checkpoint
Monitoring per-batch metrics surfaced to SRE tools

Why this works — concept by concept:

  • Backpressure as production insurancemaxOffsetsPerTrigger caps the worst-case batch size. The first batch after a 4-hour outage is 100k offsets, not 12M — the cluster recovers gracefully instead of OOMing.
  • failOnDataLoss=false — gives the query the right to skip offsets that Kafka has aged out. With true, an old checkpoint is fatal after Kafka rotation.
  • withWatermark on a non-stateful query — costs nothing now but lets you add stateful operators later (dedup, windowed agg) without changing the source.
  • Idempotent MERGE INTO — the s.event_time > t.event_time guard makes the upsert order-independent; replays produce the same target state.
  • Private checkpoint — owned by this query, never shared with another query, never deleted manually. The most common production outage is "two queries pointed at the same checkpoint."
  • StreamingQueryListener — the SRE-grade hook. Wire it to your observability stack so input-rate vs processed-rate divergence pages someone within minutes, not hours.
  • Cost — O(batch_rows) for parse + dedup + MERGE matching. The cost is dominated by MERGE if the target table is not partitioned/Z-ordered on the merge key — fix that upstream of the streaming query.

PySpark
Topic — streaming · meta
Meta streaming problems (PySpark)

Practice →


Cheat sheet — Structured Streaming recipes

  • Read from Kafka, write to Delta (minimal exactly-once). spark.readStream.format("kafka").option("kafka.bootstrap.servers", ...).option("subscribe", "events").option("maxOffsetsPerTrigger", 100_000).load() ... .writeStream.format("delta").outputMode("append").option("checkpointLocation", "/ck/...").trigger(processingTime="30 seconds").toTable("target").
  • 5-minute tumbling window + 10-minute watermark. df.withWatermark("event_time", "10 minutes").groupBy(window("event_time", "5 minutes"), "user_id").count().
  • Trigger.AvailableNow for scheduled ingest. .trigger(availableNow=True) — drains every new file/offset across as many batches as needed, then exits. Replaces deprecated Trigger.Once.
  • foreachBatch + MERGE INTO for CDC upserts. Dedup within batch (row_number() over (key, event_time DESC)), then target.merge(source, "t.key = s.key").whenMatchedUpdate(condition="s.event_time > t.event_time").whenNotMatchedInsertAll().execute().
  • Dedup with watermark. df.withWatermark("event_time", "30 minutes").dropDuplicatesWithinWatermark(["event_id"]) — bounded state.
  • Stream-stream join with two watermarks. df1.withWatermark("ts1", "10 min").join(df2.withWatermark("ts2", "10 min"), expr("k1 = k2 AND ts2 BETWEEN ts1 AND ts1 + interval 1 hour")) — both sides need a watermark.
  • Recover from checkpoint corruption. Do not edit the checkpoint directory. Point the query at a new dir and re-process from the source. Old data already in the sink is preserved by Delta's idempotency.
  • StreamingQueryListener for SRE. Subclass pyspark.sql.streaming.StreamingQueryListener, implement onQueryProgress, and push the QueryProgressEvent metrics to Datadog / Prometheus.
  • Switch to RocksDB state store. spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") — required past 1M state keys.
  • Backpressure for Kafka. option("maxOffsetsPerTrigger", N) or option("maxBytesPerTrigger", "N MB") (Spark 3.4+). Without it, first batch after outage OOMs.
  • Backpressure for file source. option("maxFilesPerTrigger", N) (default 1000) — pair with Trigger.AvailableNow for big back-fills.
  • Delta sink schema evolution. option("mergeSchema", "true") on the writeStream. For MERGE INTO, also set spark.databricks.delta.schema.autoMerge.enabled = true (Databricks) or use .withSchemaEvolution() on the MergeBuilder (Delta 2.4+).
  • Idempotency for non-Delta sinks. Use epoch_id as the API idempotency key, or maintain a (epoch_id, batch_id) ledger table and skip already-processed epochs in foreachBatch.
  • Avoid foreach for upserts. Always prefer foreachBatch — row-by-row foreach cannot use MERGE INTO and is 10-100× slower.

Frequently asked questions

What is the difference between Trigger.Once and Trigger.AvailableNow?

Trigger.Once (deprecated since Spark 3.3) tries to process every new offset / file in a single micro-batch — which OOMs the cluster on large backlogs. Trigger.AvailableNow processes new data across multiple batches, respecting maxOffsetsPerTrigger and maxFilesPerTrigger, and exits when the source is drained. Migration is a one-line change (trigger(once=True)trigger(availableNow=True)), checkpoint-compatible, and removes the OOM risk. Every new pipeline should use AvailableNow; legacy code using Once is technical debt.

Why does my watermark not drop late data immediately?

Because the watermark advances per batch based on the maximum event-time seen so far. A late event that arrives in batch N is only dropped if its event-time is older than the watermark at batch N — which is max(event_time observed up to batch N) - threshold. If your stream has been quiet (no new events), the watermark hasn't moved, and the late event may still be accepted. Conversely, a new batch with a much-later event-time advances the watermark and may suddenly drop a flood of "late" events. The metric to watch is numRowsDroppedByWatermark from StreamingQueryListener — sudden spikes signal a watermark step.

Why is foreachBatch usually the right way to write a custom sink?

Because foreachBatch(fn) exposes the streaming batch as a static DataFrame, which means you can call the full DataFrame API: MERGE INTO, JDBC writes, multi-sink fan-out, batch-level caching. foreach(fn) is row-by-row and cannot leverage batch operations — it's also 10-100× slower for typical workloads. foreachBatch additionally receives the epoch_id for idempotency: pass it as a Kafka/REST idempotency key or maintain a (epoch_id, batch_id) ledger table to skip already-processed epochs. Use foreach only when the sink truly needs row-by-row state (rare).

What output mode should I use for a tumbling-window aggregation?

It depends on the SLA. outputMode("append") emits one final row per window once the watermark passes the window end — clean, idempotent, but delayed by the watermark width. outputMode("update") emits a row every batch for each window whose count changed — interim counts are visible immediately, but the same window may appear many times in the sink. outputMode("complete") emits the entire result table every batch — only viable for small key spaces (< 100k windows) because every batch re-emits everything. The 90% answer is append with a watermark calibrated to the 99th percentile late-event delay.

How does Structured Streaming guarantee exactly-once into a Delta sink?

Three pillars working together. (1) Replayable source — Kafka offsets, file names, Delta CDF version — all deterministic; the source can be re-read from a recorded position. (2) Durable checkpointoption("checkpointLocation", ...) records the source position, state, and commit log; a crashed batch can be replayed. (3) Idempotent Delta sink — Delta's _delta_log/ records the streaming batch id and refuses duplicate writes. The combination is exactly-once: on crash, the engine replays the last unconfirmed batch and Delta either commits it (first success) or refuses it (already committed). Without any one of the three pillars, the guarantee collapses.

How do I evolve the schema of a streaming source without breaking the checkpoint?

For an additive change (new column added), set option("mergeSchema", "true") on the Delta sink and update your .selectExpr to project the new field. The Delta table widens its schema on the next commit; existing rows get NULL for the new column; the checkpoint is forward-compatible. For MERGE INTO, additionally enable withSchemaEvolution() on the MergeBuilder (Delta 2.4+) or set spark.databricks.delta.schema.autoMerge.enabled = true on Databricks. For breaking changes (type change, column rename, partition column change), you cannot keep the existing checkpoint — write to a new checkpoint dir, backfill, and switch consumers. The rule of thumb: design source contracts to be additive-only, and breaking changes become a once-in-a-quarter, planned event.

Practice on PipeCode

Pipecode.ai is Leetcode for Data Engineering — every Structured Streaming pattern above ships with hands-on practice rooms where you write the `withWatermark`, the `foreachBatch` upsert, and the MERGE INTO against real graded inputs. PipeCode pairs every reading with 450+ DE-focused problems and a real-time scoring engine, so you never have to wonder whether your trigger + watermark + sink combination actually delivers exactly-once on Spark 3.5 with RocksDB state.

Practice Structured Streaming now →
Spark internals course →

Top comments (0)