DEV Community

Cover image for Data Pipeline Design: Batch vs Streaming, Idempotency, Backfills
Gowtham Potureddi
Gowtham Potureddi

Posted on

Data Pipeline Design: Batch vs Streaming, Idempotency, Backfills

data pipeline design is the single highest-leverage system-design competency a mid-to-staff data engineer is hired on: batch architectures (Airflow DAG + dbt build + warehouse), streaming architectures (Kafka + Flink Kappa with log replay), idempotency patterns (MERGE INTO, dedup keys, deterministic hash partitions), backfill strategies (full-table, partition-aware, log replay), observability + SLOs (structured JSON logs, metrics, OpenTelemetry traces, freshness SLOs), and the production failure modes (schema drift, source unavailable, OOM, runaway scan, late data, partition misalignment, retry storm, downstream backpressure) every senior loop drills against. Together those seven concerns form the pipeline design interview map that every senior data engineer interview questions round circles back to.

This guide is the 7-section deep-dive counterpart to a shorter design-guide article: each section is structured as ### Title sub-topics that walk a single concept, then a #### Worked example block in the Question → Input → Code → Step-by-step → Output order, then a ### Solution Using … block with the four-part Solution Tail (code → step-by-step trace → output → why this works). The seven sections cover why pipeline design separates juniors from seniors, batch architectures deep-dive, streaming architectures deep-dive, idempotency patterns, backfill strategies, observability + SLOs, and a failure-mode + production playbook — the exact shape data engineering interview questions loops reward when the whiteboard prompt is "design me a pipeline that …".

PipeCode blog header for a complete data pipeline design guide — bold white headline 'Data Pipeline Design · Complete Guide' with subtitle 'Batch · Streaming · Idempotency · Backfills · Observability · Failure modes' and a stylised seven-layer pipeline ribbon on a dark gradient with blue, purple, green, and orange accents and a small pipecode.ai attribution.

When you want hands-on reps alongside the read, browse ETL Python drills →, drill data-processing patterns →, sharpen streaming Python drills →, rehearse real-time analytics drills →, reinforce pipeline-design drills →, or widen coverage on the full Python practice library →.


On this page


1. Why data pipeline design separates juniors from seniors

The senior-loop signal — name the design loop, not the tool stack

The one-sentence invariant: data pipeline design is the discipline of moving data from source to consumer such that every stage is idempotent, every window is backfillable, every failure is observable, and the architecture (batch vs streaming) is chosen by the consumer's SLA — not by the team's tool preference. Junior answers reach for tool names ("I'd use Airflow, dbt, Snowflake"); senior answers reach for the design loop — source → ingest → transform → serve, with idempotency, backfill, and observability orthogonal to all four stages.

The four pillars of senior pipeline design.

  • Architecturebatch vs streaming, Lambda vs Kappa; the decision is driven by consumer SLA, not by hype.
  • Idempotency — every transform must be safe to re-run; MERGE INTO, idempotency keys, deterministic hash partitions are the three implementation patterns.
  • Backfills — a known window is re-processed with the same code; partition-aware Airflow is the default, full-table reload is the fallback, log replay is the streaming equivalent.
  • Observabilitystructured JSON logs with correlation IDs, metrics (row counts, latency, freshness), OpenTelemetry traces per task, SLOs with PagerDuty + a written runbook.

What interviewers actually listen for.

  • Do you start from the consumer SLA when choosing batch vs streaming? — basic-but-tested.
  • Do you mention MERGE INTO or an event_id idempotency key the first time the reviewer says "what if Airflow retries this task?" — fluency signal.
  • Can you describe a partition-aware backfill in Airflow with --start-date and --end-date? — senior signal.
  • Do you call out observability + SLOs as a first-class design concern, not a post-hoc addition? — interview-canonical answer.
  • Do you cite at least one failure mode (schema drift, late data, retry storm) before the reviewer asks? — staff-level signal.

The 7-section map this guide walks.

  • §2 — Batch architectures — Airflow DAG + dbt build + warehouse; sensors, SLA monitor, idempotent partition overwrites.
  • §3 — Streaming architectures — Kafka topic + partition model, Flink job + windowing + watermark + late-data, Kappa replay.
  • §4 — Idempotency patternsMERGE INTO upsert, event_id dedup, deterministic SHA256 hash partition.
  • §5 — Backfill strategies — full-table reload, partition-aware --start-date / --end-date, log replay from a Kafka offset.
  • §6 — Observability + SLOs — 4-layer stack (logs → metrics → traces → alerting/SLOs) with a freshness-SLO worked example.
  • §7 — Failure modes — schema drift, source unavailable, OOM, runaway scan, late data, partition misalignment, retry storm, downstream backpressure.
  • Cheat sheet + FAQ + CTA — choose-the-pattern table, 5 senior-loop FAQs, practice routes.

The non-negotiables that show up in every senior answer.

  • Idempotent sinksMERGE INTO on a natural key, partition overwrite, or upsert-with-version; never blind INSERT INTO target SELECT … without a WHERE window.
  • Backfill-first design — every task is parameterised by {{ ds }} (Airflow logical date) so a single re-run with --start-date / --end-date corrects history.
  • Observability scaffolding — structured logs with a dag_run_id correlation ID, row-count and freshness metrics, freshness SLO with a PagerDuty target.
  • Schema tolerance — Schema Registry + tolerant readers; MERGE clauses that drop unknown columns; alerts on schema drift.
  • A documented runbook — every alert has a paired runbook entry naming the diagnostic queries and the safe remediation steps.

Worked example — answering "design a 500M-events/day pipeline" in three minutes

Detailed explanation. Most pipeline-design rounds open with a single fat prompt: "design a pipeline that lands 500M events/day from Kafka into a warehouse, surfaces revenue_by_region to Power BI by 8 AM, survives retries, and supports backfilling any past day after a bug fix." The senior answer is a 4-line architecture sketch that names every pillar — source → ingest → transform → serve, with idempotency / backfill / observability bolted on the side.

Question. Sketch the canonical four-pillar answer for the 500M-events/day prompt. Name the idempotency primitive, the backfill command, and the SLO.

