data pipeline design is the single highest-leverage system-design competency a mid-to-staff data engineer is hired on: batch architectures (Airflow DAG + dbt build + warehouse), streaming architectures (Kafka + Flink Kappa with log replay), idempotency patterns (MERGE INTO, dedup keys, deterministic hash partitions), backfill strategies (full-table, partition-aware, log replay), observability + SLOs (structured JSON logs, metrics, OpenTelemetry traces, freshness SLOs), and the production failure modes (schema drift, source unavailable, OOM, runaway scan, late data, partition misalignment, retry storm, downstream backpressure) every senior loop drills against. Together those seven concerns form the pipeline design interview map that every senior data engineer interview questions round circles back to.
This guide is the 7-section deep-dive counterpart to a shorter design-guide article: each section is structured as ### Title sub-topics that walk a single concept, then a #### Worked example block in the Question → Input → Code → Step-by-step → Output order, then a ### Solution Using … block with the four-part Solution Tail (code → step-by-step trace → output → why this works). The seven sections cover why pipeline design separates juniors from seniors, batch architectures deep-dive, streaming architectures deep-dive, idempotency patterns, backfill strategies, observability + SLOs, and a failure-mode + production playbook — the exact shape data engineering interview questions loops reward when the whiteboard prompt is "design me a pipeline that …".
When you want hands-on reps alongside the read, browse ETL Python drills →, drill data-processing patterns →, sharpen streaming Python drills →, rehearse real-time analytics drills →, reinforce pipeline-design drills →, or widen coverage on the full Python practice library →.
On this page
- Why data pipeline design separates juniors from seniors
- Batch architectures deep-dive — Airflow DAG + dbt build + warehouse
- Streaming architectures deep-dive — Kafka + Flink Kappa with replay
- Idempotency patterns — MERGE INTO, dedup keys, deterministic hash
- Backfill strategies — full-table, partition-aware, log replay
- Observability + SLOs — logs, metrics, traces, alerting
- Failure modes + production playbook
- Choosing the right pipeline pattern (cheat sheet)
- Frequently asked questions
- Practice on PipeCode
1. Why data pipeline design separates juniors from seniors
The senior-loop signal — name the design loop, not the tool stack
The one-sentence invariant: data pipeline design is the discipline of moving data from source to consumer such that every stage is idempotent, every window is backfillable, every failure is observable, and the architecture (batch vs streaming) is chosen by the consumer's SLA — not by the team's tool preference. Junior answers reach for tool names ("I'd use Airflow, dbt, Snowflake"); senior answers reach for the design loop — source → ingest → transform → serve, with idempotency, backfill, and observability orthogonal to all four stages.
The four pillars of senior pipeline design.
-
Architecture —
batch vs streaming,Lambda vs Kappa; the decision is driven by consumer SLA, not by hype. -
Idempotency — every transform must be safe to re-run;
MERGE INTO, idempotency keys, deterministic hash partitions are the three implementation patterns. - Backfills — a known window is re-processed with the same code; partition-aware Airflow is the default, full-table reload is the fallback, log replay is the streaming equivalent.
- Observability — structured JSON logs with correlation IDs, metrics (row counts, latency, freshness), OpenTelemetry traces per task, SLOs with PagerDuty + a written runbook.
What interviewers actually listen for.
- Do you start from the consumer SLA when choosing
batch vs streaming? — basic-but-tested. - Do you mention
MERGE INTOor anevent_ididempotency key the first time the reviewer says "what if Airflow retries this task?" — fluency signal. - Can you describe a partition-aware backfill in Airflow with
--start-dateand--end-date? — senior signal. - Do you call out observability + SLOs as a first-class design concern, not a post-hoc addition? — interview-canonical answer.
- Do you cite at least one failure mode (schema drift, late data, retry storm) before the reviewer asks? — staff-level signal.
The 7-section map this guide walks.
- §2 — Batch architectures — Airflow DAG + dbt build + warehouse; sensors, SLA monitor, idempotent partition overwrites.
- §3 — Streaming architectures — Kafka topic + partition model, Flink job + windowing + watermark + late-data, Kappa replay.
-
§4 — Idempotency patterns —
MERGE INTOupsert,event_iddedup, deterministic SHA256 hash partition. -
§5 — Backfill strategies — full-table reload, partition-aware
--start-date / --end-date, log replay from a Kafka offset. - §6 — Observability + SLOs — 4-layer stack (logs → metrics → traces → alerting/SLOs) with a freshness-SLO worked example.
- §7 — Failure modes — schema drift, source unavailable, OOM, runaway scan, late data, partition misalignment, retry storm, downstream backpressure.
- Cheat sheet + FAQ + CTA — choose-the-pattern table, 5 senior-loop FAQs, practice routes.
The non-negotiables that show up in every senior answer.
-
Idempotent sinks —
MERGE INTOon a natural key, partition overwrite, or upsert-with-version; never blindINSERT INTO target SELECT …without aWHEREwindow. -
Backfill-first design — every task is parameterised by
{{ ds }}(Airflow logical date) so a single re-run with--start-date / --end-datecorrects history. -
Observability scaffolding — structured logs with a
dag_run_idcorrelation ID, row-count and freshness metrics, freshness SLO with a PagerDuty target. -
Schema tolerance — Schema Registry + tolerant readers;
MERGEclauses that drop unknown columns; alerts on schema drift. - A documented runbook — every alert has a paired runbook entry naming the diagnostic queries and the safe remediation steps.
Worked example — answering "design a 500M-events/day pipeline" in three minutes
Detailed explanation. Most pipeline-design rounds open with a single fat prompt: "design a pipeline that lands 500M events/day from Kafka into a warehouse, surfaces revenue_by_region to Power BI by 8 AM, survives retries, and supports backfilling any past day after a bug fix." The senior answer is a 4-line architecture sketch that names every pillar — source → ingest → transform → serve, with idempotency / backfill / observability bolted on the side.
Question. Sketch the canonical four-pillar answer for the 500M-events/day prompt. Name the idempotency primitive, the backfill command, and the SLO.
Input (the prompt's constraints).
| constraint | value |
|---|---|
| source | Kafka topic orders, at-least-once, event_id per record |
| volume | ~500M events/day (~5,800 events/sec) |
| consumer | Power BI dashboard refreshing daily by 08:00 local |
| backfill | must re-process any past day after a bug fix |
| failure tolerance | every task must be safe to retry |
Code (the four-line architecture answer).
Source : Kafka 'orders' (at-least-once, event_id)
Ingest : Spark Structured Streaming -> bronze Delta /raw/orders/dt=YYYY-MM-DD/
- dedupe on event_id (idempotency key)
- partition by ingest_date
Transform : Airflow DAG (06:00 daily, {{ ds }} = YYYY-MM-DD)
- read /raw/orders/dt={{ ds }}/
- MERGE INTO silver.orders_clean ON (order_id)
- aggregate -> gold.revenue_by_region partitioned by region,date
Serve : Power BI Direct Lake reads gold.revenue_by_region
Backfill : airflow dags backfill orders_daily --start-date 2026-05-01 --end-date 2026-05-07
Observe : structured JSON logs + freshness SLO (<= 1h after 06:00) + PagerDuty
Step-by-step explanation.
-
Kafka delivers at-least-once with
event_id— the idempotency key the ingest layer dedupes on. -
Spark Structured Streaming writes to a bronze Delta path partitioned by
ingest_date— partition overwrites are idempotent. -
Airflow DAG runs at 06:00 with
{{ ds }} = 2026-05-26; reads only/raw/orders/dt=2026-05-26/. -
MERGE INTO silver.orders_clean ON (order_id)— re-running the task overwrites the same target rows; no duplicates. -
Gold aggregation is
INSERT OVERWRITEper(region, date)partition — safe to re-run. -
Backfill uses
airflow dags backfill --start-date / --end-date; every task is parameterised by{{ ds }}so the same code re-runs. -
Observability — every task emits a JSON log with
dag_run_id+task_id+ row count; freshness SLO breach pages the on-call.
Sample output (the senior signal panel listens for).
Pillar Choice Why
---------------- -------------------------------------------- ------------------------------
Architecture Batch (daily 06:00) + streaming ingest only SLA is 08:00; batch is cheaper
Idempotency event_id dedup at bronze; MERGE at silver Retries + backfills both safe
Backfill Airflow --start-date / --end-date Same code, same {{ ds }}
Observability JSON logs + freshness SLO + PagerDuty SLO is the design constraint
Rule of thumb: every senior pipeline answer is a 4-line sketch (source → ingest → transform → serve) with idempotency + backfill + observability called out as constraints, not after the architecture is drawn. Lead with the SLA, name the idempotency primitive, name the backfill command — and the architecture answer practically writes itself.
Solution Using the canonical four-pillar pipeline-design template
Code (the reusable senior-loop template).
def design_pipeline(prompt):
# Step 1: read the consumer SLA from the prompt
sla = parse_consumer_sla(prompt) # e.g. "08:00 daily"
# Step 2: pick architecture from SLA
arch = "streaming" if sla.is_sub_minute() else "batch"
# Step 3: name the idempotency primitive for every sink
ingest_sink = "partition overwrite + event_id dedup"
transform_sink = "MERGE INTO <table> ON (<natural_key>)"
serve_sink = "INSERT OVERWRITE PARTITION (<date>)"
# Step 4: name the backfill command
backfill = "airflow dags backfill --start-date X --end-date Y" # batch
or "reset consumer offset; replay log from offset N" # streaming
# Step 5: declare observability + SLO
observability = {
"logs": "structured JSON + dag_run_id correlation",
"metrics": "row counts, latency, freshness lag",
"traces": "OpenTelemetry spans per task",
"alerting": f"freshness SLO <= {sla.threshold} + PagerDuty + runbook",
}
return arch, ingest_sink, transform_sink, serve_sink, backfill, observability
Step-by-step trace.
| step | output |
|---|---|
parse_consumer_sla |
"08:00 daily" → batch SLA, threshold 1h |
arch decision |
"batch" (SLA is hourly, not sub-minute) |
ingest_sink |
partition overwrite + event_id dedup |
transform_sink |
MERGE INTO silver.orders_clean ON (order_id) |
serve_sink |
INSERT OVERWRITE PARTITION (region, date) |
backfill |
airflow dags backfill --start-date 2026-05-01 --end-date 2026-05-07 |
observability |
JSON logs + freshness SLO ≤ 1h + PagerDuty |
Output:
| field | value |
|---|---|
| architecture | batch |
| ingest sink | partition overwrite + event_id dedup |
| transform sink | MERGE INTO silver.orders_clean ON (order_id) |
| serve sink | INSERT OVERWRITE PARTITION (region, date) |
| backfill | Airflow --start-date / --end-date
|
| SLO | freshness ≤ 1 hour, paged via PagerDuty |
Why this works — concept by concept:
-
SLA-first architecture — choosing
batchvsstreamingfrom the consumer SLA, not from team preference, is the first senior-vs-junior split. -
Idempotent sinks at every stage — partition overwrite +
MERGE INTO+INSERT OVERWRITEmakes every retry and every backfill safe. -
Backfill is a flag, not a special pipeline — the same DAG with
--start-date / --end-datereplays history; no parallel "backfill DAG" to maintain. - Observability is a design constraint — the SLO is declared upfront, paired with structured logs + freshness metric + PagerDuty + runbook.
- Cost — design conversation is O(1) in reviewer time; running pipeline is O(rows × stages); backfill is O(window × stages) — all bounded and reasoned about before any code is written.
Design
Topic — design
Pipeline-design drills
Python
Topic — etl
ETL Python drills
2. Batch architectures deep-dive — Airflow DAG + dbt build + warehouse
batch pipeline architecture — the Airflow DAG anatomy every senior knows
batch pipeline architecture is the workhorse of modern data engineering. The canonical shape is an Airflow DAG of 5–10 tasks: a sensor waits for the source, a load task lands raw data in the lakehouse, a dbt build transforms it, a data-quality task validates the output, a publish task surfaces it to the consumer. The whole DAG is parameterised by {{ ds }} (the logical execution date) so any past day can be re-run with the same code.
Airflow DAG anatomy — the five canonical tasks.
-
sensortask —S3KeySensor,GCSObjectExistenceSensor,ExternalTaskSensor; blocks until the upstream source is ready. -
load_rawtask — copies the source into the bronze layer (/raw/<table>/dt={{ ds }}/); idempotent because each{{ ds }}writes its own partition. -
dbt runtask —BashOperatororDbtRunOperator; executesdbt run --select <model> --vars '{date: {{ ds }}}'to populate silver / gold models. -
dbt testtask —dbt test --select <model>to enforce uniqueness, not-null, referential, and custom data-quality assertions. -
publishtask — surfaces the curated table to the consumer (cache warm-up, BI refresh, downstreamTriggerDagRunOperator).
The {{ ds }} (logical date) — Airflow's idempotency primitive.
-
{{ ds }}— Airflow templates this to the logical execution date (YYYY-MM-DD); every task reads / writes only that day's partition. -
Re-run safety —
airflow tasks run <dag> <task> <execution_date>re-executes a single task with the same{{ ds }}; idempotent if your code respects the partition. -
Backfill —
airflow dags backfill <dag> --start-date X --end-date Ywalks a date range, scheduling one DAG run per day with the right{{ ds }}. -
Anti-pattern — never use
datetime.today()inside a task; that breaks idempotency for retries and backfills. Always template{{ ds }}or{{ data_interval_start }}.
dbt build — the modern transform layer.
-
dbt runcompiles SQL models and writes results to the warehouse (silver,goldschemas). -
dbt testruns YAML-declared tests (unique,not_null,relationships,accepted_values) and custom SQL tests. -
dbt buildrunsrun+testin a single dependency-aware DAG — fail-fast on the first broken model. -
dbt source freshness— checks that the upstream source loaded within an SLA; runs before the transforms. -
Incremental models —
materialized='incremental'withunique_key=lets dbt MERGE only new rows; the canonical idempotent transform shape.
Sensors, triggers, and the SLA monitor
Beyond the DAG itself, the production batch stack has three sidecar concerns: sensors (when does the DAG start?), triggers (what fans out downstream when it completes?), and the SLA monitor (did it finish on time?).
Sensors — block until the source is ready.
-
S3KeySensor/GCSObjectExistenceSensor— poll an object-store path until the expected file exists. -
ExternalTaskSensor— wait for a task in another DAG (cross-DAG dependency). -
HttpSensor— poll an API endpoint until it returns the expected status / payload. - Smart sensors / deferrable operators — modern Airflow (≥ 2.2) pushes the wait off the worker into the triggerer, freeing the slot.
-
Sensor anti-pattern —
mode='poke'withpoke_interval=10on hundreds of DAGs floods the scheduler; prefermode='reschedule'or deferrable.
Triggers + downstream fanout.
-
TriggerDagRunOperator— fan out from one DAG to another after completion (e.g.revenue_dailytriggersrevenue_marketing_exportandrevenue_finance_export). -
Datasettriggers (Airflow ≥ 2.4) — declarative "this DAG produces dataset X; that DAG consumes dataset X" — the scheduler wires the dependency. -
dbt model-level lineage —
dbt-airflowpackages auto-derive Airflow tasks from thedbt manifestso dependencies stay in lockstep.
SLA monitoring — the freshness contract.
-
Airflow
sla=— declarative per-task SLA; breach emits an SLA miss email / callback. -
Custom SLA monitor — a sidecar DAG queries
dag_runhistory and pages on missed runs (more reliable than Airflow's built-in SLA which has known race conditions). -
dbt
source freshness— checks the upstream file landed on time; pairs with the orchestrator SLA. - PagerDuty + runbook — every SLA miss has a paired runbook entry: diagnostic queries + safe remediation.
Idempotent batch patterns — partition overwrite, MERGE, upsert
Idempotency in batch boils down to three sink shapes: partition overwrite (atomic, simple), MERGE INTO (handles upserts), and INSERT … ON CONFLICT / upsert (PostgreSQL-style). Each fits a different stage of the pipeline.
Partition overwrite — the bronze and gold default.
-
Shape —
INSERT OVERWRITE TABLE t PARTITION (dt='{{ ds }}') SELECT … WHERE dt = '{{ ds }}'. - Why idempotent — re-running the task replaces the same partition; no duplicates, no leftover data.
- Use case — daily / hourly partitions of immutable raw data, and daily / hourly aggregates in the serve layer.
-
Engine support — Spark (
INSERT OVERWRITE), Hive (INSERT OVERWRITE PARTITION), BigQuery (WRITE_TRUNCATEon partition), Snowflake (OVERWRITE = TRUE).
MERGE INTO — the silver-layer upsert.
-
Shape —
MERGE INTO target USING staging ON target.key = staging.key WHEN MATCHED THEN UPDATE … WHEN NOT MATCHED THEN INSERT …. - Why idempotent — the merge key uniquely identifies the row; re-runs UPDATE existing rows in place.
- Use case — slowly-changing dimensions, mutable fact tables, late-arriving corrections.
- Engine support — Snowflake, BigQuery, Databricks Delta, Postgres 15+, Redshift, Synapse.
INSERT … ON CONFLICT — the OLTP upsert.
-
Shape (Postgres) —
INSERT INTO target (id, x, y) VALUES (…) ON CONFLICT (id) DO UPDATE SET x = EXCLUDED.x, y = EXCLUDED.y. -
Why idempotent —
ON CONFLICTclause runs the UPDATE when the unique key already exists. - Use case — operational tables, application state, small dimension upserts.
-
Watch out —
ON CONFLICTrequires a unique / primary-key constraint on the conflict columns.
Worked example — a daily revenue DAG with sensor + dbt build + SLA
Detailed explanation. A representative production batch pipeline. The DAG waits for the daily Kafka-dump file to land, copies it into bronze, runs the dbt transform graph (silver orders_clean + gold revenue_by_region), runs dbt test, then triggers the BI cache refresh. The whole DAG has an sla=timedelta(hours=2) and a freshness SLO of ≤ 1h after the 06:00 schedule.
Question. Write an Airflow DAG that ingests daily orders from S3, runs the dbt build graph, validates with dbt test, and triggers a downstream cache-refresh DAG — with a 2-hour SLA per task and a daily 06:00 schedule.
Input (DAG inputs and SLAs).
| item | value |
|---|---|
| source | s3://lake/raw/orders/dt={{ ds }}/orders.parquet |
| schedule |
0 6 * * * (daily 06:00 UTC) |
| dbt models |
silver.orders_clean, gold.revenue_by_region
|
| per-task SLA | 2 hours |
| pipeline SLO | freshness ≤ 1h after 06:00 |
| paging | PagerDuty de-on-call rotation |
Code.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
"owner": "data-eng",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"sla": timedelta(hours=2),
}
with DAG(
dag_id="orders_daily",
schedule="0 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
default_args=default_args,
tags=["batch", "revenue"],
) as dag:
wait = S3KeySensor(
task_id="wait_for_orders_file",
bucket_key="raw/orders/dt={{ ds }}/orders.parquet",
bucket_name="lake",
mode="reschedule",
poke_interval=60,
timeout=60 * 60,
)
load_raw = BashOperator(
task_id="load_raw",
bash_command=(
"spark-submit jobs/load_raw.py "
"--src s3://lake/raw/orders/dt={{ ds }}/ "
"--dst lakehouse.bronze.orders --date {{ ds }}"
),
)
dbt_build = BashOperator(
task_id="dbt_build",
bash_command=(
"cd /repo/dbt && "
"dbt build --select +gold.revenue_by_region "
"--vars '{date: {{ ds }}}'"
),
)
refresh_bi = TriggerDagRunOperator(
task_id="refresh_bi_cache",
trigger_dag_id="bi_cache_refresh",
conf={"date": "{{ ds }}"},
)
wait >> load_raw >> dbt_build >> refresh_bi
Step-by-step explanation.
-
S3KeySensor(mode='reschedule') blocks the DAG until the source file lands; the slot is freed between pokes so other DAGs run. -
load_rawSpark job copies the source intolakehouse.bronze.orderspartitioned by{{ ds }}— partition overwrite is idempotent. -
dbt buildruns +gold.revenue_by_region which expands tosilver.orders_clean(incremental MERGE) →gold.revenue_by_region(INSERT OVERWRITE PARTITION) plus their tests. -
refresh_bi_cachetrigger fans out to the BI DAG withconf={"date": "{{ ds }}"}so the downstream uses the same logical date. -
sla=timedelta(hours=2)is declared per task; breach emits an SLA-miss callback that pages on-call.
Sample output (the DAG run timeline).
06:00:00 wait_for_orders_file RUNNING (rescheduled until file lands)
06:08:14 wait_for_orders_file SUCCESS (object present)
06:08:15 load_raw RUNNING
06:12:42 load_raw SUCCESS (rows=12,418,503)
06:12:43 dbt_build RUNNING
06:34:07 dbt_build SUCCESS (12 models built, 27 tests passed)
06:34:08 refresh_bi_cache RUNNING
06:34:42 refresh_bi_cache SUCCESS
06:34:42 dag_run SUCCESS (duration=34m42s; SLO <= 1h MET)
Rule of thumb: a production batch DAG is a sensor + a load + a dbt build + a downstream trigger, parameterised by {{ ds }}, with declared per-task SLAs and an SLO ≤ the consumer's freshness requirement. Anything more elaborate is usually a smell.
Solution Using a partition-overwrite + dbt-incremental MERGE silver pattern
Code (silver model as an idempotent incremental MERGE).
-- models/silver/orders_clean.sql
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
partition_by={'field': 'order_date', 'data_type': 'date'},
on_schema_change='append_new_columns'
) }}
SELECT
order_id,
customer_id,
region,
amount,
status,
CAST(order_ts AS DATE) AS order_date,
CURRENT_TIMESTAMP() AS _loaded_at
FROM {{ source('lakehouse', 'bronze_orders') }}
WHERE order_date = DATE('{{ var("date") }}') -- only today's partition
{% if is_incremental() %}
AND order_id NOT IN (
SELECT order_id FROM {{ this }}
WHERE order_date = DATE('{{ var("date") }}')
)
{% endif %}
Step-by-step trace.
| input | output |
|---|---|
{{ ds }} = 2026-05-26 |
dbt receives var('date') = '2026-05-26'
|
WHERE order_date = '2026-05-26' |
scan limited to one partition (cheap) |
is_incremental() branch |
excludes IDs already present in target |
| materialization | MERGE INTO silver.orders_clean ON order_id |
re-run on same {{ ds }}
|
merge updates same rows in place; no duplicates |
backfill --start-date 2026-05-01 --end-date 2026-05-07
|
seven DAG runs, each MERGEs its own {{ ds }} partition |
Output:
| run | rows merged | duplicate rows in target |
|---|---|---|
| first run (2026-05-26) | 12,418,503 | 0 |
| retry of same run | 0 inserts, 12,418,503 matched | 0 |
| backfill 2026-05-01 | 11,902,118 | 0 |
Why this works — concept by concept:
-
MERGE INTOonunique_key— the merge clause UPDATEs existingorder_ids and INSERTs new ones; idempotent under retries and backfills. -
Partition pruning —
WHERE order_date = '{{ ds }}'limits the scan to one partition, keeping cost flat regardless of table size. -
is_incremental()guard — first run does a full INSERT; subsequent runs MERGE only the matching partition; same SQL covers both shapes. -
on_schema_change='append_new_columns'— tolerates schema drift; new source columns are appended to the target without manual ALTERs. -
Cost —
MERGEcost is O(partition_rows) not O(table_rows) thanks to partition pruning; the dbt incremental shape is the cheapest idempotent silver pattern.
SQL
Topic — etl
Batch ETL drills
Python
Topic — data-processing
Batch processing patterns
3. Streaming architectures deep-dive — Kafka + Flink Kappa with replay
streaming pipeline architecture — the Kafka topic + partition model
streaming pipeline architecture shifts the design centre of gravity from a daily DAG to a continuously running Flink / Spark Structured Streaming / Kafka Streams job that reads from a Kafka topic and writes to one or more sinks. The Kappa shape (one log + one streaming job) has displaced the Lambda shape (separate batch + speed layers) for most modern teams.
Kafka topic + partition fundamentals.
- Topic — a named, append-only, partitioned log of records.
- Partition — a single ordered sub-log; ordering is guaranteed within a partition, not across the topic.
- Partition count — the parallelism ceiling for any consumer group; pick partitions ≥ peak parallelism (e.g. 6, 12, 24, 48).
-
Partition key — the producer-supplied key that decides which partition a record lands in;
hash(key) % partitionsis the default partitioner. -
Offset — the monotonically increasing position of a record within a partition; the consumer's position is
(topic, partition, offset).
Producer semantics.
-
acks=0— fire and forget; lowest latency, no durability guarantee. -
acks=1— leader ack; durable as long as the leader doesn't fail before replication. -
acks=all— full ISR ack; durable even on leader failure; the production default. -
Idempotent producer —
enable.idempotence=true; prevents duplicates on producer retries (single-partition, single-session). -
Transactional producer —
transactional.id=…; exactly-once across multiple partitions / topics in a single transaction.
Consumer semantics.
-
At-least-once — the default;
commitafter processing → a crash before commit replays the record. -
At-most-once —
commitbefore processing → a crash loses the record (rare in DE). -
Exactly-once (system-level) — at-least-once delivery + idempotent sink (dedup on
event_id, MERGE, transactional write) → the canonical recipe. - Consumer group — a set of consumers sharing partitions; rebalances on join / leave; partition is the unit of assignment.
Flink job, watermarks, and late-data handling
Flink (and Spark Structured Streaming with very similar semantics) is the engine that reads Kafka, applies windowed aggregates with a watermark policy, and emits results to a sink. Every windowed streaming job has the same five components.
The five Flink job components.
-
Source —
FlinkKafkaConsumer/KafkaSourcereading a topic + consumer group. -
Event-time extractor —
assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(30)))declares how late events can be. -
Operator —
keyBy(region).window(TumblingEventTimeWindows.of(Time.minutes(5))).reduce(...). - Trigger — when to emit results; default is "watermark passes the window end"; custom triggers fire on early / late events.
- Sink — Kafka, JDBC, Delta, Iceberg, KV store; idempotent sinks are the exactly-once requirement.
Watermark — the event-time progress signal.
-
Definition — "the system assumes no more events with
event_time < watermarkwill arrive". -
Bounded-out-of-orderness —
WatermarkStrategy.forBoundedOutOfOrderness(30s)→ watermark =max_event_time_seen - 30s. - Watermark gap — too small drops late events; too large delays output.
- Per-partition watermarks — each Kafka partition emits its own watermark; the operator's effective watermark is the min across partitions.
-
Idle partitions —
withIdleness(Duration.ofMinutes(1))lets the watermark advance even when one partition is silent.
Window types + late-data policy.
- Tumbling window — fixed, non-overlapping (e.g. every 5 minutes).
- Sliding window — fixed-size, overlapping (e.g. 5-minute window sliding every 1 minute).
- Session window — gap-defined (e.g. close after 30s of silence per key).
-
Late events:
allowedLateness(Duration.ofMinutes(10))— keeps window state alive 10 minutes past the watermark for late merges. -
Side output —
OutputTag<LateEvent>lets you route truly late events to a side stream for a separate consumer.
Exactly-once via dedup + log-replay backfill
The senior signal in any streaming round is naming exactly-once as a system property, not a magic feature, and explaining log-replay backfill as the streaming equivalent of Airflow's --start-date / --end-date.
Exactly-once semantics — the canonical recipe.
- At-least-once delivery from Kafka (the default).
-
Idempotency key in every event (
event_idor(partition_key, sequence_number)). -
Dedup at the sink —
INSERT … ON CONFLICT DO NOTHING,MERGE INTOonevent_id, ordropDuplicates(["event_id"])in Structured Streaming. -
Transactional sink — Kafka Transactions, Delta Lake
WriteSerial, or two-phase commit for cross-system exactly-once. - The interview-canonical answer — exactly-once is (at-least-once delivery) + (idempotent sink); reach for that phrase before "exactly-once is a broker setting".
Log-replay backfill — the Kappa equivalent of --start-date.
-
Reset offsets —
kafka-consumer-groups --reset-offsets --to-datetime 2026-05-01T00:00:00 --topic events --group my-job --execute. -
startingOffsets='earliest'in Spark Structured Streaming with a newcheckpointLocationreprocesses the full log. - Replayability — depends on retention; Kafka's default 7-day retention rolls off old data, so production replay-backfill setups use compacted topics or long retention (30+ days).
- Sink behaviour — idempotent sinks make replay safe; non-idempotent sinks duplicate every record.
Worked example — 5-minute event counts with watermark + late-data + log replay
Detailed explanation. A typical senior streaming prompt: "given a Kafka events topic with event_time per record, emit 5-minute tumbling-window counts per region, tolerate 10-minute late data, and support log-replay backfill". The Spark Structured Streaming code below shows the full shape.
Question. Write a Spark Structured Streaming job that reads events from Kafka, applies a 30-second watermark + 10-minute allowed lateness, emits 5-minute tumbling counts per region to a Delta sink, and is replayable from offset earliest.
Input (sample Kafka events).
| event_id | region | event_time |
|---|---|---|
e001 |
US | 2026-05-26T08:00:01 |
e002 |
EU | 2026-05-26T08:00:03 |
e003 |
US | 2026-05-26T08:04:59 |
e004 |
US | 2026-05-26T08:05:02 |
e003 |
US | 2026-05-26T08:00:00 (duplicate, late) |
Code.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, window, count, expr,
)
from pyspark.sql.types import StructType, StringType, TimestampType
spark = SparkSession.builder.appName("events_5m_counts").getOrCreate()
schema = (
StructType()
.add("event_id", StringType())
.add("region", StringType())
.add("event_time", TimestampType())
)
events = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest") # replay-safe
.load()
.selectExpr("CAST(value AS STRING) AS json")
.select(from_json("json", schema).alias("e"))
.select("e.*")
.dropDuplicates(["event_id"]) # exactly-once at sink
)
counts_5m = (
events
.withWatermark("event_time", "30 seconds")
.groupBy(window("event_time", "5 minutes"), "region")
.agg(count("*").alias("n"))
)
query = (
counts_5m.writeStream
.outputMode("update")
.format("delta")
.option("checkpointLocation", "/chk/events_5m_v1")
.toTable("gold.events_5m_counts")
)
query.awaitTermination()
Step-by-step explanation.
-
readStream … format("kafka")subscribes to theeventstopic withstartingOffsets='earliest'so a fresh checkpoint replays the full log. -
from_json+ schema decodes the Kafka value into typed columns. -
dropDuplicates(["event_id"])dedupes by idempotency key — exactly-once at the sink. -
withWatermark("event_time", "30 seconds")declares "events arriving > 30s after their event_time are late". -
groupBy(window(…, "5 minutes"), "region").agg(count("*"))aggregates per 5-min tumbling window per region. -
outputMode("update")emits updates as windows accumulate, including late updates within the watermark gap. -
Delta sink +
checkpointLocationpersists progress; idempotent writes (Delta atomic commits) make retries safe.
Sample output (Delta gold.events_5m_counts).
| window_start | region | n |
|---|---|---|
| 2026-05-26 08:00 | US | 2 |
| 2026-05-26 08:00 | EU | 1 |
| 2026-05-26 08:05 | US | 1 |
Rule of thumb: every windowed streaming aggregate is (1) dropDuplicates on the idempotency key, (2) withWatermark for event-time progress, (3) groupBy(window(...), key).agg(...) for the aggregate, (4) idempotent sink (Delta, MERGE, INSERT ON CONFLICT). Skip any of the four and "exactly-once" becomes a lie.
Solution Using Kappa log-replay backfill via consumer-offset reset
Code (replay the 2026-05-26 day from Kafka after a bug fix).
# 1. Stop the streaming job (the consumer group 'events_5m' detaches).
# 2. Reset offsets to the start of 2026-05-26 (assume retention is 30 days).
kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--group events_5m \
--topic events \
--reset-offsets \
--to-datetime 2026-05-26T00:00:00.000 \
--execute
# 3. Drop the bad partition in the sink (idempotent re-write).
spark-sql -e "DELETE FROM gold.events_5m_counts \
WHERE window_start >= '2026-05-26 00:00:00' \
AND window_start < '2026-05-27 00:00:00'"
# 4. Restart the streaming job with the SAME checkpointLocation
# so it picks up from the freshly reset offsets.
spark-submit events_5m_counts.py
Step-by-step trace.
| step | effect |
|---|---|
| step 1 — stop job | consumer group has no active members |
step 2 — --reset-offsets --to-datetime
|
every partition's committed offset rewinds to 2026-05-26 00:00 |
step 3 — DELETE FROM gold.events_5m_counts WHERE …
|
bad rows removed; idempotent re-write will recreate them |
| step 4 — restart job | streaming resumes from rewound offsets; dropDuplicates + Delta sink make re-write idempotent |
| outcome | the same code reprocesses 2026-05-26 events with the fixed logic |
Output:
| metric | before backfill | after backfill |
|---|---|---|
2026-05-26 rows in gold.events_5m_counts
|
wrong counts (bug) | corrected counts |
event_5m_counts duplicates |
0 (deduped by event_id) |
0 |
| consumer-group offset (partition 0) | 12,402,118 | rewound → re-advances to 12,402,118 |
Why this works — concept by concept:
- Log retention as a backfill primitive — Kappa stores history in Kafka; replay-backfill is "rewind the consumer offset" rather than "run a separate batch job".
-
Idempotent sink + dedup key —
dropDuplicates(["event_id"])+ Delta atomic commits mean the replay produces the same final state. -
Surgical partition delete — clearing only
2026-05-26rows lets the rest of the table stay untouched while the day reprocesses. -
Same checkpoint, same job — restarting with the existing
checkpointLocationkeeps the streaming state machine; the offset rewind drives the replay. - Cost — log-replay backfill cost is O(events_in_window) — usually orders of magnitude smaller than a full-table reload in a Lambda architecture.
Python
Topic — streaming/python
Streaming Python drills
Python
Topic — real-time-analytics
Real-time analytics drills
4. Idempotency patterns — MERGE INTO, dedup keys, deterministic hash
idempotent pipeline — the universal contract
An idempotent pipeline is one where running the same code over the same input N times produces the same final state. Without idempotency, every Airflow retry, every Kafka at-least-once redelivery, every backfill silently corrupts the warehouse. The senior signal in a pipeline-design round is naming idempotency as a design constraint before the reviewer prompts for it.
Why idempotency matters — the three retry surfaces.
- Orchestrator retry — Airflow / Dagster / Prefect retries failed tasks; without idempotency, retries double-count.
- Broker redelivery — Kafka, Kinesis, Pub/Sub default to at-least-once; consumers see every record one-or-more times.
- Backfill replay — the same window is reprocessed deliberately; without idempotency, every backfill duplicates the affected rows.
The three implementation patterns this section covers.
-
MERGE INTO— the warehouse-native upsert on a natural key (covered in §4.2). -
Dedup key (
event_id) — produce + dedupe on a unique key per event (covered in §4.3). -
Deterministic hash partition —
SHA256(natural_key) % partitionsroutes the same row to the same partition every time (covered in §4.4).
Pattern 1 — MERGE INTO on a natural key
The default warehouse-native idempotency primitive. Every mid-2020s warehouse (Snowflake, BigQuery, Databricks Delta, Postgres 15+, Redshift) supports the same MERGE syntax with minor dialect variation.
Shape and semantics.
-
Syntax —
MERGE INTO target USING source ON target.key = source.key WHEN MATCHED THEN UPDATE SET … WHEN NOT MATCHED THEN INSERT (…) VALUES (…). -
Natural key — a stable business key (
order_id,(customer_id, order_date)) that uniquely identifies a target row. -
Atomicity — most engines run
MERGEas a single transaction; partial success doesn't half-merge. -
Variants —
WHEN MATCHED AND target.updated_at < source.updated_at THEN UPDATElets you skip stale updates.
When MERGE INTO is the right choice.
- Silver-layer normalisation — bronze rows are merged into a clean silver fact / dimension.
-
Slowly-changing dimensions (SCD Type 1 / Type 2) —
MERGEupdates current rows or expires old ones. -
Late-arriving corrections — the same
order_idarrives with a corrected amount;MERGEupdates the row in place. -
dbt incremental models —
materialized='incremental' + incremental_strategy='merge'generates theMERGEfor you.
Gotchas.
-
Non-deterministic source — if
sourcehas duplicate keys,MERGEfails or picks arbitrarily; deduplicate the source first. -
Cost —
MERGEon a huge target without partition pruning scans the whole table; always partition the target by the merge-natural-key's time dimension. -
Concurrency — concurrent
MERGEs on the same target can deadlock; serialise upstream.
Pattern 2 — Dedup key (event_id) for at-least-once streams
The streaming-native idempotency primitive. Every event produced into Kafka / Kinesis / Pub/Sub carries a unique event_id; the consumer dedupes on event_id before applying state changes.
Producer side.
-
Generate at source — UUID v4 (
uuid.uuid4()), or(producer_id, sequence_number)for deterministic generation. -
Persist before publish — write to a local outbox table, then publish to Kafka; outbox-pattern guarantees the same
event_idsurvives producer crashes. -
Idempotent producer —
enable.idempotence=truein Kafka prevents producer-side duplicates on retries.
Consumer side.
-
In-memory
seen_idsset — bounded by a TTL or a sliding window; works for short windows. -
dropDuplicates(["event_id"])in Structured Streaming — uses Spark's state store with a watermark to bound memory. -
INSERT … ON CONFLICT (event_id) DO NOTHING— atomically dedupe at the sink (Postgres, SnowflakeMERGE WHEN NOT MATCHED). -
External dedup store — Redis / DynamoDB with
SETNX; pays a network hop but supports cross-job dedup.
Watermark + dedup window — bounding memory.
-
Why — keeping every
event_idever seen blows up memory; bound the dedup window to, e.g., 7 days. -
Spark —
dropDuplicates(["event_id"])+withWatermark("event_time", "7 days")evicts state past the watermark. - Trade-off — events arriving > 7 days late may slip through as "new"; the watermark gap is the trust window.
Pattern 3 — Deterministic hash partition
The stateless-transform idempotency primitive. When a transform routes records to partitions (Kafka producer key, Spark repartition, shard selection), use a deterministic hash so the same input always lands in the same partition on retry.
Shape and semantics.
-
Hash function —
SHA256(natural_key) % partitions,MurmurHash3(natural_key) % partitions, orhash(natural_key)(Python's default is randomised per-process — avoid for cross-process determinism). - Why deterministic — retries route the same row to the same partition; downstream dedup is local and fast.
- Why hash, not modulo on the key directly — keys are not uniformly distributed; hashing spreads load.
Use cases.
-
Kafka producer key —
producer.send(topic, key=order_id.encode(), value=...); ensures all events for the same order land in the same partition (ordering guarantee per key). -
Sharded sink —
shard = SHA256(customer_id) % num_shardsroutes all of a customer's events to the same shard. -
Bucketed Delta / Iceberg tables —
CLUSTER BY (customer_id)orbucket(N, customer_id)is a deterministic-hash partition by another name.
Gotchas.
-
Hot keys — a single high-volume key (
region='US') over-allocates to one partition; consider compound keys (region:customer_id) or salting (region || rand_bucket(0,9)). - Re-partitioning — changing partition count breaks the hash mapping; plan capacity ahead.
Worked example — three idempotency patterns applied to the same orders pipeline
Detailed explanation. Real pipelines stack all three idempotency patterns: the producer emits event_id (pattern 2), the streaming ingest dedupes on event_id and routes to partitions with SHA256(order_id) (pattern 3), and the silver-layer transform MERGE INTOs on order_id (pattern 1). The combined effect is a pipeline where every retry, redelivery, and backfill is safe.
Question. Show the three idempotency primitives — event_id dedup at ingest, deterministic hash partitioning at routing, MERGE INTO at silver — applied to a single orders pipeline.
Input (a single order produced twice due to producer retry).
| event_id | order_id | customer_id | amount | event_time |
|---|---|---|---|---|
e-7a3f... |
O-1042 |
C-99 |
120.00 | 2026-05-26T08:00:01 |
e-7a3f... |
O-1042 |
C-99 |
120.00 | 2026-05-26T08:00:01 (retry) |
Code.
import hashlib
from pyspark.sql import SparkSession, functions as F
from delta.tables import DeltaTable
spark = SparkSession.builder.getOrCreate()
# Pattern 2: dedupe on event_id at ingest
raw = (
spark.readStream
.format("kafka").option("subscribe", "orders").load()
.selectExpr("CAST(value AS STRING) AS json")
.select(F.from_json("json", schema).alias("e")).select("e.*")
.withWatermark("event_time", "1 hour")
.dropDuplicates(["event_id"])
)
# Pattern 3: deterministic-hash partition for the bronze sink
def hash_bucket(key, n=64):
return int(hashlib.sha256(key.encode()).hexdigest(), 16) % n
hash_udf = F.udf(lambda oid: hash_bucket(oid, 64))
bronze = raw.withColumn("bucket", hash_udf(F.col("order_id")))
(bronze.writeStream
.format("delta")
.partitionBy("bucket")
.option("checkpointLocation", "/chk/orders_bronze")
.toTable("lakehouse.bronze.orders"))
# Pattern 1: MERGE INTO silver on natural key order_id (run in batch DAG)
def merge_to_silver(batch_df, batch_id):
silver = DeltaTable.forName(spark, "lakehouse.silver.orders_clean")
(silver.alias("t")
.merge(batch_df.alias("s"), "t.order_id = s.order_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
Step-by-step explanation.
-
dropDuplicates(["event_id"])with a 1-hour watermark eliminates the producer-retry duplicate at ingest. -
hash_udf(order_id)routes both copies of any single order (had they survived dedup) to the same bronze partition — deterministic. -
partitionBy("bucket")keeps the bronze data physically clustered for cheap downstream reads. -
merge_to_silveruses Delta'sMERGEonorder_id; re-running it for any past window is safe — the sameorder_idUPDATEs in place. - Stacked patterns — Pattern 2 + Pattern 3 + Pattern 1 together guarantee end-to-end exactly-once as a system property.
Sample output (the deduplicated path).
| stage | rows in | rows out | duplicates |
|---|---|---|---|
| Kafka source | 2 (one duplicate) | — | — |
dropDuplicates(event_id) |
2 | 1 | 1 dropped |
partitionBy(bucket) |
1 | 1 in bucket 47 | — |
MERGE INTO silver |
1 | 1 row updated | 0 net inserts on retry |
Rule of thumb: a production pipeline stacks all three idempotency patterns — dedup at ingest, deterministic-hash at routing, MERGE at silver. Each pattern protects a different retry surface; together they form the exactly-once recipe.
Solution Using a Delta-MERGE silver upsert with MERGE WHEN MATCHED AND guard
Code (Delta MERGE that respects _loaded_at so stale corrections don't overwrite fresh data).
MERGE INTO lakehouse.silver.orders_clean AS t
USING (
SELECT
order_id,
customer_id,
region,
amount,
status,
order_ts,
_loaded_at
FROM lakehouse.bronze.orders
WHERE _loaded_at > (SELECT COALESCE(MAX(_merged_at), '1970-01-01') FROM lakehouse.silver.orders_clean)
) AS s
ON t.order_id = s.order_id
WHEN MATCHED
AND s._loaded_at >= t._loaded_at
THEN UPDATE SET
t.customer_id = s.customer_id,
t.region = s.region,
t.amount = s.amount,
t.status = s.status,
t.order_ts = s.order_ts,
t._loaded_at = s._loaded_at,
t._merged_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (
order_id, customer_id, region, amount, status, order_ts, _loaded_at, _merged_at
) VALUES (
s.order_id, s.customer_id, s.region, s.amount, s.status, s.order_ts, s._loaded_at, CURRENT_TIMESTAMP()
);
Step-by-step trace.
| input row | matched? | guard | action |
|---|---|---|---|
(O-1042, _loaded_at=08:00:01) first time |
no | n/a | INSERT |
(O-1042, _loaded_at=08:00:01) retry |
yes |
08:00:01 >= 08:00:01 true |
UPDATE (same values, idempotent) |
(O-1042, _loaded_at=07:59:30) stale |
yes |
07:59:30 >= 08:00:01 false |
skip — keep fresh row |
(O-9999, _loaded_at=08:01:00) new |
no | n/a | INSERT |
Output:
| order_id | amount | _loaded_at | _merged_at |
|---|---|---|---|
O-1042 |
120.00 | 2026-05-26 08:00:01 | 2026-05-26 08:00:04 |
O-9999 |
220.00 | 2026-05-26 08:01:00 | 2026-05-26 08:01:02 |
Why this works — concept by concept:
-
Natural-key
ONclause —t.order_id = s.order_idmakes the merge uniquely target one target row per source row. -
_loaded_atguard —WHEN MATCHED AND s._loaded_at >= t._loaded_atblocks stale corrections from overwriting fresh data — critical when backfills race with current loads. -
_merged_atbookmark —(SELECT MAX(_merged_at) FROM target)makes the source CTE incremental; only new bronze rows enter the merge. -
Atomicity — Delta
MERGEis a single ACID commit; partial failures don't half-merge. -
Cost —
MERGEcost is O(bronze_new_rows + matched_silver_rows) with partition pruning; bounded and predictable.
SQL
Topic — etl
Idempotency drills (ETL)
Python
Topic — data-manipulation
Dedup + MERGE practice
5. Backfill strategies — full-table, partition-aware, log replay
backfill data pipeline — three strategies, one design constraint
backfill data pipeline is the most under-rehearsed senior-loop topic. Every interviewer asks "how would you reprocess last Tuesday after a bug fix?" — and the senior answer is one of three patterns, picked by the architecture and the failure mode.
The three backfill strategies this section covers.
- Full-table reload — drop and rebuild the target; correct but expensive.
- Partition-aware backfill — re-run only the affected partitions; the default for batch DAGs.
- Log replay — rewind the consumer offset and replay the source log; the default for streaming.
The design constraint underpinning all three.
- Same code path — backfill code must be identical to forward-fill code; any branch is a future bug.
- Idempotent sinks — covered in §4; without them, backfill duplicates rows.
- Bounded blast radius — only the affected partitions / offsets are rewritten; everything else stays untouched.
-
Observability — every backfill emits a logged audit event with
who / when / window / reason.
Strategy 1 — Full-table reload
The fallback. Drop the target, reread the source, rebuild from scratch. Right when the schema changed, when the bug affects all of history, or when partitioning isn't available.
Shape.
-
Truncate-and-reload —
TRUNCATE TABLE target; INSERT INTO target SELECT … FROM source;inside a single transaction. -
Atomic swap — write to
target_new, thenALTER TABLE target RENAME TO target_old; ALTER TABLE target_new RENAME TO target;(zero-downtime consumer reads). -
Snowflake / BigQuery —
CREATE OR REPLACE TABLE target AS SELECT …; atomic and cheap.
When to use.
- Schema change — adding a column that needs to be backfilled across all of history.
- Logic bug across all history — the whole table is wrong; partitioned backfill would touch every partition anyway.
- Small tables — under a few GB; rebuild is faster than figuring out the partition list.
Trade-offs.
- Cost — scans the entire source; bandwidth- and compute-expensive.
- Downtime — without atomic swap, consumers see an empty / partial table.
- Lineage — every downstream consumer must invalidate caches.
Strategy 2 — Partition-aware backfill (Airflow --start-date / --end-date)
The default for batch DAGs. Re-run only the affected partitions; Airflow's backfill command walks the date range and schedules one DAG run per logical date.
Shape.
-
airflow dags backfill <dag> --start-date 2026-05-01 --end-date 2026-05-07— schedules 7 DAG runs (one per day), each with the right{{ ds }}. -
Idempotent partition overwrite — each task writes only its own
{{ ds }}partition; replays overwrite identically. -
Concurrency —
max_active_runs=controls parallelism; balance throughput vs warehouse load. -
Reset states —
airflow tasks clear <dag> --start-date X --end-date Yclears state so paused runs resume from scratch.
Pre-requisites.
-
Partition-by-date in every layer — bronze, silver, gold all keyed by
{{ ds }}; not just one layer. -
No
datetime.today()in code — every reference to "today" must come from{{ ds }}/{{ data_interval_start }}. -
Idempotent sinks — covered in §4; partition overwrite,
MERGE,INSERT OVERWRITE PARTITION. - Resource isolation — backfills can hammer the warehouse; route to a dedicated warehouse / pool.
Use cases.
-
Bug fix for a known date range — "the
regionmapping was wrong from 2026-05-01 to 2026-05-07; rerun those days". -
Late-arriving source data — vendor re-sends 2026-05-03's file at 2026-05-05; backfill
2026-05-03. - New downstream dimension — a new dim_region table needs the past 30 days re-joined; backfill 30 days.
Strategy 3 — Log replay (Kafka offset reset)
The streaming-native backfill. The log itself is the source of truth; rewind the consumer offset and the same streaming job replays history.
Shape.
-
Reset offsets —
kafka-consumer-groups --reset-offsets --to-datetime 2026-05-01T00:00:00 --topic events --group my-job --execute. -
Drop affected sink rows —
DELETE FROM target WHERE window_start >= 'X' AND window_start < 'Y'. - Restart job — same job, same code, same checkpoint location; resumes from rewound offsets.
-
Compacted topics — for very long replays, configure
cleanup.policy=compactso only the latest value per key is retained.
Pre-requisites.
- Retention covers the replay window — Kafka's default 7 days is rarely enough; production replay setups use 30+ days or compacted topics.
-
Idempotent sink — dedup on
event_id, MERGE on natural key, or partition overwrite at the sink. - Checkpoint compatibility — same job version and code; major version upgrades may require a fresh checkpoint.
- Capacity headroom — replay competes with live traffic; scale parallelism temporarily or route to a separate consumer group.
Trade-offs.
- Replay vs live race — during replay, live events still arrive; the dedup window must cover both streams.
-
Out-of-order watermarks — replayed events have old
event_time; watermark policy must tolerate the gap. -
Cost — a single full-log replay can be expensive; bound the window with
--to-datetimeprecisely.
Worked example — three-day Airflow partition-aware backfill for a bug fix
Detailed explanation. The most common production backfill: a logic bug was deployed at 2026-05-04 09:00 and discovered at 2026-05-07 11:00. The fix is merged; now reprocess 2026-05-04, 2026-05-05, and 2026-05-06 with the corrected code. Partition-aware Airflow backfill is the right tool.
Question. Backfill the orders_daily DAG for 2026-05-04 → 2026-05-06 inclusive after a bug fix. Show the Airflow command, the expected DAG-run schedule, and the post-backfill row-count audit.
Input (the situation).
| field | value |
|---|---|
| DAG | orders_daily |
| affected dates | 2026-05-04, 2026-05-05, 2026-05-06 |
| bug |
region mapping returned null for LATAM
|
| target table |
gold.revenue_by_region partitioned by (region, date)
|
| consumer | Power BI; backfill must complete before 06:00 next day |
Code.
# 1. Clear the affected runs so Airflow re-creates them with the new code.
airflow tasks clear orders_daily \
--start-date 2026-05-04 --end-date 2026-05-06 \
--yes
# 2. Run the backfill (Airflow schedules 3 DAG runs, one per {{ ds }}).
airflow dags backfill orders_daily \
--start-date 2026-05-04 --end-date 2026-05-06 \
--reset-dagruns \
--rerun-failed-tasks
# 3. Post-backfill audit — row count and freshness check.
spark-sql -e "
SELECT order_date,
SUM(revenue) AS total_revenue,
COUNT(*) AS rows,
MAX(_merged_at) AS last_merged
FROM gold.revenue_by_region
WHERE order_date BETWEEN '2026-05-04' AND '2026-05-06'
GROUP BY order_date
ORDER BY order_date;
"
Step-by-step explanation.
-
airflow tasks clearremoves the existing task instances for the affected dates so Airflow re-creates them with the new code on--reset-dagruns. -
airflow dags backfill --start-date / --end-dateschedules 3 DAG runs, one per{{ ds }}. Each run executes the full DAG with the right logical date. -
max_active_runs=2(declared on the DAG) caps parallelism so the warehouse isn't overwhelmed. -
Each task is idempotent —
MERGE INTO silver,INSERT OVERWRITE PARTITION (region, date)in gold — so replays write the same final state. -
Post-backfill audit confirms row counts and shows the fresh
_merged_attimestamps; if any partition is missing, the audit query exposes it.
Sample output (post-backfill audit).
| order_date | total_revenue | rows | last_merged |
|---|---|---|---|
| 2026-05-04 | 1,287,402.55 | 8,432 | 2026-05-07 13:14:08 |
| 2026-05-05 | 1,401,118.20 | 8,891 | 2026-05-07 13:21:42 |
| 2026-05-06 | 1,356,907.71 | 8,704 | 2026-05-07 13:29:17 |
Rule of thumb: partition-aware backfill is "same DAG, same {{ ds }}, idempotent sinks, bounded date range". Anything more elaborate — separate "backfill DAG", custom Spark scripts, manual SQL — is a smell.
Solution Using parameterised partition overwrite + dbt incremental is_incremental() guard
Code (the gold model that handles forward-fill and backfill identically).
-- models/gold/revenue_by_region.sql
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={'field': 'order_date', 'data_type': 'date'},
unique_key=['region', 'order_date']
) }}
SELECT
region,
order_date,
SUM(amount) AS revenue,
COUNT(DISTINCT order_id) AS orders,
CURRENT_TIMESTAMP() AS _merged_at
FROM {{ ref('silver_orders_clean') }}
WHERE order_date = DATE('{{ var("date") }}') -- one partition per run
GROUP BY region, order_date
Step-by-step trace.
var("date") |
partition affected | action |
|---|---|---|
2026-05-04 (backfill) |
order_date='2026-05-04' |
INSERT OVERWRITE PARTITION (order_date='2026-05-04') |
2026-05-05 (backfill) |
order_date='2026-05-05' |
INSERT OVERWRITE PARTITION (order_date='2026-05-05') |
2026-05-06 (backfill) |
order_date='2026-05-06' |
INSERT OVERWRITE PARTITION (order_date='2026-05-06') |
2026-05-07 (forward-fill) |
order_date='2026-05-07' |
INSERT OVERWRITE PARTITION (order_date='2026-05-07') |
| forward-fill | order_date='2026-05-08' |
INSERT OVERWRITE PARTITION (order_date='2026-05-08') |
Output:
| partition | rows | total_revenue |
|---|---|---|
| 2026-05-04 | 8,432 | 1,287,402.55 |
| 2026-05-05 | 8,891 | 1,401,118.20 |
| 2026-05-06 | 8,704 | 1,356,907.71 |
| 2026-05-07 | 8,801 | 1,387,019.04 |
| 2026-05-08 | 8,612 | 1,378,442.91 |
Why this works — concept by concept:
-
One model, one code path — forward-fill and backfill use the exact same SQL; only
var("date")differs. -
INSERT OVERWRITE PARTITION— replays for the samevar("date")are idempotent at the partition level; no duplicates. -
unique_key=['region', 'order_date']— dbt enforces uniqueness for the partition's natural key; double-runs surface as test failures. -
Airflow
{{ ds }}→ dbtvar("date")— the same logical date flows through every layer; nodatetime.today()lurking anywhere. - Cost — backfill cost is O(rows_in_window); orders of magnitude cheaper than a full-table reload, and bounded by the explicit date range.
Python
Topic — etl
Backfill ETL drills
Python
Topic — design
Pipeline-design drills
6. Observability + SLOs — logs, metrics, traces, alerting
pipeline observability — the four-layer stack
pipeline observability is the senior signal that closes the design loop. Junior answers say "we have logs"; senior answers describe the four-layer stack — structured logs → metrics → traces → alerting + SLOs — and how each layer catches a different class of failure.
The four layers and what each catches.
- Layer 1 — Structured JSON logs — who did what with which inputs; catches incorrect logic, missing rows, validation failures.
- Layer 2 — Metrics — row counts, byte counts, latency, freshness; catches volumetric drift and SLA breaches.
- Layer 3 — Traces — per-task spans tied by a correlation ID; catches slow stages and cross-DAG latency.
- Layer 4 — Alerting + SLOs — PagerDuty + freshness / completeness SLOs with error budgets; catches user-facing failures before the user sees them.
Layer 1 — Structured JSON logging
The foundation. Every task emits one structured JSON log per significant event; the log line carries a correlation ID so all logs from one DAG run can be queried as a unit.
Required fields per log line.
-
timestamp— ISO 8601 with timezone. -
level—INFO,WARN,ERROR,CRITICAL. -
dag_id+task_id+dag_run_id— the correlation ID set; lets youWHERE dag_run_id = Xto assemble the full timeline. -
event— short slug ("task_started","row_count_written","merge_complete"). -
metrics— nested object withrows,bytes,duration_s, etc. -
error(when applicable) — exception type + message + stacktrace.
Example log line.
{
"timestamp": "2026-05-26T06:34:08.521Z",
"level": "INFO",
"dag_id": "orders_daily",
"task_id": "dbt_build",
"dag_run_id": "manual__2026-05-26T06:00:00+00:00",
"event": "task_complete",
"metrics": {"rows_written": 12418503, "duration_s": 1284.2, "models_built": 12, "tests_passed": 27}
}
Anti-patterns.
-
Unstructured
print— strings, no fields, ungreppable; never in production. -
PII in logs —
customer_email,card_number; redact before emit or use a separate restricted sink. - One log per row — fan-out kills the log sink; aggregate to per-batch / per-task.
Layer 2 — Metrics (row counts, latency, freshness)
Numerical time series scraped by Prometheus / Datadog / CloudWatch. The four metrics every pipeline emits.
The four canonical pipeline metrics.
-
pipeline_rows_written_total{dag, task}— counter; alerts on drop > 10% week-over-week. -
pipeline_task_duration_seconds{dag, task}— histogram; alerts on p95 breaching SLA. -
pipeline_freshness_lag_seconds{table}— gauge ofnow() - max(updated_at); alerts on lag > SLO. -
pipeline_task_status{dag, task, status}— counter of success / failure / retry; alerts on failure rate > error budget.
Implementation tips.
- Push gateway (Prometheus) or StatsD (Datadog) for batch jobs that don't run a long-lived HTTP server.
-
dbt
source freshness— emits freshness metrics natively; pair with the orchestrator. - Great Expectations / Soda — emit row-count + uniqueness + null-rate metrics from data-quality tests.
-
Tag every metric with
env,team,pipelinefor slicing dashboards by ownership.
Layer 3 — Tracing (OpenTelemetry spans)
Distributed tracing makes cross-stage / cross-DAG latency visible. The OpenTelemetry convention is one span per task, parent span per DAG run.
Tracing anatomy.
- Trace — a single end-to-end execution (one DAG run, one streaming micro-batch).
- Span — a unit of work within a trace (one task, one query, one Spark stage).
-
Span attributes —
dag_id,task_id,rows_read,rows_written,engine(Spark / Snowflake / BigQuery). -
Span events — point-in-time annotations (
"checkpoint_committed","watermark_advanced"). - Span links — cross-trace references (e.g. downstream DAG run links upstream DAG run).
Stack components.
- OpenTelemetry SDK — language-native; auto-instrumentation for Airflow, dbt, Spark in progress.
- Collector — receives spans (OTLP), exports to backends.
- Backend — Honeycomb, Tempo, Jaeger, Datadog APM.
- Sampling — head-based (sample N% of traces) or tail-based (keep all error traces, sample success traces).
Layer 4 — Alerting + SLOs (freshness, completeness, error budget)
The user-facing contract. An SLO is "the table is fresh within 1 hour of the schedule, 99.5% of days"; the error budget is the 0.5% you're allowed to burn before pausing change.
SLO anatomy.
-
Service— the pipeline / table the SLO covers (gold.revenue_by_region). -
SLI(indicator) — the measurable signal (freshness_lag_seconds,completeness_ratio,error_rate). -
SLO(objective) — the target (freshness < 3600s,completeness > 99.5%). -
Error budget — the allowed shortfall over a window (
1 - 0.995 = 0.5%of days). - Burn rate alert — "the error budget is being consumed faster than the window allows"; pages on-call early.
Alerting routing.
- PagerDuty — primary on-call rotation; pages on SLO breach + burn-rate alerts.
- Slack — non-paging notifications (warnings, FYI failures).
- Email digest — daily summary of yesterday's SLO status.
-
Runbook link — every alert carries a
runbook_urlfield pointing to diagnostic queries + remediation steps.
Worked example — design an SLO + alert for a 1-hour-freshness pipeline
Detailed explanation. The canonical staff-level prompt: "the gold.revenue_by_region table must be fresh within 1 hour of the 06:00 schedule, 99.5% of days, with PagerDuty paging if the SLO is at risk. Design the SLO, the SLI, the alert, and the runbook."
Question. Design a full SLO + alert + runbook for gold.revenue_by_region with freshness ≤ 1h after 06:00, completeness ≥ 99.5%, paged via PagerDuty.
Input (the SLO requirements).
| field | value |
|---|---|
| service | gold.revenue_by_region |
| SLI 1 (freshness) | max(_merged_at) >= today's_schedule + 1h |
| SLO 1 | freshness target met on 99.5% of days |
| SLI 2 (completeness) | count(distinct region) >= expected_region_count |
| SLO 2 | completeness target met on 99.5% of days |
| paging | PagerDuty de-on-call rotation |
| burn rate alert | error-budget burn > 14× normal in 1 hour |
Code (the Prometheus / Alertmanager rules + runbook reference).
# prometheus/rules/revenue_by_region_slo.yaml
groups:
- name: revenue_by_region_slo
interval: 60s
rules:
# SLI 1 — freshness gauge (seconds since last merge)
- record: pipeline_freshness_lag_seconds:revenue_by_region
expr: time() - max(pipeline_last_merged_seconds{table="gold.revenue_by_region"})
# SLO 1 — page if freshness > 1h after the 06:00 schedule
- alert: RevenueByRegionFreshnessSLO
expr: pipeline_freshness_lag_seconds:revenue_by_region > 3600
for: 5m
labels:
severity: page
team: data-eng
annotations:
summary: "gold.revenue_by_region freshness SLO breach"
description: "Lag is {{ $value | humanizeDuration }} (>1h)."
runbook_url: "https://runbooks.example.com/data-eng/revenue-by-region-freshness"
slo: "freshness <= 1h, target 99.5%"
# SLI 2 — completeness (regions present today)
- record: pipeline_completeness_ratio:revenue_by_region
expr: |
count(count by (region) (
pipeline_revenue_by_region_today{table="gold.revenue_by_region"}
))
/
count(count by (region) (
pipeline_expected_regions{table="gold.revenue_by_region"}
))
# SLO 2 — page if completeness < 99.5%
- alert: RevenueByRegionCompletenessSLO
expr: pipeline_completeness_ratio:revenue_by_region < 0.995
for: 10m
labels:
severity: page
team: data-eng
annotations:
summary: "gold.revenue_by_region completeness SLO breach"
description: "Only {{ $value | humanizePercentage }} of regions present."
runbook_url: "https://runbooks.example.com/data-eng/revenue-by-region-completeness"
# Burn-rate alert — error budget burning 14x normal in last hour
- alert: RevenueByRegionErrorBudgetBurn
expr: |
(
increase(pipeline_slo_violations_total{table="gold.revenue_by_region"}[1h])
/
(1 - 0.995)
) > 14
for: 2m
labels:
severity: page
team: data-eng
annotations:
summary: "revenue_by_region error budget burning fast"
runbook_url: "https://runbooks.example.com/data-eng/revenue-by-region-burn"
Step-by-step explanation.
-
SLI 1 (freshness) — gauge of
now() - last_merge_time; trips on> 3600s. - SLO 1 — alert fires after the lag exceeds the threshold for 5 consecutive minutes (debounces flaps).
-
SLI 2 (completeness) — ratio of
regions_seen / regions_expected; trips below 99.5%. - SLO 2 — alert fires after 10 consecutive minutes below the threshold (gives the DAG time to retry).
- Burn-rate alert — fires when the error budget is being burned 14× faster than the SLO window allows; gives on-call a 1-hour head start before the SLO is technically violated.
-
Runbook links — every alert carries a
runbook_urlannotation; PagerDuty surfaces it as a clickable link to the diagnostic queries + remediation steps.
Sample output (PagerDuty incident on a freshness breach).
[PD] RevenueByRegionFreshnessSLO
severity=page team=data-eng
summary: gold.revenue_by_region freshness SLO breach
description: Lag is 1h 12m 4s (>1h).
slo: freshness <= 1h, target 99.5%
runbook: https://runbooks.example.com/data-eng/revenue-by-region-freshness
firing_for: 5m12s
Rule of thumb: every SLO has an SLI (measurable), an SLO (target), an error budget, a burn-rate alert, a paging rule, and a runbook link. Skip any of those six and the alert becomes noise rather than signal.
Solution Using a freshness-SLI gauge + burn-rate-driven PagerDuty rule
Code (the freshness-emit task that produces the SLI).
from datetime import datetime, timezone
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
def emit_freshness_metric(table_name: str, last_merged_at: datetime):
registry = CollectorRegistry()
g = Gauge(
"pipeline_last_merged_seconds",
"Unix-seconds of last successful merge per table",
["table"],
registry=registry,
)
g.labels(table=table_name).set(last_merged_at.timestamp())
push_to_gateway(
"pushgateway:9091",
job=f"freshness:{table_name}",
registry=registry,
)
# Called at the end of refresh_bi_cache for gold.revenue_by_region
last = spark.sql(
"SELECT MAX(_merged_at) AS t FROM gold.revenue_by_region"
).first()["t"]
emit_freshness_metric("gold.revenue_by_region", last)
Step-by-step trace.
| event | metric | alert |
|---|---|---|
06:34:42 — DAG completes; _merged_at = 06:34:42
|
pipeline_last_merged_seconds = 1748,234,082 |
none |
| 07:34:42 — lag = 1h exactly | pipeline_freshness_lag_seconds = 3600 |
for: 5m not yet tripped |
| 07:39:42 — lag = 1h 5m | pipeline_freshness_lag_seconds = 3900 |
PagerDuty page fires |
| burn rate evaluated | > 14× normal |
second page (early warning) |
| on-call runs runbook diagnostics | freshness metric resets after fix | alert auto-resolves |
Output:
| time | freshness lag | SLO state | paged? |
|---|---|---|---|
| 06:34:42 | 0s | met | no |
| 07:00:00 | 25m 18s | met | no |
| 07:34:42 | 1h 0m | met (threshold) | no |
| 07:39:42 | 1h 5m | breach | yes |
| 08:12:14 | 0s (fix deployed) | recovered | auto-resolved |
Why this works — concept by concept:
- SLI is a gauge, not a count — gauges expose "current state" instead of "events since"; freshness is naturally a gauge.
-
for: 5mdebouncing — prevents flapping when the metric momentarily exceeds the threshold during normal DAG completion. - Burn-rate alert as early warning — fires before the SLO is technically violated, giving on-call a 1-hour head start.
- Runbook URL on every alert — the page is useless without a paired runbook; the URL is part of the SLO contract.
- Cost — alert evaluation is O(rules × interval) in Prometheus; freshness emit is O(1) per DAG run; SLO machinery has near-zero runtime overhead.
Python
Topic — design
SLO + design drills
Python
Topic — log-processing
Log-processing drills
7. Failure modes + production playbook
data pipeline failure modes — the eight failures every senior loop tests
data pipeline failure modes is the staff-level closing topic. Every senior pipeline-design round eventually asks "what could go wrong?"; the candidate who can name eight common failure modes and a paired runbook for each is the candidate who gets hired. The eight failures below cover almost every real production incident.
The eight failure modes.
- F1 — Schema drift — source adds / removes / retypes a column; downstream parse breaks.
- F2 — Source unavailable — upstream API / file drop fails; DAG sensor times out.
- F3 — Out-of-memory (OOM) — Spark / Flink job exceeds executor memory and dies mid-stage.
- F4 — Runaway scan — a query without partition pruning scans the whole table; cost explodes.
- F5 — Late data — streaming events arrive after the watermark; window aggregates miss them.
-
F6 — Partition misalignment — source partition (
event_date) and sink partition (load_date) drift; rows land in wrong day. - F7 — Retry storm — failing task retries thundering-herd a downstream service; cascades to outage.
- F8 — Downstream backpressure — sink can't keep up with source; queues fill, latency explodes.
F1 — Schema drift; F2 — Source unavailable
The two most common ingest-layer failures.
F1 — Schema drift.
- Symptom — parse error in bronze load; missing column in silver model; nulls where data was expected.
-
Detection — Schema Registry compatibility check fails, or dbt
not_nulltest fails on a new column. -
Prevention — Schema Registry with
BACKWARDorFULLcompatibility; tolerant readers (spark.read.option("mergeSchema", "true"));on_schema_change='append_new_columns'in dbt incremental models. - Remediation — promote schema change through dev → staging → prod; backfill the affected window if the new column should have history.
-
Runbook — "diagnose:
dbt source freshness; if schema change detected, rundbt run --full-refresh --select <model>after updating the model; backfill if needed viaairflow dags backfill --start-date X --end-date Y".
F2 — Source unavailable.
-
Symptom —
S3KeySensortimes out;HttpSensorreturns 5xx; vendor SFTP refuses connections. -
Detection — sensor task
up_for_rescheduleexceeds timeout → task failure. -
Prevention — sensors with reasonable
timeout=; deferrable sensors to avoid worker exhaustion; alerting on consecutive sensor failures (not single-run failures). - Remediation — page vendor on-call; manually trigger the DAG once the source recovers; backfill missed windows.
-
Runbook — "diagnose: check vendor status page + recent sensor history; if vendor is down, suspend DAG via Airflow CLI; on recovery,
airflow dags trigger+--start-date / --end-datefor missed windows".
F3 — OOM; F4 — Runaway scan
The two most common compute-layer failures.
F3 — Out-of-memory (OOM).
-
Symptom — Spark executor killed with
OutOfMemoryError; Flink job restarts in a loop. -
Detection — Spark UI shows failed stages with
Container killed by YARN for exceeding memory limits; Flink metrics showtaskmanager.memory.heap.usednear 100%. -
Prevention — right-size executor memory (
spark.executor.memory); reduce partition count for high-cardinality joins; broadcast small dims withbroadcast(small_df); spill to disk withspark.sql.shuffle.partitions=200+. -
Remediation — increase executor memory; switch to
df.repartition(N)to balance partitions; convert wide transformations to narrow when possible. - Runbook — "diagnose: Spark UI → failed stages → stage detail → executor memory; if a single partition is huge, repartition by a higher-cardinality key; if a broadcast join is too big, drop the broadcast hint".
F4 — Runaway scan.
- Symptom — query that normally runs in 2 minutes takes 2 hours; warehouse bill spikes.
-
Detection — Snowflake query history shows
bytes_scanned > 100GBfor a query that should scan one partition; BigQuery showsBillingTier: 5+. -
Prevention — every query has a
WHEREon the partition column; CI test (dbt test) that asserts partition pruning; query budget guardrails in CI. -
Remediation —
SET QUERY_TIMEOUT = 60on the warehouse session; cancel the runaway query; add the missingWHEREclause; rerun. -
Runbook — "diagnose: warehouse query history; if
bytes_scanned > expected, find the query; checkWHEREclause; rerun with partition filter".
F5 — Late data; F6 — Partition misalignment
The two most common time-correctness failures.
F5 — Late data.
-
Symptom — yesterday's 5-minute counts are wrong; events arrive hours after their
event_time. -
Detection —
pipeline_late_event_count_totalmetric > threshold; downstream user reports "yesterday's number changed". -
Prevention —
withWatermark("event_time", "1 hour")or higher;allowedLateness(1 hour)on windows; side-output for events past watermark. - Remediation — widen the watermark; reprocess affected windows via log replay (§5.3); document the trust window (e.g. "numbers are stable after 4 hours").
- Runbook — "diagnose: late-event metric + watermark lag; if widespread, reprocess the affected window via offset reset + idempotent sink".
F6 — Partition misalignment.
-
Symptom — events arriving on day N land in day N+1 partition; queries by
event_datemiss rows. -
Detection —
dbt testfor partition counts shows shortfall; analytics team reports row count discrepancy. -
Prevention — partition by event_date (extracted from
event_time), not load_date; document the difference explicitly; midnight-rollover handling in streaming jobs. -
Remediation — backfill the misaligned dates; correct the partition logic; backfill via
--start-date / --end-date. - Runbook — "diagnose: `SELECT event_date, load_date, count() FROM bronze GROUP BY 1,2`; if mismatched, fix partitioning logic + backfill"*.
F7 — Retry storm; F8 — Downstream backpressure
The two most common cascading failures.
F7 — Retry storm.
- Symptom — a failing task retries N times every 5 minutes, hammering a downstream API; downstream rate-limits everyone.
- Detection — downstream service reports 429 / 503 spike; metrics show retry count > normal.
-
Prevention — exponential backoff (
base_delay * (2 ** attempt)) + jitter (+ random.uniform(0, 1)); cap retries (max_retries=5); circuit-breaker pattern. -
Remediation — pause the offending DAG; reduce
retrieson the failing task; coordinate with downstream owners. - Runbook — "diagnose: downstream 429 / 503 rate vs our retry rate; if cause is us, pause DAG + reduce retries + add jitter".
F8 — Downstream backpressure.
- Symptom — Kafka consumer lag grows; Flink checkpoint times out; sink writes hang.
-
Detection —
kafka_consumer_lag_totalgauge climbing monotonically; Flink job manager showscheckpoint_alignment_timerising. - Prevention — right-size sink throughput; partition the sink for parallelism; circuit-break when consumer lag exceeds threshold.
- Remediation — temporarily scale up consumers / sinks; throttle producers; drop side-output to a "DLQ" topic for later replay.
- Runbook — "diagnose: consumer lag + sink write latency; if sustained, scale consumers; if write latency, scale sink; if neither, throttle producers".
Worked example — full runbook for an F1 schema-drift incident
Detailed explanation. A representative on-call scenario. The vendor adds a currency column to the daily orders.parquet file. The bronze load succeeds (Parquet schema-merge is tolerant), but the dbt silver.orders_clean model fails on a not_null test for the new column. On-call wakes up at 06:42.
Question. Walk through the on-call runbook for an F1 schema-drift incident — diagnose, decide, remediate, document.
Input (the page).
[PD] dbt_build task failed in orders_daily
dag_id=orders_daily task_id=dbt_build dag_run_id=manual__2026-05-26
error: "FAIL not_null_silver_orders_clean_currency" — 12,418,503 nulls
runbook: https://runbooks.example.com/data-eng/schema-drift
Code (the on-call runbook steps).
# 1. Diagnose — find what changed.
spark-sql -e "
SELECT * FROM lakehouse.bronze.orders
WHERE dt = '2026-05-26' LIMIT 5;
"
# -> output shows a new 'currency' column that wasn't there yesterday.
# 2. Confirm with Schema Registry.
schema-registry-cli show --subject orders-value --version latest
# -> v3 = adds 'currency' (string)
# 3. Decide — is this a backward-compatible change? Yes (new optional column).
# Update the silver model + relax the not_null test to allow nulls for now.
# 4. Patch silver_orders_clean.sql + schema.yml.
git checkout -b fix/orders-currency-column
# - add `currency` to the SELECT list in silver/orders_clean.sql
# - relax `not_null` -> `dbt_utils.accepted_values` (allow null until backfill complete)
# - PR + review + merge
# 5. Re-run today's DAG with the fix.
airflow tasks clear orders_daily \
--task-regex 'dbt_(build|test)' \
--start-date 2026-05-26 --end-date 2026-05-26 \
--yes
airflow dags trigger orders_daily --conf '{"date": "2026-05-26"}'
# 6. Document — append to the runbook + post in #data-eng.
echo "2026-05-26 06:55 — vendor added currency column; silver_orders_clean patched; SLO MET at 07:12" \
>> runbooks/data-eng/schema-drift-incidents.md
Step-by-step explanation.
- Diagnose — query bronze; spot the new column.
- Confirm — Schema Registry shows v3 with the new field.
- Decide — backward-compatible? Yes (additive). No backfill needed yet.
-
Patch — update model + test; ship through normal PR flow (no
--force-merge). - Re-run — clear and trigger only today's tasks; don't backfill all of history.
- Document — append the incident to the runbook log for future on-call learnings.
Sample output (the post-incident timeline).
06:42:01 PD page — RevenueByRegionFreshnessSLO firing
06:42:14 on-call ack
06:48:30 diagnosis complete (new currency column)
06:54:17 PR merged
07:01:42 DAG re-run triggered
07:12:08 DAG complete; freshness SLO met
07:14:00 runbook updated
07:30:00 retro logged: "request vendor to email schema changes 48h ahead"
Rule of thumb: every production incident becomes a runbook entry, and every runbook entry has the same five steps — diagnose, confirm, decide, patch, document. Every page should resolve to less future paging.
Solution Using a versioned silver model + Schema Registry compatibility check
Code (CI gate that catches schema drift before it pages anyone).
# ci/check_schema_compat.py
import sys
from schema_registry_client import SchemaRegistryClient
client = SchemaRegistryClient(url="https://schema-registry.example.com")
SUBJECT = "orders-value"
def check_compat(new_schema_path: str) -> int:
with open(new_schema_path) as f:
new_schema = f.read()
compat = client.test_compatibility(SUBJECT, new_schema)
if not compat:
print(f"FAIL: {SUBJECT} schema is NOT backward-compatible.")
return 1
print(f"OK: {SUBJECT} schema is backward-compatible with v{client.get_latest_version(SUBJECT).version}.")
return 0
if __name__ == "__main__":
sys.exit(check_compat(sys.argv[1]))
Step-by-step trace.
| step | result |
|---|---|
| PR opened with schema change | CI runs check_schema_compat.py
|
| CI checks compatibility against latest registered version |
compat=True if additive |
If compat=False
|
PR blocked; producer updates required first |
If compat=True
|
PR merges; schema registered as new version |
| Producer ships the new field | consumers tolerate via schema-merge |
| Silver model patched in same PR | downstream tests pass |
Output:
| change | CI result | outcome |
|---|---|---|
Add currency (optional string) |
BACKWARD compat OK |
merges; no on-call page |
Drop region
|
BACKWARD compat FAIL |
PR blocked |
Change amount: double → string
|
BACKWARD compat FAIL |
PR blocked |
Add nested address: struct<...> (optional) |
BACKWARD compat OK |
merges |
Why this works — concept by concept:
- Schema Registry as the source of truth — producer-consumer contract is enforced at PR time, not at runtime.
- BACKWARD compatibility — new schema can read old data; old consumers can read new data (with new field as null).
- CI as the failure-prevention layer — Layer 0 of observability; the incident never happens because the PR is blocked.
-
Paired with tolerant readers — silver models use
on_schema_change='append_new_columns'so they auto-absorb additive changes. - Cost — registry check is O(1) per PR; the alternative (on-call page) is O(hours of toil) — the ROI on schema compatibility checks is 100×+.
Python
Topic — exception-handling
Exception-handling drills
Python
Topic — defensive-coding
Defensive-coding drills
Choosing the right pipeline pattern (cheat sheet)
A one-screen cheat sheet for data pipeline design — pick the pattern that matches your prompt.
| Reviewer asks … | Pattern | Notes |
|---|---|---|
| "Batch or streaming?" | Pick by consumer SLA, not by team preference | Hour+ → batch; sub-minute → streaming |
| "Lambda or Kappa?" | Default to Kappa for new pipelines | Lambda only if you need a regulated batch-of-record |
| "How do you make this idempotent?" |
MERGE INTO on natural key |
Most warehouse-native answer |
| "What if Kafka redelivers an event?" | Dedup on event_id
|
dropDuplicates + watermark to bound state |
| "How do you partition the sink for retries?" | Deterministic hash | SHA256(natural_key) % N |
| "How do you backfill yesterday after a bug?" | airflow dags backfill --start-date X --end-date X |
Same code, same {{ ds }}
|
| "How do you backfill in a streaming job?" | Reset consumer offsets + replay log | Requires retention covering the window |
| "How do you reprocess the entire history?" | Full-table reload via CREATE OR REPLACE
|
Last resort; small tables only |
| "What's your observability stack?" | 4 layers — logs / metrics / traces / SLOs + alerting | Name the layer for each failure class |
| "What's an SLO?" | SLI + objective + error budget + burn-rate alert | Plus a runbook URL |
| "What if the schema changes?" | Schema Registry + tolerant readers + dbt on_schema_change
|
CI catches incompatible changes |
| "What if the source is down?" | Sensor timeout + alerting + manual trigger on recovery | Don't auto-retry forever |
| "What if a Spark job OOMs?" | Right-size memory, broadcast small dims, repartition by high-cardinality key | Inspect Spark UI first |
| "What if a query scans too much?" | Partition pruning + CI assertion on bytes_scanned
|
Query-budget guardrails |
| "What if events arrive late?" |
withWatermark + allowedLateness + side-output |
Trust window is the watermark |
| "What if partitions misalign?" | Partition by event_date, not load_date
|
Backfill if discovered after the fact |
| "What if retries storm a downstream?" | Exponential backoff + jitter + capped retries | Pause DAG if cause is upstream |
| "What if the sink can't keep up?" | Scale consumers, partition the sink, DLQ on overflow | Backpressure is a capacity problem |
Frequently asked questions
How do you choose between batch and streaming in a data pipeline design interview?
The senior answer in one sentence: batch is the default — pick streaming only when the consumer SLA is sub-minute, the source is genuinely an event log, and the team has the operational budget for stateful stream jobs; otherwise, batch + tight scheduling is cheaper, simpler, and easier to reason about. Start from the consumer SLA, not from team preference or the cool tool of the week. Hour+ SLA, file-drop source, heavy joins to slowly-changing dimensions, or cost-sensitive workloads all point at batch. Sub-minute SLA, event-driven source (Kafka / Pub/Sub / Kinesis), continuous feature stores, and right-sized state all point at streaming. Modern teams that need both have largely collapsed to Kappa (one streaming log + one streaming job, replayable from offset) to avoid maintaining the two codebases that Lambda forces. Interviewers love when you name the trade-off explicitly: "I'll pick Kappa because the SLA is sub-minute and the source is Kafka; the cost is operational complexity, which I'll mitigate with managed Flink / Spark Structured Streaming."
What's the difference between idempotency and exactly-once semantics?
Idempotency is a property of a transform: running the same code over the same input N times produces the same final state. Exactly-once is a delivery / processing guarantee: each event affects the sink exactly once. In modern pipelines, exactly-once is delivered as a system-level property — at-least-once delivery from the broker (Kafka, Pub/Sub) plus idempotent sinks (MERGE INTO, INSERT … ON CONFLICT, deterministic event_id dedup) — rather than as a magic checkbox on the broker. The interview-canonical recipe: event_id per event + dedup at the sink (dropDuplicates(["event_id"]), MERGE WHEN MATCHED, INSERT ON CONFLICT DO NOTHING) + idempotent storage (Delta atomic commits, transactional Kafka writes, partition-overwrite gold tables). If you reach for "exactly-once is a broker setting" you'll lose the round; if you reach for the recipe, you'll pass the bar.
How do you backfill a streaming pipeline like Kafka + Flink?
Three steps. Step 1 — stop the streaming job so the consumer group has no active members. Step 2 — reset offsets with kafka-consumer-groups --reset-offsets --to-datetime 2026-05-01T00:00:00 --topic events --group my-job --execute (or --to-earliest, --to-offset N). Step 3 — delete the affected sink rows with DELETE FROM target WHERE window_start >= 'X' AND window_start < 'Y' (or drop the partition), then restart the streaming job with the same checkpoint location. The replay reprocesses the rewound offsets through the same code; idempotent sinks (Delta MERGE, INSERT ON CONFLICT, partition overwrite) make the rewrite safe. Pre-requisites: Kafka retention covers the replay window (default 7 days is rarely enough — use 30+ days or compacted topics for serious backfill capability); the streaming job tolerates the old event_time watermark gap; the sink dedupe / overwrite guard is in place. The senior signal in the room is naming log replay as the streaming equivalent of Airflow's --start-date / --end-date — both are "same code, bounded window, idempotent sinks".
What's a sensible freshness SLO for a daily batch pipeline?
For a daily batch pipeline running at 06:00 with a consumer dashboard refreshing at 09:00, a sensible SLO is freshness ≤ 1 hour after the scheduled run, on 99.5% of days, with PagerDuty paging on breach and a 14× burn-rate early warning. The SLI is a gauge of now() - max(_merged_at); the objective is < 3600s; the error budget is 0.5% of days over a 30-day rolling window. Pair the freshness SLO with a completeness SLO (count(distinct region) >= expected_region_count, target 99.5%) so a partial run also pages. Every SLO has six required parts: an SLI (measurable), an SLO (target), an error budget (allowed shortfall), a burn-rate alert (early warning), a paging rule (PagerDuty), and a runbook URL (diagnostic + remediation steps). Skip any of those six and the alert becomes noise rather than signal — and on-call eventually stops responding.
What are the most common production data pipeline failure modes?
The eight failures every senior loop tests are: F1 — schema drift (vendor adds / removes a column; tolerant readers + Schema Registry catch this); F2 — source unavailable (sensor timeout; deferrable sensors + alerting); F3 — out-of-memory (Spark / Flink OOM; right-size memory + broadcast small dims + repartition); F4 — runaway scan (query without partition pruning; CI assertion + query-budget guardrails); F5 — late data (events past watermark; withWatermark + allowedLateness + side-output); F6 — partition misalignment (event_date vs load_date drift; partition by event date, not load date); F7 — retry storm (failing task hammers downstream; exponential backoff + jitter + capped retries); and F8 — downstream backpressure (sink can't keep up; scale consumers / sink, throttle producers, DLQ on overflow). Every failure has a paired runbook — diagnose, confirm, decide, patch, document. The candidate who can name all eight plus their runbooks is the candidate who gets hired as senior or staff.
How do you make a dbt incremental model idempotent and backfill-friendly?
Three rules. Rule 1 — materialized='incremental' + incremental_strategy='merge' + unique_key=['natural_key'] — dbt generates a MERGE on the natural key so retries and backfills don't duplicate. Rule 2 — partition the target by the time dimension (partition_by={'field': 'order_date', 'data_type': 'date'}) so each {{ var("date") }} run touches only one partition; cost is O(rows_in_window), not O(table_rows). Rule 3 — gate the model's WHERE on a templated date variable (WHERE order_date = DATE('{{ var("date") }}')) so forward-fill and backfill use the same SQL; only the variable changes. Combined with Airflow's airflow dags backfill --start-date X --end-date Y (which iterates {{ ds }} over the range and passes it as var("date")), the same code path covers both forward-fill and backfill — no separate "backfill DAG", no parallel logic, no drift. Add on_schema_change='append_new_columns' for schema-drift tolerance and you have a fully idempotent + backfill-friendly silver / gold model.
Practice on PipeCode
PipeCode ships 450+ data-engineering interview problems — including pipeline-design rehearsal sets keyed to ETL, data-processing, streaming, real-time analytics, design, defensive coding, exception handling, and the production-safety patterns every senior loop tests. Whether you're drilling data pipeline design end-to-end or sharpening the four-pillar architecture · idempotency · backfills · observability map, the practice library mirrors the seven-section mental model this guide teaches.
Kick off via Explore practice →; drill the Python practice lane →; fan out into the ETL drills →; sharpen streaming Python drills →; rehearse real-time analytics drills →; reinforce pipeline-design drills →; widen coverage on the full data-processing library →.





Top comments (0)