airflow 3 is the biggest Apache Airflow release since 2.0 shipped in December 2020, and — five years later — every senior interviewer probing an orchestration answer expects a crisp, four-axes take on what actually changed. The 3.0 release landed in April 2025 with Edge Workers, DAG versioning, MLOps primitives, and a unified UI; the 3.1 point releases through late 2025 and into 2026 polished the migration story, hardened the reverse-connect worker, and closed the gap on the Assets rename. The engineering trade-off is no longer "should we adopt Airflow 3?" — the release notes and the deprecation clock have already answered that — but which axis of airflow 3.x matters for your workload and how you sequence a safe upgrade from 2.x.
This guide is the senior-DE walkthrough you wished existed the first time an interviewer asked "explain how an airflow edge worker reverse-connects to the API and why the control plane never needs to open a port" or "walk me through dag versioning semantics and how a backfill picks the historical code" or "what does airflow mlops actually give me that 2.x with a KubernetesPodOperator did not?" It walks through why 3.0 is the biggest release since 2.0, the Edge Worker reverse-connect model for on-prem and data-sovereignty pipelines, the DAG versioning story with task-instance-to-version binding, the MLOps primitives built on airflow assets (renamed from Datasets) with Vertex / MLflow / Mosaic hooks, and the unified UI + airflow 3 migration path from 2.x through the airflow db upgradecheck upgrade checker. Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works.
When you want hands-on reps immediately after reading, drill the ETL practice library →, rehearse on the SQL practice library →, and sharpen the orchestration axis with the optimization practice library →.
On this page
- Why Airflow 3 is the biggest release since 2.0
- Edge Workers — remote / on-prem execution
- DAG versioning + change history
- MLOps primitives — Assets, model registry integration
- UI redesign + Edge deploy + migration path
- Cheat sheet — Airflow 3 recipes
- Frequently asked questions
- Practice on PipeCode
1. Why Airflow 3 is the biggest release since 2.0
The four themes — Edge Workers, DAG versioning, MLOps primitives, unified UI — and why senior interviewers open with them
The one-sentence invariant: Airflow 3.0 is the release that finally answers the four questions the community has been asking since 2.0 — how do I run a worker outside my cluster without opening a port, how do I know which version of a DAG actually ran a given task instance, how do I make Airflow an MLOps citizen rather than an ML afterthought, and how do I stop juggling three separate UI tabs to find one DAG run. Every other 3.x feature — the Assets rename, the new scheduler internals, the deprecated-API cleanup — is either a consequence of these four themes or the plumbing that makes them possible.
The four axes interviewers actually probe.
- Workers. Edge Workers are the marquee 3.0 feature — an Airflow worker running outside the main cluster (edge site, on-prem, air-gapped) that reverse-connects to the Airflow API rather than the API reaching in. The senior signal is naming the reverse-connect semantics without prompting and explaining why it eliminates the inbound-port requirement.
- Versioning. DAG versioning ties every task instance to the DAG version it ran under. Backfills replay the historical code, not the current code. Interviewers ask "what happens if I edit the DAG, then run a backfill for last week?" — the senior answer is "the backfill runs the version that was live last week," not "the backfill runs the current code."
-
MLOps. Datasets are renamed to Assets in Airflow 3, and Assets are the shared vocabulary for lineage, freshness, and downstream triggers. The MLOps story is Assets plus model-registry hooks (Vertex AI, MLflow, Mosaic) plus asset-scheduled DAGs. The senior probe is "why is this different from just calling a
KubernetesPodOperator?" — the answer is lineage and trigger semantics as first-class primitives. -
UI / DX. The 3.x UI collapses the Graph, Grid, Calendar, and per-DAG-run views into a single lens with Assets, Versions, and Task Groups tabs. The upgrade checker (
airflow db upgradecheck) surfaces every deprecated API in your 2.x DAGs before you cut over.
Why 3.0 matters even if you're not upgrading tomorrow.
- Deprecation clock. Airflow 2.x is in maintenance-only mode. New provider packages, new integrations, and Python 3.13+ support ship on the 3.x line. Staying on 2.x is a slow bit-rot problem, not an active choice.
- The reverse-connect model. The Edge Worker's reverse-connect story is the first practical answer to hybrid on-prem + cloud Airflow deployments. Teams running Celery or Kubernetes workers in a private data centre for GDPR, HIPAA, or data-sovereignty reasons have historically fought networking; Edge Workers make it a config-only decision.
-
Reproducibility. DAG versioning solves the "which code ran last Tuesday's job?" question that every audit and every post-mortem asks. Before 3.0, the answer was "whatever the current
mainbranch is — hope you tagged it." After 3.0, the answer is a query against the metadata database. - MLOps as first-class. MLOps teams have been bolting Airflow onto model-training pipelines with custom operators and out-of-band lineage tools since 2019. 3.0's Asset model + registry hooks reduce the glue code to near-zero for the common case.
- UI unification. The old UI split every question across three or four views. The new UI answers "what happened here, what version, what assets" in a single lens. The productivity delta on an incident is measurable in minutes per event.
What Airflow 2.x users actually have to change.
-
DAG import compatibility. 90% of 2.x DAGs import unchanged. The 10% that break use deprecated
SubDagOperator, custom XCom backends, or provider modules moved between packages. -
Datasets → Assets rename.
from airflow.datasets import Datasetbecomesfrom airflow.assets import Asset. The old import still works with a deprecation warning through 3.1. - Executor config. New Edge Worker executor lives beside Celery / Kubernetes; no forced migration.
- API deprecations. The old experimental REST API (deprecated since 2.0) is fully removed in 3.0. Anyone still using the experimental endpoints must switch to the stable API.
What interviewers listen for.
- Do you say "Edge Workers reverse-connect to the API" in the first sentence when asked about remote execution? — senior signal.
- Do you mention that task instances are tied to the DAG version they ran under, not the current version? — senior signal.
- Do you push back on "Airflow is just an orchestrator, why do we need MLOps primitives?" with the Asset lineage + trigger argument? — required answer.
- Do you describe the upgrade path as "run
airflow db upgradecheckfirst, fix the warnings, then cut over" rather than as a big-bang migration? — required answer.
Worked example — the four axes on a real orchestration answer
Detailed explanation. A senior interviewer opens with an intentionally vague prompt: "walk me through what's new in Airflow 3 and why I should care." The bad answer meanders through the release notes. The senior answer picks four axes — workers, versioning, MLOps, UI — and gives one concrete example for each, framed against a workload the interviewer probably runs.
- Workers. Edge Workers unlock hybrid deployments — a Snowflake cluster in the cloud, a customer-hosted Postgres in a private data centre.
-
Versioning. Backfills now replay the exact code that ran, not the current
main. - MLOps. Assets are the shared vocabulary; model-registry hooks close the loop.
- UI. One lens per DAG run — Graph + Grid + Assets + Versions in a single view.
Question. A senior DE interviewer asks: "give me one-sentence pitches for the four biggest Airflow 3 features and one workload each unblocks that 2.x could not." Structure the answer.
Input.
| Axis | 2.x limitation | 3.x fix |
|---|---|---|
| Workers | Celery/K8s workers assumed in-cluster; no clean reverse-connect | Edge Worker reverse-connects to API |
| Versioning | Task instances not tied to DAG version | Every task instance stamped with its DAG version |
| MLOps | Custom operators + out-of-band lineage | Assets + registry hooks first-class |
| UI | Graph / Grid / Calendar / DAG-run views scattered | Unified UI with Assets, Versions, Task Groups tabs |
Code.
# Sketch of the four-axes answer, as a Python dict for structured recall
answer = {
"workers": {
"pitch": "Edge Workers run outside the cluster and reverse-connect to the Airflow API — no inbound port needed.",
"unblocks": "hybrid on-prem + cloud pipelines, edge IoT ETL, GDPR data-sovereignty workloads",
},
"versioning": {
"pitch": "DAG versioning ties every task instance to the version of the DAG code that ran it — backfills replay history exactly.",
"unblocks": "auditable reproducibility, correct backfills after code changes, incident forensics",
},
"mlops": {
"pitch": "Assets (renamed from Datasets) are the lineage + freshness + trigger primitive; registry hooks connect Vertex / MLflow / Mosaic.",
"unblocks": "training pipelines with real lineage, asset-scheduled retraining, model promotion workflows",
},
"ui": {
"pitch": "One unified UI collapses Graph / Grid / Calendar into a single lens with Assets, Versions, and Task Groups tabs.",
"unblocks": "one-click incident triage, per-run assets and versions visible without tab-switching",
},
}
for axis, spec in answer.items():
print(f"{axis.upper():>10}: {spec['pitch']}")
print(f" unblocks: {spec['unblocks']}")
Step-by-step explanation.
- Each axis gets a one-sentence pitch that names the primitive (Edge Worker, DAG versioning, Assets, unified UI) and its behaviour (reverse-connect, task-instance-to-version binding, lineage + triggers, single lens).
- Each axis names a workload that 2.x could not cleanly support — hybrid deployments, correct backfills after code edits, MLOps pipelines, one-click incident triage.
- The structure is memorable: pitch, unblocks, next axis. The senior signal is not rattling off ten features; it is naming the four that shift the architectural conversation.
- The
answerdict doubles as a study aid — write it once, memorise the pitches, redeploy in every 3.x interview. - The unblocks column is where interviewers probe deeper: "tell me more about hybrid deployments" leads directly to the Edge Worker reverse-connect story.
Output.
| Axis | Pitch (1 sentence) | Unblocks |
|---|---|---|
| Workers | Edge Workers reverse-connect to the API | Hybrid on-prem + cloud |
| Versioning | Task instance stamped with DAG version | Correct backfills after edits |
| MLOps | Assets + registry hooks first-class | Model-training pipelines |
| UI | Single lens, all tabs collapsed | One-click incident triage |
Rule of thumb. Structure every Airflow 3 answer around the four axes — workers, versioning, MLOps, UI. Interviewers use the same taxonomy; matching it signals "I've read the release notes with intent."
Worked example — deciding whether to upgrade a 2.x deployment now
Detailed explanation. A team runs Airflow 2.9 in production with 400 DAGs, Celery executor, and no immediate MLOps roadmap. The engineering manager asks whether to upgrade to 3.x now or wait. The senior answer weighs the concrete signals: deprecation clock, feature dependencies, Python version support, provider-package roadmap, and team bandwidth. It is not "just upgrade, 3.x is better."
- Signal 1. Python 3.13 support ships on 3.x only; 2.x is Python 3.10 max in maintenance mode.
- Signal 2. New provider packages (Snowflake 6.x, dbt Cloud 4.x) land on 3.x first.
- Signal 3. Any hybrid on-prem workload benefits immediately from Edge Workers.
- Signal 4. MLOps roadmap = 3.x now; no MLOps roadmap = 3.x within 12 months anyway.
Question. A team runs Airflow 2.9 with 400 DAGs, Celery, no MLOps roadmap. Enumerate the decision criteria for "upgrade now vs upgrade in 6 months" and reach a recommendation.
Input.
| Criterion | Value | Weight |
|---|---|---|
| Current version | 2.9 | — |
| DAG count | 400 | — |
| Executor | Celery | — |
| Python version | 3.11 | — |
| MLOps roadmap | none | low |
| Hybrid / edge workload | none | low |
| Provider package roadmap | Snowflake 6.x on 3.x only | high |
| Team bandwidth | 1 senior DE-quarter | — |
Code.
# Decision helper — return "upgrade now" or "upgrade in 6 months"
def upgrade_decision(current_version: str,
dag_count: int,
mlops_roadmap: bool,
hybrid_workload: bool,
provider_dependency: bool,
python_target: str,
bandwidth_quarters: float) -> str:
signals = 0
if mlops_roadmap: signals += 3
if hybrid_workload: signals += 3
if provider_dependency: signals += 2
if python_target >= "3.12": signals += 2
if current_version < "2.8": signals += 1 # further behind = more risk
# Bandwidth check
est_effort = max(0.25, dag_count / 500) # ~1 quarter per 500 DAGs
if bandwidth_quarters < est_effort:
return f"defer; need {est_effort:.2f} quarters but have {bandwidth_quarters:.2f}"
return "upgrade now" if signals >= 3 else "upgrade in 6 months"
print(upgrade_decision(
current_version="2.9",
dag_count=400,
mlops_roadmap=False,
hybrid_workload=False,
provider_dependency=True,
python_target="3.12",
bandwidth_quarters=1.0,
))
Step-by-step explanation.
- The helper collects five weighted signals: MLOps roadmap, hybrid workload, provider dependency, Python target, and current-version drift. Each signal contributes points; a total above 3 is "upgrade now" territory.
- The bandwidth check is a hard gate: even if the signals scream upgrade, insufficient bandwidth means "defer until you have the DE-quarters to do it safely." Rule of thumb is one senior-DE quarter per ~500 DAGs, including regression testing.
- For the input case (no MLOps, no hybrid, Snowflake provider on 3.x only, Python 3.12 target, 1 quarter bandwidth): signals = 2 (provider) + 2 (Python) = 4, above 3, and bandwidth (1.0) ≥ effort (0.8). Result: upgrade now.
- If the same team had no provider dependency and stayed on Python 3.11, signals would be 0 and the recommendation would flip to "upgrade in 6 months" — buy time for the ecosystem to settle, watch for 3.2 or 3.3 point releases.
- The scoring is not a substitute for judgement; it is a structured way to defend the recommendation to leadership. The signals are the same for every 2.x-to-3.x conversation in the industry.
Output.
| Scenario | Signal total | Bandwidth check | Recommendation |
|---|---|---|---|
| No MLOps, no hybrid, no provider dep, Python 3.11 | 0 | pass | Upgrade in 6 months |
| No MLOps, no hybrid, Snowflake 6.x on 3.x, Python 3.12 | 4 | pass | Upgrade now |
| Hybrid on-prem workload, MLOps roadmap | 6 | pass | Upgrade now |
| Any scenario with < 0.8 quarter bandwidth | any | fail | Defer until bandwidth |
Rule of thumb. Upgrade now if any two of {hybrid workload, MLOps roadmap, provider dependency, Python 3.12+ target} apply. Otherwise, wait 6 months and re-evaluate; the deprecation clock will catch up regardless.
Worked example — reading the 3.x release notes with a senior lens
Detailed explanation. The 3.0 release notes are hundreds of bullet points; the 3.1 and 3.2 notes add more. A senior reader does not read line-by-line — they scan for the four axes (workers, versioning, MLOps, UI) and the deprecations that will bite their DAGs. Walk through the mental model for scanning a release-notes page for a specific 2.x deployment.
- Scan pattern. Ctrl-F: "edge", "version", "asset" (or "dataset"), "UI", "deprecated", "removed".
- Impact triage. Anything in "removed" that your DAGs import → hard block. Anything in "deprecated" → migration ticket. Anything in "new" that matches your roadmap → adoption plan.
- Version-to-version. 3.0 = big-bang features; 3.1 = polish + more providers; 3.2+ = expected AI/ML operator expansion.
Question. A team on Airflow 2.9 wants a 30-minute triage of the 3.0 release notes. Produce the scan checklist and the categorisation of findings.
Input.
| Column | Value |
|---|---|
| Current version | 2.9 |
| Target version | 3.0 |
| Notes URL | airflow.apache.org/docs/apache-airflow/3.0/release_notes.html |
| Scan budget | 30 minutes |
Code.
# Scan helper — walk the release notes, bucket findings into (adopt, migrate, block)
import re
from dataclasses import dataclass, field
from typing import List
@dataclass
class ScanFinding:
axis: str # workers | versioning | mlops | ui | other
category: str # adopt | migrate | block
quote: str
ticket: str = ""
@dataclass
class ScanReport:
findings: List[ScanFinding] = field(default_factory=list)
def add(self, axis, category, quote):
self.findings.append(ScanFinding(axis, category, quote))
def summary(self):
buckets = {"adopt": [], "migrate": [], "block": []}
for f in self.findings:
buckets[f.category].append(f)
return buckets
report = ScanReport()
report.add("workers", "adopt", "Edge Worker executor available for reverse-connect deployments")
report.add("versioning", "adopt", "DAG versioning enabled by default; task instances stamped with version_id")
report.add("mlops", "migrate", "Dataset renamed to Asset; airflow.datasets deprecated (removed in 3.2)")
report.add("ui", "adopt", "Unified UI ships behind AIRFLOW__UI__NEW_UI=true")
report.add("other", "block", "SubDagOperator removed — refactor to TaskGroup")
report.add("other", "block", "Experimental REST API removed — switch to stable API")
for category, items in report.summary().items():
print(f"[{category}]")
for i in items:
print(f" - {i.axis}: {i.quote}")
Step-by-step explanation.
- The scan proceeds axis-by-axis. For each axis, Ctrl-F the release notes for the relevant keyword and copy any matching bullet into the report.
- Every finding is categorised into
adopt,migrate, orblock.adopt= new feature to consider;migrate= existing DAG code needs an edit but no immediate outage;block= existing DAG code will fail on 3.0. - The example report captures the six most impactful items: two adopts (Edge Worker, DAG versioning), one migrate (Dataset → Asset), one adopt (unified UI behind a flag), and two blocks (SubDagOperator, experimental API).
- The
blockcategory feeds the pre-upgrade checklist: every block-tier finding must have a fixed PR in main before the cutover. - The
migratecategory feeds the post-upgrade backlog: rewrites happen on the 3.x line before the deprecation removes the compatibility shim.
Output.
| Category | Count | Meaning |
|---|---|---|
| adopt | 3 | New features to consider (Edge Worker, DAG versioning, unified UI) |
| migrate | 1 | Rename Dataset → Asset in existing DAG code (before 3.2 removal) |
| block | 2 | Hard blockers: SubDagOperator and experimental REST API removed |
Rule of thumb. Every 2.x → 3.x scan produces roughly 3–6 findings per axis. Ship a triage doc with adopt/migrate/block buckets before you touch the runtime. The rewriting effort is proportional to the block count, not the total notes length.
Senior interview question on the shape of Airflow 3
A senior interviewer often opens with: "You've been running Airflow 2.x for four years. Your CTO reads about airflow 3 at a conference and asks whether the team should upgrade. Walk me through the four themes of the release, the upgrade signals, and how you'd sequence a 6-month migration."
Solution Using the four-axes framing + the signal-based decision + a phased rollout
# Six-month 2.x → 3.x migration plan, expressed as a phased checklist
from dataclasses import dataclass, field
from typing import List
@dataclass
class Phase:
month: int
name: str
axis: str
activities: List[str]
exit_criteria: List[str]
phases = [
Phase(1, "Discovery & triage", "all", [
"Run airflow db upgradecheck against every 2.x DAG",
"Categorise findings into adopt / migrate / block",
"Inventory Edge / MLOps / versioning use cases",
], [
"Triage doc merged; block-tier list has an owner per DAG",
]),
Phase(2, "Deprecation clean-up", "all", [
"Fix every block-tier import (SubDagOperator → TaskGroup, experimental → stable API)",
"Rewrite XCom custom backends against the 3.x interface",
"Ship on the 2.x line first — bug-for-bug compatible",
], [
"All 400 DAGs pass upgradecheck with zero errors",
]),
Phase(3, "Staging on 3.x", "all", [
"Stand up a 3.x staging cluster (same executor as prod)",
"Deploy the DAG bundle unchanged (2.x compat shims still active)",
"Run parallel DAG runs against staging for 2 weeks",
], [
"Staging parity: same task success rate, same runtime, no new errors",
]),
Phase(4, "Adopt DAG versioning", "versioning", [
"Enable DAG versioning (default in 3.0; verify metadata migration ran)",
"Add version_id filter to lineage / audit dashboards",
"Test a backfill of a week-old run: verify historical code runs",
], [
"Backfill test passes; audit dashboard shows version_id per task instance",
]),
Phase(5, "Assets rename + optional Edge / MLOps", "mlops+workers", [
"Rewrite Dataset imports to Asset (deprecation warnings turn into errors in 3.2)",
"For any hybrid workload: deploy an Edge Worker to the remote site",
"For any ML pipeline: adopt Asset lineage + registry hook",
], [
"Zero deprecation warnings in staging log; edge worker Helm chart merged",
]),
Phase(6, "Prod cutover", "all", [
"Cut over prod to 3.x (blue/green: 3.x runs in parallel for 48 hours)",
"Turn on the unified UI (AIRFLOW__UI__NEW_UI=true)",
"Retire the 2.x cluster after 2 weeks of clean prod",
], [
"3.x carries 100% of prod traffic; 2.x fully decommissioned",
]),
]
for p in phases:
print(f"Month {p.month}: {p.name} ({p.axis})")
for a in p.activities:
print(f" - {a}")
for e in p.exit_criteria:
print(f" ✓ {e}")
Step-by-step trace.
| Month | Phase | Axis | Exit criterion |
|---|---|---|---|
| 1 | Discovery & triage | all | Triage doc, block-list ownership |
| 2 | Deprecation clean-up | all | 0 upgradecheck errors on 2.x |
| 3 | Staging on 3.x | all | Parity across all DAGs |
| 4 | Adopt DAG versioning | versioning | Backfill of week-old run passes |
| 5 | Assets + optional Edge/MLOps | mlops+workers | 0 deprecation warnings |
| 6 | Prod cutover | all | 3.x = 100% traffic; 2.x decommissioned |
After 6 months, the deployment runs on 3.x with DAG versioning on, Assets renamed, the unified UI enabled, and (if the workload warranted) at least one Edge Worker in production. The team has the vocabulary — workers, versioning, MLOps, UI — to answer every 3.x interview question fluently.
Output:
| Deliverable | Status after 6 months |
|---|---|
| upgradecheck | 0 errors, 0 warnings |
| DAG versioning | on, per-task-instance |
| Assets rename | complete |
| Unified UI | enabled |
| Edge Workers | 1+ deployed (if applicable) |
| 2.x cluster | decommissioned |
Why this works — concept by concept:
- Four-axes framing — every 3.x conversation collapses onto workers, versioning, MLOps, UI. Aligning the plan to the same axes makes cross-team communication trivial (each axis has an owner, each owner has a phase).
- Signal-based decision — the upgrade decision is not a gut call; it is a scored decision against MLOps roadmap, hybrid workload, provider dependencies, and Python target. Signals ≥ 3 → upgrade now.
- Phased rollout — six phases, one per month, each with explicit activities and exit criteria. The most common failure mode of Airflow upgrades is skipping the staging phase; the plan makes that impossible by design.
- Backfill-of-week-old-run gate — the DAG versioning phase's exit criterion tests the feature you're actually buying: a backfill has to replay the version that was live a week ago. If it doesn't, you've mis-migrated.
- Cost — six senior-DE months for a 400-DAG deployment; roughly 0.015 months per DAG. Cost scales with DAG count and block-tier finding count, not with total feature area. Skipping upgradecheck is the biggest cost multiplier because it defers block-tier work into the cutover window.
ETL
Topic — etl
ETL orchestration and upgrade-path problems
2. Edge Workers — remote / on-prem execution
airflow edge worker reverse-connects to the API — no inbound port on your control plane, no VPN gymnastics
The mental model in one line: an Edge Worker is a task-executing process running outside the Airflow cluster (edge site, on-prem, air-gapped facility) that opens an outbound connection to the Airflow API, polls for work, executes tasks locally, and pushes results back — the control plane never needs an inbound port, and the network story collapses to "the worker can reach the API". Every other Edge-related interview question is a consequence of this reverse-connect topology.
Why Edge Workers exist.
- The 2.x pain. Celery workers assumed a shared broker (Redis / RabbitMQ) reachable by both the scheduler and the worker. Kubernetes executors assumed the workers ran in the same cluster. Neither model worked cleanly for a worker in a customer data centre or an edge site.
- The workaround before 3.0. Teams tunnelled the broker over a VPN, or ran a full Airflow deployment per site (with cross-site XCom syncing), or shelled out to a queue-and-agent pattern of their own invention. Each was operationally expensive.
- The 3.0 fix. The Edge Worker is a first-class executor. The worker process opens an HTTP/HTTPS connection to the API, polls for work, executes tasks with its own Python runtime, and posts results back. The API is the only network surface required.
The reverse-connect wire protocol.
-
Registration. On startup, the Edge Worker calls
POST /api/v1/edge/registerwith its identity, capabilities (queue tags), and API token. -
Poll. The worker long-polls
GET /api/v1/edge/dequeue?queue=on_prem&worker_id=...for work. If a task matches, the API returns the task instance descriptor. - Execute. The worker runs the task locally against its own Python environment (which can be entirely different from the scheduler's — separate Python version, separate provider packages, separate secrets).
-
Report. The worker POSTs progress (state transitions, XCom pushes, logs) to
/api/v1/edge/report. -
Heartbeat. A concurrent thread heartbeats to
/api/v1/edge/heartbeatevery 5 seconds; the scheduler marks the worker unhealthy after 3 missed beats.
Use cases that Edge Workers unblock.
- GDPR / data sovereignty. The pipeline processes EU customer data that must not leave EU boundaries. The Edge Worker runs in Frankfurt; the Airflow control plane lives wherever the team wants (typically the cloud). Only task metadata (state, logs, XCom) crosses the boundary.
- HIPAA / on-prem sensitive data. A hospital keeps patient data on-prem. The Edge Worker runs in the hospital's data centre; the Airflow control plane is a hosted SaaS Airflow (e.g. Astronomer). The pipeline orchestrates cleanly without exposing patient data.
- Edge IoT ETL. A factory has sensors producing terabytes per day. An Edge Worker running on a small industrial box aggregates the sensor stream locally, sends only summaries to the cloud, and closes the loop.
- Hybrid cloud. Some tasks run on AWS (S3 → Snowflake), others on a customer-hosted Postgres in a private data centre. Two workers — one in AWS, one on-prem — both talk to the same Airflow control plane.
What the network story looks like.
-
Outbound only. The Edge Worker opens exactly one HTTPS connection to
https://airflow.example.com/api/v1/edge/*. That is the entire network requirement. - No inbound port. The Airflow control plane does not reach out to the worker; the worker reaches in. Firewalls stay closed.
- TLS + token. The auth model is a bearer token issued from the Airflow UI plus TLS for confidentiality.
- Optional mTLS. For higher-trust environments, mTLS client certs replace the token.
Common interview probes on Edge Workers.
- "Walk me through what happens when an Edge Worker starts up." — registration + long-poll + heartbeat.
- "Which network holes do I have to open?" — outbound HTTPS to the API only.
- "How does it differ from Celery over a VPN?" — no shared broker, no bidirectional network, no VPN.
- "What happens if the worker loses connectivity for 30 minutes?" — running tasks fail with a
worker_loststate; the scheduler re-queues them.
Worked example — deploying an Edge Worker on an on-prem factory box
Detailed explanation. A factory data engineering team runs sensor-ETL DAGs against local telemetry. The control plane is a hosted Astronomer Airflow 3.1 in the cloud. The team deploys an Edge Worker on an Intel NUC in the factory, gives it access to the local Modbus PLC network, and lets it execute only tasks tagged queue=factory. Walk through the deploy: install, configure, register, run.
- Hardware. Intel NUC, 32 GB RAM, Ubuntu 24.04, Python 3.12.
- Network. Outbound HTTPS to the Astronomer control plane only.
-
Tagging. DAGs tag the sensor-ETL tasks with
queue=factory; the Edge Worker dequeues only fromfactory.
Question. Write the systemd unit and the pip install command for an Edge Worker on the NUC. Show how the DAG tags a task for the factory queue.
Input.
| Component | Value |
|---|---|
| Airflow API URL | https://airflow.factory-team.astronomer.io |
| Worker API token | env var AIRFLOW_EDGE_TOKEN (issued from UI) |
| Queue | factory |
| Concurrency | 4 |
Code.
# NUC-side install
python3.12 -m venv /opt/airflow-edge/venv
/opt/airflow-edge/venv/bin/pip install \
"apache-airflow==3.1.2" \
"apache-airflow-providers-edge>=1.0" \
"pymodbus>=3.6"
# Create the config directory
mkdir -p /etc/airflow-edge
# /etc/systemd/system/airflow-edge-worker.service
[Unit]
Description=Airflow Edge Worker (factory queue)
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=airflow
Group=airflow
Environment="AIRFLOW__EDGE__API_URL=https://airflow.factory-team.astronomer.io"
Environment="AIRFLOW__EDGE__QUEUES=factory"
Environment="AIRFLOW__EDGE__CONCURRENCY=4"
EnvironmentFile=/etc/airflow-edge/secrets.env
ExecStart=/opt/airflow-edge/venv/bin/airflow edge worker
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
# DAG side — tag the sensor ETL for the factory queue
from datetime import datetime
from airflow.decorators import dag, task
@dag(
dag_id="factory_sensor_etl",
schedule="@hourly",
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={"queue": "factory"}, # every task runs on the factory Edge Worker
)
def sensor_etl():
@task
def scrape_modbus() -> dict:
# Runs on the NUC; the Modbus PLC is on the factory LAN, reachable from here
from pymodbus.client import ModbusTcpClient
client = ModbusTcpClient("10.20.30.40", port=502)
rr = client.read_holding_registers(0, 100)
client.close()
return {"registers": rr.registers}
@task
def push_summary(payload: dict) -> None:
# Sends only summarised data back to the cloud
summary = {"avg": sum(payload["registers"]) / len(payload["registers"])}
# ... post to S3 / Snowflake ...
push_summary(scrape_modbus())
sensor_etl()
Step-by-step explanation.
- The pip install layers apache-airflow, the edge provider (
apache-airflow-providers-edge), and any workload-specific libs (pymodbushere). Everything runs in a venv on the NUC — no dependency on the scheduler's Python environment. - The systemd unit exports the three critical env vars:
AIRFLOW__EDGE__API_URL(where to reverse-connect),AIRFLOW__EDGE__QUEUES(which queues to dequeue from), andAIRFLOW__EDGE__CONCURRENCY(parallel task slots). The API token lives in a secrets file withchmod 600. - On startup, the worker registers with the API, then long-polls
factoryfor work. The scheduler in Astronomer sees the worker in the UI (name, capabilities, heartbeat) exactly as it would a Celery worker. - The DAG tags the entire DAG (via
default_args={"queue": "factory"}) so every task instance is only dispatched to the factory Edge Worker. Cloud-side workers ignore the tag; the scheduler never sends factory tasks to them. - The task runs on the NUC.
pymodbusreads the local PLC over the factory LAN. The result goes back to the scheduler as an XCom (small summary). No PLC data leaves the factory; only aggregate summaries cross the boundary.
Output.
| Layer | Location | Data flow |
|---|---|---|
| Scheduler / webserver / metadata | Astronomer cloud | reads task state |
| Edge Worker (factory queue) | NUC in factory | executes DAG task |
| PLC / Modbus | Factory LAN | read by Edge Worker only |
| Summary (S3) | AWS | posted from Edge Worker |
| Network requirement | Outbound HTTPS from NUC → API | one connection |
Rule of thumb. Every Edge Worker deployment answers three questions before the systemd unit is written: (1) which queue tag does this worker serve? (2) what are the local dependencies (Modbus, private CA, mounted volume)? (3) what is the auth token and how is it rotated? Skipping any of the three is how deployments break at 2 AM.
Worked example — Edge Worker for GDPR data sovereignty
Detailed explanation. An analytics team processes EU customer data that legal mandates must not leave the EU. The control plane is a hosted Airflow in the US; the workers must run in Frankfurt. An Edge Worker in EU-Central-1 dequeues only EU-tagged tasks, processes them against an EU-local S3 bucket and an EU-local Snowflake, and returns only aggregate metrics (never PII) to the US-hosted metadata.
- Legal constraint. No PII leaves the EU perimeter.
- Architecture. Airflow control plane in US; Edge Worker in EU; EU worker sees EU data.
- Metadata leakage. Task state, DAG version, XCom values do cross to US metadata — so XCom must contain only aggregates, never raw PII.
Question. Design the DAG + Edge Worker configuration so PII never touches the US control plane. Show the XCom guardrail.
Input.
| Component | Value |
|---|---|
| Control plane region | US-East-1 |
| EU Edge Worker region | EU-Central-1 |
| EU data source | s3://eu-customers/... (EU bucket) |
| Sensitivity | GDPR — no PII to US |
Code.
# DAG side — EU-tagged tasks + XCom aggregate-only contract
from datetime import datetime
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException
@dag(
dag_id="eu_customer_analytics",
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
)
def eu_analytics():
@task(queue="eu_central")
def compute_eu_aggregate() -> dict:
# Runs on the EU Edge Worker
import boto3
s3 = boto3.client("s3", region_name="eu-central-1")
obj = s3.get_object(Bucket="eu-customers", Key="events/2026-06-22.parquet")
# ... compute aggregate on EU worker ...
aggregate = {"n_users": 12345, "total_events": 987654}
# Guardrail — never return raw PII to XCom (which lives in US metadata)
for k, v in aggregate.items():
if isinstance(v, str) and "@" in v:
raise AirflowFailException("guardrail: PII detected in XCom payload")
return aggregate
@task(queue="us_default")
def load_to_us_dashboard(agg: dict) -> None:
# Runs on US worker; receives only aggregates
print("US dashboard update:", agg)
load_to_us_dashboard(compute_eu_aggregate())
eu_analytics()
# EU Edge Worker Helm values (Astronomer Helm chart)
edgeWorker:
enabled: true
queues:
- eu_central
region: eu-central-1
concurrency: 6
env:
- name: AWS_DEFAULT_REGION
value: eu-central-1
- name: SNOWFLAKE_ACCOUNT
value: eu.snowflakecomputing.com
secretsRef: eu-secrets
Step-by-step explanation.
- The DAG splits tasks by queue:
compute_eu_aggregateruns on the EU Edge Worker (queueeu_central),load_to_us_dashboardruns on the default US worker. The scheduler enforces the split. - The EU task reads raw customer data from the EU-local S3 bucket. This data never touches the US worker; it exists only in the EU Edge Worker's memory during task execution.
- The XCom guardrail is the critical GDPR moment: only aggregates return from the EU task. The check for
@in string values is a naive PII regex; production teams use a full PII scanner (Presidio, DataDog Sensitive Data Scanner) as a mandatory pre-XCom hook. - The US worker picks up the aggregate XCom and updates the dashboard. No PII crossed the boundary; the compliance auditor can point at the queue split and the XCom-guardrail hook.
- The Helm values pin the Edge Worker to EU-Central-1, wire in the EU-specific AWS + Snowflake endpoints, and reference an EU secrets store. The systemd equivalent is functionally identical; Helm is more common in Astronomer / self-hosted K8s deployments.
Output.
| Task | Runs on | Data touched | XCom (US-visible) |
|---|---|---|---|
| compute_eu_aggregate | EU Edge Worker | Raw EU PII | Aggregate only (guarded) |
| load_to_us_dashboard | US default worker | Aggregate | Aggregate |
Rule of thumb. For GDPR/HIPAA workloads, the Edge Worker is the mechanism; the XCom guardrail is the policy. Ship both. A worker in EU without XCom PII checks is a compliance incident waiting to happen.
Worked example — worker loses connectivity mid-task
Detailed explanation. An Edge Worker running a 20-minute DAG task loses its uplink 5 minutes in. The scheduler stops receiving heartbeats and marks the worker as worker_lost after 15 seconds. The task instance moves to a retry state; when the worker reconnects, it may or may not be able to resume — the design decision is idempotency + retry, not resumability.
-
Failure mode. Worker heartbeat stops; scheduler marks worker lost; task fails with
worker_lost. -
Recovery. Task retries per DAG's
retriesconfig. The retry may land on the same worker (if it reconnects) or a different worker in the same queue. - Contract. Tasks must be idempotent — designed to run twice without duplicating output.
Question. Configure the DAG so a mid-task network partition results in a clean retry with no duplicate data. Show the idempotency pattern.
Input.
| Parameter | Value |
|---|---|
| Task | ETL from PLC → S3 |
| Task duration | 20 minutes |
| Heartbeat interval | 5 seconds |
| worker_lost threshold | 15 seconds (3 missed beats) |
| DAG retries | 3 |
Code.
from datetime import datetime, timedelta
from airflow.decorators import dag, task
@dag(
dag_id="factory_idempotent_etl",
schedule="@hourly",
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={
"queue": "factory",
"retries": 3,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True,
},
)
def factory_etl():
@task
def extract_and_write(run_id: str = "{{ run_id }}") -> str:
"""Idempotent extract — key by run_id, so a retry overwrites, not duplicates."""
import boto3, io, pandas as pd
from pymodbus.client import ModbusTcpClient
# Read PLC (idempotent)
client = ModbusTcpClient("10.20.30.40", port=502)
rr = client.read_holding_registers(0, 100)
client.close()
# Write to a run_id-keyed S3 path (idempotent by construction)
buf = io.BytesIO()
pd.DataFrame({"reg": rr.registers}).to_parquet(buf)
buf.seek(0)
s3 = boto3.client("s3")
key = f"raw/plc/{run_id}.parquet" # run_id, not timestamp; safe on retry
s3.put_object(Bucket="factory", Key=key, Body=buf.read())
return key
extract_and_write()
factory_etl()
Step-by-step explanation.
- The task writes to
s3://factory/raw/plc/{run_id}.parquet. Therun_idis a stable per-DAG-run identifier; a retry writes to the same key. S3's PUT is idempotent, so a retry either overwrites the partial upload from before the disconnect or writes fresh — either way, exactly one final object. -
retries=3with exponential backoff gives the worker three attempts. If the network partition heals within ~7 minutes (1m + 2m + 4m), the retry succeeds. - When the scheduler marks the worker lost, the task instance moves to
up_for_retry. On the next scheduler tick, it is re-queued; the next Edge Worker in thefactoryqueue picks it up. - Critically, the retry does not try to resume the failed attempt — Airflow retries at the task boundary, not the operator internals. The task code must be safe to re-run from scratch.
- The key design tension: writing to a timestamp-based key (e.g.
raw/plc/{now.iso}.parquet) would create a new file on retry, resulting in duplicated data downstream. Always key onrun_idorlogical_date, not on wall-clock time.
Output.
| Attempt | Worker state | S3 key written | Downstream cost |
|---|---|---|---|
| 1 | worker_lost mid-write |
raw/plc/{run_id}.parquet (partial) |
none (overwritten) |
| 2 | worker_lost |
raw/plc/{run_id}.parquet (partial) |
none |
| 3 | success |
raw/plc/{run_id}.parquet (final) |
one canonical object |
Rule of thumb. Every Edge Worker task must be idempotent — key writes by run_id, not by wall-clock time. Combined with retries=3 + exponential backoff, the pipeline heals through any transient partition without operator intervention or duplicated data.
Senior interview question on Edge Worker architecture
A senior interviewer might ask: "You need to run Airflow tasks in a customer's on-prem data centre for a HIPAA workload, with the control plane in your cloud. Walk me through the Edge Worker architecture, the network requirements, and the failure modes you'd guard against."
Solution Using Edge Worker + queue split + XCom PII guardrail + idempotent tasks
# Complete architecture — DAG + Edge Worker deploy + guardrails
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException
# ---- guardrail hook: run before every XCom push ----
def scrub_xcom(payload):
"""Reject XCom payloads that contain plausibly-PII strings."""
import re
PII_PATTERNS = [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+", # email
r"\b\d{16}\b", # credit-card-ish
]
def scan(v):
if isinstance(v, str):
for pat in PII_PATTERNS:
if re.search(pat, v):
raise AirflowFailException("guardrail: PII in XCom")
elif isinstance(v, dict):
for x in v.values(): scan(x)
elif isinstance(v, list):
for x in v: scan(x)
scan(payload)
return payload
@dag(
dag_id="hospital_hipaa_etl",
schedule="@hourly",
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True,
},
)
def hospital_etl():
@task(queue="hospital_edge")
def extract_patient_events(run_id: str = "{{ run_id }}") -> dict:
# Runs inside the hospital data centre on the Edge Worker
# ... query hospital EHR system ...
raw = {"n_patients": 4321, "n_events": 98765} # aggregate only
return scrub_xcom(raw)
@task(queue="cloud_default")
def update_cloud_dashboard(agg: dict) -> None:
# Runs in the cloud; only aggregates arrive
print("Cloud dashboard:", agg)
update_cloud_dashboard(extract_patient_events())
hospital_etl()
# Hospital-side Helm values — Edge Worker deploy
edgeWorker:
enabled: true
queues: ["hospital_edge"]
concurrency: 8
env:
- name: AIRFLOW__EDGE__API_URL
value: https://cloud-airflow.example.com
- name: AIRFLOW__EDGE__HEARTBEAT_SECONDS
value: "5"
secretsRef: hospital-secrets
networkPolicy:
egress:
- to: cloud-airflow.example.com
ports: [443]
ingress: [] # explicitly none
Step-by-step trace.
| Question | Answer | Reasoning |
|---|---|---|
| Where does the Edge Worker run? | Hospital data centre | HIPAA data cannot leave the perimeter |
| Which port does the hospital open? | None inbound; only outbound HTTPS/443 to API | reverse-connect topology |
| How is PII scrubbed? | scrub_xcom() runs before every XCom push | policy enforced at the DAG boundary |
| Idempotency? |
run_id-keyed writes + retries=3
|
worker_lost recovers cleanly |
| Auth? | Bearer token in secretsRef; TLS mandatory | zero-trust between hospital and cloud |
| Failure of worker connectivity? |
worker_lost → retry with backoff |
idempotent tasks re-run safely |
After the rollout, the hospital's compliance officer signs off (no PII crosses the boundary; no inbound port opened), the cloud on-call watches the DAG run per hour, and the Edge Worker heartbeat stays green. HIPAA is preserved by the architecture, not by a manual review.
Output:
| Property | Value |
|---|---|
| Control plane | Cloud Airflow 3.1 |
| Edge Worker | Hospital DC, queue=hospital_edge |
| Inbound ports opened | 0 |
| PII in cloud metadata | 0 |
| Idempotency contract | run_id-keyed writes |
| worker_lost recovery | automatic (3 retries + exponential backoff) |
Why this works — concept by concept:
- Reverse-connect topology — the Edge Worker calls home. The cloud never dials into the hospital; the hospital's firewall stays closed. Zero-trust holds.
-
Queue split —
hospital_edgeandcloud_defaultare separate queues; the scheduler dispatches by queue tag. Tasks that touch PII carry the hospital tag; nothing else can execute on that worker. -
XCom guardrail — the
scrub_xcomhook runs before every XCom push. It is the last line of policy defence between task-local data and cloud-visible metadata. -
Idempotency + retries — every task keys writes on
run_id;retries=3with exponential backoff heals through transient partitions.worker_lostbecomes a routine event, not an incident. - Cost — Edge Worker adds one process per site + one long-poll connection. XCom guardrail is O(size of XCom payload). Idempotency shifts write cost to the storage layer's PUT idempotency (S3, GCS both handle this natively). The architecture cost is dominated by the policy work (compliance review, PII scanner tuning), not the runtime cost.
ETL
Topic — etl
ETL problems on remote and hybrid worker deployments
3. DAG versioning + change history
dag versioning ties every task instance to the DAG version that ran it — backfills replay historical code, not main
The mental model in one line: Airflow 3 stamps every parsed DAG file into the metadata database with a version identifier, associates every task instance with the version it ran under, and — when you trigger a backfill — replays the historical DAG version rather than the current main. The primitive answers the "which code ran last Tuesday?" question that every audit, every post-mortem, and every compliance officer asks, and it eliminates the entire class of bugs where a backfill silently runs different logic than the original run.
How versioning works under the hood.
- DAG hash. Every time the scheduler parses a DAG file, it hashes the parsed structure (task graph, operator arguments, schedule) plus the source-code SHA. A change in either produces a new version.
-
Version table. The metadata DB has a
dag_versiontable with columnsdag_id,version_id,created_at,source_hash,structure_hash, and (optionally)bundle_reflinking to the DAG source bundle in object storage. -
Task-instance binding. The
task_instancetable gains adag_version_idforeign key. Every task instance is stamped at creation time with the version that produced it. -
Bundle storage. By default, DAG source is stored in the local file system where the scheduler parses it. For production, the recommended pattern is DAG-source-in-S3 (or GCS/ABS) with
AIRFLOW__DAG_BUNDLES__pointing to versioned buckets.
What versioning enables.
-
Reproducible backfills.
airflow dags backfill --dag-id my_dag --logical-date 2026-05-01looks up the DAG version that was live on 2026-05-01, fetches the historical source, and runs that code — not the code inmaintoday. - Audit trail. For any task instance, the UI (and API) shows which version ran, when the version was created, and (with bundle storage) the exact source code.
- Change history. The Versions tab in the new UI shows a scrollable list of every DAG version with diffs between adjacent versions.
-
Rollback. A bad DAG version can be marked
deprecated=true; future scheduling picks the previous good version until the DAG is re-parsed with a fix.
The scheduler's behaviour when a DAG file changes.
- T0. Scheduler parses the DAG file, computes hashes, sees they match an existing version → no new version created.
-
T1. Developer commits a change (renames a task, adds a step). Scheduler re-parses on the next tick, computes new hashes, inserts a new
dag_versionrow. - T2. From T1 onward, every new task instance is stamped with the new version_id. Task instances created before T1 keep their old version_id.
- T3. A backfill of an interval before T1 uses the pre-T1 version; a backfill of an interval after T1 uses the post-T1 version.
Common interview probes on DAG versioning.
- "Walk me through what happens when I edit a DAG file." — new version row on next parse; new task instances stamped with new version_id.
- "If I backfill last Tuesday, does it run the current code or the code from last Tuesday?" — historical code (via version_id lookup).
- "How does versioning interact with dynamic task mapping?" — the mapped task's structure is part of the version hash; dynamic changes create new versions.
- "What if the DAG source bundle is not in object storage?" — versioning still works structurally, but the source snapshot for a historical version may be lost. Recommendation: enable bundle storage.
Worked example — enabling DAG versioning + bundle storage
Detailed explanation. DAG versioning is on by default in Airflow 3.0, but the source-bundle storage is not — the metadata DB stores the version hash but not the historical DAG file itself. Configure S3 bundle storage so the full source is preserved for every version, then verify a backfill replays the historical code.
- Default. Version hash yes, source snapshot no.
- Recommendation. Enable S3 bundle storage so historical DAG files are always retrievable.
- Cost. Roughly 10–100 KB per DAG version; a 400-DAG deployment with 10 versions per DAG per month = ~400 MB/year. Negligible.
Question. Configure the scheduler to store DAG source bundles in S3, then trigger a backfill of a 3-week-old run and verify it uses the historical code.
Input.
| Component | Value |
|---|---|
| Airflow version | 3.1 |
| Bundle storage | s3://airflow-dag-bundles/prod/ |
| DAG being backfilled | daily_sales_etl |
| Backfill logical date | 3 weeks ago |
Code.
# airflow.cfg — enable bundle storage
[dag_bundles]
default_bundle = s3_prod
[dag_bundles.s3_prod]
type = s3
bucket = airflow-dag-bundles
prefix = prod/
aws_conn_id = aws_default
snapshot_on_parse = true # snapshot every parsed DAG on every version bump
# Trigger the backfill via CLI
airflow dags backfill \
--dag-id daily_sales_etl \
--logical-date 2026-06-01 \
--end-date 2026-06-01 \
--reset-dagruns
# Verify which version the scheduler picked
airflow dags versions daily_sales_etl \
--at-date 2026-06-01 \
--format json
-- Directly against the Airflow metadata DB
SELECT ti.dag_id,
ti.task_id,
ti.run_id,
ti.dag_version_id,
dv.created_at AS version_created,
dv.source_hash AS source_sha
FROM task_instance ti
JOIN dag_version dv ON dv.id = ti.dag_version_id
WHERE ti.dag_id = 'daily_sales_etl'
AND ti.run_id LIKE 'backfill__2026-06-01%'
ORDER BY ti.task_id;
Step-by-step explanation.
- The
[dag_bundles]section names a default bundle (s3_prod). Thesnapshot_on_parse = trueflag makes the scheduler upload the DAG file to S3 every time it detects a new version. A missed upload means a permanently un-recoverable historical version, so keep this on. - The
airflow dags backfillcommand with a--logical-dateof 2026-06-01 triggers a run for that date. Internally, the scheduler queriesdag_versionfor the version live at 2026-06-01, fetches the source from S3, and runs that code. -
airflow dags versions --at-date 2026-06-01prints the version metadata for the backfill's version — created_at, source_hash, and a URL back to the S3 object. This is the audit artefact. - The SQL query directly reads the metadata DB for the task instances the backfill produced; every
dag_version_idon those rows points at the historical version, confirming the replay behaviour. - If bundle storage were off, step 2 would still work structurally (version_id present), but step 3 could not fetch source, and the operational value of versioning would be greatly diminished.
Output.
| Task | Backfill run_id | dag_version_id | Version created |
|---|---|---|---|
| extract | backfill_2026-06-01_... | v7 (created 2026-05-30) | 2 days before logical_date |
| transform | backfill_2026-06-01_... | v7 | 2 days before logical_date |
| load | backfill_2026-06-01_... | v7 | 2 days before logical_date |
Rule of thumb. DAG versioning is only half a feature without bundle storage. Enable S3/GCS/ABS bundles on day one and confirm via an intentional backfill that the historical source is actually replayed.
Worked example — a bad deploy and a version rollback
Detailed explanation. A DAG edit ships a bug: a rename introduces a silent data type mismatch. The scheduler creates version v12 (buggy); several DAG runs fail with cryptic errors. Rather than roll back the git repo (which is slow and creates a new v13), the on-call marks v12 as deprecated=true in the metadata DB, and future scheduling picks v11 (last known good).
-
Bug. Task renamed
parse_datestoparse_dates_v2; downstream task still referencesparse_dates. -
Symptom. Task instances fail with
KeyError: 'parse_dates'in XCom pull. - Rollback (fast). Mark v12 deprecated; scheduler auto-picks v11 on next run.
- Rollback (slow). Revert the git commit, wait for parse, get v13 = v11.
Question. Show the SQL/CLI to mark v12 deprecated, verify v11 is picked next, and design the alert that would have caught this earlier.
Input.
| Component | Value |
|---|---|
| DAG | customer_events_etl |
| Bad version | v12 (deployed 14:22 UTC) |
| Last good version | v11 |
| Symptom | Task fail rate 100% since 14:22 |
Code.
# Fast rollback — deprecate v12 via CLI
airflow dags versions deprecate customer_events_etl --version-id 12 \
--reason "silent XCom key rename; task fail rate 100%"
# Verify scheduler picks v11 on next run
airflow dags versions customer_events_etl --active
# Trigger a fresh DAG run to confirm
airflow dags trigger customer_events_etl
-- Directly: mark v12 deprecated
UPDATE dag_version
SET deprecated = true,
deprecated_at = now(),
deprecated_by = 'oncall-rollback',
deprecated_reason = 'silent XCom key rename; 100% fail rate'
WHERE dag_id = 'customer_events_etl'
AND id = 12;
-- Verify: latest non-deprecated version
SELECT id, created_at, source_hash
FROM dag_version
WHERE dag_id = 'customer_events_etl'
AND deprecated = false
ORDER BY created_at DESC
LIMIT 1;
-- Expect: id = 11
# The alert that should have caught this — task failure rate per DAG version
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(schedule="@hourly", start_date=datetime(2026, 1, 1), catchup=False)
def dag_version_health_alert():
@task
def check_failure_rate_per_version() -> None:
import psycopg2
conn = psycopg2.connect("postgres://.../airflow")
cur = conn.cursor()
cur.execute("""
SELECT dv.dag_id,
dv.id AS version_id,
COUNT(*) FILTER (WHERE ti.state = 'failed')::float / COUNT(*) AS fail_rate,
COUNT(*) AS n
FROM task_instance ti
JOIN dag_version dv ON dv.id = ti.dag_version_id
WHERE ti.start_date > now() - INTERVAL '30 minutes'
AND dv.created_at > now() - INTERVAL '2 hours'
GROUP BY dv.dag_id, dv.id
HAVING COUNT(*) >= 5
AND COUNT(*) FILTER (WHERE ti.state = 'failed')::float / COUNT(*) > 0.5
""")
for row in cur.fetchall():
# Page on-call
print(f"ALERT: {row[0]} v{row[1]} fail_rate={row[2]:.0%} n={row[3]}")
check_failure_rate_per_version()
dag_version_health_alert()
Step-by-step explanation.
- The
airflow dags versions deprecateCLI marks v12 as bad. The scheduler, on its next parse cycle (~30 seconds), seesdeprecated=trueand falls back to the newest non-deprecated version — v11. - Directly in SQL:
UPDATE dag_version SET deprecated = true WHERE id = 12achieves the same thing. The CLI is the operational path; the SQL is the "what actually happens" answer for an interview. -
airflow dags trigger customer_events_etlimmediately fires a new DAG run using v11. This is the confirmation that rollback took effect — no waiting for the next scheduled tick. - The alert DAG runs every hour, queries the last 30 minutes of task instances joined against
dag_versioncreated in the last 2 hours (i.e. new versions), and pages on-call if any version has >50% failure rate over ≥5 runs. This catches "bad deploy" incidents within one hour. - The rollback is fast (seconds), reversible (undeprecate later after fixing), and preserves the audit trail — v12 is still in the DB, marked deprecated with a reason, so the post-mortem can reference it.
Output.
| Time | Event | Active version | Fail rate |
|---|---|---|---|
| 14:22 UTC | Deploy v12 | v12 | 100% |
| 14:40 UTC | Alert fires | v12 | 100% |
| 14:42 UTC | On-call deprecates v12 | v11 (rollback) | 0% |
| 14:44 UTC | Trigger clean run | v11 | 0% |
| Next day | Fixed commit ships v13 | v13 (v12 stays deprecated) | 0% |
Rule of thumb. DAG versioning gives you a deprecation-based rollback that is faster and safer than a git revert. Ship the version-health alert on day one so bad deploys page within an hour, not the next morning.
Worked example — dynamic task mapping + versioning
Detailed explanation. Dynamic task mapping (.expand() / .expand_kwargs()) creates a variable number of task instances at run time. Versioning treats the structure of the map (the upstream that produces the mapping, the operator, the arguments) as part of the version hash, so a change to the mapping logic bumps the version. But the actual mapped values are runtime data, not structure.
-
Structure counts. Changing
.expand(input=upstream_task.output)to.expand(input=another_task.output)= new version. - Values don't. The count of mapped tasks changing from 5 to 500 because upstream returned more data = same version.
- Consequence. A backfill of a 3-week-old run replays the version that was live then, with the same structure — but the mapped values re-materialise from the upstream task's XCom at run time.
Question. Show a dynamic-mapping DAG and explain what happens on backfill when (a) the mapping structure changed since original, (b) only the upstream data changed.
Input.
| Scenario | Structure changed? | Upstream data changed? |
|---|---|---|
| A | yes (map source swapped) | irrelevant |
| B | no | yes (upstream returns different values) |
Code.
from datetime import datetime
from airflow.decorators import dag, task
@dag(
dag_id="tenant_shard_etl",
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
)
def tenant_shards():
@task
def list_active_tenants() -> list[int]:
# Reads the tenants table; returns e.g. [1, 2, 3, 4, 5]
return [1, 2, 3, 4, 5]
@task
def process_tenant(tenant_id: int) -> None:
# Runs once per tenant_id emitted by list_active_tenants
print(f"processing tenant {tenant_id}")
process_tenant.expand(tenant_id=list_active_tenants())
tenant_shards()
-- Verify what a backfill replays
SELECT ti.dag_id,
ti.task_id,
ti.map_index,
ti.dag_version_id,
ti.rendered_map_index
FROM task_instance ti
WHERE ti.dag_id = 'tenant_shard_etl'
AND ti.run_id LIKE 'backfill__2026-06-01%'
ORDER BY ti.map_index;
-- Compare v-old (backfill) vs v-current
SELECT id, created_at, structure_hash
FROM dag_version
WHERE dag_id = 'tenant_shard_etl'
ORDER BY created_at DESC;
Step-by-step explanation.
- In scenario A (structure changed), the current DAG maybe swaps
list_active_tenantsforlist_premium_tenants. This changes the structure hash → new version. A backfill of a 3-week-old date looks up the version live then, which usedlist_active_tenants. The historical structure runs. - In scenario B (only data changed), the DAG structure is unchanged. The backfill uses the same version. But
list_active_tenantsre-runs at backfill time — meaning today's tenants list is what gets mapped, not the tenant list from 3 weeks ago. - This is a subtle but important point: structure is versioned; data is re-evaluated. If historical fidelity of the mapped values matters (e.g. reproducing an exact billing calculation), the upstream must itself read from a stable, versioned data source (event table, snapshot).
- The
map_indexcolumn intask_instancerecords the position within the map;rendered_map_indexrecords the actual mapped value at run time. Both are preserved for the backfill run. - The
structure_hashcolumn indag_versionis what the scheduler compares to detect structural changes. A hash miss creates a new version; a hash hit reuses the current one.
Output.
| Scenario | Structure hash | Backfill behaviour |
|---|---|---|
| A: structure changed | new hash → v-new | Historical version replays; original structure |
| B: only data changed | same hash → v-current | Same version; upstream re-evaluates data |
Rule of thumb. DAG versioning captures structural fidelity, not data fidelity. For workloads where backfill data must be historically exact (billing, compliance), the upstream that feeds the mapping must itself be idempotent-on-date — e.g., read from a dt='2026-06-01' partition, not from a live table.
Senior interview question on DAG versioning semantics
A senior interviewer might ask: "You maintain a 400-DAG Airflow 3 deployment. Someone edits a DAG on Monday, another edit lands Wednesday, and on Friday an auditor asks you to reproduce the Sunday-before-last DAG run exactly. Walk me through the versioning behaviour, the SQL you'd run to confirm, and the failure mode where the reproduction is not exact."
Solution Using the version_id lookup + bundle-stored source + upstream-idempotency check
# End-to-end reproduction walkthrough — code + SQL + verification
from datetime import datetime
# Step 1 — find the version_id for the target date
LOOKUP_SQL = """
SELECT dv.id AS version_id,
dv.created_at,
dv.source_hash,
dv.bundle_url
FROM dag_version dv
WHERE dv.dag_id = %s
AND dv.created_at <= %s
AND (dv.deprecated_at IS NULL OR dv.deprecated_at > %s)
ORDER BY dv.created_at DESC
LIMIT 1;
"""
# Step 2 — fetch the historical source from bundle storage
def fetch_bundle(bundle_url: str) -> bytes:
import boto3
from urllib.parse import urlparse
u = urlparse(bundle_url) # s3://airflow-dag-bundles/prod/dag_v7.py
s3 = boto3.client("s3")
r = s3.get_object(Bucket=u.netloc, Key=u.path.lstrip("/"))
return r["Body"].read()
# Step 3 — trigger the backfill against that version
def backfill_at(dag_id: str, logical_date: str) -> None:
# Airflow 3.1 CLI: backfill picks the version by created_at ≤ logical_date
import subprocess
subprocess.check_call([
"airflow", "dags", "backfill",
"--dag-id", dag_id,
"--logical-date", logical_date,
"--end-date", logical_date,
"--reset-dagruns",
])
# Step 4 — verify the backfill used the expected version
VERIFY_SQL = """
SELECT ti.task_id,
ti.dag_version_id,
ti.state,
dv.source_hash
FROM task_instance ti
JOIN dag_version dv ON dv.id = ti.dag_version_id
WHERE ti.dag_id = %s
AND ti.run_id LIKE %s
ORDER BY ti.task_id;
"""
-- Concrete run against a real audit request
-- "Reproduce sunday_2026_06_14 exactly"
-- Step 1 — pick version
SELECT id, created_at, source_hash, bundle_url
FROM dag_version
WHERE dag_id = 'daily_sales_etl'
AND created_at <= '2026-06-14 00:00:00'
AND (deprecated_at IS NULL OR deprecated_at > '2026-06-14 00:00:00')
ORDER BY created_at DESC
LIMIT 1;
-- returns: id=7, created_at=2026-06-11, source_hash=abc123, bundle_url=s3://.../dag_v7.py
-- Step 4 — verify after backfill
SELECT ti.task_id, ti.dag_version_id, ti.state, dv.source_hash
FROM task_instance ti
JOIN dag_version dv ON dv.id = ti.dag_version_id
WHERE ti.dag_id = 'daily_sales_etl'
AND ti.run_id LIKE 'backfill__2026-06-14%'
ORDER BY ti.task_id;
-- Expected: every row has dag_version_id = 7, source_hash = abc123
Step-by-step trace.
| Step | Command | Result |
|---|---|---|
| 1 | Lookup version at 2026-06-14 | v7, created 2026-06-11, source_hash abc123 |
| 2 | Fetch bundle from S3 | historical DAG source retrieved |
| 3 | Trigger backfill for 2026-06-14 | scheduler runs v7 code |
| 4 | Verify task_instance rows | every task stamped dag_version_id=7 |
| 5 | Compare aggregate output to original run | should match if upstream data is idempotent |
After the reproduction, the auditor gets a signed artefact: "this backfill used dag_version_id=7 (source_hash abc123), pinned to the version live on the original logical date, and the task instances stamp confirms the replay." The reproduction is exact on structure, and exact on data as long as the upstream data source is idempotent-on-date.
Output:
| Property | Original run | Backfill |
|---|---|---|
| dag_version_id | 7 | 7 |
| source_hash | abc123 | abc123 |
| task graph | identical | identical |
| Upstream data | as of 2026-06-14 | as of query time (danger zone) |
| Aggregate outputs match? | — | yes iff upstream is date-idempotent |
Why this works — concept by concept:
-
version_id lookup — the scheduler picks the version whose
created_at <= logical_dateAND (not deprecated OR deprecated after logical_date). This is deterministic and audit-provable. - Bundle-stored source — the historical DAG file lives in S3, keyed by version_id. Without bundle storage, the version_id is a hash but the source is lost — reproduction becomes impossible if the git history has been amended.
-
Task-instance stamping — every replayed task instance carries the
dag_version_id. The auditor's SQL againsttask_instanceis the confirmation artefact; it survives Airflow restarts and metadata migrations. - Upstream data idempotency (the failure mode) — versioning captures code, not data. If the DAG reads from a live table that has been updated since the original run, the backfill computes different results with the same code. The interview signal is naming this failure mode without prompting.
-
Cost — bundle storage costs ~100 KB per version × versions × DAGs. Metadata DB adds one small table (
dag_version) and one column ontask_instance. Query cost for lookup is O(log versions) via the index on(dag_id, created_at). The reproduction cost is O(1) in engineering time — the whole feature is designed to make audits routine, not exceptional.
ETL
Topic — etl
ETL problems on reproducible backfills and versioning
4. MLOps primitives — Assets, model registry integration
airflow assets (renamed from Datasets) are the lineage + freshness + trigger primitive that makes airflow mlops real
The mental model in one line: Airflow 3 renames Datasets to Assets and elevates them from a scheduling hint to a first-class lineage-and-trigger primitive — Assets carry URIs, freshness metadata, and lineage edges; DAGs subscribe to Asset updates via schedule=[Asset(...)]; and dedicated hooks in the airflow.providers.vertex_ai, airflow.providers.mlflow, and airflow.providers.mosaic packages complete the MLOps loop by registering trained models against external registries the moment training finishes. The primitive is what turns a hand-rolled ML DAG into a production MLOps pipeline.
What Assets add over the 2.x Datasets story.
-
Renamed and elevated.
airflow.datasets.Dataset→airflow.assets.Asset. The 2.x import still works with a deprecation warning through 3.1; removed in 3.2. -
URI-first identity. An Asset is identified by a URI (
s3://bucket/path/features.parquet,snowflake://ACCOUNT/DB.SCHEMA.TABLE,bigquery://project.dataset.table). The URI is the canonical primary key across DAGs. -
Freshness metadata. Every Asset write records a
logical_date, an optionalcontent_hash, and an optional row/size delta. Downstream DAGs can require an Asset to be fresh within N minutes before triggering. - Lineage edges. The Assets tab in the UI shows the full dependency graph — which DAG produced this Asset, which DAGs consume it, when it was last updated.
-
Asset-scheduled DAGs.
schedule=[Asset("s3://...")]triggers the DAG the moment any upstream writer updates that Asset. Multi-Asset schedules (schedule=[Asset(A), Asset(B)]) support fan-in semantics.
The MLOps primitives that build on Assets.
-
Model-registry hooks.
VertexAIModelRegisterOperator,MLflowRegisterModelOperator,MosaicRegisterModelOperator(and equivalents for SageMaker, Databricks). Each takes an Asset representing the trained model artefact and registers it against the target registry with metadata. -
Feature-store integration.
FeastMaterializeOperator,TectonMaterializeOperator— Asset-aware feature materialisation that emits an Asset update on completion. -
Evaluation gates.
ModelEvaluationSensorwaits for evaluation metrics to be produced (as Assets);ModelPromotionOperatorpromotes a model version toproductionin the registry only if the evaluation metrics exceed a threshold. -
Batch inference DAGs. Inference DAGs subscribe to the model Asset (
schedule=[Asset("mlflow://models/customer_churn/latest")]) and re-run whenever a new production version lands.
Lineage + freshness — the two axes of Asset semantics.
-
Lineage. Every Asset knows its upstream DAG (
producer_dag_id) and its downstream DAG subscriptions (consumer_dag_ids). The metadata DB stores the edges; the UI visualises them. -
Freshness. Each Asset write carries a timestamp and (optionally) a
freshness_expected_withinhint. Consumers can require the Asset to have been updated within the last N minutes/hours; the sensorAssetFreshnessSensorblocks otherwise.
Common interview probes on Airflow MLOps.
- "What's the difference between Datasets and Assets?" — rename + URI-first + freshness + lineage edges.
- "How does an MLflow model registration integrate with an Airflow DAG?" —
MLflowRegisterModelOperatorconsumes a trained-model Asset, registers, emits a new Asset for the registered version. - "How do you gate model promotion to production?" — sensor waits for evaluation Assets, operator promotes only if metrics pass, emits a
promoted-to-productionAsset that downstream inference DAGs subscribe to. - "Can I still use
KubernetesPodOperatorfor custom training?" — yes; you emit an Asset at the end viaoutlets=[Asset(...)]and downstream DAGs subscribe as normal.
Worked example — an Asset-scheduled training DAG
Detailed explanation. A team trains a customer-churn model daily. The training data is a Parquet file in S3 updated hourly by a separate ETL DAG. Rather than schedule the training DAG on a fixed cron and pray the data is fresh, use an Asset-scheduled DAG that triggers whenever the training data Asset updates.
-
Producer DAG.
feature_engineeringwritess3://features/customer_churn/features.parquethourly. -
Consumer DAG.
train_churn_modelsubscribes to the Asset; triggers the moment the Asset updates. - Registry. Trained model registered against MLflow.
Question. Write both DAGs and show how the Asset flows through them. Explain the freshness guarantee.
Input.
| Component | Value |
|---|---|
| Training data Asset URI | s3://features/customer_churn/features.parquet |
| Model artefact Asset URI | s3://models/customer_churn/model.pkl |
| Registry | MLflow at https://mlflow.example.com |
Code.
# DAG 1 — producer
from datetime import datetime
from airflow.decorators import dag, task
from airflow.assets import Asset
FEATURES = Asset("s3://features/customer_churn/features.parquet")
@dag(
dag_id="feature_engineering",
schedule="@hourly",
start_date=datetime(2026, 1, 1),
catchup=False,
)
def feature_engineering():
@task(outlets=[FEATURES])
def compute_and_write_features() -> None:
# ... read raw events, compute features, write parquet ...
# `outlets=[FEATURES]` marks this task as an Asset producer
pass
compute_and_write_features()
feature_engineering()
# DAG 2 — consumer, Asset-scheduled
from datetime import datetime
from airflow.decorators import dag, task
from airflow.assets import Asset
from airflow.providers.mlflow.operators.registry import MLflowRegisterModelOperator
FEATURES = Asset("s3://features/customer_churn/features.parquet")
MODEL = Asset("s3://models/customer_churn/model.pkl")
@dag(
dag_id="train_churn_model",
schedule=[FEATURES], # trigger the moment FEATURES updates
start_date=datetime(2026, 1, 1),
catchup=False,
)
def train_churn_model():
@task(outlets=[MODEL])
def train() -> str:
# ... load features, train sklearn model, pickle to s3 ...
return "s3://models/customer_churn/model.pkl"
register = MLflowRegisterModelOperator(
task_id="register",
model_artifact_uri="{{ ti.xcom_pull(task_ids='train') }}",
model_name="customer_churn",
mlflow_tracking_uri="https://mlflow.example.com",
)
train() >> register
train_churn_model()
Step-by-step explanation.
- The producer DAG (
feature_engineering) runs hourly. Its single task declaresoutlets=[FEATURES]— this tells Airflow that when this task succeeds, the FEATURES Asset should be marked as updated. The metadata DB records an Asset-update event. - The consumer DAG (
train_churn_model) usesschedule=[FEATURES]instead of a cron string. Airflow's scheduler treats Asset-update events as trigger events; the moment the producer emits an update, the consumer is queued. - If the producer runs at 14:00, 15:00, 16:00, the consumer runs at 14:00+ε, 15:00+ε, 16:00+ε — synchronised with the data, not with the wall clock. If the producer misses an hour (upstream ETL delay), the consumer skips that hour too. No more "training ran but on stale data" incidents.
- The consumer's
traintask also declaresoutlets=[MODEL]— training emits a new Asset. Downstream inference DAGs can subscribe to MODEL and re-run whenever a new model ships. -
MLflowRegisterModelOperatorreceives the model artefact URI from XCom and registers it in MLflow. In production, the registered version becomes discoverable by inference services; the Airflow Asset serves as the trigger source and the MLflow registry serves as the discovery layer.
Output.
| Event | Timestamp | Effect |
|---|---|---|
| feature_engineering runs | 14:00 | FEATURES asset updated |
| train_churn_model queued | 14:00+ε | Asset-scheduled trigger |
| train_churn_model completes | 14:15 | MODEL asset updated; MLflow gets new version |
| Inference DAG queued (if subscribed) | 14:15+ε | Runs on fresh model |
Rule of thumb. Asset-scheduled DAGs replace cron schedules for any pipeline where upstream data freshness matters more than wall-clock time. The Asset is the contract; the DAGs on either side are the implementations.
Worked example — model promotion gate with evaluation Assets
Detailed explanation. A trained model must clear an evaluation gate (accuracy > 0.85) before promotion to production. The pipeline: train → evaluate → gate → promote. The gate is a sensor that waits for the evaluation Asset and checks the metrics; the promotion operator only runs if the gate passes. The production model Asset is what inference DAGs subscribe to.
- Metrics threshold. Accuracy > 0.85 on the holdout set.
-
Promotion. Set the MLflow model stage from
stagingtoproduction. -
Downstream. Inference DAGs subscribed to the
productionAsset re-run.
Question. Design the DAG with the evaluation gate. Show what happens on a passing and a failing model.
Input.
| Component | Value |
|---|---|
| Model Asset | mlflow://models/customer_churn |
| Evaluation Asset | s3://eval/customer_churn/metrics.json |
| Threshold | accuracy > 0.85 |
| Promotion target | mlflow://models/customer_churn?stage=production |
Code.
from datetime import datetime
from airflow.decorators import dag, task
from airflow.assets import Asset
from airflow.exceptions import AirflowSkipException
from airflow.providers.mlflow.operators.registry import (
MLflowRegisterModelOperator,
MLflowPromoteModelOperator,
)
FEATURES = Asset("s3://features/customer_churn/features.parquet")
STAGED_MODEL = Asset("mlflow://models/customer_churn?stage=staging")
METRICS = Asset("s3://eval/customer_churn/metrics.json")
PROD_MODEL = Asset("mlflow://models/customer_churn?stage=production")
@dag(
dag_id="train_evaluate_promote_churn",
schedule=[FEATURES],
start_date=datetime(2026, 1, 1),
catchup=False,
)
def train_eval_promote():
@task(outlets=[STAGED_MODEL])
def train_and_stage() -> str:
# ... train + register in MLflow as `staging` ...
return "mlflow://models/customer_churn?version=42&stage=staging"
@task(outlets=[METRICS])
def evaluate(model_uri: str) -> dict:
# ... load holdout set, score model ...
return {"accuracy": 0.87, "roc_auc": 0.91}
@task
def gate(metrics: dict) -> None:
if metrics["accuracy"] <= 0.85:
raise AirflowSkipException(f"gate failed: accuracy={metrics['accuracy']:.3f}")
promote = MLflowPromoteModelOperator(
task_id="promote",
model_name="customer_churn",
source_stage="staging",
target_stage="production",
outlets=[PROD_MODEL],
)
staged = train_and_stage()
metrics = evaluate(staged)
gate(metrics) >> promote
train_eval_promote()
Step-by-step explanation.
-
train_and_stagetrains and registers to MLflow asstaging.outlets=[STAGED_MODEL]marks the staged model Asset as updated; downstream monitoring dashboards subscribed toSTAGED_MODELcan pick this up. -
evaluatescores the staged model against the holdout set and writes metrics to S3 (outlets=[METRICS]marks that Asset updated). The metrics are also returned as an XCom for the gate task. -
gateis the guard: if accuracy ≤ 0.85, raiseAirflowSkipExceptionwhich marks the task as skipped rather than failed. Downstream tasks in the same branch (viatrigger_rule) skip too. If accuracy > 0.85, gate succeeds, promote runs. -
MLflowPromoteModelOperatortransitions the model fromstagingtoproductionvia the MLflow REST API.outlets=[PROD_MODEL]marks the production Asset updated — any inference DAG subscribed toPROD_MODELfires next. - The elegance: the gate is inside the DAG, but the promotion effect propagates outside the DAG via the Asset. Downstream DAGs never need to know the promotion happened; they subscribe to the Asset and Airflow handles the trigger.
Output.
| Scenario | accuracy | gate | promote runs? | production Asset updated? |
|---|---|---|---|---|
| Pass | 0.87 | success | yes | yes |
| Fail | 0.82 | skipped | no (skipped) | no |
| Borderline | 0.85 | skipped (≤ threshold) | no | no |
Rule of thumb. Evaluation gates + outlets=[production_asset] is the canonical model-promotion pattern in Airflow 3. The Asset is the trigger for downstream inference; the gate is the policy; the operator is the mechanism.
Worked example — Feast feature materialisation Asset chain
Detailed explanation. A production ML stack uses Feast as its feature store. The feature-materialisation DAG reads raw events, materialises features into the online store (Redis) and offline store (S3), and emits an Asset for each. Downstream training DAGs subscribe to the offline-store Asset; inference services read from the online store. The DAG-side integration uses FeastMaterializeOperator.
-
Upstream. Raw event Asset (
s3://raw/events/{ds}/*.parquet). - Feast operator. Materialises features into online (Redis) + offline (S3) stores.
-
Downstream training Asset.
s3://feast/offline/customer_churn/features.parquet.
Question. Write the DAG. Show how the training DAG subscribes to the Feast-materialised Asset.
Input.
| Component | Value |
|---|---|
| Feast repo | git@github.com:team/feast-repo.git |
| Online store | Redis at redis://feast-online:6379 |
| Offline store | s3://feast/offline/... |
Code.
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.assets import Asset
from airflow.providers.feast.operators.materialize import FeastMaterializeOperator
RAW_EVENTS = Asset("s3://raw/events/latest.parquet")
OFFLINE_FEATURES = Asset("s3://feast/offline/customer_churn/features.parquet")
@dag(
dag_id="feast_materialize_customer_churn",
schedule=[RAW_EVENTS],
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={"retries": 2},
)
def feast_materialize():
materialize = FeastMaterializeOperator(
task_id="materialize",
feature_views=["customer_churn_features"],
start_ts="{{ data_interval_start }}",
end_ts="{{ data_interval_end }}",
feast_repo="/opt/airflow/feast_repo",
outlets=[OFFLINE_FEATURES],
)
materialize
feast_materialize()
# Training DAG subscribes to the Feast-materialised offline Asset
from airflow.assets import Asset
from airflow.decorators import dag, task
from datetime import datetime
OFFLINE_FEATURES = Asset("s3://feast/offline/customer_churn/features.parquet")
MODEL = Asset("mlflow://models/customer_churn?stage=staging")
@dag(
dag_id="train_churn_from_feast",
schedule=[OFFLINE_FEATURES],
start_date=datetime(2026, 1, 1),
catchup=False,
)
def train_churn():
@task(outlets=[MODEL])
def train() -> str:
# ... load from OFFLINE_FEATURES, train, register in MLflow ...
return "mlflow://models/customer_churn?version=n&stage=staging"
train()
train_churn()
Step-by-step explanation.
-
feast_materialize_customer_churnsubscribes toRAW_EVENTS. Whenever a raw-events Asset update lands, the Feast materialisation kicks off. The DAG is Asset-scheduled, not cron-scheduled. -
FeastMaterializeOperatorruns the equivalent offeast materialize-incrementalagainst the repo, materialising the specified feature views into both online (Redis) and offline (S3) stores.outlets=[OFFLINE_FEATURES]marks the offline Asset updated on success. -
train_churn_from_feastsubscribes toOFFLINE_FEATURES. The moment Feast finishes materialising, the training DAG queues. Thetraintask reads from the offline store, trains, and emits a new MODEL Asset. - The dependency graph in the Assets tab of the UI now shows:
RAW_EVENTS → feast_materialize → OFFLINE_FEATURES → train_churn → MODEL. This is the lineage senior interviewers ask about — visible, queryable, unambiguous. - Feast's operator handles the
--end-tstemplate correctly: the materialisation window matches the DAG run's data interval, so backfills materialise the historically-correct window.
Output.
| DAG | Asset produced | Asset consumed |
|---|---|---|
| raw_events_etl (upstream) | RAW_EVENTS | — |
| feast_materialize | OFFLINE_FEATURES | RAW_EVENTS |
| train_churn | MODEL | OFFLINE_FEATURES |
| inference_churn (downstream) | predictions | MODEL |
Rule of thumb. Feature store + Airflow 3 Assets = a fully wired MLOps lineage graph with zero glue code. The FeastMaterializeOperator (or TectonMaterializeOperator) is the bridge; the Asset is the contract; the schedule is the trigger.
Senior interview question on Airflow MLOps architecture
A senior interviewer might ask: "Design an Airflow 3 MLOps pipeline that trains a churn model daily, evaluates it on a holdout set, promotes to production only if accuracy exceeds a threshold, and re-triggers a batch inference DAG whenever a new production model ships. Walk me through the DAGs, the Assets, and the failure modes."
Solution Using Asset chains + evaluation gate + registry hook + Asset-scheduled inference
# Complete four-DAG MLOps pipeline
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.assets import Asset
from airflow.exceptions import AirflowSkipException
from airflow.providers.mlflow.operators.registry import (
MLflowRegisterModelOperator,
MLflowPromoteModelOperator,
)
from airflow.providers.feast.operators.materialize import FeastMaterializeOperator
# ---- Assets ----
RAW = Asset("s3://raw/events/latest.parquet")
FEATURES = Asset("s3://feast/offline/customer_churn/features.parquet")
STAGED = Asset("mlflow://models/customer_churn?stage=staging")
METRICS = Asset("s3://eval/customer_churn/metrics.json")
PROD = Asset("mlflow://models/customer_churn?stage=production")
# ---- DAG 1: materialise features ----
@dag(dag_id="mlops_materialize",
schedule=[RAW],
start_date=datetime(2026, 1, 1),
catchup=False)
def materialize():
FeastMaterializeOperator(
task_id="mat",
feature_views=["customer_churn_features"],
feast_repo="/opt/airflow/feast_repo",
outlets=[FEATURES],
)
materialize()
# ---- DAG 2: train + register staging ----
@dag(dag_id="mlops_train",
schedule=[FEATURES],
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)})
def train():
@task(outlets=[STAGED])
def train_task() -> str:
return "mlflow://models/customer_churn?version=n&stage=staging"
train_task()
train()
# ---- DAG 3: evaluate + promote if pass ----
@dag(dag_id="mlops_evaluate_promote",
schedule=[STAGED],
start_date=datetime(2026, 1, 1),
catchup=False)
def evaluate_promote():
@task(outlets=[METRICS])
def eval_task() -> dict:
return {"accuracy": 0.87, "roc_auc": 0.91}
@task
def gate_task(m: dict) -> None:
if m["accuracy"] <= 0.85:
raise AirflowSkipException(f"gate failed: acc={m['accuracy']}")
promote = MLflowPromoteModelOperator(
task_id="promote",
model_name="customer_churn",
source_stage="staging",
target_stage="production",
outlets=[PROD],
)
m = eval_task()
gate_task(m) >> promote
evaluate_promote()
# ---- DAG 4: batch inference re-runs on every prod update ----
@dag(dag_id="mlops_infer",
schedule=[PROD],
start_date=datetime(2026, 1, 1),
catchup=False)
def infer():
@task
def infer_task() -> None:
# ... load PROD model, score, write predictions ...
pass
infer_task()
infer()
Step-by-step trace.
| DAG | Consumes | Produces | Trigger |
|---|---|---|---|
| mlops_materialize | RAW | FEATURES | Asset (raw events landing) |
| mlops_train | FEATURES | STAGED | Asset (features materialised) |
| mlops_evaluate_promote | STAGED | METRICS, PROD (if pass) | Asset (staged model) |
| mlops_infer | PROD | (predictions) | Asset (production promotion) |
After the pipeline stabilises, the daily loop is: raw events land → features materialise → training runs → evaluation runs → promotion happens (or skips) → inference re-runs on the new production model. Every step is Asset-scheduled; no cron; freshness is guaranteed by the Asset contract; lineage is queryable in the UI.
Output:
| Metric | Value |
|---|---|
| DAGs in pipeline | 4 |
| Schedules used | 0 cron / 4 Asset-scheduled |
| Failure modes handled | training retry, gate skip, missing raw data (no trigger) |
| Lineage visible in UI | full graph |
| Model promotion gated | yes (accuracy > 0.85) |
| Registry integration | MLflow (register + promote) |
Why this works — concept by concept:
-
Asset chains as the contract — every DAG boundary is an Asset. The chain
RAW → FEATURES → STAGED → PRODis the pipeline; the DAGs are just the mechanism. Adding a new step is one more Asset in the chain, not a schedule refactor. -
Evaluation gate as a skip —
AirflowSkipExceptionis idiomatic for "we don't want to promote this run"; it is not a failure. The gate DAG stays green in the UI even when it skips; only the promotion is elided. -
Registry hook as an operator —
MLflowPromoteModelOperatoris a first-class operator, not a bash-shelled MLflow CLI call. The integration is stable and testable; the operator emits an Asset update for the promotion. - Asset-scheduled inference — inference is not on cron. It runs the moment a new production model ships. No stale-model incidents.
-
Cost — four DAGs, one Asset per boundary, one Feast + one MLflow provider. Operational cost is O(1) — the Asset scheduler is part of the core Airflow runtime. Metadata cost is one row per Asset per update, indexed on
(asset_uri, updated_at). The savings versus the 2.x custom-cron-and-XCom-signalling equivalent are measurable in engineer-weeks per pipeline.
ETL
Topic — etl
ETL problems on lineage and asset-scheduled pipelines
5. UI redesign + Edge deploy + migration path
The unified UI, the Edge worker Helm chart, and the airflow db upgradecheck airflow 3 migration path — the three things every 2.x-to-3.x cutover touches
The mental model in one line: Airflow 3 ships a unified UI that collapses Graph / Grid / Calendar / DAG-run views into a single lens with Assets, Versions, and Task Groups tabs; provides a canonical Helm chart and systemd path for edge worker deployment; and gives you airflow db upgradecheck — the linter that surfaces every deprecated API in your 2.x DAGs before you cut over — so the entire 2.x → 3.x migration reduces to (a) run the checker, (b) fix the errors on 2.x first, (c) cut over with confidence. The UI redesign is the daily productivity win; the upgrade checker is the safety-net that makes senior engineers actually adopt.
The unified UI — one lens, three tabs.
- The old UI's problem. In 2.x, you'd look at a failed task and cross-reference Graph (for structure), Grid (for run history), Calendar (for scheduling gaps), and the Datasets page (for lineage). Four tabs, four mental context switches.
- The 3.x UI. A single DAG-run view with tabs at the top: Overview (Graph + Grid combined), Task Groups (collapsible hierarchical view), Versions (this run's DAG version + diff to previous), Assets (produced/consumed by this run).
-
The flag. In 3.0, the new UI shipped behind
AIRFLOW__UI__NEW_UI=truefor opt-in. In 3.1 it is default; the legacy UI is available via a?legacy=1query param. - What senior users notice first. Time-to-diagnose-a-failed-task drops from ~5 minutes to <60 seconds — the version and Asset context is right there next to the task instance detail.
Edge Worker deployment — Helm chart and systemd.
-
Helm chart. The official
apache-airflow/airflowHelm chart adds anedgeWorkersection (mirroring the existingworkerssection). SetedgeWorker.enabled: trueand provideapiUrl,queues,concurrency, andsecretsRef. -
systemd path. For on-prem or non-K8s deploys, a systemd unit runs
airflow edge workerwith env vars. Same behaviour, different packaging. - Astronomer / MWAA / Cloud Composer. Managed Airflow services expose Edge Worker registration via a UI-issued token; the operational path is identical to self-hosted.
- Scale story. One Edge Worker pod per queue per site. Concurrency = local task slots; total capacity = pods × concurrency.
The airflow 3 migration path.
-
Step 1 — inventory.
airflow dags liston 2.x +airflow db upgradecheck --run-allprints every deprecated import, every removed operator, every incompatible custom XCom backend. -
Step 2 — fix on 2.x first. Every finding gets a PR against the 2.x branch.
SubDagOperator→TaskGroup.airflow.datasets→airflow.assets(works on 2.9+ via compat shim). Custom XCom backends updated to the 3.x interface. - Step 3 — parallel staging. Stand up a 3.x staging cluster with the same executor, same connections, same variables. Deploy the same DAG bundle. Run parallel for 1–2 weeks.
- Step 4 — cutover. Point production traffic to 3.x. Retire 2.x after 2 weeks of clean prod.
- Step 5 — adopt. Enable DAG versioning bundle storage, migrate Datasets imports to Assets, deploy Edge Workers as workloads require, turn on the new UI.
The upgrade checker (airflow db upgradecheck).
- What it scans. Every DAG file in the DAGs folder, plus the metadata DB for deprecated executor configs, deprecated auth backends, and deprecated provider packages.
-
What it reports. Categorised list —
ERROR(must fix before cutover),WARNING(should fix before 3.2),INFO(nice-to-have). - CI integration. Run in CI on every PR; block merges that introduce ERRORs.
-
The escape hatch. For findings you can't fix immediately,
airflow db upgradecheck --ignore <rule>suppresses the rule with a tracked ignore-file — never a silent skip.
Common interview probes on the UI + migration.
- "What's new in the Airflow 3 UI?" — unified lens, Assets/Versions/Task Groups tabs.
- "How do you migrate a 2.x deployment?" — upgradecheck → fix on 2.x → staging parallel → cutover → adopt.
- "What's the fastest 3.x adoption you'd recommend?" — 6 months for a 400-DAG deployment.
- "What breaks in the migration?" — SubDagOperator (removed), experimental REST API (removed), custom XCom backends (interface changed).
Worked example — running the upgrade checker on a 2.x deployment
Detailed explanation. A team wants to inventory their 2.9 deployment before starting the 3.x migration. They run airflow db upgradecheck --run-all --format json in CI and process the output into a tracking spreadsheet. Every finding gets an owner, a category, and a due date.
-
Command.
airflow db upgradecheck --run-all --format json > findings.json. - Output. JSON list of findings — rule_id, severity, dag_id, file, line, message.
- Triage. Findings sorted by severity, filtered by rule_id, grouped by owning team.
Question. Write the CI job and the triage helper. Show what the output looks like for a real 2.x deployment with 400 DAGs.
Input.
| Component | Value |
|---|---|
| Airflow version | 2.9.4 |
| DAGs folder | /opt/airflow/dags |
| CI system | GitHub Actions |
| Reporting | Sheet + Slack |
Code.
# .github/workflows/airflow-upgradecheck.yml
name: Airflow 3 upgrade checker
on:
pull_request:
paths:
- "dags/**"
- "airflow.cfg"
schedule:
- cron: "0 12 * * MON" # weekly Monday scan
jobs:
upgradecheck:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Airflow 2.9 with upgrade checker
run: |
python -m venv .venv
source .venv/bin/activate
pip install "apache-airflow==2.9.4" "apache-airflow-upgrade-check>=3.0"
- name: Run upgradecheck
run: |
source .venv/bin/activate
airflow db upgradecheck --run-all --format json > findings.json
- name: Triage & block on ERROR
run: |
python scripts/triage_upgradecheck.py findings.json
- uses: actions/upload-artifact@v4
with:
name: upgradecheck-findings
path: findings.json
# scripts/triage_upgradecheck.py
import json, sys
from collections import Counter
path = sys.argv[1]
with open(path) as f:
findings = json.load(f)
by_severity = Counter(f["severity"] for f in findings)
by_rule = Counter(f["rule_id"] for f in findings)
print("Upgrade check findings:")
for sev, n in by_severity.most_common():
print(f" {sev:>8}: {n}")
print("\nTop rules:")
for rule, n in by_rule.most_common(10):
print(f" {rule:>40}: {n}")
# Fail CI if any ERRORs
errors = [f for f in findings if f["severity"] == "ERROR"]
if errors:
print(f"\n{len(errors)} ERROR-severity findings — blocking merge")
for e in errors[:5]:
print(f" {e['dag_id']}:{e['line']} {e['rule_id']} {e['message']}")
sys.exit(1)
Step-by-step explanation.
- The CI workflow installs 2.9 + the upgrade-check package into a scratch venv, then runs
airflow db upgradecheck --run-allagainst the DAGs folder. The--run-allflag enables every rule;--format jsonproduces machine-readable output. - The triage script summarises findings by severity and by rule_id. Common rule_ids in a 2.9 deployment:
SubDagOperatorRule(SubDagOperator removed),ExperimentalApiRule(experimental REST removed),DatasetImportRule(rename to Assets),AuthBackendRule(deprecated auth backends). - The CI job fails if any
ERROR-severity finding exists. This makes upgrade-blocker regressions impossible — a PR that adds a SubDagOperator import fails CI immediately. - The weekly cron scan catches drift: even PRs that don't touch DAGs sometimes bring in provider-package changes that surface a new finding. The weekly run keeps the tracking sheet honest.
- The uploaded artefact preserves the full JSON for offline analysis. Teams import it into a tracking spreadsheet (or Jira, or Linear) and assign owners.
Output.
| Severity | Count |
|---|---|
| ERROR | 8 |
| WARNING | 42 |
| INFO | 137 |
| Top rule | DatasetImportRule (68) |
| Second | SubDagOperatorRule (5) |
| Third | ExperimentalApiRule (3) |
Rule of thumb. Ship the upgrade checker in CI before you plan the migration timeline. The ERROR count is the size of your block-tier work; the WARNING count sizes your post-cutover backlog. Numbers before dates.
Worked example — deploying an Edge Worker via Helm
Detailed explanation. A team runs their control plane in EKS (Elastic Kubernetes Service on AWS) and wants to add an Edge Worker pool for their on-prem sensor pipeline. The on-prem site runs a small K3s cluster. Deploy the Edge Worker via the official Helm chart with values pointing at the cloud control plane's API.
-
Control plane. EKS in us-east-1, Airflow API at
airflow.example.com. -
Edge site. K3s in the factory, needs Edge Worker for
queue=factory. - Auth. Bearer token issued from Airflow UI, stored as a K8s secret.
Question. Write the Helm values file for the Edge Worker deploy. Show the install command and the smoke test.
Input.
| Component | Value |
|---|---|
| Helm chart | apache-airflow/airflow |
| Chart version | ≥ 1.16 (Airflow 3 support) |
| Edge Worker queue | factory |
| Concurrency | 4 |
Code.
# edge-worker-values.yaml
airflowVersion: 3.1.2
# Turn everything else off — this cluster only runs Edge Workers
scheduler:
enabled: false
webserver:
enabled: false
triggerer:
enabled: false
postgresql:
enabled: false
redis:
enabled: false
edgeWorker:
enabled: true
replicas: 2
queues:
- factory
concurrency: 4
env:
- name: AIRFLOW__EDGE__API_URL
value: https://airflow.example.com
- name: AIRFLOW__EDGE__HEARTBEAT_SECONDS
value: "5"
- name: AIRFLOW__EDGE__LOG_UPLOAD_URL
value: https://airflow.example.com/api/v1/edge/logs
extraEnvFrom:
- secretRef:
name: edge-worker-secrets
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2000m
memory: 4Gi
nodeSelector:
role: airflow-edge
# Install
kubectl create ns airflow-edge
kubectl -n airflow-edge create secret generic edge-worker-secrets \
--from-literal=AIRFLOW__EDGE__API_TOKEN='eyJhbGciOi...'
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow-edge apache-airflow/airflow \
--namespace airflow-edge \
--version 1.16.0 \
--values edge-worker-values.yaml
# Smoke test — check the worker registered with the control plane
kubectl -n airflow-edge logs -l component=edge-worker --tail=50
# Verify from control plane
airflow edge workers list
# +--------------+---------+--------+------------+-----------+
# | worker_id | queue | state | last_hb | concurrency|
# +--------------+---------+--------+------------+-----------+
# | edge-abc123 | factory | active | 2s ago | 4 |
# | edge-def456 | factory | active | 3s ago | 4 |
# +--------------+---------+--------+------------+-----------+
Step-by-step explanation.
- The values file turns off every component except
edgeWorker— no scheduler, no webserver, no metadata DB. This K3s cluster runs Edge Workers only; the control plane lives elsewhere. -
edgeWorker.replicas: 2runs two worker pods per site for HA. Each pod concurrency = 4 → total site capacity 8 concurrent tasks. If one pod fails, the other keeps working; the control plane sees the unhealthy pod's heartbeat stop and re-queues its in-flight tasks. - The
envblock sets the API URL and heartbeat interval.AIRFLOW__EDGE__HEARTBEAT_SECONDS=5is the recommended cadence; anything faster wastes API cycles, anything slower delays worker-lost detection. - The
extraEnvFrompulls the API token from a K8s secret. The secret is created out-of-band viakubectl create secret; secrets never live in the values file in git. - The smoke test has two parts: (a)
kubectl logsshows the worker's registration log line, (b)airflow edge workers liston the control plane shows the worker registered, active, and heartbeating. Both must pass before considering the deploy successful.
Output.
| Component | Value |
|---|---|
| Pods | 2 × edgeWorker |
| Queue | factory |
| Concurrency per pod | 4 |
| Total site capacity | 8 concurrent tasks |
| Auth | Bearer token via secret |
| Heartbeat cadence | 5s |
| Control-plane visibility | 2 active workers |
Rule of thumb. Helm-deploy Edge Workers as their own release, in their own namespace, on their own cluster. Never mix them with the control-plane Helm release — the concerns are entirely separate, and the operational separation makes upgrades independent.
Worked example — a 2.x → 3.x cutover weekend
Detailed explanation. A team has done all the prep — upgradecheck passes, staging has parity, block-tier findings are fixed. The cutover weekend is 48 hours. Walk through the hour-by-hour plan: Friday 18:00 freeze, Saturday morning cutover, weekend soak, Monday morning declare-victory or rollback.
- Friday 18:00. Freeze DAG changes on both branches.
- Saturday 08:00. Blue/green flip: DNS points to 3.x cluster.
- Saturday–Sunday. Soak; watch dashboards; keep 2.x hot for rollback.
- Monday 09:00. Declare victory or roll back.
Question. Produce the runbook for the 48-hour cutover, with rollback triggers and success criteria.
Input.
| Parameter | Value |
|---|---|
| Cutover window | Fri 18:00 → Mon 09:00 |
| Rollback trigger | task fail-rate > 5% for 30 min |
| Success criterion | 48h at fail-rate ≤ 1% |
Code.
Cutover runbook — 2.9 → 3.1
===========================
Friday
18:00 DAG freeze on both branches; no merges to main until Monday
18:15 Final upgradecheck on 2.x snapshot; expect 0 ERRORs
18:30 Backup Airflow metadata DB (pg_dump)
19:00 Deploy 3.1 to blue cluster (parallel to 2.9 green)
19:30 Run smoke DAG on blue; verify all 400 DAGs parse
Saturday
08:00 Blue/green flip: DNS points to 3.1 cluster
Turn on unified UI (AIRFLOW__UI__NEW_UI=true)
08:15 Watch task fail-rate dashboard; alert threshold 5% for 30m
10:00 Manual spot-check: pick 20 diverse DAG runs, verify success + version stamp
14:00 Rehearsal: kill an Edge Worker pod, confirm re-queue behaviour
18:00 Sitrep #1: overall fail-rate, saturation, MLops asset triggers
Sunday
10:00 Sitrep #2
14:00 Rehearsal: pretend rollback (do not execute) — verify DNS flip time <60s
18:00 Sitrep #3
Monday
09:00 If 48h fail-rate ≤ 1%: declare victory
→ decommission 2.9 cluster over the following week
→ announce migration complete to stakeholders
→ open backlog tickets for post-cutover adoption:
- migrate remaining Dataset imports to Asset
- deploy production DAG versioning bundle
- deploy first Edge Worker to hybrid site
Else: roll back
→ DNS flip back to 2.9 green cluster
→ restore metadata DB from Friday 18:30 backup
→ post-mortem within 48 hours
# Cutover watchdog — page on fail-rate spike
import time, sys, psycopg2
from datetime import datetime, timedelta
FAIL_RATE_THRESHOLD = 0.05 # 5%
WINDOW_MINUTES = 30
def fail_rate(conn) -> float:
cur = conn.cursor()
cur.execute("""
SELECT COUNT(*) FILTER (WHERE state = 'failed')::float / NULLIF(COUNT(*), 0)
FROM task_instance
WHERE start_date > now() - INTERVAL '30 minutes'
""")
return cur.fetchone()[0] or 0.0
def main():
conn = psycopg2.connect("postgres://.../airflow")
while True:
rate = fail_rate(conn)
print(f"{datetime.utcnow().isoformat()} fail_rate={rate:.2%}")
if rate > FAIL_RATE_THRESHOLD:
# page on-call, then exit — human decides rollback
sys.stderr.write(f"ROLLBACK ALERT: fail_rate={rate:.2%}\n")
sys.exit(1)
time.sleep(60)
if __name__ == "__main__":
main()
Step-by-step explanation.
- Friday's freeze prevents "one more merge" from destabilising the migration. Both 2.x and 3.x branches are frozen; the DB backup is the rollback safety net.
- Saturday 08:00 is the go-live moment. DNS flip is preferred over gradual traffic split because Airflow doesn't cleanly support two clusters sharing a metadata DB; the 3.1 cluster gets its own DB (restored from the Friday backup).
- The fail-rate watchdog runs every minute. Threshold 5% for 30 minutes is aggressive-enough to catch a real regression, conservative-enough to avoid pages on transient blips.
- The rehearsal steps (Saturday 14:00, Sunday 14:00) exercise the parts of the runbook that operational muscle-memory forgets: killing an Edge Worker pod, verifying DNS flip timing, checking metadata restore semantics.
- Monday 09:00 is the decision point. Declare victory or roll back. The 48-hour fail-rate threshold (≤ 1%) is the objective criterion; the sitreps are the confidence check. Rollback in this framework is a routine operation, not a heroic act.
Output.
| Time | Event | Success signal |
|---|---|---|
| Fri 19:30 | Blue cluster live in parallel | All 400 DAGs parse |
| Sat 08:00 | DNS flip | Task fail-rate < 5% |
| Sat 10:00 | Spot-check 20 DAGs | All succeed with version stamp |
| Sat 18:00 | Sitrep #1 | Fail-rate ≤ 1% |
| Mon 09:00 | Decision | 48h fail-rate ≤ 1% → victory |
Rule of thumb. Cutover weekends are a runbook problem, not an engineering problem. Ship the runbook + the watchdog + the rehearsals a week before Friday; the actual weekend is boring by design.
Senior interview question on the migration path
A senior interviewer might ask: "You've inherited a 2.9 Airflow deployment with 400 DAGs, a large custom XCom backend, several SubDagOperator-based DAGs, and no CI upgrade checker. Walk me through the migration to 3.1, the order of operations, the rollback strategy, and what you'd adopt on day 1 of the new cluster."
Solution Using upgradecheck-first + fix-on-2.x + parallel staging + Asset-first adoption
Migration plan — 2.9 → 3.1 for 400-DAG deployment
==================================================
Phase A — Foundation (weeks 1–2)
A1 Ship airflow db upgradecheck to CI (blocks ERROR-severity merges)
A2 Inventory findings: expect ~8 ERROR, ~42 WARNING, ~137 INFO
A3 Assign each ERROR to a DAG owner with a fixed-by date
A4 Rewrite custom XCom backend against 3.x interface (biggest single risk)
Phase B — Block-tier remediation (weeks 3–6)
B1 Refactor every SubDagOperator to TaskGroup (5 DAGs)
B2 Migrate every experimental-REST-API caller to the stable API (3 clients)
B3 Rename airflow.datasets → airflow.assets in DAG imports (68 DAGs)
B4 Re-run upgradecheck; expect 0 ERROR
Phase C — Parallel staging (weeks 7–8)
C1 Stand up 3.1 staging cluster with identical executor + connections + variables
C2 Deploy DAG bundle to staging; run every DAG at least once
C3 Compare task success rate, runtime, XCom shape against 2.9 baseline
C4 Sign-off gate: 100% DAGs green in staging for 48 consecutive hours
Phase D — Cutover weekend (week 9)
D1 Friday freeze; backup metadata DB
D2 Saturday DNS flip; unified UI on
D3 48h soak with watchdog; rollback trigger = fail-rate > 5% for 30m
D4 Monday decision: victory or rollback
Phase E — Day-1 adoption (week 10+)
E1 Enable DAG versioning bundle storage (S3)
E2 Ship the version-health alert (task fail-rate per new version)
E3 Deploy first Edge Worker (for the hybrid customer workload)
E4 Rewrite one training DAG against Assets + MLflow (proof-of-concept for MLOps team)
Step-by-step trace.
| Phase | Weeks | Exit gate | Risk |
|---|---|---|---|
| A: Foundation | 1–2 | Findings inventory + owners assigned | XCom backend surprises |
| B: Block-tier | 3–6 | 0 ERROR in upgradecheck | SubDag rewrites; regression on Asset rename |
| C: Staging | 7–8 | 100% DAG parity for 48h | Missing staging connections |
| D: Cutover | 9 | 48h fail-rate ≤ 1% | Watchdog false negatives |
| E: Adoption | 10+ | Versioning + Edge + MLOps proof | Team capacity |
After phase E is running, the deployment is a fully modern Airflow 3.1 cluster with the unified UI, DAG versioning + bundle storage, at least one Edge Worker in production, and a paved path for the MLOps team to migrate training DAGs one at a time. The migration is complete; adoption is ongoing.
Output:
| Property | Before | After |
|---|---|---|
| Airflow version | 2.9.4 | 3.1.2 |
| upgradecheck errors | ~8 | 0 |
| Custom XCom backend | 2.x interface | 3.x interface |
| SubDagOperator use | 5 DAGs | 0 (TaskGroups) |
| Dataset imports | 68 | 0 (Assets) |
| DAG versioning | off | on with bundle storage |
| Edge Workers | 0 | 1 site (customer workload) |
| Unified UI | off | on |
Why this works — concept by concept:
- Upgradecheck-first — the migration plan starts with instrumentation, not code changes. The ERROR count sizes the block-tier work; the WARNING count sizes the post-cutover backlog. Numbers before dates.
- Fix-on-2.x — every block-tier fix ships on the 2.9 branch first. This means the fixes are stress-tested against the current production workload before the 3.1 cutover ever happens. Big-bang migrations fail; incremental de-risking succeeds.
- Parallel staging — a 3.1 cluster running the same DAGs against staged data for 2 weeks catches almost every latent incompatibility. The 48h all-green gate is the objective criterion.
- Cutover weekend as a boring runbook — the actual go-live is a DNS flip + 48h watch. The interesting engineering happened in phases A through C.
- Cost — 9 weeks calendar time, ~2 senior DEs. The adoption phase (versioning, Edge, MLOps) continues indefinitely at a lower burn rate. Skipping upgradecheck typically doubles the timeline because block-tier findings surface in staging instead of in phase A.
ETL
Topic — etl
ETL problems on migration planning and orchestration cutover
Optimization
Topic — optimization
Optimization problems on Airflow deployment and rollout
Cheat sheet — Airflow 3 recipes
- When to upgrade from 2.x. Any two of {hybrid workload, MLOps roadmap, Python 3.12+ target, provider dependency on 3.x-only package} = upgrade now. Otherwise, upgrade within 12 months; the 2.x line is maintenance-only. Do not upgrade without CI upgrade-checker instrumentation first.
-
Edge Worker startup (systemd).
ExecStart=/opt/airflow-edge/venv/bin/airflow edge workerwithAIRFLOW__EDGE__API_URL=https://airflow.example.com,AIRFLOW__EDGE__QUEUES=factory,AIRFLOW__EDGE__CONCURRENCY=4. Bearer token fromEnvironmentFile=/etc/airflow-edge/secrets.env. Only outbound HTTPS/443 required. -
Edge Worker startup (Helm).
edgeWorker.enabled: truein the officialapache-airflow/airflowchart; provideapiUrl,queues,concurrency,secretsRef. Deploy in its own namespace, separate from the control-plane Helm release.replicas: 2for site HA. -
DAG version history query.
SELECT ti.dag_id, ti.task_id, ti.run_id, ti.dag_version_id, dv.created_at, dv.source_hash FROM task_instance ti JOIN dag_version dv ON dv.id = ti.dag_version_id WHERE ti.dag_id = %s ORDER BY ti.start_date DESC LIMIT 100;Thedag_version_idon every task_instance row is the audit artefact. -
DAG versioning bundle storage.
[dag_bundles.s3_prod] type=s3, bucket=airflow-dag-bundles, prefix=prod/, snapshot_on_parse=true. Every DAG source snapshotted on every version bump. Without this, historical DAG source is not recoverable and versioning is only half a feature. -
Bad-deploy rollback via deprecation.
airflow dags versions deprecate <dag_id> --version-id <N> --reason "..."marks a version bad; the scheduler auto-falls-back to the newest non-deprecated version. Faster and safer than a git revert; reversible withairflow dags versions undeprecate. -
Asset (formerly Dataset) declaration.
from airflow.assets import Asset; FEATURES = Asset("s3://features/customer_churn/features.parquet"). Emit with@task(outlets=[FEATURES]). Consume with@dag(schedule=[FEATURES], ...). Asset URIs are the canonical primary key across DAGs. -
Airflow 2 → 3 upgrade checker.
pip install "apache-airflow-upgrade-check>=3.0"thenairflow db upgradecheck --run-all --format json. Ship in CI; block PRs on ERROR severity. Weekly cron scan catches drift from provider-package changes. - Migration checklist (9-week plan). Weeks 1–2: instrument upgradecheck. Weeks 3–6: fix block-tier findings on 2.x. Weeks 7–8: parallel staging 3.x. Week 9: cutover weekend. Week 10+: enable DAG versioning bundle storage, adopt Assets, deploy first Edge Worker.
-
Asset-scheduled DAG.
@dag(schedule=[Asset("s3://...")], ...)triggers on Asset update. Multi-Asset schedules (schedule=[Asset(A), Asset(B)]) support fan-in semantics. Never mixschedule=[...]with a cron; Assets win. -
Model-promotion gate.
AirflowSkipExceptioninside a gate task; downstreamMLflowPromoteModelOperatorwithoutlets=[PROD_ASSET]. Downstream inference DAGs subscribe toPROD_ASSET. Idiomatic MLOps promotion, no cron. -
Idempotent Edge task pattern. Key writes on
run_id, not on wall-clock time.retries=3with exponential backoff;retry_delay=timedelta(minutes=1);retry_exponential_backoff=True. worker_lost becomes a routine event, not an incident. -
XCom PII guardrail for Edge Workers. Pre-XCom hook scans strings for SSN/email/credit-card patterns; raises
AirflowFailExceptionon hit. Combined with per-queue routing, gives compliance officers a signed-off GDPR/HIPAA architecture with no ad-hoc reviews. - Cutover watchdog. Task fail-rate > 5% for 30 minutes → page on-call → human decides rollback. DNS-flip rollback in <60 seconds. 48-hour soak with all-green ≤ 1% fail-rate = declare victory. Boring by design.
- Version-health alert. Task fail-rate per DAG version, evaluated hourly against versions created in the last 2 hours. Threshold 50% fail-rate over ≥5 runs. Catches bad deploys within 1 hour; runbook: deprecate the version, trigger a clean run, file bug.
Frequently asked questions
What's new in airflow 3 — the one-paragraph pitch?
Airflow 3.0 (April 2025) is the biggest release since 2.0 and centres on four themes: Edge Workers (task-executing processes running outside the main cluster that reverse-connect to the Airflow API — no inbound port required, unlocking hybrid on-prem + cloud, GDPR, HIPAA workloads); DAG versioning (every task instance stamped with the DAG version that ran it, so backfills replay historical code rather than the current main); MLOps primitives built on Assets (renamed from Datasets — URI-first, freshness-aware, lineage-carrying), with first-class hooks for Vertex AI, MLflow, and Mosaic model registries; and a unified UI that collapses Graph / Grid / Calendar into a single lens with Assets, Versions, and Task Groups tabs. Beneath those four, the release also delivers a new upgrade checker (airflow db upgradecheck), improved provider packaging, Python 3.13 support, and the removal of long-deprecated APIs (experimental REST, SubDagOperator).
Edge Worker vs KubernetesExecutor — when do I pick each?
KubernetesExecutor is the right choice when your workers should run in the same cluster as your scheduler and you want per-task pod isolation with autoscaling. It assumes the scheduler can create pods in a Kubernetes cluster it can reach — great for in-cluster workloads, cloud-native deployments, and teams that already run K8s. Edge Worker is the right choice when your workers need to run outside the scheduler's cluster — a customer data centre, an edge/IoT site, a GDPR/HIPAA-compliant regional cluster, or an on-prem factory. Edge Workers reverse-connect to the Airflow API over outbound HTTPS/443; no inbound port on the worker side, no VPN, no shared broker. In practice, most 3.x deployments use both — KubernetesExecutor for the in-cluster workload and Edge Workers for the hybrid or remote workload. Queue tags (queue=factory, queue=eu_central) route DAG tasks to the right executor.
Do I need to rewrite my 2.x DAGs to move to 3.x?
For the vast majority of DAGs — no. About 90% of typical 2.x DAGs import unchanged on 3.x. The 10% that do need edits fall into three buckets: hard removals (SubDagOperator → TaskGroup; the experimental REST API → the stable API; custom XCom backends → the 3.x interface); rename with compat shim (from airflow.datasets import Dataset → from airflow.assets import Asset — the old import still works with a deprecation warning through 3.1, hard-removed in 3.2); and auth backend changes (a couple of deprecated auth backends were removed). The right workflow is to install the upgrade checker (pip install apache-airflow-upgrade-check>=3.0), run airflow db upgradecheck --run-all --format json against your 2.x deployment, and triage the output into ERROR (must fix before cutover), WARNING (should fix before 3.2), and INFO (nice-to-have). The ERROR count sizes your migration work; the WARNING count sizes your post-cutover backlog.
Are Datasets still called Datasets in Airflow 3?
No — they're Assets in airflow 3, and the rename is more than cosmetic. The 2.x Dataset was a scheduling hint; the 3.x Asset is a first-class lineage-and-trigger primitive with a canonical URI, freshness metadata, produced-by / consumed-by edges in the metadata DB, and a dedicated Assets tab in the UI. The import path is from airflow.assets import Asset; the old from airflow.datasets import Dataset still works through Airflow 3.1 with a deprecation warning and is fully removed in 3.2. Usage patterns transfer directly: @task(outlets=[Asset("s3://...")]) marks a task as an Asset producer; @dag(schedule=[Asset("s3://...")]) triggers a DAG on Asset update. The MLOps story is built on top of Assets — model-registry hooks (Vertex, MLflow, Mosaic) consume and emit Assets, and asset-scheduled inference DAGs re-run automatically when a new production model ships.
What is dag versioning actually good for?
Three concrete wins. Reproducible backfills — when you trigger airflow dags backfill --logical-date 2026-05-01, the scheduler looks up the DAG version that was live on 2026-05-01 and replays that code, not the current main. This eliminates the "backfill silently ran different logic" bug class that has plagued every 2.x deployment. Audit trails — every task instance in the metadata DB carries a dag_version_id; auditors and post-mortems can query "which code ran this task instance?" and get a deterministic answer. Fast bad-deploy rollback — a bad DAG version can be marked deprecated (airflow dags versions deprecate <dag_id> --version-id N), and the scheduler auto-falls-back to the newest non-deprecated version. This is faster and safer than a git revert (which would create yet another version and delay recovery). To get the full value, pair versioning with bundle storage ([dag_bundles.s3_prod] type=s3, snapshot_on_parse=true) so historical DAG source is preserved in S3/GCS/ABS; without that, versioning captures the hash but not the source, and reproducibility is only partial.
Which release should I upgrade to first — 3.0 or 3.1?
Go to 3.1, and if the timeline allows, wait for the latest 3.1.x point release before cutover. 3.0 (April 2025) shipped the big features — Edge Workers, DAG versioning, Assets, unified UI — but 3.0.0–3.0.3 had rough edges around the Assets rename compatibility shim, some Edge Worker heartbeat edge cases, and provider-package interop. 3.1 (landing late 2025) polishes those, adds the unified UI as default (not behind a flag), stabilises the DAG versioning bundle storage path, and expands the MLOps operator catalogue. For a new deployment, 3.1.2+ is the current recommended target. For an existing 2.9 deployment migrating in 2026, going directly to 3.1 skips the 3.0 stability tax; the migration effort is nearly identical for either target.
Practice on PipeCode
- Drill the ETL practice library → for the orchestration, backfill, and Asset-scheduling problems senior interviewers love.
- Rehearse on the SQL practice library → for the metadata-query, audit-trail, and version-history problems that Airflow 3's versioning story makes routine.
- Sharpen the platform axis with the optimization practice library → for the pool-sizing, worker-scaling, and cutover-tuning problems that decide production Airflow health.
- Stack the prerequisites against PipeCode's broader 450+ data-engineering catalogue to anchor the workers-versioning-MLOps-UI framing against real graded inputs.
Lock in Airflow 3 muscle memory
Release notes describe features. PipeCode drills describe the decision — when Edge Workers replace a VPN, when DAG versioning saves an audit, when Assets replace a cron. Pipecode.ai is Leetcode for Data Engineering — pattern-first practice tuned for the production trade-offs senior data engineers actually face.





Top comments (0)