Input (the prompt's constraints).

constraint value
source Kafka topic orders, at-least-once, event_id per record
volume ~500M events/day (~5,800 events/sec)
consumer Power BI dashboard refreshing daily by 08:00 local
backfill must re-process any past day after a bug fix
failure tolerance every task must be safe to retry

Code (the four-line architecture answer).

Source     : Kafka 'orders'  (at-least-once, event_id)
Ingest     : Spark Structured Streaming -> bronze Delta /raw/orders/dt=YYYY-MM-DD/
             - dedupe on event_id  (idempotency key)
             - partition by ingest_date
Transform  : Airflow DAG (06:00 daily, {{ ds }} = YYYY-MM-DD)
             - read /raw/orders/dt={{ ds }}/
             - MERGE INTO silver.orders_clean ON (order_id)
             - aggregate -> gold.revenue_by_region partitioned by region,date
Serve      : Power BI Direct Lake reads gold.revenue_by_region
Backfill   : airflow dags backfill orders_daily --start-date 2026-05-01 --end-date 2026-05-07
Observe    : structured JSON logs + freshness SLO (<= 1h after 06:00) + PagerDuty
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Kafka delivers at-least-once with event_id — the idempotency key the ingest layer dedupes on.
  2. Spark Structured Streaming writes to a bronze Delta path partitioned by ingest_date — partition overwrites are idempotent.
  3. Airflow DAG runs at 06:00 with {{ ds }} = 2026-05-26; reads only /raw/orders/dt=2026-05-26/.
  4. MERGE INTO silver.orders_clean ON (order_id) — re-running the task overwrites the same target rows; no duplicates.
  5. Gold aggregation is INSERT OVERWRITE per (region, date) partition — safe to re-run.
  6. Backfill uses airflow dags backfill --start-date / --end-date; every task is parameterised by {{ ds }} so the same code re-runs.
  7. Observability — every task emits a JSON log with dag_run_id + task_id + row count; freshness SLO breach pages the on-call.

Sample output (the senior signal panel listens for).

Pillar           Choice                                       Why
---------------- -------------------------------------------- ------------------------------
Architecture     Batch (daily 06:00) + streaming ingest only  SLA is 08:00; batch is cheaper
Idempotency      event_id dedup at bronze; MERGE at silver    Retries + backfills both safe
Backfill         Airflow --start-date / --end-date            Same code, same {{ ds }}
Observability    JSON logs + freshness SLO + PagerDuty        SLO is the design constraint
Enter fullscreen mode Exit fullscreen mode

Rule of thumb: every senior pipeline answer is a 4-line sketch (source → ingest → transform → serve) with idempotency + backfill + observability called out as constraints, not after the architecture is drawn. Lead with the SLA, name the idempotency primitive, name the backfill command — and the architecture answer practically writes itself.

Solution Using the canonical four-pillar pipeline-design template

Code (the reusable senior-loop template).

def design_pipeline(prompt):
    # Step 1: read the consumer SLA from the prompt
    sla = parse_consumer_sla(prompt)          # e.g. "08:00 daily"

    # Step 2: pick architecture from SLA
    arch = "streaming" if sla.is_sub_minute() else "batch"

    # Step 3: name the idempotency primitive for every sink
    ingest_sink   = "partition overwrite + event_id dedup"
    transform_sink = "MERGE INTO <table> ON (<natural_key>)"
    serve_sink    = "INSERT OVERWRITE PARTITION (<date>)"

    # Step 4: name the backfill command
    backfill = "airflow dags backfill --start-date X --end-date Y"  # batch
              or "reset consumer offset; replay log from offset N"   # streaming

    # Step 5: declare observability + SLO
    observability = {
        "logs":   "structured JSON + dag_run_id correlation",
        "metrics": "row counts, latency, freshness lag",
        "traces":  "OpenTelemetry spans per task",
        "alerting": f"freshness SLO <= {sla.threshold} + PagerDuty + runbook",
    }
    return arch, ingest_sink, transform_sink, serve_sink, backfill, observability
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

step output
parse_consumer_sla "08:00 daily" → batch SLA, threshold 1h
arch decision "batch" (SLA is hourly, not sub-minute)
ingest_sink partition overwrite + event_id dedup
transform_sink MERGE INTO silver.orders_clean ON (order_id)
serve_sink INSERT OVERWRITE PARTITION (region, date)
backfill airflow dags backfill --start-date 2026-05-01 --end-date 2026-05-07
observability JSON logs + freshness SLO ≤ 1h + PagerDuty

Output:

field value
architecture batch
ingest sink partition overwrite + event_id dedup
transform sink MERGE INTO silver.orders_clean ON (order_id)
serve sink INSERT OVERWRITE PARTITION (region, date)
backfill Airflow --start-date / --end-date
SLO freshness ≤ 1 hour, paged via PagerDuty

Why this works — concept by concept:

  • SLA-first architecture — choosing batch vs streaming from the consumer SLA, not from team preference, is the first senior-vs-junior split.
  • Idempotent sinks at every stage — partition overwrite + MERGE INTO + INSERT OVERWRITE makes every retry and every backfill safe.
  • Backfill is a flag, not a special pipeline — the same DAG with --start-date / --end-date replays history; no parallel "backfill DAG" to maintain.
  • Observability is a design constraint — the SLO is declared upfront, paired with structured logs + freshness metric + PagerDuty + runbook.
  • Cost — design conversation is O(1) in reviewer time; running pipeline is O(rows × stages); backfill is O(window × stages) — all bounded and reasoned about before any code is written.

Design
Topic — design
Pipeline-design drills

Practice →

Python
Topic — etl
ETL Python drills

Practice →


2. Batch architectures deep-dive — Airflow DAG + dbt build + warehouse

Visual diagram of a batch pipeline architecture — sources on the left flow into an Airflow DAG with five tasks (sensor → load → transform → quality → publish), tasks call out to a dbt build step, the result lands in a Snowflake/BigQuery warehouse on the right; an orchestrator metadata DB and SLA monitor float beside the DAG; on a light PipeCode card.

batch pipeline architecture — the Airflow DAG anatomy every senior knows

batch pipeline architecture is the workhorse of modern data engineering. The canonical shape is an Airflow DAG of 5–10 tasks: a sensor waits for the source, a load task lands raw data in the lakehouse, a dbt build transforms it, a data-quality task validates the output, a publish task surfaces it to the consumer. The whole DAG is parameterised by {{ ds }} (the logical execution date) so any past day can be re-run with the same code.

Airflow DAG anatomy — the five canonical tasks.

  • sensor taskS3KeySensor, GCSObjectExistenceSensor, ExternalTaskSensor; blocks until the upstream source is ready.
  • load_raw task — copies the source into the bronze layer (/raw/<table>/dt={{ ds }}/); idempotent because each {{ ds }} writes its own partition.
  • dbt run taskBashOperator or DbtRunOperator; executes dbt run --select <model> --vars '{date: {{ ds }}}' to populate silver / gold models.
  • dbt test taskdbt test --select <model> to enforce uniqueness, not-null, referential, and custom data-quality assertions.
  • publish task — surfaces the curated table to the consumer (cache warm-up, BI refresh, downstream TriggerDagRunOperator).

The {{ ds }} (logical date) — Airflow's idempotency primitive.

  • {{ ds }} — Airflow templates this to the logical execution date (YYYY-MM-DD); every task reads / writes only that day's partition.
  • Re-run safetyairflow tasks run <dag> <task> <execution_date> re-executes a single task with the same {{ ds }}; idempotent if your code respects the partition.
  • Backfillairflow dags backfill <dag> --start-date X --end-date Y walks a date range, scheduling one DAG run per day with the right {{ ds }}.
  • Anti-pattern — never use datetime.today() inside a task; that breaks idempotency for retries and backfills. Always template {{ ds }} or {{ data_interval_start }}.

dbt build — the modern transform layer.

  • dbt run compiles SQL models and writes results to the warehouse (silver, gold schemas).
  • dbt test runs YAML-declared tests (unique, not_null, relationships, accepted_values) and custom SQL tests.
  • dbt build runs run + test in a single dependency-aware DAG — fail-fast on the first broken model.
  • dbt source freshness — checks that the upstream source loaded within an SLA; runs before the transforms.
  • Incremental modelsmaterialized='incremental' with unique_key= lets dbt MERGE only new rows; the canonical idempotent transform shape.

Sensors, triggers, and the SLA monitor

Beyond the DAG itself, the production batch stack has three sidecar concerns: sensors (when does the DAG start?), triggers (what fans out downstream when it completes?), and the SLA monitor (did it finish on time?).

Sensors — block until the source is ready.

  • S3KeySensor / GCSObjectExistenceSensor — poll an object-store path until the expected file exists.
  • ExternalTaskSensor — wait for a task in another DAG (cross-DAG dependency).
  • HttpSensor — poll an API endpoint until it returns the expected status / payload.
  • Smart sensors / deferrable operators — modern Airflow (≥ 2.2) pushes the wait off the worker into the triggerer, freeing the slot.
  • Sensor anti-patternmode='poke' with poke_interval=10 on hundreds of DAGs floods the scheduler; prefer mode='reschedule' or deferrable.

Triggers + downstream fanout.

  • TriggerDagRunOperator — fan out from one DAG to another after completion (e.g. revenue_daily triggers revenue_marketing_export and revenue_finance_export).
  • Dataset triggers (Airflow ≥ 2.4) — declarative "this DAG produces dataset X; that DAG consumes dataset X" — the scheduler wires the dependency.
  • dbt model-level lineagedbt-airflow packages auto-derive Airflow tasks from the dbt manifest so dependencies stay in lockstep.

SLA monitoring — the freshness contract.

  • Airflow sla= — declarative per-task SLA; breach emits an SLA miss email / callback.
  • Custom SLA monitor — a sidecar DAG queries dag_run history and pages on missed runs (more reliable than Airflow's built-in SLA which has known race conditions).
  • dbt source freshness — checks the upstream file landed on time; pairs with the orchestrator SLA.
  • PagerDuty + runbook — every SLA miss has a paired runbook entry: diagnostic queries + safe remediation.

Idempotent batch patterns — partition overwrite, MERGE, upsert

Idempotency in batch boils down to three sink shapes: partition overwrite (atomic, simple), MERGE INTO (handles upserts), and INSERT … ON CONFLICT / upsert (PostgreSQL-style). Each fits a different stage of the pipeline.

Partition overwrite — the bronze and gold default.

  • ShapeINSERT OVERWRITE TABLE t PARTITION (dt='{{ ds }}') SELECT … WHERE dt = '{{ ds }}'.
  • Why idempotent — re-running the task replaces the same partition; no duplicates, no leftover data.
  • Use case — daily / hourly partitions of immutable raw data, and daily / hourly aggregates in the serve layer.
  • Engine support — Spark (INSERT OVERWRITE), Hive (INSERT OVERWRITE PARTITION), BigQuery (WRITE_TRUNCATE on partition), Snowflake (OVERWRITE = TRUE).

MERGE INTO — the silver-layer upsert.

  • ShapeMERGE INTO target USING staging ON target.key = staging.key WHEN MATCHED THEN UPDATE … WHEN NOT MATCHED THEN INSERT ….
  • Why idempotent — the merge key uniquely identifies the row; re-runs UPDATE existing rows in place.
  • Use case — slowly-changing dimensions, mutable fact tables, late-arriving corrections.
  • Engine support — Snowflake, BigQuery, Databricks Delta, Postgres 15+, Redshift, Synapse.

INSERT … ON CONFLICT — the OLTP upsert.

  • Shape (Postgres)INSERT INTO target (id, x, y) VALUES (…) ON CONFLICT (id) DO UPDATE SET x = EXCLUDED.x, y = EXCLUDED.y.
  • Why idempotentON CONFLICT clause runs the UPDATE when the unique key already exists.
  • Use case — operational tables, application state, small dimension upserts.
  • Watch outON CONFLICT requires a unique / primary-key constraint on the conflict columns.

Worked example — a daily revenue DAG with sensor + dbt build + SLA

Detailed explanation. A representative production batch pipeline. The DAG waits for the daily Kafka-dump file to land, copies it into bronze, runs the dbt transform graph (silver orders_clean + gold revenue_by_region), runs dbt test, then triggers the BI cache refresh. The whole DAG has an sla=timedelta(hours=2) and a freshness SLO of ≤ 1h after the 06:00 schedule.

Question. Write an Airflow DAG that ingests daily orders from S3, runs the dbt build graph, validates with dbt test, and triggers a downstream cache-refresh DAG — with a 2-hour SLA per task and a daily 06:00 schedule.

Input (DAG inputs and SLAs).

item value
source s3://lake/raw/orders/dt={{ ds }}/orders.parquet
schedule 0 6 * * * (daily 06:00 UTC)
dbt models silver.orders_clean, gold.revenue_by_region
per-task SLA 2 hours
pipeline SLO freshness ≤ 1h after 06:00
paging PagerDuty de-on-call rotation

Code.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

default_args = {
    "owner": "data-eng",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "sla": timedelta(hours=2),
}

with DAG(
    dag_id="orders_daily",
    schedule="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args=default_args,
    tags=["batch", "revenue"],
) as dag:

    wait = S3KeySensor(
        task_id="wait_for_orders_file",
        bucket_key="raw/orders/dt={{ ds }}/orders.parquet",
        bucket_name="lake",
        mode="reschedule",
        poke_interval=60,
        timeout=60 * 60,
    )

    load_raw = BashOperator(
        task_id="load_raw",
        bash_command=(
            "spark-submit jobs/load_raw.py "
            "--src s3://lake/raw/orders/dt={{ ds }}/ "
            "--dst lakehouse.bronze.orders --date {{ ds }}"
        ),
    )

    dbt_build = BashOperator(
        task_id="dbt_build",
        bash_command=(
            "cd /repo/dbt && "
            "dbt build --select +gold.revenue_by_region "
            "--vars '{date: {{ ds }}}'"
        ),
    )

    refresh_bi = TriggerDagRunOperator(
        task_id="refresh_bi_cache",
        trigger_dag_id="bi_cache_refresh",
        conf={"date": "{{ ds }}"},
    )

    wait >> load_raw >> dbt_build >> refresh_bi
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. S3KeySensor (mode='reschedule') blocks the DAG until the source file lands; the slot is freed between pokes so other DAGs run.
  2. load_raw Spark job copies the source into lakehouse.bronze.orders partitioned by {{ ds }} — partition overwrite is idempotent.
  3. dbt build runs +gold.revenue_by_region which expands to silver.orders_clean (incremental MERGE) → gold.revenue_by_region (INSERT OVERWRITE PARTITION) plus their tests.
  4. refresh_bi_cache trigger fans out to the BI DAG with conf={"date": "{{ ds }}"} so the downstream uses the same logical date.
  5. sla=timedelta(hours=2) is declared per task; breach emits an SLA-miss callback that pages on-call.

Sample output (the DAG run timeline).

06:00:00  wait_for_orders_file  RUNNING   (rescheduled until file lands)
06:08:14  wait_for_orders_file  SUCCESS   (object present)
06:08:15  load_raw              RUNNING
06:12:42  load_raw              SUCCESS   (rows=12,418,503)
06:12:43  dbt_build             RUNNING
06:34:07  dbt_build             SUCCESS   (12 models built, 27 tests passed)
06:34:08  refresh_bi_cache      RUNNING
06:34:42  refresh_bi_cache      SUCCESS
06:34:42  dag_run               SUCCESS   (duration=34m42s; SLO <= 1h MET)
Enter fullscreen mode Exit fullscreen mode

Rule of thumb: a production batch DAG is a sensor + a load + a dbt build + a downstream trigger, parameterised by {{ ds }}, with declared per-task SLAs and an SLO ≤ the consumer's freshness requirement. Anything more elaborate is usually a smell.

Solution Using a partition-overwrite + dbt-incremental MERGE silver pattern

Code (silver model as an idempotent incremental MERGE).

-- models/silver/orders_clean.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge',
    partition_by={'field': 'order_date', 'data_type': 'date'},
    on_schema_change='append_new_columns'
) }}

SELECT
    order_id,
    customer_id,
    region,
    amount,
    status,
    CAST(order_ts AS DATE) AS order_date,
    CURRENT_TIMESTAMP() AS _loaded_at
