apache airflow interview questions dominate the orchestration round in every senior data engineering loop because Airflow is the workflow scheduler underneath nearly every modern data platform. Interviewers don't stop at "what is a DAG?" — they probe whether you understand airflow dag dependencies as the only contract between tasks, airflow operators sensors as the building blocks of every pipeline, airflow xcom taskflow api as the parameter-passing mechanism that decouples your tasks, and airflow scheduler executor as the operational story that decides production reliability.
This guide walks through the seven Airflow primitives that show up most often in data engineering interview questions at FAANG and data-platform-heavy shops (Airbnb, Lyft, Stripe, Databricks, Snowflake). Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, and a concept-by-concept breakdown of why it works. By the end you'll be able to design an idempotent ETL DAG, explain Celery vs Kubernetes executor trade-offs, defend retries=3 with exponential backoff, and walk through dynamic task mapping plus dataset-driven scheduling — the exact shape airflow data engineer interview rounds reward when airflow celery executor and airflow datasets come up.
When you want hands-on reps immediately after reading, browse streaming practice library →, drill real-time analytics drills →, and rehearse on Python streaming problems →.
On this page
- Why Airflow shows up in every senior data engineering interview
- DAGs, tasks, and dependencies — the only contract between tasks
- Operators, sensors, and hooks — the building blocks
- XComs and the TaskFlow API — parameter passing done right
- The scheduler, the executor, and parallelism
- Retries, SLAs, backfills, and idempotency
- Modern Airflow — dynamic task mapping, datasets, deferrable operators
- Choosing the right Airflow primitive (cheat sheet)
- Frequently asked questions
- Practice on PipeCode
1. Why Airflow shows up in every senior data engineering interview
Airflow is the cron-on-steroids that every modern data team runs production on
The one-sentence invariant: Airflow is a Python framework for authoring, scheduling, and monitoring directed acyclic graphs (DAGs) of tasks, where each task is a unit of work and each edge is a dependency. Once you internalise that — DAG as scheduling unit, task as work unit, dependency as the only contract — every apache airflow interview questions prompt reduces to "what's the right operator, the right executor, and the right idempotency story for this work?"
Why interviewers love Airflow questions.
- It surfaces orchestration instincts. Does the candidate think in DAGs, or in shell scripts?
- It tests Python fluency. DAGs are Python files; misuse of top-level imports, mutable defaults, or non-deterministic builds shows up immediately.
- It probes operational realism. Retries, SLAs, alerts, backfills, idempotency — none of these matter on day one and all of them matter by year one.
- It hits scaling reality. LocalExecutor vs CeleryExecutor vs KubernetesExecutor — the right choice depends on workload shape, not vibes.
The three primitives every Airflow interview opens with.
- DAG. Directed Acyclic Graph — a Python file declaring a set of tasks plus their dependencies, plus a schedule.
- Task. A single unit of work; a parameterised instance of an Operator.
-
Operator. A reusable Python class that defines what a task does —
PythonOperator,BashOperator,KubernetesPodOperator,EmailOperator, hundreds more.
The five questions that open most Airflow rounds.
- "What's a DAG and why does it have to be acyclic?" — required answer.
- "What's the difference between an Operator and a Task?" — class vs instance.
- "What's a Sensor and when would you use one?" — wait-for-condition, not run-once.
- "How do you pass data between tasks?" — XComs / TaskFlow / shared storage.
- "What executor would you run in production and why?" — Celery for steady state, K8s for elastic / isolated workloads.
What interviewers listen for.
- Do you say idempotent tasks the moment retries are mentioned? — senior signal.
- Do you reach for
KubernetesPodOperatorwhen isolation or custom deps come up? — current-default signal. - Do you mention
catchup=Falsewhen designing a non-backfillable DAG? — practical-experience signal. - Do you bring up dynamic task mapping when the answer requires fan-out over a runtime-known list? — modern-Airflow signal.
Worked example — design a daily warehouse ETL DAG
Detailed explanation. A realistic DE interview question is "design a daily Airflow DAG that extracts orders from PostgreSQL, transforms them in Spark, validates the row counts, loads into Snowflake, and alerts on failure." The answer should be a five-task DAG with explicit dependencies, sensible retries, and catchup=False.
Question. Sketch the DAG with five tasks (extract, transform, validate, load, alert), include schedule, default args, dependencies, and the operator choice for each task.
Code.
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.email import EmailOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.trigger_rule import TriggerRule
default_args = {
"owner": "data-platform",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
"execution_timeout": timedelta(hours=2),
}
@dag(
dag_id="orders_daily_etl",
start_date=datetime(2026, 1, 1),
schedule="0 2 * * *", # 02:00 UTC daily
catchup=False,
default_args=default_args,
tags=["orders", "warehouse"],
)
def orders_daily_etl():
@task
def extract(execution_date=None) -> str:
sql = f"COPY (SELECT * FROM orders WHERE created_at::date = '{execution_date}') TO STDOUT WITH CSV"
path = f"/tmp/orders_{execution_date}.csv"
PostgresHook(postgres_conn_id="orders_db").copy_expert(sql, path)
return path # XCom push
@task
def transform(input_path: str) -> str:
out_path = input_path.replace("/orders_", "/orders_transformed_")
# ... spark-submit or polars transformation ...
return out_path
@task
def validate(transformed_path: str) -> int:
# row count for downstream load idempotency
with open(transformed_path) as f:
return sum(1 for _ in f)
@task
def load(transformed_path: str, row_count: int) -> None:
# write to Snowflake with MERGE on order_id (idempotent)
...
alert = EmailOperator(
task_id="alert_on_failure",
to=["data-oncall@example.com"],
subject="orders_daily_etl FAILED",
html_content="See Airflow UI for details.",
trigger_rule=TriggerRule.ONE_FAILED,
)
raw = extract()
transformed = transform(raw)
rows = validate(transformed)
loaded = load(transformed, rows)
[extract, transform, validate, load] >> alert
orders_daily_etl()
Step-by-step explanation.
-
@dagdeclares the DAG withschedule="0 2 * * *"(cron) andcatchup=False(skip historical fills if the DAG was paused). -
default_argssetretries=3with exponential backoff — every task inherits unless overridden. -
extractusesPostgresHookto dump the day's rows; it returns the file path, which Airflow stores as an XCom and passes totransform. -
transform → validate → loadchain explicit data dependencies via TaskFlow function calls. -
alertusesTriggerRule.ONE_FAILEDso it fires only when some upstream failed, never on success. -
catchup=False+ idempotentload(MERGE onorder_id) means a re-run on the same execution date is safe.
Output (Airflow UI grid).
| Task | Status | Duration | Retries used |
|---|---|---|---|
| extract | success | 18s | 0 |
| transform | success | 2m 04s | 0 |
| validate | success | 6s | 0 |
| load | success | 47s | 0 |
| alert_on_failure | skipped | — | — |
Rule of thumb. Start every Airflow design with schedule + catchup + retries + idempotency. Get those four right and the rest of the DAG is just plumbing.
Airflow interview question on DAG sizing
A senior interviewer often shapes this round as: "We have 200 source tables — should it be one DAG or 200?" The answer tests whether the candidate can reason about scheduler load, alert noise, dependency surface, and rerun granularity.
Solution Using one DAG with dynamic task mapping
from airflow.decorators import dag, task
from datetime import datetime
@dag(dag_id="warehouse_load", schedule="0 3 * * *", catchup=False,
start_date=datetime(2026, 1, 1))
def warehouse_load():
@task
def list_tables() -> list[str]:
return ["orders", "customers", "products", "shipments", "...200 entries..."]
@task
def extract_and_load(table: str) -> None:
# idempotent per-table load (MERGE on natural key)
...
extract_and_load.expand(table=list_tables())
warehouse_load()
Step-by-step trace.
| Step | Action | Effect |
|---|---|---|
| 1 |
list_tables() runs at parse time → returns a list of 200 table names |
one task instance creates a runtime list |
| 2 |
.expand(table=...) instantiates 200 parallel mapped task instances |
Airflow records 200 task instances in the metadata DB |
| 3 | Scheduler queues them all; executor runs max_active_tasks_per_dag at a time |
parallelism bounded by config, not by the number of tables |
| 4 | Each extract_and_load(table=X) is independent and idempotent |
retrying one doesn't affect the other 199 |
| 5 | Failures are surfaced per-table in the UI grid view | alert noise scoped to the failed tables, not the whole DAG |
Output:
| Metric | One-DAG-per-table | One DAG with .expand
|
|---|---|---|
| Scheduler load | 200 × DAG parses | 1 × DAG parse + 200 task instances |
| Alert noise | 200 separate alerts on a global outage | 1 DAG-level alert + per-task UI breakdown |
| Retry granularity | per-table (good) | per-table (same — .expand retries are per-mapped-instance) |
| Adding a new table | new DAG file | update list_tables return value |
Why this works — concept by concept:
-
Dynamic task mapping —
expandturns a runtime-known list into N mapped task instances; each is a first-class task with its own retry, log, and UI cell. Replaces the old "DAG-generation-from-config" anti-pattern. - Idempotency at the task level — because each mapped task uses MERGE on a natural key, retries are safe; this is what makes the one-DAG strategy viable.
-
Bounded parallelism —
max_active_tasks_per_dag(formerlyconcurrency) caps how many mapped tasks run concurrently, so 200 expansions don't melt the warehouse. -
Single source of truth — one DAG file, one
default_argsblock, one schedule. Config drift across 200 DAG files becomes impossible. - Cost — scheduler parse time = O(1 DAG); metadata DB rows = O(N mapped instances) — large, but linear and indexable.
Workflow
Topic — streaming pipelines
Pipeline design problems (DAG sizing, idempotency)
2. DAGs, tasks, and dependencies — the only contract between tasks
airflow dag dependencies are explicit, declarative, and the only thing the scheduler honours
The mental model in one line: a DAG is a Python function that returns a graph; the scheduler reads the graph, not the function. Every dependency must be expressed as an edge — there is no implicit "task B always runs after task A because it's defined below" in Airflow. The >> operator, the set_upstream / set_downstream methods, or TaskFlow's data-dependency-by-function-call are the three ways to declare an edge.
Three ways to set dependencies.
-
a >> b—bis downstream ofa. The most common idiom. -
a.set_downstream(b)/b.set_upstream(a)— same effect; explicit method calls. -
TaskFlow —
b_result = b(a_result)— function call expresses the data dependency, the framework infers the edge.
Trigger rules.
-
all_success(default) — run when every upstream succeeds. -
all_failed— run only if every upstream failed. -
one_failed— run as soon as any upstream fails (good for alert tasks). -
none_skipped— run when no upstream was skipped (good for downstream ofShortCircuitOperator). -
always— run unconditionally (good for cleanup tasks).
The three DAG-level knobs every interview probes.
-
schedule— cron, timedelta, dataset list, orNone(manual only). -
catchup— should the DAG fill in historical runs fromstart_date? DefaultTrue(legacy), preferFalsefor new DAGs unless you specifically want backfills. -
max_active_runs— how many DAG runs can be in flight concurrently? Default16; set to1when runs are serial-dependent.
Worked example — convert a task-chain DAG to TaskFlow
Detailed explanation. Legacy DAGs use PythonOperator(python_callable=fn) and >> to wire tasks. TaskFlow is the modern equivalent — @task decorators turn ordinary Python functions into tasks, and function call syntax expresses both dependency and XCom in one place.
Question. Convert a 3-task ETL chain from classic PythonOperator style to TaskFlow.
Input (classic).
def fetch(): return "/tmp/raw.csv"
def transform(input_path): return input_path.replace("raw", "clean")
def load(clean_path): print(f"loading {clean_path}")
with DAG("etl_classic", schedule="@daily", catchup=False) as dag:
t1 = PythonOperator(task_id="fetch", python_callable=fetch)
t2 = PythonOperator(task_id="transform", python_callable=transform,
op_args=["{{ ti.xcom_pull(task_ids='fetch') }}"])
t3 = PythonOperator(task_id="load", python_callable=load,
op_args=["{{ ti.xcom_pull(task_ids='transform') }}"])
t1 >> t2 >> t3
Code (TaskFlow).
@dag(dag_id="etl_taskflow", schedule="@daily", catchup=False,
start_date=datetime(2026, 1, 1))
def etl_taskflow():
@task
def fetch() -> str:
return "/tmp/raw.csv"
@task
def transform(input_path: str) -> str:
return input_path.replace("raw", "clean")
@task
def load(clean_path: str) -> None:
print(f"loading {clean_path}")
load(transform(fetch()))
etl_taskflow()
Step-by-step explanation.
- Each
@taskregisters the function as an Airflow Task in the DAG context. The task id defaults to the function name. - Calling
fetch()returns an XComArg (a lazy reference to the result), not a string. Airflow records the edge. -
transform(fetch())creates an edgefetch → transform. The XComArg fromfetchis automatically resolved whentransformruns. -
load(transform(fetch()))creates the second edge. - No Jinja
{{ ti.xcom_pull(...) }}boilerplate. Noop_args. No explicit>>.
Output (UI graph view).
| Task | Status | Notes |
|---|---|---|
| fetch | success | XCom: /tmp/raw.csv
|
| transform | success | XCom: /tmp/clean.csv
|
| load | success | prints loading /tmp/clean.csv
|
Rule of thumb. Use TaskFlow for new Python-heavy DAGs. Fall back to classic operators only when you need a specific operator that doesn't have a TaskFlow equivalent (KubernetesPodOperator, BashOperator in some cases).
Airflow interview question on trigger rules
A common probe: "I want a cleanup task that runs whether the pipeline succeeded or failed, but skip if the upstream was skipped. Which trigger rule?"
Solution Using TriggerRule.NONE_SKIPPED
from airflow.utils.trigger_rule import TriggerRule
cleanup = EmptyOperator(
task_id="cleanup",
trigger_rule=TriggerRule.NONE_SKIPPED,
)
Step-by-step trace.
| Upstream A | Upstream B | Cleanup runs? | Reason |
|---|---|---|---|
| success | success | yes | no skips |
| success | failed | yes | no skips |
| failed | failed | yes | no skips |
| skipped | success | no | one upstream skipped |
| success | skipped | no | one upstream skipped |
Output:
| Trigger rule | When does cleanup run? |
|---|---|
all_success |
only when both succeed |
all_done |
regardless of upstream outcome (including skips) |
none_failed |
runs unless any upstream failed |
none_skipped |
runs unless any upstream skipped — the right pick for "always cleanup, except when the pipeline was deliberately short-circuited" |
Why this works — concept by concept:
- Trigger rule as the edge-evaluator — the scheduler evaluates the trigger rule against the terminal states of the upstream tasks, not their return values.
-
NONE_SKIPPED— special-case "I want unconditional cleanup, but a deliberate skip from aShortCircuitOperatorupstream should propagate." Saves you from running cleanup when the pipeline was deliberately cancelled mid-flight. -
ALL_DONE— close cousin; runs after any terminal state including skipped. Use this when you truly want "always run." -
ONE_FAILED— the alert-task workhorse; fires on first failure so you get notified fast. - Cost — trigger rules are evaluated by the scheduler on every state transition; choice is essentially free at runtime.
Workflow
Topic — real-time analytics
Pipeline orchestration problems
3. Operators, sensors, and hooks — the building blocks
airflow operators sensors are the difference between "run this" and "wait for this"
Operators define what a task does. Sensors are operators that wait for an external condition before running anything else. Hooks are the connection-pool / SDK wrappers underneath both. The senior interview hinges on knowing which to reach for, and when.
The five operator families every Airflow interview covers.
-
PythonOperator/@task— run a Python callable. The default workhorse. -
BashOperator— run a shell command. Useful for one-line scripts and quick wrappers. -
KubernetesPodOperator— run an arbitrary container in a Kubernetes pod. The standard answer for isolation, custom dependencies, or non-Python workloads. -
Provider operators —
SnowflakeOperator,BigQueryInsertJobOperator,S3ListOperator,DbtCloudRunJobOperator, hundreds more. Read the docs for the provider you're using. -
EmptyOperator(formerlyDummyOperator) — does nothing; useful as a join point or named milestone.
Sensors — wait until X is true.
-
FileSensor,S3KeySensor,ExternalTaskSensor,SqlSensor,TimeDeltaSensor,HttpSensor. -
Sensors hold a worker slot by default —
mode="poke"(poll every N seconds) keeps a worker busy. -
mode="reschedule"— release the slot between pokes; the scheduler re-queues the sensor task. Way better for long waits. -
Deferrable sensors —
deferrable=Truepunts the wait to an async Triggerer process. Free worker slot entirely. Modern default for cheap waits.
Hooks — the connection layer.
-
PostgresHook,SnowflakeHook,S3Hook,BaseHook.get_connection(...). - Hooks fetch credentials from Airflow Connections (UI / env / Secrets backend). Never hard-code credentials in DAGs.
- Hooks return native clients — call SDK methods directly.
Worked example — replace a time.sleep poll loop with a deferrable sensor
Detailed explanation. A junior pipeline might time.sleep(60) in a loop to wait for a partition to arrive. This holds a worker slot for the entire wait. A deferrable S3KeySensor punts the wait to the Triggerer and releases the slot — same correctness, ~100× less compute.
Question. Rewrite a "wait for s3://lake/clicks/dt=2026-05-27/_SUCCESS" check from a polling Python task to a deferrable sensor.
Input (anti-pattern).
@task
def wait_for_partition_bad():
import time, boto3
s3 = boto3.client("s3")
key = "clicks/dt=2026-05-27/_SUCCESS"
while True:
try:
s3.head_object(Bucket="lake", Key=key)
return
except Exception:
time.sleep(60)
# ⚠️ holds a worker slot the entire time
Code (deferrable sensor).
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_partition = S3KeySensor(
task_id="wait_for_partition",
bucket_key="clicks/dt={{ ds }}/_SUCCESS",
bucket_name="lake",
poke_interval=60,
timeout=60 * 60 * 2, # 2 hours max
deferrable=True, # punt to Triggerer
)
Step-by-step explanation.
-
S3KeySensoris a provider-supplied sensor that wraps the underlying S3head_objectcheck. -
bucket_key="clicks/dt={{ ds }}/_SUCCESS"uses Jinja templating;{{ ds }}resolves to the execution-date string. -
deferrable=Trueregisters the wait with the Triggerer process. The task moves todeferredstate and releases its worker slot. - The Triggerer polls S3 at
poke_intervalcadence using async I/O — one Triggerer handles thousands of waits concurrently on a single core. - When the key appears, the Triggerer fires an event that re-queues the task to run its "complete" step, which immediately succeeds.
Output.
| Metric | Polling task | Deferrable sensor |
|---|---|---|
| Worker slot held | yes, entire wait | no, released on defer |
| Wait responsiveness | poke_interval |
same |
| Concurrent waits per worker | 1 | bounded by Triggerer (1000s) |
| Code complexity | manual try/except loop | one operator call |
Rule of thumb. Any wait longer than ~5 minutes should be a deferrable sensor (or mode="reschedule" if a deferrable variant doesn't exist).
Airflow interview question on choosing the right operator
A common probe: "Where would KubernetesPodOperator win over PythonOperator?"
Solution Using KubernetesPodOperator for isolation and custom deps
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
ml_train = KubernetesPodOperator(
task_id="ml_train",
image="myorg/ml-train:2026-05-27",
name="ml-train",
namespace="data-eng",
cmds=["python", "-m", "ml_train"],
arguments=["--date={{ ds }}"],
resources={"request_memory": "8Gi", "request_cpu": "4", "limit_memory": "16Gi", "limit_cpu": "8"},
get_logs=True,
is_delete_operator_pod=True,
)
Step-by-step trace.
| Step | Action | Effect |
|---|---|---|
| 1 | Scheduler schedules ml_train
|
task transitions to queued
|
| 2 | Executor (KubernetesExecutor or any) launches the pod | pod created in data-eng namespace |
| 3 | Pod pulls myorg/ml-train:2026-05-27
|
image is the only dependency surface |
| 4 | Pod runs python -m ml_train --date=2026-05-27
|
logs streamed back to Airflow UI |
| 5 | Pod exits 0 | Airflow marks task success; pod deleted |
Output:
| Concern | PythonOperator |
KubernetesPodOperator |
|---|---|---|
| Dependencies | shared with Airflow worker (conflicts!) | sealed inside the image |
| Resource isolation | shares worker memory / CPU | own pod, own limits |
| Python version | Airflow's | any |
| Restart blast radius | takes out the worker | takes out one pod |
| Cost | tiny overhead | ~5–10s pod-spinup latency |
Why this works — concept by concept:
- Image-as-dependency-boundary — every dep, every version, every system library is captured in the container image. Reproducible by definition.
-
Per-task resource limits —
request_memory,request_cpu,limit_*flow straight to the pod spec. Airflow doesn't have to share memory with your model-training code. -
Logs streamed via
get_logs=True— pod stdout reaches the Airflow UI as if the task ran in the worker. -
is_delete_operator_pod=True— pod is cleaned up on success; on failure the pod is retained for forensics by default (set toTrueonly when storage matters). - Cost — pod startup latency is ~5–10 seconds; for tasks that run < 30 seconds this is a meaningful overhead. For anything else, the isolation pays off.
Workflow
Topic — streaming (Python)
Operator + sensor design problems
4. XComs and the TaskFlow API — parameter passing done right
airflow xcom taskflow api is the small-payload bus between tasks — keep payloads small
The mental model: XCom (cross-communication) is a key-value store backed by the Airflow metadata DB; tasks xcom_push and xcom_pull to share scalars and small objects. TaskFlow wraps this in a Pythonic interface — return a value from a @task function, accept it as an argument in another, and the framework handles the push/pull.
XCom limits and gotchas.
- Default backend = metadata DB. Don't put a 50MB DataFrame in there. Default per-XCom serialization is JSON (size-limited).
- Custom XCom backends — write to S3 / GCS / a custom store and only put the pointer in the DB. Standard for big-payload pipelines.
- XCom is per task instance. Every retry creates a new XCom; downstream reads the latest.
-
xcom_pull(task_ids=..., key="return_value")— the explicit form when you're outside TaskFlow.
TaskFlow benefits.
- No Jinja boilerplate — function arg → XCom pull happens implicitly.
- Type hints carry through — IDE / mypy can reason about the data flow.
-
Multiple outputs —
@task(multiple_outputs=True)lets a task return a dict that's split into named XComs.
Worked example — fan-out + fan-in with XComs
Detailed explanation. A common interview pattern: one task produces a list of partition keys; downstream tasks fan out across the list; a final task fans the results back in.
Question. Build a DAG that lists partitions, processes each partition in parallel, and writes a single summary row at the end.
Code.
@dag(dag_id="partition_fanout", schedule="@daily", catchup=False,
start_date=datetime(2026, 1, 1))
def partition_fanout():
@task
def list_partitions(ds=None) -> list[str]:
return [f"{ds}-r{i}" for i in range(5)]
@task
def process(partition: str) -> dict:
# transform & write the partition, return a summary
return {"partition": partition, "rows": 100_000}
@task
def summarise(summaries: list[dict]) -> int:
total = sum(s["rows"] for s in summaries)
print(f"total rows = {total}")
return total
summaries = process.expand(partition=list_partitions())
summarise(summaries)
partition_fanout()
Step-by-step explanation.
-
list_partitionsreturns a list of 5 strings — pushed as one XCom. -
process.expand(partition=list_partitions())instantiates 5 mappedprocesstasks; each gets one item from the list. - Each mapped
process(partition=X)runs independently and returns a dict (its own XCom). -
summarise(summaries)declares it depends on every mappedprocesstask. Airflow assembles the list-of-dicts XCom automatically and passes it assummaries. -
summarisereturns the total — final XCom.
Output.
| Task | XCom return |
|---|---|
| list_partitions | ["2026-05-27-r0", ..., "2026-05-27-r4"] |
| process[0..4] | each → {"partition": "...", "rows": 100000}
|
| summarise | 500000 |
Rule of thumb. Use XCom for coordination metadata, not for data. Pass a path, a row-count, a status, a checkpoint id — not a DataFrame.
Airflow interview question on XCom backends for big payloads
The probe: "What if process returns a 200 MB DataFrame instead of a dict?"
Solution Using a custom S3-backed XCom backend
# airflow.cfg or env: AIRFLOW__CORE__XCOM_BACKEND = "myorg.airflow.s3_xcom.S3XComBackend"
# myorg/airflow/s3_xcom.py
from airflow.models.xcom import BaseXCom
import pickle, uuid, boto3
S3 = boto3.client("s3")
BUCKET = "airflow-xcom"
class S3XComBackend(BaseXCom):
@staticmethod
def serialize_value(value, **_):
key = f"xcom/{uuid.uuid4()}.pkl"
S3.put_object(Bucket=BUCKET, Key=key, Body=pickle.dumps(value))
return BaseXCom.serialize_value(f"s3://{BUCKET}/{key}")
@staticmethod
def deserialize_value(result):
ref = BaseXCom.deserialize_value(result)
bucket, key = ref.replace("s3://", "").split("/", 1)
return pickle.loads(S3.get_object(Bucket=bucket, Key=key)["Body"].read())
Step-by-step trace.
| Step | Action | Effect |
|---|---|---|
| 1 | task returns a 200 MB DataFrame |
serialize_value is called |
| 2 | DataFrame is pickled and put to S3 at xcom/{uuid}.pkl
|
one S3 PUT, ~200 MB |
| 3 | the pointer s3://airflow-xcom/xcom/{uuid}.pkl is JSON-serialised into the metadata DB |
DB row stays small (a few bytes) |
| 4 | downstream task accepts the value; deserialize_value is called |
one S3 GET, DataFrame returned to user code |
Output:
| Approach | Metadata DB row size | Max payload | Cost |
|---|---|---|---|
| Default (DB-backed XCom) | size of JSON | ~1 MB (config-limited) | DB I/O |
| S3-backed XCom | ~100 bytes (pointer) | unbounded | one S3 PUT + GET per task pair |
Why this works — concept by concept:
- Decouple metadata from data — the metadata DB only ever sees a pointer; the bulk goes to object storage that's built for it.
-
Pluggable XCom backend — Airflow's
AIRFLOW__CORE__XCOM_BACKENDsetting lets you swap the serialization layer without changing any DAG code. - Pickle for arbitrary Python objects — fine inside a tight Airflow / Python version boundary; switch to Parquet/Arrow for cross-language interop.
- Lifecycle — orphaned S3 objects need a janitor (TTL on the bucket or a daily sweep DAG) — XCom-clear doesn't reach into S3.
- Cost — one S3 PUT + GET per task pair; negligible at most scales; cheaper than blowing up the metadata DB.
Workflow
Topic — streaming (Python)
XCom + TaskFlow design problems
5. The scheduler, the executor, and parallelism
airflow scheduler executor decides production reliability — pick the executor that matches your workload shape
The Airflow components.
- Webserver — the UI. Reads from the metadata DB; not on the critical path.
- Scheduler — parses DAG files, schedules task instances, hands them to the executor.
- Executor — runs tasks. Pluggable.
- Metadata DB — Postgres (recommended) or MySQL — stores DAG runs, task instances, XComs, connections.
- Triggerer — async process that handles deferrable sensors / operators.
Executor choices.
-
SequentialExecutor— single task at a time, single process. Dev only. SQLite metadata DB only. -
LocalExecutor— multiple tasks in parallel on the scheduler host. Postgres / MySQL metadata DB. Fine for small teams. -
CeleryExecutor— distributed worker pool with a Redis or RabbitMQ broker. The steady-state production default. -
KubernetesExecutor— each task runs in its own pod. Elastic, isolated, no idle workers. Great for spiky workloads. -
CeleryKubernetesExecutor— hybrid; Celery for steady-state, K8s pods for tasks tagged with a specific queue.
Scheduler tuning knobs.
-
parallelism— global cap on concurrent task instances across the whole Airflow deployment. -
max_active_runs_per_dag— concurrent DAG runs of a single DAG. -
max_active_tasks_per_dag(formerlydag_concurrency) — concurrent tasks within a single DAG run. -
scheduler_heartbeat_sec— how often the scheduler loops; default 5s; lower for sub-second pickup.
Worked example — Celery vs Kubernetes executor choice
Detailed explanation. A common interview probe: "We run 2,000 DAGs, mostly short pure-Python tasks, but ~50 are long ML jobs with custom GPU containers. Which executor?"
Question. Pick an executor strategy and justify it.
Code (hybrid CeleryKubernetes setup).
# airflow.cfg
# [core]
# executor = CeleryKubernetesExecutor
# In a DAG: tag GPU tasks to the kubernetes queue
ml_train = PythonOperator(
task_id="ml_train",
python_callable=train,
queue="kubernetes", # routes this task to the K8s side
executor_config={
"pod_override": k8s_pod_spec_with_gpu(),
},
)
# Default tasks (no `queue=`) go to Celery workers.
Step-by-step explanation.
-
CeleryKubernetesExecutorruns both paths side-by-side. Tasks withqueue="kubernetes"route to K8s; everything else to Celery. - Celery worker pool sustains the 2,000-DAG steady-state load — workers stay warm, no per-task pod-spin overhead.
- The 50 GPU jobs spawn dedicated pods with GPU resource requests; pod lifecycle is per-task; no GPU is held idle between runs.
- Failure isolation: a misbehaving ML container can't OOM-kill a Celery worker.
- Cost: Celery pool is right-sized to steady-state; K8s autoscaler picks up the GPU pods on demand.
Output (executor decision matrix).
| Workload shape | Right executor |
|---|---|
| Dev / single laptop | LocalExecutor |
| Small team, 10s of DAGs, no isolation needs | LocalExecutor |
| Production steady-state, 100s of DAGs, Python-heavy | CeleryExecutor |
| Spiky workloads, varied resource needs | KubernetesExecutor |
| Mixed steady + spiky / GPU / isolated | CeleryKubernetesExecutor |
Rule of thumb. Celery for steady state, Kubernetes for elastic / isolated, hybrid for mixed. The exam-trap answer is "always KubernetesExecutor" — wrong if your tasks are all short.
Airflow interview question on scheduler scaling
A senior probe: "Our scheduler is at 100% CPU and DAGs are not being parsed fast enough. What knobs?"
Solution Using multi-scheduler HA + tuned parser settings
# airflow.cfg
[scheduler]
# Run multiple schedulers (Airflow 2.0+ is HA-native)
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
parsing_processes = 2 # tune based on CPU
dag_dir_list_interval = 300 # how often the scheduler lists DAGs folder
file_parsing_sort_mode = modified_time
min_file_process_interval = 30 # don't re-parse a DAG file more often than 30s
[core]
parallelism = 256
max_active_tasks_per_dag = 32
Step-by-step trace.
| Step | Knob | Effect |
|---|---|---|
| 1 | run 2–3 scheduler replicas | active/active HA; failure of one scheduler doesn't pause scheduling |
| 2 |
parsing_processes=2 per scheduler |
DAG file parsing parallelised |
| 3 | min_file_process_interval=30 |
prevents the scheduler from re-parsing a DAG file every loop |
| 4 | parallelism=256 |
global task concurrency cap raised |
| 5 | metadata DB tuned with proper indexes + Postgres connection pool | DB stops being the bottleneck |
Output:
| Metric | Before | After |
|---|---|---|
| Scheduler CPU | 100% | ~60% per replica |
| DAG parse lag | 90s | < 10s |
| Task slot pickup | 5s | < 1s |
| Resilience | single scheduler | active/active HA |
Why this works — concept by concept:
- Active/active scheduler HA — Airflow 2.0+ schedulers coordinate via the metadata DB. Multiple replicas split parse + scheduling work and tolerate single-process loss.
-
Tuned parser settings —
parsing_processesandmin_file_process_intervalare the two knobs that decide how much CPU the parser consumes. Tune to your DAG count. -
parallelismas a global cap — protects the metadata DB and the executor from runaway concurrency. -
Metadata DB as the real bottleneck — Postgres connection pool + proper indexes (
dag_id, execution_date,task_id, dag_run_id) move the limit much higher than the defaults. - Cost — running N schedulers is N× metadata DB load; usually worth it for steady-state production.
Workflow
Topic — streaming pipelines
Scheduler + executor sizing problems
6. Retries, SLAs, backfills, and idempotency
airflow retries idempotency is the production-realism round — answers separate juniors from seniors
Every senior airflow data engineer interview probes the operational story. Retries without idempotency is broken. SLAs without alerts is theatre. Backfills without partition-aware tasks is data corruption.
Retries — the four-knob recipe.
-
retries=3— number of attempts after the first failure. -
retry_delay=timedelta(minutes=5)— base delay between retries. -
retry_exponential_backoff=True— multiply the delay each retry (5m, 10m, 20m, ...). -
max_retry_delay=timedelta(minutes=30)— cap the exponential growth.
Idempotency — the core invariant.
- A task is idempotent if running it twice produces the same end state.
- For loads — use MERGE on a natural key.
- For appends — partition by
dsand overwrite the partition. - For external API calls — use an idempotency key.
- Tasks that aren't idempotent should NOT have
retries > 0.
Backfills — re-run historical runs.
-
airflow dags backfill -s 2026-01-01 -e 2026-01-31 my_dag— re-run the daily DAG for every day in the window. -
catchup=True— when the DAG is unpaused, automatically run every missing schedule fromstart_dateto now. -
Default to
catchup=Falsefor new DAGs unless you specifically need backfills.
SLAs and alerts.
-
sla=timedelta(hours=1)on a task — if the task isn't done within the SLA window, Airflow emits an SLA miss event. - Plug into
sla_miss_callbackor hook into your alerting (PagerDuty, Slack, OpsGenie). - For binary "task failed" alerts, use
on_failure_callbackor a downstreamEmailOperatorwithTriggerRule.ONE_FAILED.
Worked example — make a non-idempotent task idempotent
Detailed explanation. A naive INSERT INTO orders after re-running produces duplicate rows. The senior answer is MERGE keyed on order_id, or delete-then-insert keyed on partition.
Question. Convert a duplicate-prone INSERT load into an idempotent MERGE.
Input (anti-pattern).
@task
def load_bad(rows):
hook = SnowflakeHook(snowflake_conn_id="warehouse")
for r in rows:
hook.run(f"INSERT INTO orders VALUES ({r['order_id']}, '{r['status']}')")
# ⚠️ retry-on-failure → duplicate rows
Code (idempotent MERGE).
@task
def load_idempotent(rows):
hook = SnowflakeHook(snowflake_conn_id="warehouse")
# write to a staging table first
hook.run("CREATE OR REPLACE TEMP TABLE stg AS SELECT * FROM orders WHERE 1=0")
hook.insert_rows("stg", rows)
hook.run("""
MERGE INTO orders t
USING stg s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET status = s.status
WHEN NOT MATCHED THEN INSERT (order_id, status) VALUES (s.order_id, s.status)
""")
Step-by-step explanation.
- Create a temporary staging table with the same schema (empty).
- Bulk-load
rowsinto the staging table. -
MERGEkeyed onorder_id—INSERTfor new rows,UPDATEfor existing rows. - The temp table is dropped automatically at session end.
- Running this task twice produces the same end state — the second run finds every row already present and just updates
status(no-op if unchanged).
Output (after two runs).
| Approach | Rows after 1st run | Rows after 2nd run |
|---|---|---|
| Bad INSERT | N | 2N (duplicated) |
| Idempotent MERGE | N | N |
Rule of thumb. Every task that writes to a persistent store should be either MERGE-keyed or partition-overwrite. If you can't make it idempotent, set retries=0 and rely on manual re-run after fixing.
Airflow interview question on SLA misses
The probe: "Our nightly DAG misses its 04:00 SLA twice a week — how would you diagnose?"
Solution Using SLA callbacks + task-level alerts
from airflow import settings
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
for sla in slas:
send_pagerduty(
summary=f"SLA miss: {sla.dag_id}.{sla.task_id}",
details={"execution_date": str(sla.execution_date)},
)
@dag(
dag_id="nightly_etl",
schedule="0 2 * * *",
catchup=False,
sla_miss_callback=sla_miss_callback,
start_date=datetime(2026, 1, 1),
)
def nightly_etl():
@task(sla=timedelta(hours=2))
def extract(): ...
@task(sla=timedelta(hours=3))
def transform(): ...
# ...
Step-by-step trace.
| Event | Action | Effect |
|---|---|---|
| 02:00 | DAG run starts | tasks queued |
| 04:00 |
extract not yet done |
scheduler emits SLA-miss event for extract (sla=2h) |
| 04:00 |
sla_miss_callback invoked |
PagerDuty alert fired |
| 05:00 |
transform not yet done |
scheduler emits SLA miss for transform
|
| 05:30 | both tasks complete | task statuses succeed; SLA events are historical |
Output:
| Diagnosis path | Tool | What you learn |
|---|---|---|
| 1. Recent run durations | Airflow UI grid | which task is dragging |
| 2. Per-task logs | task instance log | what the task is doing slowly |
| 3. Worker resource pressure | metric system | CPU / memory contention |
| 4. Upstream data delays | source-system metrics | input not ready in time |
Why this works — concept by concept:
- Task-level SLA — defined per task, evaluated against the DAG run's logical date. Decouples "DAG started late" from "task is slow."
-
sla_miss_callback— runs in the scheduler process; keep it short and async (just emit, don't wait for response). - Diagnostic path — UI grid → task log → worker metrics → upstream data. Walking that ladder is what senior interviewers want to see.
- Cost — SLA evaluation happens in the scheduler loop; lightweight by design. Callbacks must be fast or they'll back up the loop.
Workflow
Topic — streaming pipelines
Retry + idempotency + SLA problems
7. Modern Airflow — dynamic task mapping, datasets, deferrable operators
Three patterns every senior apache airflow interview questions round expects you to know
Dynamic Task Mapping (Airflow 2.3+).
-
task.expand(arg=upstream_xcom)— runtime fan-out across a list. -
task.partial(arg=fixed).expand(arg=...)— combine fixed args with mapped args. - Mapped task instances are first-class — each has its own retry, log, UI cell.
- Replaces the legacy "generate-DAG-from-config" anti-pattern.
Datasets-driven scheduling (Airflow 2.4+).
- Declare datasets —
clicks = Dataset("s3://lake/clicks"). - Producer DAGs declare
outlets=[clicks]on the producing task. - Consumer DAGs use
schedule=[clicks]— they run when the dataset is updated. - Removes the need for cross-DAG
ExternalTaskSensorpatterns.
Deferrable operators (Airflow 2.2+).
-
deferrable=Trueon supported operators (S3KeySensor, BigQueryInsertJobOperator, etc.) punts the wait to the Triggerer. - Triggerer is an async process — handles thousands of waits on a single core.
- Free worker slot entirely; massive cost savings for long waits.
Airflow 3 highlights (late-2025 / 2026 GA).
- Decoupled execution API — workers don't talk directly to the metadata DB.
- Better dataset semantics (versioned datasets, dataset events).
- Improved scheduler throughput; UI refresh.
Worked example — producer DAG + two consumer DAGs via Datasets
Detailed explanation. Replace a tangled web of ExternalTaskSensor cross-DAG waits with dataset-driven scheduling.
Question. Producer DAG build_clicks_daily writes to s3://lake/clicks/dt=*. Two consumer DAGs (refresh_dashboard, build_features) need to run after the producer.
Code.
from airflow import Dataset
clicks = Dataset("s3://lake/clicks")
# producer DAG
@dag(dag_id="build_clicks_daily", schedule="0 2 * * *", catchup=False,
start_date=datetime(2026, 1, 1))
def build_clicks_daily():
@task(outlets=[clicks]) # declare the dataset write
def build():
# ... write to s3://lake/clicks/dt=YYYY-MM-DD/ ...
return "ok"
build()
build_clicks_daily()
# consumer DAG 1
@dag(dag_id="refresh_dashboard", schedule=[clicks], catchup=False,
start_date=datetime(2026, 1, 1))
def refresh_dashboard():
@task
def refresh():
...
refresh()
refresh_dashboard()
# consumer DAG 2
@dag(dag_id="build_features", schedule=[clicks], catchup=False,
start_date=datetime(2026, 1, 1))
def build_features():
@task
def build():
...
build()
build_features()
Step-by-step explanation.
-
clicks = Dataset("s3://lake/clicks")— a logical handle. Airflow stores this as a dataset entity. - The producer's
buildtask declaresoutlets=[clicks]— Airflow records "this task updatesclicks." - When
buildsucceeds, Airflow fires a dataset event. - Both consumer DAGs have
schedule=[clicks]— they're listening. The scheduler creates a DAG run for each consumer immediately. - No
ExternalTaskSensorneeded; no cross-DAG dependency declared by hand.
Output (DAG run timeline).
| t | DAG | Event |
|---|---|---|
| 02:00 | build_clicks_daily | scheduled run starts |
| 02:18 | build_clicks_daily.build | success → dataset clicks updated |
| 02:18 | refresh_dashboard | new run scheduled |
| 02:18 | build_features | new run scheduled |
| 02:18 | ... | both consumers run in parallel |
Rule of thumb. Whenever you reach for ExternalTaskSensor across DAGs that share a logical dataset, reach for Dataset instead.
Airflow interview question on choosing between dynamic mapping and a for-loop
The probe: "Why use .expand() instead of a Python for-loop to create N similar tasks?"
Solution Using .expand() for runtime-known lists
# Anti-pattern — for-loop at DAG-parse time
@dag(...)
def for_loop_dag():
@task
def process(p): ...
partitions = ["a", "b", "c"] # known at parse time
for p in partitions:
process.override(task_id=f"process_{p}")(p)
for_loop_dag()
# Modern — runtime-known list
@dag(...)
def expand_dag():
@task
def list_partitions() -> list[str]:
# returns whatever is in the source today (runtime)
return ["a", "b", "c", "d"]
@task
def process(p): ...
process.expand(p=list_partitions())
expand_dag()
Step-by-step trace.
| Strategy | Determined at | Adds a new partition without code change | UI cell per partition |
|---|---|---|---|
| For-loop | DAG-parse time | no — requires redeploy | yes |
.expand() |
DAG-run time | yes — list_partitions returns the new value |
yes |
Output:
| Use case | Pick |
|---|---|
| Fixed list known at deploy time | for-loop is fine |
| List comes from a query / API call | .expand() |
| Hundreds of items, frequent changes | .expand() |
| Need per-item retry / log isolation | both work; .expand() is the modern default |
Why this works — concept by concept:
-
Parse-time vs run-time — a for-loop runs at DAG-parse time and bakes the partitions into the graph.
.expand()defers the decision to DAG-run time, picking up changes without redeploys. -
Mapped task instances are first-class — every
.expand()instance has its own retry, its own log, its own UI cell. Same fidelity as a for-loop, less code. -
Bounded parallelism —
.expand()respectsmax_active_tasks_per_dag; you can't accidentally spawn 10,000 concurrent tasks. -
Modern default — combined with TaskFlow,
.expand()is the cleanest way to express fan-out in Airflow 2.4+. -
Cost —
list_partitionsruns once per DAG run;.expand()records one row per mapped instance in the metadata DB.
Workflow
Topic — real-time analytics
Modern Airflow patterns (mapping, datasets, deferrable)
Choosing the right Airflow primitive (cheat sheet)
-
Need to wait for a file / partition? Use a deferrable sensor (
deferrable=Trueormode="reschedule"), not a Python polling loop. -
Need to run an arbitrary container?
KubernetesPodOperator— sealed deps, per-task resources, isolation. -
Need to fan out over a runtime-known list?
task.expand()— never DAG-generation-from-config. -
Need to chain a downstream DAG after another?
schedule=[Dataset(...)]— notExternalTaskSensor. -
Need parameter passing between tasks? TaskFlow
@task+ function return value — XCom under the hood. - Need big-payload parameter passing? Custom XCom backend writing to S3 / GCS, pointer in metadata DB.
-
Need scheduled retries on transient failure?
retries=3+retry_exponential_backoff=True+max_retry_delay=.... - Need at-least-once writes to a warehouse? MERGE on the natural key — every task must be idempotent before retries are safe.
-
Need an alert on failure? Downstream task with
TriggerRule.ONE_FAILED, oron_failure_callbackfor cleaner separation. - Need cross-region HA? Multi-scheduler Airflow + replicated metadata DB.
Frequently asked questions
What is a DAG in Apache Airflow, and why does it have to be acyclic?
A DAG (Directed Acyclic Graph) is the unit of scheduling in Airflow — a Python file declaring a set of tasks and their dependencies. It must be acyclic because Airflow needs a deterministic topological order to run the tasks; a cycle (task A depends on task B which depends on A) has no valid order. The "directed" part means every dependency is one-way — extract >> transform means transform runs after extract, never the reverse.
What's the difference between an Airflow Operator, a Task, and a Sensor?
An Operator is a Python class that defines what a task does (e.g. PythonOperator, BashOperator). A Task is a parameterised instance of an Operator inside a DAG — the operator is the recipe, the task is the meal. A Sensor is a specialised Operator that waits for an external condition (file arrives, time passes, upstream DAG completes) before its downstream tasks can run. Deferrable sensors offload the wait to the Triggerer process and don't hold a worker slot.
How does Airflow pass data between tasks (XComs vs TaskFlow)?
XComs (cross-communication) are small key-value records in the metadata DB; tasks xcom_push and xcom_pull to share scalars and small JSON payloads. TaskFlow (the modern @task decorator API) wraps XComs in a Pythonic interface — you return a value from one task and accept it as an argument in another, and Airflow handles push/pull automatically. For big payloads, configure a custom XCom backend that writes to S3/GCS and keeps only a pointer in the metadata DB.
Which Airflow executor should I use in production — Local, Celery, or Kubernetes?
LocalExecutor is fine for dev or very small deployments. CeleryExecutor is the steady-state production default — a distributed worker pool, warm workers, low per-task overhead. KubernetesExecutor spawns a pod per task — best for spiky workloads, isolation, or per-task custom dependencies. CeleryKubernetesExecutor combines both: Celery for the steady-state load, K8s pods for tasks tagged with a specific queue. Pick based on workload shape: warm + frequent → Celery, spiky + isolated → Kubernetes.
What's the right way to handle retries and backfills in Airflow?
For retries: set retries=3 with retry_exponential_backoff=True and max_retry_delay=timedelta(minutes=30). Every retryable task must be idempotent — use MERGE on a natural key, partition-overwrite, or an idempotency key. For backfills: default new DAGs to catchup=False to avoid surprise floods of historical runs. Run explicit backfills via airflow dags backfill -s ... -e ... when you actually need to fill a gap. Idempotency is the precondition for both retries and backfills to be safe.
How does dynamic task mapping work and when should I use it?
task.expand(arg=upstream_xcom) instantiates N mapped task instances at runtime, one per item in the upstream list. Each mapped instance is a first-class task with its own retry, log, and UI cell. Use it when the list is known only at DAG-run time (e.g. "list every partition that arrived today") or when the list is large enough that hard-coding it in the DAG file would be unwieldy. It replaces the legacy "DAG-generation-from-config" anti-pattern.
Practice on PipeCode
- Drill the streaming practice library → for orchestration-shaped pipeline questions.
- Rehearse real-time analytics drills → for dataset-driven scheduling and DAG triggering.
- Sharpen Python streaming problems → when the interviewer wants TaskFlow code, not pseudocode.
- For the broader interview surface, read top data engineering interview questions →.
- Stack the prerequisites with the only 5 skills you need to become a data engineer →.
- Reinforce the compute side with Apache Spark internals for DE interviews →.
- For the design-round muscles, work through ETL system design for DE interviews →.
- To pair Airflow with table modelling, browse data modelling for DE interviews →.
Pipecode.ai is Leetcode for Data Engineering — every Airflow concept above ships with hands-on practice rooms where you design DAGs, configure real executors, and trace SLA misses. Start with the streaming library and work outward; PipeCode pairs every reading with 450+ DE-focused problems and a real-time scoring engine.





Top comments (0)