FROM {{ source('lakehouse', 'bronze_orders') }}
WHERE order_date = DATE('{{ var("date") }}')   -- only today's partition
  {% if is_incremental() %}
    AND order_id NOT IN (
        SELECT order_id FROM {{ this }}
        WHERE order_date = DATE('{{ var("date") }}')
    )
  {% endif %}
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

input output
{{ ds }} = 2026-05-26 dbt receives var('date') = '2026-05-26'
WHERE order_date = '2026-05-26' scan limited to one partition (cheap)
is_incremental() branch excludes IDs already present in target
materialization MERGE INTO silver.orders_clean ON order_id
re-run on same {{ ds }} merge updates same rows in place; no duplicates
backfill --start-date 2026-05-01 --end-date 2026-05-07 seven DAG runs, each MERGEs its own {{ ds }} partition

Output:

run rows merged duplicate rows in target
first run (2026-05-26) 12,418,503 0
retry of same run 0 inserts, 12,418,503 matched 0
backfill 2026-05-01 11,902,118 0

Why this works — concept by concept:

  • MERGE INTO on unique_key — the merge clause UPDATEs existing order_ids and INSERTs new ones; idempotent under retries and backfills.
  • Partition pruningWHERE order_date = '{{ ds }}' limits the scan to one partition, keeping cost flat regardless of table size.
  • is_incremental() guard — first run does a full INSERT; subsequent runs MERGE only the matching partition; same SQL covers both shapes.
  • on_schema_change='append_new_columns' — tolerates schema drift; new source columns are appended to the target without manual ALTERs.
  • CostMERGE cost is O(partition_rows) not O(table_rows) thanks to partition pruning; the dbt incremental shape is the cheapest idempotent silver pattern.

SQL
Topic — etl
Batch ETL drills

Practice →

Python
Topic — data-processing
Batch processing patterns

Practice →


3. Streaming architectures deep-dive — Kafka + Flink Kappa with replay

Visual diagram of a Kappa-style streaming pipeline — producers on the left publish into a Kafka topic with 6 partitions, a Flink streaming job in the middle applies windowed aggregation with watermark + late-data handling, output sinks on the right are a stateful KV store and a BigQuery sink; a tiny replay arrow shows log-replay backfill; on a light PipeCode card.

streaming pipeline architecture — the Kafka topic + partition model

streaming pipeline architecture shifts the design centre of gravity from a daily DAG to a continuously running Flink / Spark Structured Streaming / Kafka Streams job that reads from a Kafka topic and writes to one or more sinks. The Kappa shape (one log + one streaming job) has displaced the Lambda shape (separate batch + speed layers) for most modern teams.

Kafka topic + partition fundamentals.

  • Topic — a named, append-only, partitioned log of records.
  • Partition — a single ordered sub-log; ordering is guaranteed within a partition, not across the topic.
  • Partition count — the parallelism ceiling for any consumer group; pick partitions ≥ peak parallelism (e.g. 6, 12, 24, 48).
  • Partition key — the producer-supplied key that decides which partition a record lands in; hash(key) % partitions is the default partitioner.
  • Offset — the monotonically increasing position of a record within a partition; the consumer's position is (topic, partition, offset).

Producer semantics.

  • acks=0 — fire and forget; lowest latency, no durability guarantee.
  • acks=1 — leader ack; durable as long as the leader doesn't fail before replication.
  • acks=all — full ISR ack; durable even on leader failure; the production default.
  • Idempotent producerenable.idempotence=true; prevents duplicates on producer retries (single-partition, single-session).
  • Transactional producertransactional.id=…; exactly-once across multiple partitions / topics in a single transaction.

Consumer semantics.

  • At-least-once — the default; commit after processing → a crash before commit replays the record.
  • At-most-oncecommit before processing → a crash loses the record (rare in DE).
  • Exactly-once (system-level) — at-least-once delivery + idempotent sink (dedup on event_id, MERGE, transactional write) → the canonical recipe.
  • Consumer group — a set of consumers sharing partitions; rebalances on join / leave; partition is the unit of assignment.

Flink job, watermarks, and late-data handling

Flink (and Spark Structured Streaming with very similar semantics) is the engine that reads Kafka, applies windowed aggregates with a watermark policy, and emits results to a sink. Every windowed streaming job has the same five components.

The five Flink job components.

  • SourceFlinkKafkaConsumer / KafkaSource reading a topic + consumer group.
  • Event-time extractorassignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(30))) declares how late events can be.
  • OperatorkeyBy(region).window(TumblingEventTimeWindows.of(Time.minutes(5))).reduce(...).
  • Trigger — when to emit results; default is "watermark passes the window end"; custom triggers fire on early / late events.
  • Sink — Kafka, JDBC, Delta, Iceberg, KV store; idempotent sinks are the exactly-once requirement.

Watermark — the event-time progress signal.

  • Definition — "the system assumes no more events with event_time < watermark will arrive".
  • Bounded-out-of-ordernessWatermarkStrategy.forBoundedOutOfOrderness(30s) → watermark = max_event_time_seen - 30s.
  • Watermark gap — too small drops late events; too large delays output.
  • Per-partition watermarks — each Kafka partition emits its own watermark; the operator's effective watermark is the min across partitions.
  • Idle partitionswithIdleness(Duration.ofMinutes(1)) lets the watermark advance even when one partition is silent.

Window types + late-data policy.

  • Tumbling window — fixed, non-overlapping (e.g. every 5 minutes).
  • Sliding window — fixed-size, overlapping (e.g. 5-minute window sliding every 1 minute).
  • Session window — gap-defined (e.g. close after 30s of silence per key).
  • Late events: allowedLateness(Duration.ofMinutes(10)) — keeps window state alive 10 minutes past the watermark for late merges.
  • Side outputOutputTag<LateEvent> lets you route truly late events to a side stream for a separate consumer.

Exactly-once via dedup + log-replay backfill

The senior signal in any streaming round is naming exactly-once as a system property, not a magic feature, and explaining log-replay backfill as the streaming equivalent of Airflow's --start-date / --end-date.

Exactly-once semantics — the canonical recipe.

  • At-least-once delivery from Kafka (the default).
  • Idempotency key in every event (event_id or (partition_key, sequence_number)).
  • Dedup at the sinkINSERT … ON CONFLICT DO NOTHING, MERGE INTO on event_id, or dropDuplicates(["event_id"]) in Structured Streaming.
  • Transactional sink — Kafka Transactions, Delta Lake WriteSerial, or two-phase commit for cross-system exactly-once.
  • The interview-canonical answer — exactly-once is (at-least-once delivery) + (idempotent sink); reach for that phrase before "exactly-once is a broker setting".

Log-replay backfill — the Kappa equivalent of --start-date.

  • Reset offsetskafka-consumer-groups --reset-offsets --to-datetime 2026-05-01T00:00:00 --topic events --group my-job --execute.
  • startingOffsets='earliest' in Spark Structured Streaming with a new checkpointLocation reprocesses the full log.
  • Replayability — depends on retention; Kafka's default 7-day retention rolls off old data, so production replay-backfill setups use compacted topics or long retention (30+ days).
  • Sink behaviour — idempotent sinks make replay safe; non-idempotent sinks duplicate every record.

Worked example — 5-minute event counts with watermark + late-data + log replay

Detailed explanation. A typical senior streaming prompt: "given a Kafka events topic with event_time per record, emit 5-minute tumbling-window counts per region, tolerate 10-minute late data, and support log-replay backfill". The Spark Structured Streaming code below shows the full shape.

Question. Write a Spark Structured Streaming job that reads events from Kafka, applies a 30-second watermark + 10-minute allowed lateness, emits 5-minute tumbling counts per region to a Delta sink, and is replayable from offset earliest.

Input (sample Kafka events).

event_id region event_time
e001 US 2026-05-26T08:00:01
e002 EU 2026-05-26T08:00:03
e003 US 2026-05-26T08:04:59
e004 US 2026-05-26T08:05:02
e003 US 2026-05-26T08:00:00 (duplicate, late)

Code.

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, window, count, expr,
)
from pyspark.sql.types import StructType, StringType, TimestampType

spark = SparkSession.builder.appName("events_5m_counts").getOrCreate()

schema = (
    StructType()
    .add("event_id", StringType())
    .add("region", StringType())
    .add("event_time", TimestampType())
)

events = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("subscribe", "events")
        .option("startingOffsets", "earliest")        # replay-safe
        .load()
        .selectExpr("CAST(value AS STRING) AS json")
        .select(from_json("json", schema).alias("e"))
        .select("e.*")
        .dropDuplicates(["event_id"])                  # exactly-once at sink
)

counts_5m = (
    events
        .withWatermark("event_time", "30 seconds")
        .groupBy(window("event_time", "5 minutes"), "region")
        .agg(count("*").alias("n"))
)

query = (
    counts_5m.writeStream
        .outputMode("update")
        .format("delta")
        .option("checkpointLocation", "/chk/events_5m_v1")
        .toTable("gold.events_5m_counts")
)
query.awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. readStream … format("kafka") subscribes to the events topic with startingOffsets='earliest' so a fresh checkpoint replays the full log.
  2. from_json + schema decodes the Kafka value into typed columns.
  3. dropDuplicates(["event_id"]) dedupes by idempotency key — exactly-once at the sink.
  4. withWatermark("event_time", "30 seconds") declares "events arriving > 30s after their event_time are late".
  5. groupBy(window(…, "5 minutes"), "region").agg(count("*")) aggregates per 5-min tumbling window per region.
  6. outputMode("update") emits updates as windows accumulate, including late updates within the watermark gap.
  7. Delta sink + checkpointLocation persists progress; idempotent writes (Delta atomic commits) make retries safe.

Sample output (Delta gold.events_5m_counts).

window_start region n
2026-05-26 08:00 US 2
2026-05-26 08:00 EU 1
2026-05-26 08:05 US 1

Rule of thumb: every windowed streaming aggregate is (1) dropDuplicates on the idempotency key, (2) withWatermark for event-time progress, (3) groupBy(window(...), key).agg(...) for the aggregate, (4) idempotent sink (Delta, MERGE, INSERT ON CONFLICT). Skip any of the four and "exactly-once" becomes a lie.

Solution Using Kappa log-replay backfill via consumer-offset reset

Code (replay the 2026-05-26 day from Kafka after a bug fix).

# 1. Stop the streaming job (the consumer group 'events_5m' detaches).

# 2. Reset offsets to the start of 2026-05-26 (assume retention is 30 days).
kafka-consumer-groups.sh \
  --bootstrap-server kafka:9092 \
  --group events_5m \
  --topic events \
  --reset-offsets \
  --to-datetime 2026-05-26T00:00:00.000 \
  --execute

# 3. Drop the bad partition in the sink (idempotent re-write).
spark-sql -e "DELETE FROM gold.events_5m_counts \
              WHERE window_start >= '2026-05-26 00:00:00' \
                AND window_start <  '2026-05-27 00:00:00'"

# 4. Restart the streaming job with the SAME checkpointLocation
#    so it picks up from the freshly reset offsets.
spark-submit events_5m_counts.py
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

step effect
step 1 — stop job consumer group has no active members
step 2 — --reset-offsets --to-datetime every partition's committed offset rewinds to 2026-05-26 00:00
step 3 — DELETE FROM gold.events_5m_counts WHERE … bad rows removed; idempotent re-write will recreate them
step 4 — restart job streaming resumes from rewound offsets; dropDuplicates + Delta sink make re-write idempotent
outcome the same code reprocesses 2026-05-26 events with the fixed logic

Output:

metric before backfill after backfill
2026-05-26 rows in gold.events_5m_counts wrong counts (bug) corrected counts
event_5m_counts duplicates 0 (deduped by event_id) 0
consumer-group offset (partition 0) 12,402,118 rewound → re-advances to 12,402,118

Why this works — concept by concept:

  • Log retention as a backfill primitive — Kappa stores history in Kafka; replay-backfill is "rewind the consumer offset" rather than "run a separate batch job".
  • Idempotent sink + dedup keydropDuplicates(["event_id"]) + Delta atomic commits mean the replay produces the same final state.
  • Surgical partition delete — clearing only 2026-05-26 rows lets the rest of the table stay untouched while the day reprocesses.
  • Same checkpoint, same job — restarting with the existing checkpointLocation keeps the streaming state machine; the offset rewind drives the replay.
  • Cost — log-replay backfill cost is O(events_in_window) — usually orders of magnitude smaller than a full-table reload in a Lambda architecture.

Python
Topic — streaming/python
Streaming Python drills

Practice →

Python
Topic — real-time-analytics
Real-time analytics drills

Practice →


4. Idempotency patterns — MERGE INTO, dedup keys, deterministic hash

Visual diagram of idempotency patterns — three side-by-side mini-architectures: Panel 1 (MERGE INTO upsert) shows source rows → MERGE on natural key → target Delta table with a small dedup chip; Panel 2 (Idempotency key in Pub/Sub) shows a producer publishing with unique event_ids, a consumer with a 'seen_ids' set discarding duplicates; Panel 3 (Stateless transform with deterministic hash) shows input → SHA256 partition key → output with a tiny 'retry-safe' badge; on a light PipeCode card.

idempotent pipeline — the universal contract

An idempotent pipeline is one where running the same code over the same input N times produces the same final state. Without idempotency, every Airflow retry, every Kafka at-least-once redelivery, every backfill silently corrupts the warehouse. The senior signal in a pipeline-design round is naming idempotency as a design constraint before the reviewer prompts for it.

Why idempotency matters — the three retry surfaces.

  • Orchestrator retry — Airflow / Dagster / Prefect retries failed tasks; without idempotency, retries double-count.
  • Broker redelivery — Kafka, Kinesis, Pub/Sub default to at-least-once; consumers see every record one-or-more times.
  • Backfill replay — the same window is reprocessed deliberately; without idempotency, every backfill duplicates the affected rows.

The three implementation patterns this section covers.

  • MERGE INTO — the warehouse-native upsert on a natural key (covered in §4.2).
  • Dedup key (event_id) — produce + dedupe on a unique key per event (covered in §4.3).
  • Deterministic hash partitionSHA256(natural_key) % partitions routes the same row to the same partition every time (covered in §4.4).

Pattern 1 — MERGE INTO on a natural key

The default warehouse-native idempotency primitive. Every mid-2020s warehouse (Snowflake, BigQuery, Databricks Delta, Postgres 15+, Redshift) supports the same MERGE syntax with minor dialect variation.

Shape and semantics.

  • SyntaxMERGE INTO target USING source ON target.key = source.key WHEN MATCHED THEN UPDATE SET … WHEN NOT MATCHED THEN INSERT (…) VALUES (…).
  • Natural key — a stable business key (order_id, (customer_id, order_date)) that uniquely identifies a target row.
  • Atomicity — most engines run MERGE as a single transaction; partial success doesn't half-merge.
  • VariantsWHEN MATCHED AND target.updated_at < source.updated_at THEN UPDATE lets you skip stale updates.

When MERGE INTO is the right choice.

  • Silver-layer normalisation — bronze rows are merged into a clean silver fact / dimension.
  • Slowly-changing dimensions (SCD Type 1 / Type 2) — MERGE updates current rows or expires old ones.
  • Late-arriving corrections — the same order_id arrives with a corrected amount; MERGE updates the row in place.
  • dbt incremental modelsmaterialized='incremental' + incremental_strategy='merge' generates the MERGE for you.

Gotchas.

  • Non-deterministic source — if source has duplicate keys, MERGE fails or picks arbitrarily; deduplicate the source first.
  • CostMERGE on a huge target without partition pruning scans the whole table; always partition the target by the merge-natural-key's time dimension.
  • Concurrency — concurrent MERGEs on the same target can deadlock; serialise upstream.

Pattern 2 — Dedup key (event_id) for at-least-once streams

The streaming-native idempotency primitive. Every event produced into Kafka / Kinesis / Pub/Sub carries a unique event_id; the consumer dedupes on event_id before applying state changes.

Producer side.

  • Generate at source — UUID v4 (uuid.uuid4()), or (producer_id, sequence_number) for deterministic generation.
  • Persist before publish — write to a local outbox table, then publish to Kafka; outbox-pattern guarantees the same event_id survives producer crashes.
  • Idempotent producerenable.idempotence=true in Kafka prevents producer-side duplicates on retries.

Consumer side.

  • In-memory seen_ids set — bounded by a TTL or a sliding window; works for short windows.
  • dropDuplicates(["event_id"]) in Structured Streaming — uses Spark's state store with a watermark to bound memory.
  • INSERT … ON CONFLICT (event_id) DO NOTHING — atomically dedupe at the sink (Postgres, Snowflake MERGE WHEN NOT MATCHED).
  • External dedup store — Redis / DynamoDB with SETNX; pays a network hop but supports cross-job dedup.

Watermark + dedup window — bounding memory.

  • Why — keeping every event_id ever seen blows up memory; bound the dedup window to, e.g., 7 days.
  • SparkdropDuplicates(["event_id"]) + withWatermark("event_time", "7 days") evicts state past the watermark.
  • Trade-off — events arriving > 7 days late may slip through as "new"; the watermark gap is the trust window.

Pattern 3 — Deterministic hash partition

The stateless-transform idempotency primitive. When a transform routes records to partitions (Kafka producer key, Spark repartition, shard selection), use a deterministic hash so the same input always lands in the same partition on retry.

Shape and semantics.

  • Hash functionSHA256(natural_key) % partitions, MurmurHash3(natural_key) % partitions, or hash(natural_key) (Python's default is randomised per-process — avoid for cross-process determinism).
  • Why deterministic — retries route the same row to the same partition; downstream dedup is local and fast.
  • Why hash, not modulo on the key directly — keys are not uniformly distributed; hashing spreads load.

Use cases.

  • Kafka producer keyproducer.send(topic, key=order_id.encode(), value=...); ensures all events for the same order land in the same partition (ordering guarantee per key).
  • Sharded sinkshard = SHA256(customer_id) % num_shards routes all of a customer's events to the same shard.
  • Bucketed Delta / Iceberg tablesCLUSTER BY (customer_id) or bucket(N, customer_id) is a deterministic-hash partition by another name.

Gotchas.

  • Hot keys — a single high-volume key (region='US') over-allocates to one partition; consider compound keys (region:customer_id) or salting (region || rand_bucket(0,9)).
  • Re-partitioning — changing partition count breaks the hash mapping; plan capacity ahead.

Worked example — three idempotency patterns applied to the same orders pipeline

Detailed explanation. Real pipelines stack all three idempotency patterns: the producer emits event_id (pattern 2), the streaming ingest dedupes on event_id and routes to partitions with SHA256(order_id) (pattern 3), and the silver-layer transform MERGE INTOs on order_id (pattern 1). The combined effect is a pipeline where every retry, redelivery, and backfill is safe.

Question. Show the three idempotency primitives — event_id dedup at ingest, deterministic hash partitioning at routing, MERGE INTO at silver — applied to a single orders pipeline.

Input (a single order produced twice due to producer retry).

event_id order_id customer_id amount event_time
e-7a3f... O-1042 C-99 120.00 2026-05-26T08:00:01
e-7a3f... O-1042 C-99 120.00 2026-05-26T08:00:01 (retry)

Code.

import hashlib
from pyspark.sql import SparkSession, functions as F
from delta.tables import DeltaTable

spark = SparkSession.builder.getOrCreate()

# Pattern 2: dedupe on event_id at ingest
raw = (
    spark.readStream
        .format("kafka").option("subscribe", "orders").load()
        .selectExpr("CAST(value AS STRING) AS json")
        .select(F.from_json("json", schema).alias("e")).select("e.*")
        .withWatermark("event_time", "1 hour")
        .dropDuplicates(["event_id"])
)

# Pattern 3: deterministic-hash partition for the bronze sink
def hash_bucket(key, n=64):
    return int(hashlib.sha256(key.encode()).hexdigest(), 16) % n

hash_udf = F.udf(lambda oid: hash_bucket(oid, 64))
bronze = raw.withColumn("bucket", hash_udf(F.col("order_id")))

(bronze.writeStream
        .format("delta")
        .partitionBy("bucket")
        .option("checkpointLocation", "/chk/orders_bronze")
        .toTable("lakehouse.bronze.orders"))

# Pattern 1: MERGE INTO silver on natural key order_id (run in batch DAG)
def merge_to_silver(batch_df, batch_id):
    silver = DeltaTable.forName(spark, "lakehouse.silver.orders_clean")
    (silver.alias("t")
        .merge(batch_df.alias("s"), "t.order_id = s.order_id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. dropDuplicates(["event_id"]) with a 1-hour watermark eliminates the producer-retry duplicate at ingest.
  2. hash_udf(order_id) routes both copies of any single order (had they survived dedup) to the same bronze partition — deterministic.
  3. partitionBy("bucket") keeps the bronze data physically clustered for cheap downstream reads.
  4. merge_to_silver uses Delta's MERGE on order_id; re-running it for any past window is safe — the same order_id UPDATEs in place.
  5. Stacked patterns — Pattern 2 + Pattern 3 + Pattern 1 together guarantee end-to-end exactly-once as a system property.

Sample output (the deduplicated path).

stage rows in rows out duplicates
Kafka source 2 (one duplicate)
dropDuplicates(event_id) 2 1 1 dropped
partitionBy(bucket) 1 1 in bucket 47
MERGE INTO silver 1 1 row updated 0 net inserts on retry

Rule of thumb: a production pipeline stacks all three idempotency patterns — dedup at ingest, deterministic-hash at routing, MERGE at silver. Each pattern protects a different retry surface; together they form the exactly-once recipe.

Solution Using a Delta-MERGE silver upsert with MERGE WHEN MATCHED AND guard

Code (Delta MERGE that respects _loaded_at so stale corrections don't overwrite fresh data).

MERGE INTO lakehouse.silver.orders_clean AS t
USING (
    SELECT
        order_id,
        customer_id,
        region,
        amount,
        status,
        order_ts,
        _loaded_at
    FROM lakehouse.bronze.orders
    WHERE _loaded_at > (SELECT COALESCE(MAX(_merged_at), '1970-01-01') FROM lakehouse.silver.orders_clean)
) AS s
ON  t.order_id = s.order_id
WHEN MATCHED
   AND s._loaded_at >= t._loaded_at
THEN UPDATE SET
    t.customer_id  = s.customer_id,
    t.region       = s.region,
    t.amount       = s.amount,
    t.status       = s.status,
    t.order_ts     = s.order_ts,
    t._loaded_at   = s._loaded_at,
    t._merged_at   = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (
    order_id, customer_id, region, amount, status, order_ts, _loaded_at, _merged_at
) VALUES (
    s.order_id, s.customer_id, s.region, s.amount, s.status, s.order_ts, s._loaded_at, CURRENT_TIMESTAMP()
);
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

input row matched? guard action
(O-1042, _loaded_at=08:00:01) first time no n/a INSERT
(O-1042, _loaded_at=08:00:01) retry yes 08:00:01 >= 08:00:01 true UPDATE (same values, idempotent)
(O-1042, _loaded_at=07:59:30) stale yes 07:59:30 >= 08:00:01 false skip — keep fresh row
(O-9999, _loaded_at=08:01:00) new no n/a INSERT

Output:

order_id amount _loaded_at _merged_at
O-1042 120.00 2026-05-26 08:00:01 2026-05-26 08:00:04
O-9999 220.00 2026-05-26 08:01:00 2026-05-26 08:01:02

Why this works — concept by concept:

  • Natural-key ON clauset.order_id = s.order_id makes the merge uniquely target one target row per source row.
  • _loaded_at guardWHEN MATCHED AND s._loaded_at >= t._loaded_at blocks stale corrections from overwriting fresh data — critical when backfills race with current loads.
  • _merged_at bookmark(SELECT MAX(_merged_at) FROM target) makes the source CTE incremental; only new bronze rows enter the merge.
  • Atomicity — Delta MERGE is a single ACID commit; partial failures don't half-merge.
  • CostMERGE cost is O(bronze_new_rows + matched_silver_rows) with partition pruning; bounded and predictable.

SQL
Topic — etl
Idempotency drills (ETL)

Practice →

Python
Topic — data-manipulation
Dedup + MERGE practice

Practice →


5. Backfill strategies — full-table, partition-aware, log replay

backfill data pipeline — three strategies, one design constraint

backfill data pipeline is the most under-rehearsed senior-loop topic. Every interviewer asks "how would you reprocess last Tuesday after a bug fix?" — and the senior answer is one of three patterns, picked by the architecture and the failure mode.

The three backfill strategies this section covers.

  • Full-table reload — drop and rebuild the target; correct but expensive.
  • Partition-aware backfill — re-run only the affected partitions; the default for batch DAGs.
  • Log replay — rewind the consumer offset and replay the source log; the default for streaming.

The design constraint underpinning all three.

  • Same code path — backfill code must be identical to forward-fill code; any branch is a future bug.
  • Idempotent sinks — covered in §4; without them, backfill duplicates rows.
  • Bounded blast radius — only the affected partitions / offsets are rewritten; everything else stays untouched.
  • Observability — every backfill emits a logged audit event with who / when / window / reason.

Strategy 1 — Full-table reload

The fallback. Drop the target, reread the source, rebuild from scratch. Right when the schema changed, when the bug affects all of history, or when partitioning isn't available.

Shape.

  • Truncate-and-reloadTRUNCATE TABLE target; INSERT INTO target SELECT … FROM source; inside a single transaction.
  • Atomic swap — write to target_new, then ALTER TABLE target RENAME TO target_old; ALTER TABLE target_new RENAME TO target; (zero-downtime consumer reads).
  • Snowflake / BigQueryCREATE OR REPLACE TABLE target AS SELECT …; atomic and cheap.

When to use.

  • Schema change — adding a column that needs to be backfilled across all of history.
  • Logic bug across all history — the whole table is wrong; partitioned backfill would touch every partition anyway.
  • Small tables — under a few GB; rebuild is faster than figuring out the partition list.

Trade-offs.

  • Cost — scans the entire source; bandwidth- and compute-expensive.
  • Downtime — without atomic swap, consumers see an empty / partial table.
  • Lineage — every downstream consumer must invalidate caches.

Strategy 2 — Partition-aware backfill (Airflow --start-date / --end-date)

The default for batch DAGs. Re-run only the affected partitions; Airflow's backfill command walks the date range and schedules one DAG run per logical date.

Shape.

  • airflow dags backfill <dag> --start-date 2026-05-01 --end-date 2026-05-07 — schedules 7 DAG runs (one per day), each with the right {{ ds }}.
  • Idempotent partition overwrite — each task writes only its own {{ ds }} partition; replays overwrite identically.
  • Concurrencymax_active_runs= controls parallelism; balance throughput vs warehouse load.
  • Reset statesairflow tasks clear <dag> --start-date X --end-date Y clears state so paused runs resume from scratch.

Pre-requisites.

  • Partition-by-date in every layer — bronze, silver, gold all keyed by {{ ds }}; not just one layer.
  • No datetime.today() in code — every reference to "today" must come from {{ ds }} / {{ data_interval_start }}.
  • Idempotent sinks — covered in §4; partition overwrite, MERGE, INSERT OVERWRITE PARTITION.
  • Resource isolation — backfills can hammer the warehouse; route to a dedicated warehouse / pool.

Use cases.

  • Bug fix for a known date range — "the region mapping was wrong from 2026-05-01 to 2026-05-07; rerun those days".
  • Late-arriving source data — vendor re-sends 2026-05-03's file at 2026-05-05; backfill 2026-05-03.
  • New downstream dimension — a new dim_region table needs the past 30 days re-joined; backfill 30 days.

Strategy 3 — Log replay (Kafka offset reset)

The streaming-native backfill. The log itself is the source of truth; rewind the consumer offset and the same streaming job replays history.

Shape.

  • Reset offsetskafka-consumer-groups --reset-offsets --to-datetime 2026-05-01T00:00:00 --topic events --group my-job --execute.
  • Drop affected sink rowsDELETE FROM target WHERE window_start >= 'X' AND window_start < 'Y'.
  • Restart job — same job, same code, same checkpoint location; resumes from rewound offsets.
  • Compacted topics — for very long replays, configure cleanup.policy=compact so only the latest value per key is retained.

Pre-requisites.

  • Retention covers the replay window — Kafka's default 7 days is rarely enough; production replay setups use 30+ days or compacted topics.
  • Idempotent sink — dedup on event_id, MERGE on natural key, or partition overwrite at the sink.
  • Checkpoint compatibility — same job version and code; major version upgrades may require a fresh checkpoint.
  • Capacity headroom — replay competes with live traffic; scale parallelism temporarily or route to a separate consumer group.

Trade-offs.

  • Replay vs live race — during replay, live events still arrive; the dedup window must cover both streams.
  • Out-of-order watermarks — replayed events have old event_time; watermark policy must tolerate the gap.
  • Cost — a single full-log replay can be expensive; bound the window with --to-datetime precisely.

Worked example — three-day Airflow partition-aware backfill for a bug fix

Detailed explanation. The most common production backfill: a logic bug was deployed at 2026-05-04 09:00 and discovered at 2026-05-07 11:00. The fix is merged; now reprocess 2026-05-04, 2026-05-05, and 2026-05-06 with the corrected code. Partition-aware Airflow backfill is the right tool.

Question. Backfill the orders_daily DAG for 2026-05-04 → 2026-05-06 inclusive after a bug fix. Show the Airflow command, the expected DAG-run schedule, and the post-backfill row-count audit.

Input (the situation).

field value
DAG orders_daily
affected dates 2026-05-04, 2026-05-05, 2026-05-06
bug region mapping returned null for LATAM
target table gold.revenue_by_region partitioned by (region, date)
consumer Power BI; backfill must complete before 06:00 next day

Code.

# 1. Clear the affected runs so Airflow re-creates them with the new code.
airflow tasks clear orders_daily \
  --start-date 2026-05-04 --end-date 2026-05-06 \
  --yes

# 2. Run the backfill (Airflow schedules 3 DAG runs, one per {{ ds }}).
airflow dags backfill orders_daily \
  --start-date 2026-05-04 --end-date 2026-05-06 \
  --reset-dagruns \
  --rerun-failed-tasks

# 3. Post-backfill audit — row count and freshness check.
spark-sql -e "
SELECT order_date,
       SUM(revenue) AS total_revenue,
       COUNT(*)    AS rows,
       MAX(_merged_at) AS last_merged
FROM gold.revenue_by_region
WHERE order_date BETWEEN '2026-05-04' AND '2026-05-06'
GROUP BY order_date
ORDER BY order_date;
"
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. airflow tasks clear removes the existing task instances for the affected dates so Airflow re-creates them with the new code on --reset-dagruns.
  2. airflow dags backfill --start-date / --end-date schedules 3 DAG runs, one per {{ ds }}. Each run executes the full DAG with the right logical date.
  3. max_active_runs=2 (declared on the DAG) caps parallelism so the warehouse isn't overwhelmed.
  4. Each task is idempotent — MERGE INTO silver, INSERT OVERWRITE PARTITION (region, date) in gold — so replays write the same final state.
  5. Post-backfill audit confirms row counts and shows the fresh _merged_at timestamps; if any partition is missing, the audit query exposes it.

Sample output (post-backfill audit).

order_date total_revenue rows last_merged
2026-05-04 1,287,402.55 8,432 2026-05-07 13:14:08
2026-05-05 1,401,118.20 8,891 2026-05-07 13:21:42
2026-05-06 1,356,907.71 8,704 2026-05-07 13:29:17

Rule of thumb: partition-aware backfill is "same DAG, same {{ ds }}, idempotent sinks, bounded date range". Anything more elaborate — separate "backfill DAG", custom Spark scripts, manual SQL — is a smell.

Solution Using parameterised partition overwrite + dbt incremental is_incremental() guard

Code (the gold model that handles forward-fill and backfill identically).

-- models/gold/revenue_by_region.sql
{{ config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by={'field': 'order_date', 'data_type': 'date'},
    unique_key=['region', 'order_date']
) }}

SELECT
    region,
    order_date,
    SUM(amount)            AS revenue,
    COUNT(DISTINCT order_id) AS orders,
    CURRENT_TIMESTAMP()    AS _merged_at
FROM {{ ref('silver_orders_clean') }}
WHERE order_date = DATE('{{ var("date") }}')   -- one partition per run
GROUP BY region, order_date
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

var("date") partition affected action
2026-05-04 (backfill) order_date='2026-05-04' INSERT OVERWRITE PARTITION (order_date='2026-05-04')
2026-05-05 (backfill) order_date='2026-05-05' INSERT OVERWRITE PARTITION (order_date='2026-05-05')
2026-05-06 (backfill) order_date='2026-05-06' INSERT OVERWRITE PARTITION (order_date='2026-05-06')
2026-05-07 (forward-fill) order_date='2026-05-07' INSERT OVERWRITE PARTITION (order_date='2026-05-07')
forward-fill order_date='2026-05-08' INSERT OVERWRITE PARTITION (order_date='2026-05-08')

Output:

partition rows total_revenue
2026-05-04 8,432 1,287,402.55
2026-05-05 8,891 1,401,118.20
2026-05-06 8,704 1,356,907.71
2026-05-07 8,801 1,387,019.04
2026-05-08 8,612 1,378,442.91

Why this works — concept by concept:

  • One model, one code path — forward-fill and backfill use the exact same SQL; only var("date") differs.
  • INSERT OVERWRITE PARTITION — replays for the same var("date") are idempotent at the partition level; no duplicates.
  • unique_key=['region', 'order_date'] — dbt enforces uniqueness for the partition's natural key; double-runs surface as test failures.
  • Airflow {{ ds }} → dbt var("date") — the same logical date flows through every layer; no datetime.today() lurking anywhere.
  • Cost — backfill cost is O(rows_in_window); orders of magnitude cheaper than a full-table reload, and bounded by the explicit date range.

Python
Topic — etl
Backfill ETL drills

Practice →

Python
Topic — design
Pipeline-design drills

Practice →


6. Observability + SLOs — logs, metrics, traces, alerting

Visual layered stack of pipeline observability — bottom 'Logging' layer with structured JSON logs + correlation IDs; next 'Metrics' layer with row counts + latency + freshness; next 'Tracing' layer with OpenTelemetry spans per task; top 'Alerting + SLOs' layer with PagerDuty + freshness SLO + error budget as small pill labels — a clean stratified infographic, on a light PipeCode card.

pipeline observability — the four-layer stack

pipeline observability is the senior signal that closes the design loop. Junior answers say "we have logs"; senior answers describe the four-layer stack — structured logs → metrics → traces → alerting + SLOs — and how each layer catches a different class of failure.

The four layers and what each catches.

  • Layer 1 — Structured JSON logswho did what with which inputs; catches incorrect logic, missing rows, validation failures.
  • Layer 2 — Metrics — row counts, byte counts, latency, freshness; catches volumetric drift and SLA breaches.
  • Layer 3 — Traces — per-task spans tied by a correlation ID; catches slow stages and cross-DAG latency.
  • Layer 4 — Alerting + SLOs — PagerDuty + freshness / completeness SLOs with error budgets; catches user-facing failures before the user sees them.

Layer 1 — Structured JSON logging

The foundation. Every task emits one structured JSON log per significant event; the log line carries a correlation ID so all logs from one DAG run can be queried as a unit.

Required fields per log line.

  • timestamp — ISO 8601 with timezone.
  • levelINFO, WARN, ERROR, CRITICAL.
  • dag_id + task_id + dag_run_id — the correlation ID set; lets you WHERE dag_run_id = X to assemble the full timeline.
  • event — short slug ("task_started", "row_count_written", "merge_complete").
  • metrics — nested object with rows, bytes, duration_s, etc.
  • error (when applicable) — exception type + message + stacktrace.

Example log line.

{
  "timestamp": "2026-05-26T06:34:08.521Z",
  "level": "INFO",
  "dag_id": "orders_daily",
  "task_id": "dbt_build",
  "dag_run_id": "manual__2026-05-26T06:00:00+00:00",
  "event": "task_complete",
  "metrics": {"rows_written": 12418503, "duration_s": 1284.2, "models_built": 12, "tests_passed": 27}
}
Enter fullscreen mode Exit fullscreen mode

Anti-patterns.

  • Unstructured print — strings, no fields, ungreppable; never in production.
  • PII in logscustomer_email, card_number; redact before emit or use a separate restricted sink.
  • One log per row — fan-out kills the log sink; aggregate to per-batch / per-task.

Layer 2 — Metrics (row counts, latency, freshness)

Numerical time series scraped by Prometheus / Datadog / CloudWatch. The four metrics every pipeline emits.

The four canonical pipeline metrics.

  • pipeline_rows_written_total{dag, task} — counter; alerts on drop > 10% week-over-week.
  • pipeline_task_duration_seconds{dag, task} — histogram; alerts on p95 breaching SLA.
  • pipeline_freshness_lag_seconds{table} — gauge of now() - max(updated_at); alerts on lag > SLO.
  • pipeline_task_status{dag, task, status} — counter of success / failure / retry; alerts on failure rate > error budget.

Implementation tips.

  • Push gateway (Prometheus) or StatsD (Datadog) for batch jobs that don't run a long-lived HTTP server.
  • dbt source freshness — emits freshness metrics natively; pair with the orchestrator.
  • Great Expectations / Soda — emit row-count + uniqueness + null-rate metrics from data-quality tests.
  • Tag every metric with env, team, pipeline for slicing dashboards by ownership.

Layer 3 — Tracing (OpenTelemetry spans)

Distributed tracing makes cross-stage / cross-DAG latency visible. The OpenTelemetry convention is one span per task, parent span per DAG run.

Tracing anatomy.

  • Trace — a single end-to-end execution (one DAG run, one streaming micro-batch).
  • Span — a unit of work within a trace (one task, one query, one Spark stage).
  • Span attributesdag_id, task_id, rows_read, rows_written, engine (Spark / Snowflake / BigQuery).
  • Span events — point-in-time annotations ("checkpoint_committed", "watermark_advanced").
  • Span links — cross-trace references (e.g. downstream DAG run links upstream DAG run).

Stack components.

  • OpenTelemetry SDK — language-native; auto-instrumentation for Airflow, dbt, Spark in progress.
  • Collector — receives spans (OTLP), exports to backends.
  • Backend — Honeycomb, Tempo, Jaeger, Datadog APM.
  • Sampling — head-based (sample N% of traces) or tail-based (keep all error traces, sample success traces).

Layer 4 — Alerting + SLOs (freshness, completeness, error budget)

The user-facing contract. An SLO is "the table is fresh within 1 hour of the schedule, 99.5% of days"; the error budget is the 0.5% you're allowed to burn before pausing change.

SLO anatomy.

  • Service — the pipeline / table the SLO covers (gold.revenue_by_region).
  • SLI (indicator) — the measurable signal (freshness_lag_seconds, completeness_ratio, error_rate).
  • SLO (objective) — the target (freshness < 3600s, completeness > 99.5%).
  • Error budget — the allowed shortfall over a window (1 - 0.995 = 0.5% of days).
  • Burn rate alert — "the error budget is being consumed faster than the window allows"; pages on-call early.

Alerting routing.

  • PagerDuty — primary on-call rotation; pages on SLO breach + burn-rate alerts.
  • Slack — non-paging notifications (warnings, FYI failures).
  • Email digest — daily summary of yesterday's SLO status.
  • Runbook link — every alert carries a runbook_url field pointing to diagnostic queries + remediation steps.

Worked example — design an SLO + alert for a 1-hour-freshness pipeline

Detailed explanation. The canonical staff-level prompt: "the gold.revenue_by_region table must be fresh within 1 hour of the 06:00 schedule, 99.5% of days, with PagerDuty paging if the SLO is at risk. Design the SLO, the SLI, the alert, and the runbook."

Question. Design a full SLO + alert + runbook for gold.revenue_by_region with freshness ≤ 1h after 06:00, completeness ≥ 99.5%, paged via PagerDuty.

Input (the SLO requirements).

field value
service gold.revenue_by_region
SLI 1 (freshness) max(_merged_at) >= today's_schedule + 1h
SLO 1 freshness target met on 99.5% of days
SLI 2 (completeness) count(distinct region) >= expected_region_count
SLO 2 completeness target met on 99.5% of days
paging PagerDuty de-on-call rotation
burn rate alert error-budget burn > 14× normal in 1 hour

Code (the Prometheus / Alertmanager rules + runbook reference).

# prometheus/rules/revenue_by_region_slo.yaml
groups:
  - name: revenue_by_region_slo
    interval: 60s
    rules:

      # SLI 1 — freshness gauge (seconds since last merge)
      - record: pipeline_freshness_lag_seconds:revenue_by_region
        expr: time() - max(pipeline_last_merged_seconds{table="gold.revenue_by_region"})

      # SLO 1 — page if freshness > 1h after the 06:00 schedule
      - alert: RevenueByRegionFreshnessSLO
        expr: pipeline_freshness_lag_seconds:revenue_by_region > 3600
        for: 5m
        labels:
          severity: page
          team: data-eng
        annotations:
          summary: "gold.revenue_by_region freshness SLO breach"
          description: "Lag is {{ $value | humanizeDuration }} (>1h)."
          runbook_url: "https://runbooks.example.com/data-eng/revenue-by-region-freshness"
          slo: "freshness <= 1h, target 99.5%"

      # SLI 2 — completeness (regions present today)
      - record: pipeline_completeness_ratio:revenue_by_region
        expr: |
          count(count by (region) (
            pipeline_revenue_by_region_today{table="gold.revenue_by_region"}
          ))
          /
          count(count by (region) (
            pipeline_expected_regions{table="gold.revenue_by_region"}
          ))

      # SLO 2 — page if completeness < 99.5%
      - alert: RevenueByRegionCompletenessSLO
        expr: pipeline_completeness_ratio:revenue_by_region < 0.995
        for: 10m
        labels:
          severity: page
          team: data-eng
        annotations:
          summary: "gold.revenue_by_region completeness SLO breach"
          description: "Only {{ $value | humanizePercentage }} of regions present."
          runbook_url: "https://runbooks.example.com/data-eng/revenue-by-region-completeness"

      # Burn-rate alert — error budget burning 14x normal in last hour
      - alert: RevenueByRegionErrorBudgetBurn
        expr: |
          (
            increase(pipeline_slo_violations_total{table="gold.revenue_by_region"}[1h])
            /
            (1 - 0.995)
          ) > 14
        for: 2m
        labels:
          severity: page
          team: data-eng
        annotations:
          summary: "revenue_by_region error budget burning fast"
          runbook_url: "https://runbooks.example.com/data-eng/revenue-by-region-burn"
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. SLI 1 (freshness) — gauge of now() - last_merge_time; trips on > 3600s.
  2. SLO 1 — alert fires after the lag exceeds the threshold for 5 consecutive minutes (debounces flaps).
  3. SLI 2 (completeness) — ratio of regions_seen / regions_expected; trips below 99.5%.
  4. SLO 2 — alert fires after 10 consecutive minutes below the threshold (gives the DAG time to retry).
  5. Burn-rate alert — fires when the error budget is being burned 14× faster than the SLO window allows; gives on-call a 1-hour head start before the SLO is technically violated.
  6. Runbook links — every alert carries a runbook_url annotation; PagerDuty surfaces it as a clickable link to the diagnostic queries + remediation steps.

Sample output (PagerDuty incident on a freshness breach).

[PD] RevenueByRegionFreshnessSLO
severity=page  team=data-eng
summary: gold.revenue_by_region freshness SLO breach
description: Lag is 1h 12m 4s (>1h).
slo: freshness <= 1h, target 99.5%
runbook: https://runbooks.example.com/data-eng/revenue-by-region-freshness
firing_for: 5m12s
Enter fullscreen mode Exit fullscreen mode

Rule of thumb: every SLO has an SLI (measurable), an SLO (target), an error budget, a burn-rate alert, a paging rule, and a runbook link. Skip any of those six and the alert becomes noise rather than signal.

Solution Using a freshness-SLI gauge + burn-rate-driven PagerDuty rule

Code (the freshness-emit task that produces the SLI).

from datetime import datetime, timezone
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

def emit_freshness_metric(table_name: str, last_merged_at: datetime):
    registry = CollectorRegistry()
    g = Gauge(
        "pipeline_last_merged_seconds",
        "Unix-seconds of last successful merge per table",
        ["table"],
        registry=registry,
    )
    g.labels(table=table_name).set(last_merged_at.timestamp())
    push_to_gateway(
        "pushgateway:9091",
        job=f"freshness:{table_name}",
        registry=registry,
    )

# Called at the end of refresh_bi_cache for gold.revenue_by_region
last = spark.sql(
    "SELECT MAX(_merged_at) AS t FROM gold.revenue_by_region"
).first()["t"]
emit_freshness_metric("gold.revenue_by_region", last)
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

event metric alert
06:34:42 — DAG completes; _merged_at = 06:34:42 pipeline_last_merged_seconds = 1748,234,082 none
07:34:42 — lag = 1h exactly pipeline_freshness_lag_seconds = 3600 for: 5m not yet tripped
07:39:42 — lag = 1h 5m pipeline_freshness_lag_seconds = 3900 PagerDuty page fires
burn rate evaluated > 14× normal second page (early warning)
on-call runs runbook diagnostics freshness metric resets after fix alert auto-resolves

Output:

time freshness lag SLO state paged?
06:34:42 0s met no
07:00:00 25m 18s met no
07:34:42 1h 0m met (threshold) no
07:39:42 1h 5m breach yes
08:12:14 0s (fix deployed) recovered auto-resolved

Why this works — concept by concept:

  • SLI is a gauge, not a count — gauges expose "current state" instead of "events since"; freshness is naturally a gauge.
  • for: 5m debouncing — prevents flapping when the metric momentarily exceeds the threshold during normal DAG completion.
  • Burn-rate alert as early warning — fires before the SLO is technically violated, giving on-call a 1-hour head start.
  • Runbook URL on every alert — the page is useless without a paired runbook; the URL is part of the SLO contract.
  • Cost — alert evaluation is O(rules × interval) in Prometheus; freshness emit is O(1) per DAG run; SLO machinery has near-zero runtime overhead.

Python
Topic — design
SLO + design drills

Practice →

Python
Topic — log-processing
Log-processing drills

Practice →


7. Failure modes + production playbook

data pipeline failure modes — the eight failures every senior loop tests

data pipeline failure modes is the staff-level closing topic. Every senior pipeline-design round eventually asks "what could go wrong?"; the candidate who can name eight common failure modes and a paired runbook for each is the candidate who gets hired. The eight failures below cover almost every real production incident.

The eight failure modes.

  • F1 — Schema drift — source adds / removes / retypes a column; downstream parse breaks.
  • F2 — Source unavailable — upstream API / file drop fails; DAG sensor times out.
  • F3 — Out-of-memory (OOM) — Spark / Flink job exceeds executor memory and dies mid-stage.
  • F4 — Runaway scan — a query without partition pruning scans the whole table; cost explodes.
  • F5 — Late data — streaming events arrive after the watermark; window aggregates miss them.
  • F6 — Partition misalignment — source partition (event_date) and sink partition (load_date) drift; rows land in wrong day.
  • F7 — Retry storm — failing task retries thundering-herd a downstream service; cascades to outage.
  • F8 — Downstream backpressure — sink can't keep up with source; queues fill, latency explodes.

F1 — Schema drift; F2 — Source unavailable

The two most common ingest-layer failures.

F1 — Schema drift.

  • Symptom — parse error in bronze load; missing column in silver model; nulls where data was expected.
  • Detection — Schema Registry compatibility check fails, or dbt not_null test fails on a new column.
  • Prevention — Schema Registry with BACKWARD or FULL compatibility; tolerant readers (spark.read.option("mergeSchema", "true")); on_schema_change='append_new_columns' in dbt incremental models.
  • Remediation — promote schema change through dev → staging → prod; backfill the affected window if the new column should have history.
  • Runbook"diagnose: dbt source freshness; if schema change detected, run dbt run --full-refresh --select <model> after updating the model; backfill if needed via airflow dags backfill --start-date X --end-date Y".

F2 — Source unavailable.

  • SymptomS3KeySensor times out; HttpSensor returns 5xx; vendor SFTP refuses connections.
  • Detection — sensor task up_for_reschedule exceeds timeout → task failure.
  • Prevention — sensors with reasonable timeout=; deferrable sensors to avoid worker exhaustion; alerting on consecutive sensor failures (not single-run failures).
  • Remediation — page vendor on-call; manually trigger the DAG once the source recovers; backfill missed windows.
  • Runbook"diagnose: check vendor status page + recent sensor history; if vendor is down, suspend DAG via Airflow CLI; on recovery, airflow dags trigger + --start-date / --end-date for missed windows".

F3 — OOM; F4 — Runaway scan

The two most common compute-layer failures.

F3 — Out-of-memory (OOM).

  • Symptom — Spark executor killed with OutOfMemoryError; Flink job restarts in a loop.
  • Detection — Spark UI shows failed stages with Container killed by YARN for exceeding memory limits; Flink metrics show taskmanager.memory.heap.used near 100%.
  • Prevention — right-size executor memory (spark.executor.memory); reduce partition count for high-cardinality joins; broadcast small dims with broadcast(small_df); spill to disk with spark.sql.shuffle.partitions=200+.
  • Remediation — increase executor memory; switch to df.repartition(N) to balance partitions; convert wide transformations to narrow when possible.
  • Runbook"diagnose: Spark UI → failed stages → stage detail → executor memory; if a single partition is huge, repartition by a higher-cardinality key; if a broadcast join is too big, drop the broadcast hint".

F4 — Runaway scan.

  • Symptom — query that normally runs in 2 minutes takes 2 hours; warehouse bill spikes.
  • Detection — Snowflake query history shows bytes_scanned > 100GB for a query that should scan one partition; BigQuery shows BillingTier: 5+.
  • Prevention — every query has a WHERE on the partition column; CI test (dbt test) that asserts partition pruning; query budget guardrails in CI.
  • RemediationSET QUERY_TIMEOUT = 60 on the warehouse session; cancel the runaway query; add the missing WHERE clause; rerun.
  • Runbook"diagnose: warehouse query history; if bytes_scanned > expected, find the query; check WHERE clause; rerun with partition filter".

F5 — Late data; F6 — Partition misalignment

The two most common time-correctness failures.

F5 — Late data.

  • Symptom — yesterday's 5-minute counts are wrong; events arrive hours after their event_time.
  • Detectionpipeline_late_event_count_total metric > threshold; downstream user reports "yesterday's number changed".
  • PreventionwithWatermark("event_time", "1 hour") or higher; allowedLateness(1 hour) on windows; side-output for events past watermark.
  • Remediation — widen the watermark; reprocess affected windows via log replay (§5.3); document the trust window (e.g. "numbers are stable after 4 hours").
  • Runbook"diagnose: late-event metric + watermark lag; if widespread, reprocess the affected window via offset reset + idempotent sink".

F6 — Partition misalignment.

  • Symptom — events arriving on day N land in day N+1 partition; queries by event_date miss rows.
  • Detectiondbt test for partition counts shows shortfall; analytics team reports row count discrepancy.
  • Prevention — partition by event_date (extracted from event_time), not load_date; document the difference explicitly; midnight-rollover handling in streaming jobs.
  • Remediation — backfill the misaligned dates; correct the partition logic; backfill via --start-date / --end-date.
  • Runbook"diagnose: `SELECT event_date, load_date, count() FROM bronze GROUP BY 1,2`; if mismatched, fix partitioning logic + backfill"*.

F7 — Retry storm; F8 — Downstream backpressure

The two most common cascading failures.

F7 — Retry storm.

  • Symptom — a failing task retries N times every 5 minutes, hammering a downstream API; downstream rate-limits everyone.
  • Detection — downstream service reports 429 / 503 spike; metrics show retry count > normal.
  • Prevention — exponential backoff (base_delay * (2 ** attempt)) + jitter (+ random.uniform(0, 1)); cap retries (max_retries=5); circuit-breaker pattern.
  • Remediation — pause the offending DAG; reduce retries on the failing task; coordinate with downstream owners.
  • Runbook"diagnose: downstream 429 / 503 rate vs our retry rate; if cause is us, pause DAG + reduce retries + add jitter".

F8 — Downstream backpressure.

  • Symptom — Kafka consumer lag grows; Flink checkpoint times out; sink writes hang.
  • Detectionkafka_consumer_lag_total gauge climbing monotonically; Flink job manager shows checkpoint_alignment_time rising.
  • Prevention — right-size sink throughput; partition the sink for parallelism; circuit-break when consumer lag exceeds threshold.
  • Remediation — temporarily scale up consumers / sinks; throttle producers; drop side-output to a "DLQ" topic for later replay.
  • Runbook"diagnose: consumer lag + sink write latency; if sustained, scale consumers; if write latency, scale sink; if neither, throttle producers".

Worked example — full runbook for an F1 schema-drift incident

Detailed explanation. A representative on-call scenario. The vendor adds a currency column to the daily orders.parquet file. The bronze load succeeds (Parquet schema-merge is tolerant), but the dbt silver.orders_clean model fails on a not_null test for the new column. On-call wakes up at 06:42.

Question. Walk through the on-call runbook for an F1 schema-drift incident — diagnose, decide, remediate, document.

Input (the page).

[PD] dbt_build task failed in orders_daily
dag_id=orders_daily  task_id=dbt_build  dag_run_id=manual__2026-05-26
error: "FAIL not_null_silver_orders_clean_currency" — 12,418,503 nulls
runbook: https://runbooks.example.com/data-eng/schema-drift
Enter fullscreen mode Exit fullscreen mode

Code (the on-call runbook steps).

# 1. Diagnose — find what changed.
spark-sql -e "
SELECT * FROM lakehouse.bronze.orders
WHERE dt = '2026-05-26' LIMIT 5;
"
# -> output shows a new 'currency' column that wasn't there yesterday.

# 2. Confirm with Schema Registry.
schema-registry-cli show --subject orders-value --version latest
# -> v3 = adds 'currency' (string)

# 3. Decide — is this a backward-compatible change? Yes (new optional column).
#    Update the silver model + relax the not_null test to allow nulls for now.

# 4. Patch silver_orders_clean.sql + schema.yml.
git checkout -b fix/orders-currency-column
# - add `currency` to the SELECT list in silver/orders_clean.sql
# - relax `not_null` -> `dbt_utils.accepted_values` (allow null until backfill complete)
# - PR + review + merge

# 5. Re-run today's DAG with the fix.
airflow tasks clear orders_daily \
  --task-regex 'dbt_(build|test)' \
  --start-date 2026-05-26 --end-date 2026-05-26 \
  --yes
airflow dags trigger orders_daily --conf '{"date": "2026-05-26"}'

# 6. Document — append to the runbook + post in #data-eng.
echo "2026-05-26 06:55 — vendor added currency column; silver_orders_clean patched; SLO MET at 07:12" \
  >> runbooks/data-eng/schema-drift-incidents.md
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Diagnose — query bronze; spot the new column.
  2. Confirm — Schema Registry shows v3 with the new field.
  3. Decide — backward-compatible? Yes (additive). No backfill needed yet.
  4. Patch — update model + test; ship through normal PR flow (no --force-merge).
  5. Re-run — clear and trigger only today's tasks; don't backfill all of history.
  6. Document — append the incident to the runbook log for future on-call learnings.

Sample output (the post-incident timeline).

06:42:01  PD page — RevenueByRegionFreshnessSLO firing
06:42:14  on-call ack
06:48:30  diagnosis complete (new currency column)
06:54:17  PR merged
07:01:42  DAG re-run triggered
07:12:08  DAG complete; freshness SLO met
07:14:00  runbook updated
07:30:00  retro logged: "request vendor to email schema changes 48h ahead"
Enter fullscreen mode Exit fullscreen mode

Rule of thumb: every production incident becomes a runbook entry, and every runbook entry has the same five steps — diagnose, confirm, decide, patch, document. Every page should resolve to less future paging.

Solution Using a versioned silver model + Schema Registry compatibility check

Code (CI gate that catches schema drift before it pages anyone).

# ci/check_schema_compat.py
import sys
from schema_registry_client import SchemaRegistryClient

client = SchemaRegistryClient(url="https://schema-registry.example.com")
SUBJECT = "orders-value"

def check_compat(new_schema_path: str) -> int:
    with open(new_schema_path) as f:
        new_schema = f.read()
    compat = client.test_compatibility(SUBJECT, new_schema)
    if not compat:
        print(f"FAIL: {SUBJECT} schema is NOT backward-compatible.")
        return 1
    print(f"OK: {SUBJECT} schema is backward-compatible with v{client.get_latest_version(SUBJECT).version}.")
    return 0

if __name__ == "__main__":
    sys.exit(check_compat(sys.argv[1]))
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

step result
PR opened with schema change CI runs check_schema_compat.py
CI checks compatibility against latest registered version compat=True if additive
If compat=False PR blocked; producer updates required first
If compat=True PR merges; schema registered as new version
Producer ships the new field consumers tolerate via schema-merge
Silver model patched in same PR downstream tests pass

Output:

change CI result outcome
Add currency (optional string) BACKWARD compat OK merges; no on-call page
Drop region BACKWARD compat FAIL PR blocked
Change amount: double → string BACKWARD compat FAIL PR blocked
Add nested address: struct<...> (optional) BACKWARD compat OK merges

Why this works — concept by concept:

  • Schema Registry as the source of truth — producer-consumer contract is enforced at PR time, not at runtime.
  • BACKWARD compatibility — new schema can read old data; old consumers can read new data (with new field as null).
  • CI as the failure-prevention layer — Layer 0 of observability; the incident never happens because the PR is blocked.
  • Paired with tolerant readers — silver models use on_schema_change='append_new_columns' so they auto-absorb additive changes.
  • Cost — registry check is O(1) per PR; the alternative (on-call page) is O(hours of toil) — the ROI on schema compatibility checks is 100×+.

Python
Topic — exception-handling
Exception-handling drills

Practice →

Python
Topic — defensive-coding
Defensive-coding drills

Practice →


Choosing the right pipeline pattern (cheat sheet)

A one-screen cheat sheet for data pipeline design — pick the pattern that matches your prompt.

Reviewer asks … Pattern Notes
"Batch or streaming?" Pick by consumer SLA, not by team preference Hour+ → batch; sub-minute → streaming
"Lambda or Kappa?" Default to Kappa for new pipelines Lambda only if you need a regulated batch-of-record
"How do you make this idempotent?" MERGE INTO on natural key Most warehouse-native answer
"What if Kafka redelivers an event?" Dedup on event_id dropDuplicates + watermark to bound state
"How do you partition the sink for retries?" Deterministic hash SHA256(natural_key) % N
"How do you backfill yesterday after a bug?" airflow dags backfill --start-date X --end-date X Same code, same {{ ds }}
"How do you backfill in a streaming job?" Reset consumer offsets + replay log Requires retention covering the window
"How do you reprocess the entire history?" Full-table reload via CREATE OR REPLACE Last resort; small tables only
"What's your observability stack?" 4 layers — logs / metrics / traces / SLOs + alerting Name the layer for each failure class
"What's an SLO?" SLI + objective + error budget + burn-rate alert Plus a runbook URL
"What if the schema changes?" Schema Registry + tolerant readers + dbt on_schema_change CI catches incompatible changes
"What if the source is down?" Sensor timeout + alerting + manual trigger on recovery Don't auto-retry forever
"What if a Spark job OOMs?" Right-size memory, broadcast small dims, repartition by high-cardinality key Inspect Spark UI first
"What if a query scans too much?" Partition pruning + CI assertion on bytes_scanned Query-budget guardrails
"What if events arrive late?" withWatermark + allowedLateness + side-output Trust window is the watermark
"What if partitions misalign?" Partition by event_date, not load_date Backfill if discovered after the fact
"What if retries storm a downstream?" Exponential backoff + jitter + capped retries Pause DAG if cause is upstream
"What if the sink can't keep up?" Scale consumers, partition the sink, DLQ on overflow Backpressure is a capacity problem

Frequently asked questions

How do you choose between batch and streaming in a data pipeline design interview?

The senior answer in one sentence: batch is the default — pick streaming only when the consumer SLA is sub-minute, the source is genuinely an event log, and the team has the operational budget for stateful stream jobs; otherwise, batch + tight scheduling is cheaper, simpler, and easier to reason about. Start from the consumer SLA, not from team preference or the cool tool of the week. Hour+ SLA, file-drop source, heavy joins to slowly-changing dimensions, or cost-sensitive workloads all point at batch. Sub-minute SLA, event-driven source (Kafka / Pub/Sub / Kinesis), continuous feature stores, and right-sized state all point at streaming. Modern teams that need both have largely collapsed to Kappa (one streaming log + one streaming job, replayable from offset) to avoid maintaining the two codebases that Lambda forces. Interviewers love when you name the trade-off explicitly: "I'll pick Kappa because the SLA is sub-minute and the source is Kafka; the cost is operational complexity, which I'll mitigate with managed Flink / Spark Structured Streaming."

What's the difference between idempotency and exactly-once semantics?

Idempotency is a property of a transform: running the same code over the same input N times produces the same final state. Exactly-once is a delivery / processing guarantee: each event affects the sink exactly once. In modern pipelines, exactly-once is delivered as a system-level property — at-least-once delivery from the broker (Kafka, Pub/Sub) plus idempotent sinks (MERGE INTO, INSERT … ON CONFLICT, deterministic event_id dedup) — rather than as a magic checkbox on the broker. The interview-canonical recipe: event_id per event + dedup at the sink (dropDuplicates(["event_id"]), MERGE WHEN MATCHED, INSERT ON CONFLICT DO NOTHING) + idempotent storage (Delta atomic commits, transactional Kafka writes, partition-overwrite gold tables). If you reach for "exactly-once is a broker setting" you'll lose the round; if you reach for the recipe, you'll pass the bar.

How do you backfill a streaming pipeline like Kafka + Flink?

Three steps. Step 1 — stop the streaming job so the consumer group has no active members. Step 2 — reset offsets with kafka-consumer-groups --reset-offsets --to-datetime 2026-05-01T00:00:00 --topic events --group my-job --execute (or --to-earliest, --to-offset N). Step 3 — delete the affected sink rows with DELETE FROM target WHERE window_start >= 'X' AND window_start < 'Y' (or drop the partition), then restart the streaming job with the same checkpoint location. The replay reprocesses the rewound offsets through the same code; idempotent sinks (Delta MERGE, INSERT ON CONFLICT, partition overwrite) make the rewrite safe. Pre-requisites: Kafka retention covers the replay window (default 7 days is rarely enough — use 30+ days or compacted topics for serious backfill capability); the streaming job tolerates the old event_time watermark gap; the sink dedupe / overwrite guard is in place. The senior signal in the room is naming log replay as the streaming equivalent of Airflow's --start-date / --end-date — both are "same code, bounded window, idempotent sinks".

What's a sensible freshness SLO for a daily batch pipeline?

For a daily batch pipeline running at 06:00 with a consumer dashboard refreshing at 09:00, a sensible SLO is freshness ≤ 1 hour after the scheduled run, on 99.5% of days, with PagerDuty paging on breach and a 14× burn-rate early warning. The SLI is a gauge of now() - max(_merged_at); the objective is < 3600s; the error budget is 0.5% of days over a 30-day rolling window. Pair the freshness SLO with a completeness SLO (count(distinct region) >= expected_region_count, target 99.5%) so a partial run also pages. Every SLO has six required parts: an SLI (measurable), an SLO (target), an error budget (allowed shortfall), a burn-rate alert (early warning), a paging rule (PagerDuty), and a runbook URL (diagnostic + remediation steps). Skip any of those six and the alert becomes noise rather than signal — and on-call eventually stops responding.

What are the most common production data pipeline failure modes?

The eight failures every senior loop tests are: F1 — schema drift (vendor adds / removes a column; tolerant readers + Schema Registry catch this); F2 — source unavailable (sensor timeout; deferrable sensors + alerting); F3 — out-of-memory (Spark / Flink OOM; right-size memory + broadcast small dims + repartition); F4 — runaway scan (query without partition pruning; CI assertion + query-budget guardrails); F5 — late data (events past watermark; withWatermark + allowedLateness + side-output); F6 — partition misalignment (event_date vs load_date drift; partition by event date, not load date); F7 — retry storm (failing task hammers downstream; exponential backoff + jitter + capped retries); and F8 — downstream backpressure (sink can't keep up; scale consumers / sink, throttle producers, DLQ on overflow). Every failure has a paired runbook — diagnose, confirm, decide, patch, document. The candidate who can name all eight plus their runbooks is the candidate who gets hired as senior or staff.

How do you make a dbt incremental model idempotent and backfill-friendly?

Three rules. Rule 1 — materialized='incremental' + incremental_strategy='merge' + unique_key=['natural_key'] — dbt generates a MERGE on the natural key so retries and backfills don't duplicate. Rule 2 — partition the target by the time dimension (partition_by={'field': 'order_date', 'data_type': 'date'}) so each {{ var("date") }} run touches only one partition; cost is O(rows_in_window), not O(table_rows). Rule 3 — gate the model's WHERE on a templated date variable (WHERE order_date = DATE('{{ var("date") }}')) so forward-fill and backfill use the same SQL; only the variable changes. Combined with Airflow's airflow dags backfill --start-date X --end-date Y (which iterates {{ ds }} over the range and passes it as var("date")), the same code path covers both forward-fill and backfill — no separate "backfill DAG", no parallel logic, no drift. Add on_schema_change='append_new_columns' for schema-drift tolerance and you have a fully idempotent + backfill-friendly silver / gold model.


Practice on PipeCode

PipeCode ships 450+ data-engineering interview problems — including pipeline-design rehearsal sets keyed to ETL, data-processing, streaming, real-time analytics, design, defensive coding, exception handling, and the production-safety patterns every senior loop tests. Whether you're drilling data pipeline design end-to-end or sharpening the four-pillar architecture · idempotency · backfills · observability map, the practice library mirrors the seven-section mental model this guide teaches.

Kick off via Explore practice →; drill the Python practice lane →; fan out into the ETL drills →; sharpen streaming Python drills →; rehearse real-time analytics drills →; reinforce pipeline-design drills →; widen coverage on the full data-processing library →.

Top comments (0)