DEV Community

Cover image for Airflow 3.x: What's New — Edge Workers, AI/MLops, DAG Versioning
Gowtham Potureddi
Gowtham Potureddi

Posted on

Airflow 3.x: What's New — Edge Workers, AI/MLops, DAG Versioning

airflow 3 is the biggest Apache Airflow release since 2.0 shipped in December 2020, and — five years later — every senior interviewer probing an orchestration answer expects a crisp, four-axes take on what actually changed. The 3.0 release landed in April 2025 with Edge Workers, DAG versioning, MLOps primitives, and a unified UI; the 3.1 point releases through late 2025 and into 2026 polished the migration story, hardened the reverse-connect worker, and closed the gap on the Assets rename. The engineering trade-off is no longer "should we adopt Airflow 3?" — the release notes and the deprecation clock have already answered that — but which axis of airflow 3.x matters for your workload and how you sequence a safe upgrade from 2.x.

This guide is the senior-DE walkthrough you wished existed the first time an interviewer asked "explain how an airflow edge worker reverse-connects to the API and why the control plane never needs to open a port" or "walk me through dag versioning semantics and how a backfill picks the historical code" or "what does airflow mlops actually give me that 2.x with a KubernetesPodOperator did not?" It walks through why 3.0 is the biggest release since 2.0, the Edge Worker reverse-connect model for on-prem and data-sovereignty pipelines, the DAG versioning story with task-instance-to-version binding, the MLOps primitives built on airflow assets (renamed from Datasets) with Vertex / MLflow / Mosaic hooks, and the unified UI + airflow 3 migration path from 2.x through the airflow db upgradecheck upgrade checker. Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works.

PipeCode blog header for Airflow 3 — bold white headline 'Airflow 3.x — What's New' over a hero composition of an edge-worker satellite on the left, a versioned DAG scroll in the centre, and an ML-model medallion on the right, on a dark gradient.

When you want hands-on reps immediately after reading, drill the ETL practice library →, rehearse on the SQL practice library →, and sharpen the orchestration axis with the optimization practice library →.


On this page


1. Why Airflow 3 is the biggest release since 2.0

The four themes — Edge Workers, DAG versioning, MLOps primitives, unified UI — and why senior interviewers open with them

The one-sentence invariant: Airflow 3.0 is the release that finally answers the four questions the community has been asking since 2.0 — how do I run a worker outside my cluster without opening a port, how do I know which version of a DAG actually ran a given task instance, how do I make Airflow an MLOps citizen rather than an ML afterthought, and how do I stop juggling three separate UI tabs to find one DAG run. Every other 3.x feature — the Assets rename, the new scheduler internals, the deprecated-API cleanup — is either a consequence of these four themes or the plumbing that makes them possible.

The four axes interviewers actually probe.

  • Workers. Edge Workers are the marquee 3.0 feature — an Airflow worker running outside the main cluster (edge site, on-prem, air-gapped) that reverse-connects to the Airflow API rather than the API reaching in. The senior signal is naming the reverse-connect semantics without prompting and explaining why it eliminates the inbound-port requirement.
  • Versioning. DAG versioning ties every task instance to the DAG version it ran under. Backfills replay the historical code, not the current code. Interviewers ask "what happens if I edit the DAG, then run a backfill for last week?" — the senior answer is "the backfill runs the version that was live last week," not "the backfill runs the current code."
  • MLOps. Datasets are renamed to Assets in Airflow 3, and Assets are the shared vocabulary for lineage, freshness, and downstream triggers. The MLOps story is Assets plus model-registry hooks (Vertex AI, MLflow, Mosaic) plus asset-scheduled DAGs. The senior probe is "why is this different from just calling a KubernetesPodOperator?" — the answer is lineage and trigger semantics as first-class primitives.
  • UI / DX. The 3.x UI collapses the Graph, Grid, Calendar, and per-DAG-run views into a single lens with Assets, Versions, and Task Groups tabs. The upgrade checker (airflow db upgradecheck) surfaces every deprecated API in your 2.x DAGs before you cut over.

Why 3.0 matters even if you're not upgrading tomorrow.

  • Deprecation clock. Airflow 2.x is in maintenance-only mode. New provider packages, new integrations, and Python 3.13+ support ship on the 3.x line. Staying on 2.x is a slow bit-rot problem, not an active choice.
  • The reverse-connect model. The Edge Worker's reverse-connect story is the first practical answer to hybrid on-prem + cloud Airflow deployments. Teams running Celery or Kubernetes workers in a private data centre for GDPR, HIPAA, or data-sovereignty reasons have historically fought networking; Edge Workers make it a config-only decision.
  • Reproducibility. DAG versioning solves the "which code ran last Tuesday's job?" question that every audit and every post-mortem asks. Before 3.0, the answer was "whatever the current main branch is — hope you tagged it." After 3.0, the answer is a query against the metadata database.
  • MLOps as first-class. MLOps teams have been bolting Airflow onto model-training pipelines with custom operators and out-of-band lineage tools since 2019. 3.0's Asset model + registry hooks reduce the glue code to near-zero for the common case.
  • UI unification. The old UI split every question across three or four views. The new UI answers "what happened here, what version, what assets" in a single lens. The productivity delta on an incident is measurable in minutes per event.

What Airflow 2.x users actually have to change.

  • DAG import compatibility. 90% of 2.x DAGs import unchanged. The 10% that break use deprecated SubDagOperator, custom XCom backends, or provider modules moved between packages.
  • Datasets → Assets rename. from airflow.datasets import Dataset becomes from airflow.assets import Asset. The old import still works with a deprecation warning through 3.1.
  • Executor config. New Edge Worker executor lives beside Celery / Kubernetes; no forced migration.
  • API deprecations. The old experimental REST API (deprecated since 2.0) is fully removed in 3.0. Anyone still using the experimental endpoints must switch to the stable API.

What interviewers listen for.

  • Do you say "Edge Workers reverse-connect to the API" in the first sentence when asked about remote execution? — senior signal.
  • Do you mention that task instances are tied to the DAG version they ran under, not the current version? — senior signal.
  • Do you push back on "Airflow is just an orchestrator, why do we need MLOps primitives?" with the Asset lineage + trigger argument? — required answer.
  • Do you describe the upgrade path as "run airflow db upgradecheck first, fix the warnings, then cut over" rather than as a big-bang migration? — required answer.

Worked example — the four axes on a real orchestration answer

Detailed explanation. A senior interviewer opens with an intentionally vague prompt: "walk me through what's new in Airflow 3 and why I should care." The bad answer meanders through the release notes. The senior answer picks four axes — workers, versioning, MLOps, UI — and gives one concrete example for each, framed against a workload the interviewer probably runs.

  • Workers. Edge Workers unlock hybrid deployments — a Snowflake cluster in the cloud, a customer-hosted Postgres in a private data centre.
  • Versioning. Backfills now replay the exact code that ran, not the current main.
  • MLOps. Assets are the shared vocabulary; model-registry hooks close the loop.
  • UI. One lens per DAG run — Graph + Grid + Assets + Versions in a single view.

Question. A senior DE interviewer asks: "give me one-sentence pitches for the four biggest Airflow 3 features and one workload each unblocks that 2.x could not." Structure the answer.

Input.

Axis 2.x limitation 3.x fix
Workers Celery/K8s workers assumed in-cluster; no clean reverse-connect Edge Worker reverse-connects to API
Versioning Task instances not tied to DAG version Every task instance stamped with its DAG version
MLOps Custom operators + out-of-band lineage Assets + registry hooks first-class
UI Graph / Grid / Calendar / DAG-run views scattered Unified UI with Assets, Versions, Task Groups tabs

Code.

# Sketch of the four-axes answer, as a Python dict for structured recall
answer = {
    "workers": {
        "pitch": "Edge Workers run outside the cluster and reverse-connect to the Airflow API — no inbound port needed.",
        "unblocks": "hybrid on-prem + cloud pipelines, edge IoT ETL, GDPR data-sovereignty workloads",
    },
    "versioning": {
        "pitch": "DAG versioning ties every task instance to the version of the DAG code that ran it — backfills replay history exactly.",
        "unblocks": "auditable reproducibility, correct backfills after code changes, incident forensics",
    },
    "mlops": {
        "pitch": "Assets (renamed from Datasets) are the lineage + freshness + trigger primitive; registry hooks connect Vertex / MLflow / Mosaic.",
        "unblocks": "training pipelines with real lineage, asset-scheduled retraining, model promotion workflows",
    },
    "ui": {
        "pitch": "One unified UI collapses Graph / Grid / Calendar into a single lens with Assets, Versions, and Task Groups tabs.",
        "unblocks": "one-click incident triage, per-run assets and versions visible without tab-switching",
    },
}

for axis, spec in answer.items():
    print(f"{axis.upper():>10}: {spec['pitch']}")
    print(f"           unblocks: {spec['unblocks']}")
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Each axis gets a one-sentence pitch that names the primitive (Edge Worker, DAG versioning, Assets, unified UI) and its behaviour (reverse-connect, task-instance-to-version binding, lineage + triggers, single lens).
  2. Each axis names a workload that 2.x could not cleanly support — hybrid deployments, correct backfills after code edits, MLOps pipelines, one-click incident triage.
  3. The structure is memorable: pitch, unblocks, next axis. The senior signal is not rattling off ten features; it is naming the four that shift the architectural conversation.
  4. The answer dict doubles as a study aid — write it once, memorise the pitches, redeploy in every 3.x interview.
  5. The unblocks column is where interviewers probe deeper: "tell me more about hybrid deployments" leads directly to the Edge Worker reverse-connect story.

Output.

Axis Pitch (1 sentence) Unblocks
Workers Edge Workers reverse-connect to the API Hybrid on-prem + cloud
Versioning Task instance stamped with DAG version Correct backfills after edits
MLOps Assets + registry hooks first-class Model-training pipelines
UI Single lens, all tabs collapsed One-click incident triage

Rule of thumb. Structure every Airflow 3 answer around the four axes — workers, versioning, MLOps, UI. Interviewers use the same taxonomy; matching it signals "I've read the release notes with intent."

Worked example — deciding whether to upgrade a 2.x deployment now

Detailed explanation. A team runs Airflow 2.9 in production with 400 DAGs, Celery executor, and no immediate MLOps roadmap. The engineering manager asks whether to upgrade to 3.x now or wait. The senior answer weighs the concrete signals: deprecation clock, feature dependencies, Python version support, provider-package roadmap, and team bandwidth. It is not "just upgrade, 3.x is better."

  • Signal 1. Python 3.13 support ships on 3.x only; 2.x is Python 3.10 max in maintenance mode.
  • Signal 2. New provider packages (Snowflake 6.x, dbt Cloud 4.x) land on 3.x first.
  • Signal 3. Any hybrid on-prem workload benefits immediately from Edge Workers.
  • Signal 4. MLOps roadmap = 3.x now; no MLOps roadmap = 3.x within 12 months anyway.

Question. A team runs Airflow 2.9 with 400 DAGs, Celery, no MLOps roadmap. Enumerate the decision criteria for "upgrade now vs upgrade in 6 months" and reach a recommendation.

Input.

Criterion Value Weight
Current version 2.9
DAG count 400
Executor Celery
Python version 3.11
MLOps roadmap none low
Hybrid / edge workload none low
Provider package roadmap Snowflake 6.x on 3.x only high
Team bandwidth 1 senior DE-quarter

Code.

# Decision helper — return "upgrade now" or "upgrade in 6 months"
def upgrade_decision(current_version: str,
                     dag_count: int,
                     mlops_roadmap: bool,
                     hybrid_workload: bool,
                     provider_dependency: bool,
                     python_target: str,
                     bandwidth_quarters: float) -> str:
    signals = 0
    if mlops_roadmap:      signals += 3
    if hybrid_workload:    signals += 3
    if provider_dependency: signals += 2
    if python_target >= "3.12": signals += 2
    if current_version < "2.8": signals += 1   # further behind = more risk

    # Bandwidth check
    est_effort = max(0.25, dag_count / 500)     # ~1 quarter per 500 DAGs
    if bandwidth_quarters < est_effort:
        return f"defer; need {est_effort:.2f} quarters but have {bandwidth_quarters:.2f}"

    return "upgrade now" if signals >= 3 else "upgrade in 6 months"

print(upgrade_decision(
    current_version="2.9",
    dag_count=400,
    mlops_roadmap=False,
    hybrid_workload=False,
    provider_dependency=True,
    python_target="3.12",
    bandwidth_quarters=1.0,
))
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The helper collects five weighted signals: MLOps roadmap, hybrid workload, provider dependency, Python target, and current-version drift. Each signal contributes points; a total above 3 is "upgrade now" territory.
  2. The bandwidth check is a hard gate: even if the signals scream upgrade, insufficient bandwidth means "defer until you have the DE-quarters to do it safely." Rule of thumb is one senior-DE quarter per ~500 DAGs, including regression testing.
  3. For the input case (no MLOps, no hybrid, Snowflake provider on 3.x only, Python 3.12 target, 1 quarter bandwidth): signals = 2 (provider) + 2 (Python) = 4, above 3, and bandwidth (1.0) ≥ effort (0.8). Result: upgrade now.
  4. If the same team had no provider dependency and stayed on Python 3.11, signals would be 0 and the recommendation would flip to "upgrade in 6 months" — buy time for the ecosystem to settle, watch for 3.2 or 3.3 point releases.
  5. The scoring is not a substitute for judgement; it is a structured way to defend the recommendation to leadership. The signals are the same for every 2.x-to-3.x conversation in the industry.

Output.

Scenario Signal total Bandwidth check Recommendation
No MLOps, no hybrid, no provider dep, Python 3.11 0 pass Upgrade in 6 months
No MLOps, no hybrid, Snowflake 6.x on 3.x, Python 3.12 4 pass Upgrade now
Hybrid on-prem workload, MLOps roadmap 6 pass Upgrade now
Any scenario with < 0.8 quarter bandwidth any fail Defer until bandwidth

Rule of thumb. Upgrade now if any two of {hybrid workload, MLOps roadmap, provider dependency, Python 3.12+ target} apply. Otherwise, wait 6 months and re-evaluate; the deprecation clock will catch up regardless.

Worked example — reading the 3.x release notes with a senior lens

Detailed explanation. The 3.0 release notes are hundreds of bullet points; the 3.1 and 3.2 notes add more. A senior reader does not read line-by-line — they scan for the four axes (workers, versioning, MLOps, UI) and the deprecations that will bite their DAGs. Walk through the mental model for scanning a release-notes page for a specific 2.x deployment.

  • Scan pattern. Ctrl-F: "edge", "version", "asset" (or "dataset"), "UI", "deprecated", "removed".
  • Impact triage. Anything in "removed" that your DAGs import → hard block. Anything in "deprecated" → migration ticket. Anything in "new" that matches your roadmap → adoption plan.
  • Version-to-version. 3.0 = big-bang features; 3.1 = polish + more providers; 3.2+ = expected AI/ML operator expansion.

Question. A team on Airflow 2.9 wants a 30-minute triage of the 3.0 release notes. Produce the scan checklist and the categorisation of findings.

Input.

Column Value
Current version 2.9
Target version 3.0
Notes URL airflow.apache.org/docs/apache-airflow/3.0/release_notes.html
Scan budget 30 minutes

Code.

# Scan helper — walk the release notes, bucket findings into (adopt, migrate, block)
import re
from dataclasses import dataclass, field
from typing import List

@dataclass
class ScanFinding:
    axis: str            # workers | versioning | mlops | ui | other
    category: str        # adopt | migrate | block
    quote: str
    ticket: str = ""

@dataclass
class ScanReport:
    findings: List[ScanFinding] = field(default_factory=list)

    def add(self, axis, category, quote):
        self.findings.append(ScanFinding(axis, category, quote))

    def summary(self):
        buckets = {"adopt": [], "migrate": [], "block": []}
        for f in self.findings:
            buckets[f.category].append(f)
        return buckets

report = ScanReport()

report.add("workers",    "adopt",   "Edge Worker executor available for reverse-connect deployments")
report.add("versioning", "adopt",   "DAG versioning enabled by default; task instances stamped with version_id")
report.add("mlops",      "migrate", "Dataset renamed to Asset; airflow.datasets deprecated (removed in 3.2)")
report.add("ui",         "adopt",   "Unified UI ships behind AIRFLOW__UI__NEW_UI=true")
report.add("other",      "block",   "SubDagOperator removed — refactor to TaskGroup")
report.add("other",      "block",   "Experimental REST API removed — switch to stable API")

for category, items in report.summary().items():
    print(f"[{category}]")
    for i in items:
        print(f"  - {i.axis}: {i.quote}")
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The scan proceeds axis-by-axis. For each axis, Ctrl-F the release notes for the relevant keyword and copy any matching bullet into the report.
  2. Every finding is categorised into adopt, migrate, or block. adopt = new feature to consider; migrate = existing DAG code needs an edit but no immediate outage; block = existing DAG code will fail on 3.0.
  3. The example report captures the six most impactful items: two adopts (Edge Worker, DAG versioning), one migrate (Dataset → Asset), one adopt (unified UI behind a flag), and two blocks (SubDagOperator, experimental API).
  4. The block category feeds the pre-upgrade checklist: every block-tier finding must have a fixed PR in main before the cutover.
  5. The migrate category feeds the post-upgrade backlog: rewrites happen on the 3.x line before the deprecation removes the compatibility shim.

Output.

Category Count Meaning
adopt 3 New features to consider (Edge Worker, DAG versioning, unified UI)
migrate 1 Rename Dataset → Asset in existing DAG code (before 3.2 removal)
block 2 Hard blockers: SubDagOperator and experimental REST API removed

Rule of thumb. Every 2.x → 3.x scan produces roughly 3–6 findings per axis. Ship a triage doc with adopt/migrate/block buckets before you touch the runtime. The rewriting effort is proportional to the block count, not the total notes length.

Senior interview question on the shape of Airflow 3

A senior interviewer often opens with: "You've been running Airflow 2.x for four years. Your CTO reads about airflow 3 at a conference and asks whether the team should upgrade. Walk me through the four themes of the release, the upgrade signals, and how you'd sequence a 6-month migration."

Solution Using the four-axes framing + the signal-based decision + a phased rollout

# Six-month 2.x → 3.x migration plan, expressed as a phased checklist
from dataclasses import dataclass, field
from typing import List

@dataclass
class Phase:
    month: int
    name: str
    axis: str
    activities: List[str]
    exit_criteria: List[str]

phases = [
    Phase(1, "Discovery & triage", "all", [
        "Run airflow db upgradecheck against every 2.x DAG",
        "Categorise findings into adopt / migrate / block",
        "Inventory Edge / MLOps / versioning use cases",
    ], [
        "Triage doc merged; block-tier list has an owner per DAG",
    ]),
    Phase(2, "Deprecation clean-up", "all", [
        "Fix every block-tier import (SubDagOperator → TaskGroup, experimental → stable API)",
        "Rewrite XCom custom backends against the 3.x interface",
        "Ship on the 2.x line first — bug-for-bug compatible",
    ], [
        "All 400 DAGs pass upgradecheck with zero errors",
    ]),
    Phase(3, "Staging on 3.x", "all", [
        "Stand up a 3.x staging cluster (same executor as prod)",
        "Deploy the DAG bundle unchanged (2.x compat shims still active)",
        "Run parallel DAG runs against staging for 2 weeks",
    ], [
        "Staging parity: same task success rate, same runtime, no new errors",
    ]),
    Phase(4, "Adopt DAG versioning", "versioning", [
        "Enable DAG versioning (default in 3.0; verify metadata migration ran)",
        "Add version_id filter to lineage / audit dashboards",
        "Test a backfill of a week-old run: verify historical code runs",
    ], [
        "Backfill test passes; audit dashboard shows version_id per task instance",
    ]),
    Phase(5, "Assets rename + optional Edge / MLOps", "mlops+workers", [
        "Rewrite Dataset imports to Asset (deprecation warnings turn into errors in 3.2)",
        "For any hybrid workload: deploy an Edge Worker to the remote site",
        "For any ML pipeline: adopt Asset lineage + registry hook",
    ], [
        "Zero deprecation warnings in staging log; edge worker Helm chart merged",
    ]),
    Phase(6, "Prod cutover", "all", [
        "Cut over prod to 3.x (blue/green: 3.x runs in parallel for 48 hours)",
        "Turn on the unified UI (AIRFLOW__UI__NEW_UI=true)",
        "Retire the 2.x cluster after 2 weeks of clean prod",
    ], [
        "3.x carries 100% of prod traffic; 2.x fully decommissioned",
    ]),
]

for p in phases:
    print(f"Month {p.month}: {p.name} ({p.axis})")
    for a in p.activities:
        print(f"  - {a}")
    for e in p.exit_criteria:
        print(f"{e}")
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Month Phase Axis Exit criterion
1 Discovery & triage all Triage doc, block-list ownership
2 Deprecation clean-up all 0 upgradecheck errors on 2.x
3 Staging on 3.x all Parity across all DAGs
4 Adopt DAG versioning versioning Backfill of week-old run passes
5 Assets + optional Edge/MLOps mlops+workers 0 deprecation warnings
6 Prod cutover all 3.x = 100% traffic; 2.x decommissioned

After 6 months, the deployment runs on 3.x with DAG versioning on, Assets renamed, the unified UI enabled, and (if the workload warranted) at least one Edge Worker in production. The team has the vocabulary — workers, versioning, MLOps, UI — to answer every 3.x interview question fluently.

Output:

Deliverable Status after 6 months
upgradecheck 0 errors, 0 warnings
DAG versioning on, per-task-instance
Assets rename complete
Unified UI enabled
Edge Workers 1+ deployed (if applicable)
2.x cluster decommissioned

Why this works — concept by concept:

  • Four-axes framing — every 3.x conversation collapses onto workers, versioning, MLOps, UI. Aligning the plan to the same axes makes cross-team communication trivial (each axis has an owner, each owner has a phase).
  • Signal-based decision — the upgrade decision is not a gut call; it is a scored decision against MLOps roadmap, hybrid workload, provider dependencies, and Python target. Signals ≥ 3 → upgrade now.
  • Phased rollout — six phases, one per month, each with explicit activities and exit criteria. The most common failure mode of Airflow upgrades is skipping the staging phase; the plan makes that impossible by design.
  • Backfill-of-week-old-run gate — the DAG versioning phase's exit criterion tests the feature you're actually buying: a backfill has to replay the version that was live a week ago. If it doesn't, you've mis-migrated.
  • Cost — six senior-DE months for a 400-DAG deployment; roughly 0.015 months per DAG. Cost scales with DAG count and block-tier finding count, not with total feature area. Skipping upgradecheck is the biggest cost multiplier because it defers block-tier work into the cutover window.

ETL
Topic — etl
ETL orchestration and upgrade-path problems

Practice →

Optimization Topic — optimization Optimization problems on Airflow tuning

Practice →


2. Edge Workers — remote / on-prem execution

airflow edge worker reverse-connects to the API — no inbound port on your control plane, no VPN gymnastics

The mental model in one line: an Edge Worker is a task-executing process running outside the Airflow cluster (edge site, on-prem, air-gapped facility) that opens an outbound connection to the Airflow API, polls for work, executes tasks locally, and pushes results back — the control plane never needs an inbound port, and the network story collapses to "the worker can reach the API". Every other Edge-related interview question is a consequence of this reverse-connect topology.

Iconographic edge-worker diagram — a central Airflow cluster on the left and an on-prem factory site on the right, with a reverse-connection arrow from the edge worker calling back into the API and no inbound port required.

Why Edge Workers exist.

  • The 2.x pain. Celery workers assumed a shared broker (Redis / RabbitMQ) reachable by both the scheduler and the worker. Kubernetes executors assumed the workers ran in the same cluster. Neither model worked cleanly for a worker in a customer data centre or an edge site.
  • The workaround before 3.0. Teams tunnelled the broker over a VPN, or ran a full Airflow deployment per site (with cross-site XCom syncing), or shelled out to a queue-and-agent pattern of their own invention. Each was operationally expensive.
  • The 3.0 fix. The Edge Worker is a first-class executor. The worker process opens an HTTP/HTTPS connection to the API, polls for work, executes tasks with its own Python runtime, and posts results back. The API is the only network surface required.

The reverse-connect wire protocol.

  • Registration. On startup, the Edge Worker calls POST /api/v1/edge/register with its identity, capabilities (queue tags), and API token.
  • Poll. The worker long-polls GET /api/v1/edge/dequeue?queue=on_prem&worker_id=... for work. If a task matches, the API returns the task instance descriptor.
  • Execute. The worker runs the task locally against its own Python environment (which can be entirely different from the scheduler's — separate Python version, separate provider packages, separate secrets).
  • Report. The worker POSTs progress (state transitions, XCom pushes, logs) to /api/v1/edge/report.
  • Heartbeat. A concurrent thread heartbeats to /api/v1/edge/heartbeat every 5 seconds; the scheduler marks the worker unhealthy after 3 missed beats.

Use cases that Edge Workers unblock.

  • GDPR / data sovereignty. The pipeline processes EU customer data that must not leave EU boundaries. The Edge Worker runs in Frankfurt; the Airflow control plane lives wherever the team wants (typically the cloud). Only task metadata (state, logs, XCom) crosses the boundary.
  • HIPAA / on-prem sensitive data. A hospital keeps patient data on-prem. The Edge Worker runs in the hospital's data centre; the Airflow control plane is a hosted SaaS Airflow (e.g. Astronomer). The pipeline orchestrates cleanly without exposing patient data.
  • Edge IoT ETL. A factory has sensors producing terabytes per day. An Edge Worker running on a small industrial box aggregates the sensor stream locally, sends only summaries to the cloud, and closes the loop.
  • Hybrid cloud. Some tasks run on AWS (S3 → Snowflake), others on a customer-hosted Postgres in a private data centre. Two workers — one in AWS, one on-prem — both talk to the same Airflow control plane.

What the network story looks like.

  • Outbound only. The Edge Worker opens exactly one HTTPS connection to https://airflow.example.com/api/v1/edge/*. That is the entire network requirement.
  • No inbound port. The Airflow control plane does not reach out to the worker; the worker reaches in. Firewalls stay closed.
  • TLS + token. The auth model is a bearer token issued from the Airflow UI plus TLS for confidentiality.
  • Optional mTLS. For higher-trust environments, mTLS client certs replace the token.

Common interview probes on Edge Workers.

  • "Walk me through what happens when an Edge Worker starts up." — registration + long-poll + heartbeat.
  • "Which network holes do I have to open?" — outbound HTTPS to the API only.
  • "How does it differ from Celery over a VPN?" — no shared broker, no bidirectional network, no VPN.
  • "What happens if the worker loses connectivity for 30 minutes?" — running tasks fail with a worker_lost state; the scheduler re-queues them.

Worked example — deploying an Edge Worker on an on-prem factory box

Detailed explanation. A factory data engineering team runs sensor-ETL DAGs against local telemetry. The control plane is a hosted Astronomer Airflow 3.1 in the cloud. The team deploys an Edge Worker on an Intel NUC in the factory, gives it access to the local Modbus PLC network, and lets it execute only tasks tagged queue=factory. Walk through the deploy: install, configure, register, run.

  • Hardware. Intel NUC, 32 GB RAM, Ubuntu 24.04, Python 3.12.
  • Network. Outbound HTTPS to the Astronomer control plane only.
  • Tagging. DAGs tag the sensor-ETL tasks with queue=factory; the Edge Worker dequeues only from factory.

Question. Write the systemd unit and the pip install command for an Edge Worker on the NUC. Show how the DAG tags a task for the factory queue.

Input.

Component Value
Airflow API URL https://airflow.factory-team.astronomer.io
Worker API token env var AIRFLOW_EDGE_TOKEN (issued from UI)
Queue factory
Concurrency 4

Code.

# NUC-side install
python3.12 -m venv /opt/airflow-edge/venv
/opt/airflow-edge/venv/bin/pip install \
  "apache-airflow==3.1.2" \
  "apache-airflow-providers-edge>=1.0" \
  "pymodbus>=3.6"

# Create the config directory
mkdir -p /etc/airflow-edge
Enter fullscreen mode Exit fullscreen mode
# /etc/systemd/system/airflow-edge-worker.service
[Unit]
Description=Airflow Edge Worker (factory queue)
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=airflow
Group=airflow
Environment="AIRFLOW__EDGE__API_URL=https://airflow.factory-team.astronomer.io"
Environment="AIRFLOW__EDGE__QUEUES=factory"
Environment="AIRFLOW__EDGE__CONCURRENCY=4"
EnvironmentFile=/etc/airflow-edge/secrets.env
ExecStart=/opt/airflow-edge/venv/bin/airflow edge worker
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
Enter fullscreen mode Exit fullscreen mode
# DAG side — tag the sensor ETL for the factory queue
from datetime import datetime
from airflow.decorators import dag, task

@dag(
    dag_id="factory_sensor_etl",
    schedule="@hourly",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={"queue": "factory"},   # every task runs on the factory Edge Worker
)
def sensor_etl():

    @task
    def scrape_modbus() -> dict:
        # Runs on the NUC; the Modbus PLC is on the factory LAN, reachable from here
        from pymodbus.client import ModbusTcpClient
        client = ModbusTcpClient("10.20.30.40", port=502)
        rr = client.read_holding_registers(0, 100)
        client.close()
        return {"registers": rr.registers}

    @task
    def push_summary(payload: dict) -> None:
        # Sends only summarised data back to the cloud
        summary = {"avg": sum(payload["registers"]) / len(payload["registers"])}
        # ... post to S3 / Snowflake ...

    push_summary(scrape_modbus())

sensor_etl()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The pip install layers apache-airflow, the edge provider (apache-airflow-providers-edge), and any workload-specific libs (pymodbus here). Everything runs in a venv on the NUC — no dependency on the scheduler's Python environment.
  2. The systemd unit exports the three critical env vars: AIRFLOW__EDGE__API_URL (where to reverse-connect), AIRFLOW__EDGE__QUEUES (which queues to dequeue from), and AIRFLOW__EDGE__CONCURRENCY (parallel task slots). The API token lives in a secrets file with chmod 600.
  3. On startup, the worker registers with the API, then long-polls factory for work. The scheduler in Astronomer sees the worker in the UI (name, capabilities, heartbeat) exactly as it would a Celery worker.
  4. The DAG tags the entire DAG (via default_args={"queue": "factory"}) so every task instance is only dispatched to the factory Edge Worker. Cloud-side workers ignore the tag; the scheduler never sends factory tasks to them.
  5. The task runs on the NUC. pymodbus reads the local PLC over the factory LAN. The result goes back to the scheduler as an XCom (small summary). No PLC data leaves the factory; only aggregate summaries cross the boundary.

Output.

Layer Location Data flow
Scheduler / webserver / metadata Astronomer cloud reads task state
Edge Worker (factory queue) NUC in factory executes DAG task
PLC / Modbus Factory LAN read by Edge Worker only
Summary (S3) AWS posted from Edge Worker
Network requirement Outbound HTTPS from NUC → API one connection

Rule of thumb. Every Edge Worker deployment answers three questions before the systemd unit is written: (1) which queue tag does this worker serve? (2) what are the local dependencies (Modbus, private CA, mounted volume)? (3) what is the auth token and how is it rotated? Skipping any of the three is how deployments break at 2 AM.

Worked example — Edge Worker for GDPR data sovereignty

Detailed explanation. An analytics team processes EU customer data that legal mandates must not leave the EU. The control plane is a hosted Airflow in the US; the workers must run in Frankfurt. An Edge Worker in EU-Central-1 dequeues only EU-tagged tasks, processes them against an EU-local S3 bucket and an EU-local Snowflake, and returns only aggregate metrics (never PII) to the US-hosted metadata.

  • Legal constraint. No PII leaves the EU perimeter.
  • Architecture. Airflow control plane in US; Edge Worker in EU; EU worker sees EU data.
  • Metadata leakage. Task state, DAG version, XCom values do cross to US metadata — so XCom must contain only aggregates, never raw PII.

Question. Design the DAG + Edge Worker configuration so PII never touches the US control plane. Show the XCom guardrail.

Input.

Component Value
Control plane region US-East-1
EU Edge Worker region EU-Central-1
EU data source s3://eu-customers/... (EU bucket)
Sensitivity GDPR — no PII to US

Code.

# DAG side — EU-tagged tasks + XCom aggregate-only contract
from datetime import datetime
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException

@dag(
    dag_id="eu_customer_analytics",
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def eu_analytics():

    @task(queue="eu_central")
    def compute_eu_aggregate() -> dict:
        # Runs on the EU Edge Worker
        import boto3
        s3 = boto3.client("s3", region_name="eu-central-1")
        obj = s3.get_object(Bucket="eu-customers", Key="events/2026-06-22.parquet")
        # ... compute aggregate on EU worker ...
        aggregate = {"n_users": 12345, "total_events": 987654}

        # Guardrail — never return raw PII to XCom (which lives in US metadata)
        for k, v in aggregate.items():
            if isinstance(v, str) and "@" in v:
                raise AirflowFailException("guardrail: PII detected in XCom payload")
        return aggregate

    @task(queue="us_default")
    def load_to_us_dashboard(agg: dict) -> None:
        # Runs on US worker; receives only aggregates
        print("US dashboard update:", agg)

    load_to_us_dashboard(compute_eu_aggregate())

eu_analytics()
Enter fullscreen mode Exit fullscreen mode
# EU Edge Worker Helm values (Astronomer Helm chart)
edgeWorker:
  enabled: true
  queues:
    - eu_central
  region: eu-central-1
  concurrency: 6
  env:
    - name: AWS_DEFAULT_REGION
      value: eu-central-1
    - name: SNOWFLAKE_ACCOUNT
      value: eu.snowflakecomputing.com
  secretsRef: eu-secrets
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The DAG splits tasks by queue: compute_eu_aggregate runs on the EU Edge Worker (queue eu_central), load_to_us_dashboard runs on the default US worker. The scheduler enforces the split.
  2. The EU task reads raw customer data from the EU-local S3 bucket. This data never touches the US worker; it exists only in the EU Edge Worker's memory during task execution.
  3. The XCom guardrail is the critical GDPR moment: only aggregates return from the EU task. The check for @ in string values is a naive PII regex; production teams use a full PII scanner (Presidio, DataDog Sensitive Data Scanner) as a mandatory pre-XCom hook.
  4. The US worker picks up the aggregate XCom and updates the dashboard. No PII crossed the boundary; the compliance auditor can point at the queue split and the XCom-guardrail hook.
  5. The Helm values pin the Edge Worker to EU-Central-1, wire in the EU-specific AWS + Snowflake endpoints, and reference an EU secrets store. The systemd equivalent is functionally identical; Helm is more common in Astronomer / self-hosted K8s deployments.

Output.

Task Runs on Data touched XCom (US-visible)
compute_eu_aggregate EU Edge Worker Raw EU PII Aggregate only (guarded)
load_to_us_dashboard US default worker Aggregate Aggregate

Rule of thumb. For GDPR/HIPAA workloads, the Edge Worker is the mechanism; the XCom guardrail is the policy. Ship both. A worker in EU without XCom PII checks is a compliance incident waiting to happen.

Worked example — worker loses connectivity mid-task

Detailed explanation. An Edge Worker running a 20-minute DAG task loses its uplink 5 minutes in. The scheduler stops receiving heartbeats and marks the worker as worker_lost after 15 seconds. The task instance moves to a retry state; when the worker reconnects, it may or may not be able to resume — the design decision is idempotency + retry, not resumability.

  • Failure mode. Worker heartbeat stops; scheduler marks worker lost; task fails with worker_lost.
  • Recovery. Task retries per DAG's retries config. The retry may land on the same worker (if it reconnects) or a different worker in the same queue.
  • Contract. Tasks must be idempotent — designed to run twice without duplicating output.

Question. Configure the DAG so a mid-task network partition results in a clean retry with no duplicate data. Show the idempotency pattern.

Input.

Parameter Value
Task ETL from PLC → S3
Task duration 20 minutes
Heartbeat interval 5 seconds
worker_lost threshold 15 seconds (3 missed beats)
DAG retries 3

Code.

from datetime import datetime, timedelta
from airflow.decorators import dag, task

@dag(
    dag_id="factory_idempotent_etl",
    schedule="@hourly",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={
        "queue": "factory",
        "retries": 3,
        "retry_delay": timedelta(minutes=1),
        "retry_exponential_backoff": True,
    },
)
def factory_etl():

    @task
    def extract_and_write(run_id: str = "{{ run_id }}") -> str:
        """Idempotent extract — key by run_id, so a retry overwrites, not duplicates."""
        import boto3, io, pandas as pd
        from pymodbus.client import ModbusTcpClient

        # Read PLC (idempotent)
        client = ModbusTcpClient("10.20.30.40", port=502)
        rr = client.read_holding_registers(0, 100)
        client.close()

        # Write to a run_id-keyed S3 path (idempotent by construction)
        buf = io.BytesIO()
        pd.DataFrame({"reg": rr.registers}).to_parquet(buf)
        buf.seek(0)

        s3 = boto3.client("s3")
        key = f"raw/plc/{run_id}.parquet"    # run_id, not timestamp; safe on retry
        s3.put_object(Bucket="factory", Key=key, Body=buf.read())
        return key

    extract_and_write()

factory_etl()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The task writes to s3://factory/raw/plc/{run_id}.parquet. The run_id is a stable per-DAG-run identifier; a retry writes to the same key. S3's PUT is idempotent, so a retry either overwrites the partial upload from before the disconnect or writes fresh — either way, exactly one final object.
  2. retries=3 with exponential backoff gives the worker three attempts. If the network partition heals within ~7 minutes (1m + 2m + 4m), the retry succeeds.
  3. When the scheduler marks the worker lost, the task instance moves to up_for_retry. On the next scheduler tick, it is re-queued; the next Edge Worker in the factory queue picks it up.
  4. Critically, the retry does not try to resume the failed attempt — Airflow retries at the task boundary, not the operator internals. The task code must be safe to re-run from scratch.
  5. The key design tension: writing to a timestamp-based key (e.g. raw/plc/{now.iso}.parquet) would create a new file on retry, resulting in duplicated data downstream. Always key on run_id or logical_date, not on wall-clock time.

Output.

Attempt Worker state S3 key written Downstream cost
1 worker_lost mid-write raw/plc/{run_id}.parquet (partial) none (overwritten)
2 worker_lost raw/plc/{run_id}.parquet (partial) none
3 success raw/plc/{run_id}.parquet (final) one canonical object

Rule of thumb. Every Edge Worker task must be idempotent — key writes by run_id, not by wall-clock time. Combined with retries=3 + exponential backoff, the pipeline heals through any transient partition without operator intervention or duplicated data.

Senior interview question on Edge Worker architecture

A senior interviewer might ask: "You need to run Airflow tasks in a customer's on-prem data centre for a HIPAA workload, with the control plane in your cloud. Walk me through the Edge Worker architecture, the network requirements, and the failure modes you'd guard against."

Solution Using Edge Worker + queue split + XCom PII guardrail + idempotent tasks

# Complete architecture — DAG + Edge Worker deploy + guardrails
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException

# ---- guardrail hook: run before every XCom push ----
def scrub_xcom(payload):
    """Reject XCom payloads that contain plausibly-PII strings."""
    import re
    PII_PATTERNS = [
        r"\b\d{3}-\d{2}-\d{4}\b",            # SSN
        r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+", # email
        r"\b\d{16}\b",                        # credit-card-ish
    ]
    def scan(v):
        if isinstance(v, str):
            for pat in PII_PATTERNS:
                if re.search(pat, v):
                    raise AirflowFailException("guardrail: PII in XCom")
        elif isinstance(v, dict):
            for x in v.values(): scan(x)
        elif isinstance(v, list):
            for x in v: scan(x)
    scan(payload)
    return payload

@dag(
    dag_id="hospital_hipaa_etl",
    schedule="@hourly",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=1),
        "retry_exponential_backoff": True,
    },
)
def hospital_etl():

    @task(queue="hospital_edge")
    def extract_patient_events(run_id: str = "{{ run_id }}") -> dict:
        # Runs inside the hospital data centre on the Edge Worker
        # ... query hospital EHR system ...
        raw = {"n_patients": 4321, "n_events": 98765}      # aggregate only
        return scrub_xcom(raw)

    @task(queue="cloud_default")
    def update_cloud_dashboard(agg: dict) -> None:
        # Runs in the cloud; only aggregates arrive
        print("Cloud dashboard:", agg)

    update_cloud_dashboard(extract_patient_events())

hospital_etl()
Enter fullscreen mode Exit fullscreen mode
# Hospital-side Helm values — Edge Worker deploy
edgeWorker:
  enabled: true
  queues: ["hospital_edge"]
  concurrency: 8
  env:
    - name: AIRFLOW__EDGE__API_URL
      value: https://cloud-airflow.example.com
    - name: AIRFLOW__EDGE__HEARTBEAT_SECONDS
      value: "5"
  secretsRef: hospital-secrets
  networkPolicy:
    egress:
      - to: cloud-airflow.example.com
        ports: [443]
    ingress: []   # explicitly none
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Question Answer Reasoning
Where does the Edge Worker run? Hospital data centre HIPAA data cannot leave the perimeter
Which port does the hospital open? None inbound; only outbound HTTPS/443 to API reverse-connect topology
How is PII scrubbed? scrub_xcom() runs before every XCom push policy enforced at the DAG boundary
Idempotency? run_id-keyed writes + retries=3 worker_lost recovers cleanly
Auth? Bearer token in secretsRef; TLS mandatory zero-trust between hospital and cloud
Failure of worker connectivity? worker_lost → retry with backoff idempotent tasks re-run safely

After the rollout, the hospital's compliance officer signs off (no PII crosses the boundary; no inbound port opened), the cloud on-call watches the DAG run per hour, and the Edge Worker heartbeat stays green. HIPAA is preserved by the architecture, not by a manual review.

Output:

Property Value
Control plane Cloud Airflow 3.1
Edge Worker Hospital DC, queue=hospital_edge
Inbound ports opened 0
PII in cloud metadata 0
Idempotency contract run_id-keyed writes
worker_lost recovery automatic (3 retries + exponential backoff)

Why this works — concept by concept:

  • Reverse-connect topology — the Edge Worker calls home. The cloud never dials into the hospital; the hospital's firewall stays closed. Zero-trust holds.
  • Queue splithospital_edge and cloud_default are separate queues; the scheduler dispatches by queue tag. Tasks that touch PII carry the hospital tag; nothing else can execute on that worker.
  • XCom guardrail — the scrub_xcom hook runs before every XCom push. It is the last line of policy defence between task-local data and cloud-visible metadata.
  • Idempotency + retries — every task keys writes on run_id; retries=3 with exponential backoff heals through transient partitions. worker_lost becomes a routine event, not an incident.
  • Cost — Edge Worker adds one process per site + one long-poll connection. XCom guardrail is O(size of XCom payload). Idempotency shifts write cost to the storage layer's PUT idempotency (S3, GCS both handle this natively). The architecture cost is dominated by the policy work (compliance review, PII scanner tuning), not the runtime cost.

ETL
Topic — etl
ETL problems on remote and hybrid worker deployments

Practice →

Design Topic — design Design problems on hybrid orchestration architectures

Practice →


3. DAG versioning + change history

dag versioning ties every task instance to the DAG version that ran it — backfills replay historical code, not main

The mental model in one line: Airflow 3 stamps every parsed DAG file into the metadata database with a version identifier, associates every task instance with the version it ran under, and — when you trigger a backfill — replays the historical DAG version rather than the current main. The primitive answers the "which code ran last Tuesday?" question that every audit, every post-mortem, and every compliance officer asks, and it eliminates the entire class of bugs where a backfill silently runs different logic than the original run.

Iconographic DAG versioning diagram — a stack of three DAG snapshots labelled v1, v2, v3 tied to a timeline of task-instances, with a backfill arrow selecting v1 for a 3-week-old run.

How versioning works under the hood.

  • DAG hash. Every time the scheduler parses a DAG file, it hashes the parsed structure (task graph, operator arguments, schedule) plus the source-code SHA. A change in either produces a new version.
  • Version table. The metadata DB has a dag_version table with columns dag_id, version_id, created_at, source_hash, structure_hash, and (optionally) bundle_ref linking to the DAG source bundle in object storage.
  • Task-instance binding. The task_instance table gains a dag_version_id foreign key. Every task instance is stamped at creation time with the version that produced it.
  • Bundle storage. By default, DAG source is stored in the local file system where the scheduler parses it. For production, the recommended pattern is DAG-source-in-S3 (or GCS/ABS) with AIRFLOW__DAG_BUNDLES__ pointing to versioned buckets.

What versioning enables.

  • Reproducible backfills. airflow dags backfill --dag-id my_dag --logical-date 2026-05-01 looks up the DAG version that was live on 2026-05-01, fetches the historical source, and runs that code — not the code in main today.
  • Audit trail. For any task instance, the UI (and API) shows which version ran, when the version was created, and (with bundle storage) the exact source code.
  • Change history. The Versions tab in the new UI shows a scrollable list of every DAG version with diffs between adjacent versions.
  • Rollback. A bad DAG version can be marked deprecated=true; future scheduling picks the previous good version until the DAG is re-parsed with a fix.

The scheduler's behaviour when a DAG file changes.

  • T0. Scheduler parses the DAG file, computes hashes, sees they match an existing version → no new version created.
  • T1. Developer commits a change (renames a task, adds a step). Scheduler re-parses on the next tick, computes new hashes, inserts a new dag_version row.
  • T2. From T1 onward, every new task instance is stamped with the new version_id. Task instances created before T1 keep their old version_id.
  • T3. A backfill of an interval before T1 uses the pre-T1 version; a backfill of an interval after T1 uses the post-T1 version.

Common interview probes on DAG versioning.

  • "Walk me through what happens when I edit a DAG file." — new version row on next parse; new task instances stamped with new version_id.
  • "If I backfill last Tuesday, does it run the current code or the code from last Tuesday?" — historical code (via version_id lookup).
  • "How does versioning interact with dynamic task mapping?" — the mapped task's structure is part of the version hash; dynamic changes create new versions.
  • "What if the DAG source bundle is not in object storage?" — versioning still works structurally, but the source snapshot for a historical version may be lost. Recommendation: enable bundle storage.

Worked example — enabling DAG versioning + bundle storage

Detailed explanation. DAG versioning is on by default in Airflow 3.0, but the source-bundle storage is not — the metadata DB stores the version hash but not the historical DAG file itself. Configure S3 bundle storage so the full source is preserved for every version, then verify a backfill replays the historical code.

  • Default. Version hash yes, source snapshot no.
  • Recommendation. Enable S3 bundle storage so historical DAG files are always retrievable.
  • Cost. Roughly 10–100 KB per DAG version; a 400-DAG deployment with 10 versions per DAG per month = ~400 MB/year. Negligible.

Question. Configure the scheduler to store DAG source bundles in S3, then trigger a backfill of a 3-week-old run and verify it uses the historical code.

Input.

Component Value
Airflow version 3.1
Bundle storage s3://airflow-dag-bundles/prod/
DAG being backfilled daily_sales_etl
Backfill logical date 3 weeks ago

Code.

# airflow.cfg — enable bundle storage
[dag_bundles]
default_bundle = s3_prod

[dag_bundles.s3_prod]
type            = s3
bucket          = airflow-dag-bundles
prefix          = prod/
aws_conn_id     = aws_default
snapshot_on_parse = true    # snapshot every parsed DAG on every version bump
Enter fullscreen mode Exit fullscreen mode
# Trigger the backfill via CLI
airflow dags backfill \
  --dag-id daily_sales_etl \
  --logical-date 2026-06-01 \
  --end-date   2026-06-01 \
  --reset-dagruns

# Verify which version the scheduler picked
airflow dags versions daily_sales_etl \
  --at-date 2026-06-01 \
  --format json
Enter fullscreen mode Exit fullscreen mode
-- Directly against the Airflow metadata DB
SELECT ti.dag_id,
       ti.task_id,
       ti.run_id,
       ti.dag_version_id,
       dv.created_at   AS version_created,
       dv.source_hash  AS source_sha
FROM   task_instance ti
JOIN   dag_version   dv ON dv.id = ti.dag_version_id
WHERE  ti.dag_id = 'daily_sales_etl'
  AND  ti.run_id LIKE 'backfill__2026-06-01%'
ORDER  BY ti.task_id;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The [dag_bundles] section names a default bundle (s3_prod). The snapshot_on_parse = true flag makes the scheduler upload the DAG file to S3 every time it detects a new version. A missed upload means a permanently un-recoverable historical version, so keep this on.
  2. The airflow dags backfill command with a --logical-date of 2026-06-01 triggers a run for that date. Internally, the scheduler queries dag_version for the version live at 2026-06-01, fetches the source from S3, and runs that code.
  3. airflow dags versions --at-date 2026-06-01 prints the version metadata for the backfill's version — created_at, source_hash, and a URL back to the S3 object. This is the audit artefact.
  4. The SQL query directly reads the metadata DB for the task instances the backfill produced; every dag_version_id on those rows points at the historical version, confirming the replay behaviour.
  5. If bundle storage were off, step 2 would still work structurally (version_id present), but step 3 could not fetch source, and the operational value of versioning would be greatly diminished.

Output.

Task Backfill run_id dag_version_id Version created
extract backfill_2026-06-01_... v7 (created 2026-05-30) 2 days before logical_date
transform backfill_2026-06-01_... v7 2 days before logical_date
load backfill_2026-06-01_... v7 2 days before logical_date

Rule of thumb. DAG versioning is only half a feature without bundle storage. Enable S3/GCS/ABS bundles on day one and confirm via an intentional backfill that the historical source is actually replayed.

Worked example — a bad deploy and a version rollback

Detailed explanation. A DAG edit ships a bug: a rename introduces a silent data type mismatch. The scheduler creates version v12 (buggy); several DAG runs fail with cryptic errors. Rather than roll back the git repo (which is slow and creates a new v13), the on-call marks v12 as deprecated=true in the metadata DB, and future scheduling picks v11 (last known good).

  • Bug. Task renamed parse_dates to parse_dates_v2; downstream task still references parse_dates.
  • Symptom. Task instances fail with KeyError: 'parse_dates' in XCom pull.
  • Rollback (fast). Mark v12 deprecated; scheduler auto-picks v11 on next run.
  • Rollback (slow). Revert the git commit, wait for parse, get v13 = v11.

Question. Show the SQL/CLI to mark v12 deprecated, verify v11 is picked next, and design the alert that would have caught this earlier.

Input.

Component Value
DAG customer_events_etl
Bad version v12 (deployed 14:22 UTC)
Last good version v11
Symptom Task fail rate 100% since 14:22

Code.

# Fast rollback — deprecate v12 via CLI
airflow dags versions deprecate customer_events_etl --version-id 12 \
  --reason "silent XCom key rename; task fail rate 100%"

# Verify scheduler picks v11 on next run
airflow dags versions customer_events_etl --active

# Trigger a fresh DAG run to confirm
airflow dags trigger customer_events_etl
Enter fullscreen mode Exit fullscreen mode
-- Directly: mark v12 deprecated
UPDATE dag_version
SET    deprecated    = true,
       deprecated_at = now(),
       deprecated_by = 'oncall-rollback',
       deprecated_reason = 'silent XCom key rename; 100% fail rate'
WHERE  dag_id     = 'customer_events_etl'
  AND  id         = 12;

-- Verify: latest non-deprecated version
SELECT id, created_at, source_hash
FROM   dag_version
WHERE  dag_id = 'customer_events_etl'
  AND  deprecated = false
ORDER  BY created_at DESC
LIMIT  1;
-- Expect: id = 11
Enter fullscreen mode Exit fullscreen mode
# The alert that should have caught this — task failure rate per DAG version
from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(schedule="@hourly", start_date=datetime(2026, 1, 1), catchup=False)
def dag_version_health_alert():

    @task
    def check_failure_rate_per_version() -> None:
        import psycopg2
        conn = psycopg2.connect("postgres://.../airflow")
        cur  = conn.cursor()
        cur.execute("""
            SELECT dv.dag_id,
                   dv.id AS version_id,
                   COUNT(*) FILTER (WHERE ti.state = 'failed')::float / COUNT(*) AS fail_rate,
                   COUNT(*) AS n
            FROM   task_instance ti
            JOIN   dag_version   dv ON dv.id = ti.dag_version_id
            WHERE  ti.start_date > now() - INTERVAL '30 minutes'
              AND  dv.created_at > now() - INTERVAL '2 hours'
            GROUP  BY dv.dag_id, dv.id
            HAVING COUNT(*) >= 5
              AND  COUNT(*) FILTER (WHERE ti.state = 'failed')::float / COUNT(*) > 0.5
        """)
        for row in cur.fetchall():
            # Page on-call
            print(f"ALERT: {row[0]} v{row[1]} fail_rate={row[2]:.0%} n={row[3]}")

    check_failure_rate_per_version()

dag_version_health_alert()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The airflow dags versions deprecate CLI marks v12 as bad. The scheduler, on its next parse cycle (~30 seconds), sees deprecated=true and falls back to the newest non-deprecated version — v11.
  2. Directly in SQL: UPDATE dag_version SET deprecated = true WHERE id = 12 achieves the same thing. The CLI is the operational path; the SQL is the "what actually happens" answer for an interview.
  3. airflow dags trigger customer_events_etl immediately fires a new DAG run using v11. This is the confirmation that rollback took effect — no waiting for the next scheduled tick.
  4. The alert DAG runs every hour, queries the last 30 minutes of task instances joined against dag_version created in the last 2 hours (i.e. new versions), and pages on-call if any version has >50% failure rate over ≥5 runs. This catches "bad deploy" incidents within one hour.
  5. The rollback is fast (seconds), reversible (undeprecate later after fixing), and preserves the audit trail — v12 is still in the DB, marked deprecated with a reason, so the post-mortem can reference it.

Output.

Time Event Active version Fail rate
14:22 UTC Deploy v12 v12 100%
14:40 UTC Alert fires v12 100%
14:42 UTC On-call deprecates v12 v11 (rollback) 0%
14:44 UTC Trigger clean run v11 0%
Next day Fixed commit ships v13 v13 (v12 stays deprecated) 0%

Rule of thumb. DAG versioning gives you a deprecation-based rollback that is faster and safer than a git revert. Ship the version-health alert on day one so bad deploys page within an hour, not the next morning.

Worked example — dynamic task mapping + versioning

Detailed explanation. Dynamic task mapping (.expand() / .expand_kwargs()) creates a variable number of task instances at run time. Versioning treats the structure of the map (the upstream that produces the mapping, the operator, the arguments) as part of the version hash, so a change to the mapping logic bumps the version. But the actual mapped values are runtime data, not structure.

  • Structure counts. Changing .expand(input=upstream_task.output) to .expand(input=another_task.output) = new version.
  • Values don't. The count of mapped tasks changing from 5 to 500 because upstream returned more data = same version.
  • Consequence. A backfill of a 3-week-old run replays the version that was live then, with the same structure — but the mapped values re-materialise from the upstream task's XCom at run time.

Question. Show a dynamic-mapping DAG and explain what happens on backfill when (a) the mapping structure changed since original, (b) only the upstream data changed.

Input.

Scenario Structure changed? Upstream data changed?
A yes (map source swapped) irrelevant
B no yes (upstream returns different values)

Code.

from datetime import datetime
from airflow.decorators import dag, task

@dag(
    dag_id="tenant_shard_etl",
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def tenant_shards():

    @task
    def list_active_tenants() -> list[int]:
        # Reads the tenants table; returns e.g. [1, 2, 3, 4, 5]
        return [1, 2, 3, 4, 5]

    @task
    def process_tenant(tenant_id: int) -> None:
        # Runs once per tenant_id emitted by list_active_tenants
        print(f"processing tenant {tenant_id}")

    process_tenant.expand(tenant_id=list_active_tenants())

tenant_shards()
Enter fullscreen mode Exit fullscreen mode
-- Verify what a backfill replays
SELECT ti.dag_id,
       ti.task_id,
       ti.map_index,
       ti.dag_version_id,
       ti.rendered_map_index
FROM   task_instance ti
WHERE  ti.dag_id = 'tenant_shard_etl'
  AND  ti.run_id LIKE 'backfill__2026-06-01%'
ORDER  BY ti.map_index;

-- Compare v-old (backfill) vs v-current
SELECT id, created_at, structure_hash
FROM   dag_version
WHERE  dag_id = 'tenant_shard_etl'
ORDER  BY created_at DESC;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. In scenario A (structure changed), the current DAG maybe swaps list_active_tenants for list_premium_tenants. This changes the structure hash → new version. A backfill of a 3-week-old date looks up the version live then, which used list_active_tenants. The historical structure runs.
  2. In scenario B (only data changed), the DAG structure is unchanged. The backfill uses the same version. But list_active_tenants re-runs at backfill time — meaning today's tenants list is what gets mapped, not the tenant list from 3 weeks ago.
  3. This is a subtle but important point: structure is versioned; data is re-evaluated. If historical fidelity of the mapped values matters (e.g. reproducing an exact billing calculation), the upstream must itself read from a stable, versioned data source (event table, snapshot).
  4. The map_index column in task_instance records the position within the map; rendered_map_index records the actual mapped value at run time. Both are preserved for the backfill run.
  5. The structure_hash column in dag_version is what the scheduler compares to detect structural changes. A hash miss creates a new version; a hash hit reuses the current one.

Output.

Scenario Structure hash Backfill behaviour
A: structure changed new hash → v-new Historical version replays; original structure
B: only data changed same hash → v-current Same version; upstream re-evaluates data

Rule of thumb. DAG versioning captures structural fidelity, not data fidelity. For workloads where backfill data must be historically exact (billing, compliance), the upstream that feeds the mapping must itself be idempotent-on-date — e.g., read from a dt='2026-06-01' partition, not from a live table.

Senior interview question on DAG versioning semantics

A senior interviewer might ask: "You maintain a 400-DAG Airflow 3 deployment. Someone edits a DAG on Monday, another edit lands Wednesday, and on Friday an auditor asks you to reproduce the Sunday-before-last DAG run exactly. Walk me through the versioning behaviour, the SQL you'd run to confirm, and the failure mode where the reproduction is not exact."

Solution Using the version_id lookup + bundle-stored source + upstream-idempotency check

# End-to-end reproduction walkthrough — code + SQL + verification
from datetime import datetime

# Step 1 — find the version_id for the target date
LOOKUP_SQL = """
SELECT dv.id     AS version_id,
       dv.created_at,
       dv.source_hash,
       dv.bundle_url
FROM   dag_version dv
WHERE  dv.dag_id = %s
  AND  dv.created_at <= %s
  AND  (dv.deprecated_at IS NULL OR dv.deprecated_at > %s)
ORDER  BY dv.created_at DESC
LIMIT  1;
"""

# Step 2 — fetch the historical source from bundle storage
def fetch_bundle(bundle_url: str) -> bytes:
    import boto3
    from urllib.parse import urlparse
    u = urlparse(bundle_url)     # s3://airflow-dag-bundles/prod/dag_v7.py
    s3 = boto3.client("s3")
    r  = s3.get_object(Bucket=u.netloc, Key=u.path.lstrip("/"))
    return r["Body"].read()

# Step 3 — trigger the backfill against that version
def backfill_at(dag_id: str, logical_date: str) -> None:
    # Airflow 3.1 CLI: backfill picks the version by created_at ≤ logical_date
    import subprocess
    subprocess.check_call([
        "airflow", "dags", "backfill",
        "--dag-id", dag_id,
        "--logical-date", logical_date,
        "--end-date",     logical_date,
        "--reset-dagruns",
    ])

# Step 4 — verify the backfill used the expected version
VERIFY_SQL = """
SELECT ti.task_id,
       ti.dag_version_id,
       ti.state,
       dv.source_hash
FROM   task_instance ti
JOIN   dag_version   dv ON dv.id = ti.dag_version_id
WHERE  ti.dag_id  = %s
  AND  ti.run_id LIKE %s
ORDER  BY ti.task_id;
"""
Enter fullscreen mode Exit fullscreen mode
-- Concrete run against a real audit request
-- "Reproduce sunday_2026_06_14 exactly"
-- Step 1 — pick version
SELECT id, created_at, source_hash, bundle_url
FROM   dag_version
WHERE  dag_id = 'daily_sales_etl'
  AND  created_at <= '2026-06-14 00:00:00'
  AND  (deprecated_at IS NULL OR deprecated_at > '2026-06-14 00:00:00')
ORDER  BY created_at DESC
LIMIT  1;
-- returns: id=7, created_at=2026-06-11, source_hash=abc123, bundle_url=s3://.../dag_v7.py

-- Step 4 — verify after backfill
SELECT ti.task_id, ti.dag_version_id, ti.state, dv.source_hash
FROM   task_instance ti
JOIN   dag_version   dv ON dv.id = ti.dag_version_id
WHERE  ti.dag_id = 'daily_sales_etl'
  AND  ti.run_id LIKE 'backfill__2026-06-14%'
ORDER  BY ti.task_id;
-- Expected: every row has dag_version_id = 7, source_hash = abc123
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Step Command Result
1 Lookup version at 2026-06-14 v7, created 2026-06-11, source_hash abc123
2 Fetch bundle from S3 historical DAG source retrieved
3 Trigger backfill for 2026-06-14 scheduler runs v7 code
4 Verify task_instance rows every task stamped dag_version_id=7
5 Compare aggregate output to original run should match if upstream data is idempotent

After the reproduction, the auditor gets a signed artefact: "this backfill used dag_version_id=7 (source_hash abc123), pinned to the version live on the original logical date, and the task instances stamp confirms the replay." The reproduction is exact on structure, and exact on data as long as the upstream data source is idempotent-on-date.

Output:

Property Original run Backfill
dag_version_id 7 7
source_hash abc123 abc123
task graph identical identical
Upstream data as of 2026-06-14 as of query time (danger zone)
Aggregate outputs match? yes iff upstream is date-idempotent

Why this works — concept by concept:

  • version_id lookup — the scheduler picks the version whose created_at <= logical_date AND (not deprecated OR deprecated after logical_date). This is deterministic and audit-provable.
  • Bundle-stored source — the historical DAG file lives in S3, keyed by version_id. Without bundle storage, the version_id is a hash but the source is lost — reproduction becomes impossible if the git history has been amended.
  • Task-instance stamping — every replayed task instance carries the dag_version_id. The auditor's SQL against task_instance is the confirmation artefact; it survives Airflow restarts and metadata migrations.
  • Upstream data idempotency (the failure mode) — versioning captures code, not data. If the DAG reads from a live table that has been updated since the original run, the backfill computes different results with the same code. The interview signal is naming this failure mode without prompting.
  • Cost — bundle storage costs ~100 KB per version × versions × DAGs. Metadata DB adds one small table (dag_version) and one column on task_instance. Query cost for lookup is O(log versions) via the index on (dag_id, created_at). The reproduction cost is O(1) in engineering time — the whole feature is designed to make audits routine, not exceptional.

ETL
Topic — etl
ETL problems on reproducible backfills and versioning

Practice →

SQL Topic — sql SQL problems on audit trails and metadata queries

Practice →


4. MLOps primitives — Assets, model registry integration

airflow assets (renamed from Datasets) are the lineage + freshness + trigger primitive that makes airflow mlops real

The mental model in one line: Airflow 3 renames Datasets to Assets and elevates them from a scheduling hint to a first-class lineage-and-trigger primitive — Assets carry URIs, freshness metadata, and lineage edges; DAGs subscribe to Asset updates via schedule=[Asset(...)]; and dedicated hooks in the airflow.providers.vertex_ai, airflow.providers.mlflow, and airflow.providers.mosaic packages complete the MLOps loop by registering trained models against external registries the moment training finishes. The primitive is what turns a hand-rolled ML DAG into a production MLOps pipeline.

Iconographic MLOps diagram — an asset-lineage chain feeding a Vertex/MLflow model-registry medallion, with a training DAG kicking off downstream when the training-asset updates.

What Assets add over the 2.x Datasets story.

  • Renamed and elevated. airflow.datasets.Datasetairflow.assets.Asset. The 2.x import still works with a deprecation warning through 3.1; removed in 3.2.
  • URI-first identity. An Asset is identified by a URI (s3://bucket/path/features.parquet, snowflake://ACCOUNT/DB.SCHEMA.TABLE, bigquery://project.dataset.table). The URI is the canonical primary key across DAGs.
  • Freshness metadata. Every Asset write records a logical_date, an optional content_hash, and an optional row/size delta. Downstream DAGs can require an Asset to be fresh within N minutes before triggering.
  • Lineage edges. The Assets tab in the UI shows the full dependency graph — which DAG produced this Asset, which DAGs consume it, when it was last updated.
  • Asset-scheduled DAGs. schedule=[Asset("s3://...")] triggers the DAG the moment any upstream writer updates that Asset. Multi-Asset schedules (schedule=[Asset(A), Asset(B)]) support fan-in semantics.

The MLOps primitives that build on Assets.

  • Model-registry hooks. VertexAIModelRegisterOperator, MLflowRegisterModelOperator, MosaicRegisterModelOperator (and equivalents for SageMaker, Databricks). Each takes an Asset representing the trained model artefact and registers it against the target registry with metadata.
  • Feature-store integration. FeastMaterializeOperator, TectonMaterializeOperator — Asset-aware feature materialisation that emits an Asset update on completion.
  • Evaluation gates. ModelEvaluationSensor waits for evaluation metrics to be produced (as Assets); ModelPromotionOperator promotes a model version to production in the registry only if the evaluation metrics exceed a threshold.
  • Batch inference DAGs. Inference DAGs subscribe to the model Asset (schedule=[Asset("mlflow://models/customer_churn/latest")]) and re-run whenever a new production version lands.

Lineage + freshness — the two axes of Asset semantics.

  • Lineage. Every Asset knows its upstream DAG (producer_dag_id) and its downstream DAG subscriptions (consumer_dag_ids). The metadata DB stores the edges; the UI visualises them.
  • Freshness. Each Asset write carries a timestamp and (optionally) a freshness_expected_within hint. Consumers can require the Asset to have been updated within the last N minutes/hours; the sensor AssetFreshnessSensor blocks otherwise.

Common interview probes on Airflow MLOps.

  • "What's the difference between Datasets and Assets?" — rename + URI-first + freshness + lineage edges.
  • "How does an MLflow model registration integrate with an Airflow DAG?" — MLflowRegisterModelOperator consumes a trained-model Asset, registers, emits a new Asset for the registered version.
  • "How do you gate model promotion to production?" — sensor waits for evaluation Assets, operator promotes only if metrics pass, emits a promoted-to-production Asset that downstream inference DAGs subscribe to.
  • "Can I still use KubernetesPodOperator for custom training?" — yes; you emit an Asset at the end via outlets=[Asset(...)] and downstream DAGs subscribe as normal.

Worked example — an Asset-scheduled training DAG

Detailed explanation. A team trains a customer-churn model daily. The training data is a Parquet file in S3 updated hourly by a separate ETL DAG. Rather than schedule the training DAG on a fixed cron and pray the data is fresh, use an Asset-scheduled DAG that triggers whenever the training data Asset updates.

  • Producer DAG. feature_engineering writes s3://features/customer_churn/features.parquet hourly.
  • Consumer DAG. train_churn_model subscribes to the Asset; triggers the moment the Asset updates.
  • Registry. Trained model registered against MLflow.

Question. Write both DAGs and show how the Asset flows through them. Explain the freshness guarantee.

Input.

Component Value
Training data Asset URI s3://features/customer_churn/features.parquet
Model artefact Asset URI s3://models/customer_churn/model.pkl
Registry MLflow at https://mlflow.example.com

Code.

# DAG 1 — producer
from datetime import datetime
from airflow.decorators import dag, task
from airflow.assets import Asset

FEATURES = Asset("s3://features/customer_churn/features.parquet")

@dag(
    dag_id="feature_engineering",
    schedule="@hourly",
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def feature_engineering():

    @task(outlets=[FEATURES])
    def compute_and_write_features() -> None:
        # ... read raw events, compute features, write parquet ...
        # `outlets=[FEATURES]` marks this task as an Asset producer
        pass

    compute_and_write_features()

feature_engineering()
Enter fullscreen mode Exit fullscreen mode
# DAG 2 — consumer, Asset-scheduled
from datetime import datetime
from airflow.decorators import dag, task
from airflow.assets import Asset
from airflow.providers.mlflow.operators.registry import MLflowRegisterModelOperator

FEATURES = Asset("s3://features/customer_churn/features.parquet")
MODEL    = Asset("s3://models/customer_churn/model.pkl")

@dag(
    dag_id="train_churn_model",
    schedule=[FEATURES],            # trigger the moment FEATURES updates
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def train_churn_model():

    @task(outlets=[MODEL])
    def train() -> str:
        # ... load features, train sklearn model, pickle to s3 ...
        return "s3://models/customer_churn/model.pkl"

    register = MLflowRegisterModelOperator(
        task_id="register",
        model_artifact_uri="{{ ti.xcom_pull(task_ids='train') }}",
        model_name="customer_churn",
        mlflow_tracking_uri="https://mlflow.example.com",
    )

    train() >> register

train_churn_model()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The producer DAG (feature_engineering) runs hourly. Its single task declares outlets=[FEATURES] — this tells Airflow that when this task succeeds, the FEATURES Asset should be marked as updated. The metadata DB records an Asset-update event.
  2. The consumer DAG (train_churn_model) uses schedule=[FEATURES] instead of a cron string. Airflow's scheduler treats Asset-update events as trigger events; the moment the producer emits an update, the consumer is queued.
  3. If the producer runs at 14:00, 15:00, 16:00, the consumer runs at 14:00+ε, 15:00+ε, 16:00+ε — synchronised with the data, not with the wall clock. If the producer misses an hour (upstream ETL delay), the consumer skips that hour too. No more "training ran but on stale data" incidents.
  4. The consumer's train task also declares outlets=[MODEL] — training emits a new Asset. Downstream inference DAGs can subscribe to MODEL and re-run whenever a new model ships.
  5. MLflowRegisterModelOperator receives the model artefact URI from XCom and registers it in MLflow. In production, the registered version becomes discoverable by inference services; the Airflow Asset serves as the trigger source and the MLflow registry serves as the discovery layer.

Output.

Event Timestamp Effect
feature_engineering runs 14:00 FEATURES asset updated
train_churn_model queued 14:00+ε Asset-scheduled trigger
train_churn_model completes 14:15 MODEL asset updated; MLflow gets new version
Inference DAG queued (if subscribed) 14:15+ε Runs on fresh model

Rule of thumb. Asset-scheduled DAGs replace cron schedules for any pipeline where upstream data freshness matters more than wall-clock time. The Asset is the contract; the DAGs on either side are the implementations.

Worked example — model promotion gate with evaluation Assets

Detailed explanation. A trained model must clear an evaluation gate (accuracy > 0.85) before promotion to production. The pipeline: train → evaluate → gate → promote. The gate is a sensor that waits for the evaluation Asset and checks the metrics; the promotion operator only runs if the gate passes. The production model Asset is what inference DAGs subscribe to.

  • Metrics threshold. Accuracy > 0.85 on the holdout set.
  • Promotion. Set the MLflow model stage from staging to production.
  • Downstream. Inference DAGs subscribed to the production Asset re-run.

Question. Design the DAG with the evaluation gate. Show what happens on a passing and a failing model.

Input.

Component Value
Model Asset mlflow://models/customer_churn
Evaluation Asset s3://eval/customer_churn/metrics.json
Threshold accuracy > 0.85
Promotion target mlflow://models/customer_churn?stage=production

Code.

from datetime import datetime
from airflow.decorators import dag, task
from airflow.assets import Asset
from airflow.exceptions import AirflowSkipException
from airflow.providers.mlflow.operators.registry import (
    MLflowRegisterModelOperator,
    MLflowPromoteModelOperator,
)

FEATURES     = Asset("s3://features/customer_churn/features.parquet")
STAGED_MODEL = Asset("mlflow://models/customer_churn?stage=staging")
METRICS      = Asset("s3://eval/customer_churn/metrics.json")
PROD_MODEL   = Asset("mlflow://models/customer_churn?stage=production")

@dag(
    dag_id="train_evaluate_promote_churn",
    schedule=[FEATURES],
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def train_eval_promote():

    @task(outlets=[STAGED_MODEL])
    def train_and_stage() -> str:
        # ... train + register in MLflow as `staging` ...
        return "mlflow://models/customer_churn?version=42&stage=staging"

    @task(outlets=[METRICS])
    def evaluate(model_uri: str) -> dict:
        # ... load holdout set, score model ...
        return {"accuracy": 0.87, "roc_auc": 0.91}

    @task
    def gate(metrics: dict) -> None:
        if metrics["accuracy"] <= 0.85:
            raise AirflowSkipException(f"gate failed: accuracy={metrics['accuracy']:.3f}")

    promote = MLflowPromoteModelOperator(
        task_id="promote",
        model_name="customer_churn",
        source_stage="staging",
        target_stage="production",
        outlets=[PROD_MODEL],
    )

    staged  = train_and_stage()
    metrics = evaluate(staged)
    gate(metrics) >> promote

train_eval_promote()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. train_and_stage trains and registers to MLflow as staging. outlets=[STAGED_MODEL] marks the staged model Asset as updated; downstream monitoring dashboards subscribed to STAGED_MODEL can pick this up.
  2. evaluate scores the staged model against the holdout set and writes metrics to S3 (outlets=[METRICS] marks that Asset updated). The metrics are also returned as an XCom for the gate task.
  3. gate is the guard: if accuracy ≤ 0.85, raise AirflowSkipException which marks the task as skipped rather than failed. Downstream tasks in the same branch (via trigger_rule) skip too. If accuracy > 0.85, gate succeeds, promote runs.
  4. MLflowPromoteModelOperator transitions the model from staging to production via the MLflow REST API. outlets=[PROD_MODEL] marks the production Asset updated — any inference DAG subscribed to PROD_MODEL fires next.
  5. The elegance: the gate is inside the DAG, but the promotion effect propagates outside the DAG via the Asset. Downstream DAGs never need to know the promotion happened; they subscribe to the Asset and Airflow handles the trigger.

Output.

Scenario accuracy gate promote runs? production Asset updated?
Pass 0.87 success yes yes
Fail 0.82 skipped no (skipped) no
Borderline 0.85 skipped (≤ threshold) no no

Rule of thumb. Evaluation gates + outlets=[production_asset] is the canonical model-promotion pattern in Airflow 3. The Asset is the trigger for downstream inference; the gate is the policy; the operator is the mechanism.

Worked example — Feast feature materialisation Asset chain

Detailed explanation. A production ML stack uses Feast as its feature store. The feature-materialisation DAG reads raw events, materialises features into the online store (Redis) and offline store (S3), and emits an Asset for each. Downstream training DAGs subscribe to the offline-store Asset; inference services read from the online store. The DAG-side integration uses FeastMaterializeOperator.

  • Upstream. Raw event Asset (s3://raw/events/{ds}/*.parquet).
  • Feast operator. Materialises features into online (Redis) + offline (S3) stores.
  • Downstream training Asset. s3://feast/offline/customer_churn/features.parquet.

Question. Write the DAG. Show how the training DAG subscribes to the Feast-materialised Asset.

Input.

Component Value
Feast repo git@github.com:team/feast-repo.git
Online store Redis at redis://feast-online:6379
Offline store s3://feast/offline/...

Code.

from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.assets import Asset
from airflow.providers.feast.operators.materialize import FeastMaterializeOperator

RAW_EVENTS = Asset("s3://raw/events/latest.parquet")
OFFLINE_FEATURES = Asset("s3://feast/offline/customer_churn/features.parquet")

@dag(
    dag_id="feast_materialize_customer_churn",
    schedule=[RAW_EVENTS],
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={"retries": 2},
)
def feast_materialize():

    materialize = FeastMaterializeOperator(
        task_id="materialize",
        feature_views=["customer_churn_features"],
        start_ts="{{ data_interval_start }}",
        end_ts="{{ data_interval_end }}",
        feast_repo="/opt/airflow/feast_repo",
        outlets=[OFFLINE_FEATURES],
    )

    materialize

feast_materialize()
Enter fullscreen mode Exit fullscreen mode
# Training DAG subscribes to the Feast-materialised offline Asset
from airflow.assets import Asset
from airflow.decorators import dag, task
from datetime import datetime

OFFLINE_FEATURES = Asset("s3://feast/offline/customer_churn/features.parquet")
MODEL            = Asset("mlflow://models/customer_churn?stage=staging")

@dag(
    dag_id="train_churn_from_feast",
    schedule=[OFFLINE_FEATURES],
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def train_churn():
    @task(outlets=[MODEL])
    def train() -> str:
        # ... load from OFFLINE_FEATURES, train, register in MLflow ...
        return "mlflow://models/customer_churn?version=n&stage=staging"

    train()

train_churn()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. feast_materialize_customer_churn subscribes to RAW_EVENTS. Whenever a raw-events Asset update lands, the Feast materialisation kicks off. The DAG is Asset-scheduled, not cron-scheduled.
  2. FeastMaterializeOperator runs the equivalent of feast materialize-incremental against the repo, materialising the specified feature views into both online (Redis) and offline (S3) stores. outlets=[OFFLINE_FEATURES] marks the offline Asset updated on success.
  3. train_churn_from_feast subscribes to OFFLINE_FEATURES. The moment Feast finishes materialising, the training DAG queues. The train task reads from the offline store, trains, and emits a new MODEL Asset.
  4. The dependency graph in the Assets tab of the UI now shows: RAW_EVENTS → feast_materialize → OFFLINE_FEATURES → train_churn → MODEL. This is the lineage senior interviewers ask about — visible, queryable, unambiguous.
  5. Feast's operator handles the --end-ts template correctly: the materialisation window matches the DAG run's data interval, so backfills materialise the historically-correct window.

Output.

DAG Asset produced Asset consumed
raw_events_etl (upstream) RAW_EVENTS
feast_materialize OFFLINE_FEATURES RAW_EVENTS
train_churn MODEL OFFLINE_FEATURES
inference_churn (downstream) predictions MODEL

Rule of thumb. Feature store + Airflow 3 Assets = a fully wired MLOps lineage graph with zero glue code. The FeastMaterializeOperator (or TectonMaterializeOperator) is the bridge; the Asset is the contract; the schedule is the trigger.

Senior interview question on Airflow MLOps architecture

A senior interviewer might ask: "Design an Airflow 3 MLOps pipeline that trains a churn model daily, evaluates it on a holdout set, promotes to production only if accuracy exceeds a threshold, and re-triggers a batch inference DAG whenever a new production model ships. Walk me through the DAGs, the Assets, and the failure modes."

Solution Using Asset chains + evaluation gate + registry hook + Asset-scheduled inference

# Complete four-DAG MLOps pipeline
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.assets import Asset
from airflow.exceptions import AirflowSkipException
from airflow.providers.mlflow.operators.registry import (
    MLflowRegisterModelOperator,
    MLflowPromoteModelOperator,
)
from airflow.providers.feast.operators.materialize import FeastMaterializeOperator

# ---- Assets ----
RAW      = Asset("s3://raw/events/latest.parquet")
FEATURES = Asset("s3://feast/offline/customer_churn/features.parquet")
STAGED   = Asset("mlflow://models/customer_churn?stage=staging")
METRICS  = Asset("s3://eval/customer_churn/metrics.json")
PROD     = Asset("mlflow://models/customer_churn?stage=production")

# ---- DAG 1: materialise features ----
@dag(dag_id="mlops_materialize",
     schedule=[RAW],
     start_date=datetime(2026, 1, 1),
     catchup=False)
def materialize():
    FeastMaterializeOperator(
        task_id="mat",
        feature_views=["customer_churn_features"],
        feast_repo="/opt/airflow/feast_repo",
        outlets=[FEATURES],
    )
materialize()

# ---- DAG 2: train + register staging ----
@dag(dag_id="mlops_train",
     schedule=[FEATURES],
     start_date=datetime(2026, 1, 1),
     catchup=False,
     default_args={"retries": 2, "retry_delay": timedelta(minutes=5)})
def train():
    @task(outlets=[STAGED])
    def train_task() -> str:
        return "mlflow://models/customer_churn?version=n&stage=staging"
    train_task()
train()

# ---- DAG 3: evaluate + promote if pass ----
@dag(dag_id="mlops_evaluate_promote",
     schedule=[STAGED],
     start_date=datetime(2026, 1, 1),
     catchup=False)
def evaluate_promote():
    @task(outlets=[METRICS])
    def eval_task() -> dict:
        return {"accuracy": 0.87, "roc_auc": 0.91}

    @task
    def gate_task(m: dict) -> None:
        if m["accuracy"] <= 0.85:
            raise AirflowSkipException(f"gate failed: acc={m['accuracy']}")

    promote = MLflowPromoteModelOperator(
        task_id="promote",
        model_name="customer_churn",
        source_stage="staging",
        target_stage="production",
        outlets=[PROD],
    )

    m = eval_task()
    gate_task(m) >> promote
evaluate_promote()

# ---- DAG 4: batch inference re-runs on every prod update ----
@dag(dag_id="mlops_infer",
     schedule=[PROD],
     start_date=datetime(2026, 1, 1),
     catchup=False)
def infer():
    @task
    def infer_task() -> None:
        # ... load PROD model, score, write predictions ...
        pass
    infer_task()
infer()
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

DAG Consumes Produces Trigger
mlops_materialize RAW FEATURES Asset (raw events landing)
mlops_train FEATURES STAGED Asset (features materialised)
mlops_evaluate_promote STAGED METRICS, PROD (if pass) Asset (staged model)
mlops_infer PROD (predictions) Asset (production promotion)

After the pipeline stabilises, the daily loop is: raw events land → features materialise → training runs → evaluation runs → promotion happens (or skips) → inference re-runs on the new production model. Every step is Asset-scheduled; no cron; freshness is guaranteed by the Asset contract; lineage is queryable in the UI.

Output:

Metric Value
DAGs in pipeline 4
Schedules used 0 cron / 4 Asset-scheduled
Failure modes handled training retry, gate skip, missing raw data (no trigger)
Lineage visible in UI full graph
Model promotion gated yes (accuracy > 0.85)
Registry integration MLflow (register + promote)

Why this works — concept by concept:

  • Asset chains as the contract — every DAG boundary is an Asset. The chain RAW → FEATURES → STAGED → PROD is the pipeline; the DAGs are just the mechanism. Adding a new step is one more Asset in the chain, not a schedule refactor.
  • Evaluation gate as a skipAirflowSkipException is idiomatic for "we don't want to promote this run"; it is not a failure. The gate DAG stays green in the UI even when it skips; only the promotion is elided.
  • Registry hook as an operatorMLflowPromoteModelOperator is a first-class operator, not a bash-shelled MLflow CLI call. The integration is stable and testable; the operator emits an Asset update for the promotion.
  • Asset-scheduled inference — inference is not on cron. It runs the moment a new production model ships. No stale-model incidents.
  • Cost — four DAGs, one Asset per boundary, one Feast + one MLflow provider. Operational cost is O(1) — the Asset scheduler is part of the core Airflow runtime. Metadata cost is one row per Asset per update, indexed on (asset_uri, updated_at). The savings versus the 2.x custom-cron-and-XCom-signalling equivalent are measurable in engineer-weeks per pipeline.

ETL
Topic — etl
ETL problems on lineage and asset-scheduled pipelines

Practice →

Design Topic — design Design problems on MLOps pipeline architectures

Practice →


5. UI redesign + Edge deploy + migration path

The unified UI, the Edge worker Helm chart, and the airflow db upgradecheck airflow 3 migration path — the three things every 2.x-to-3.x cutover touches

The mental model in one line: Airflow 3 ships a unified UI that collapses Graph / Grid / Calendar / DAG-run views into a single lens with Assets, Versions, and Task Groups tabs; provides a canonical Helm chart and systemd path for edge worker deployment; and gives you airflow db upgradecheck — the linter that surfaces every deprecated API in your 2.x DAGs before you cut over — so the entire 2.x → 3.x migration reduces to (a) run the checker, (b) fix the errors on 2.x first, (c) cut over with confidence. The UI redesign is the daily productivity win; the upgrade checker is the safety-net that makes senior engineers actually adopt.

Iconographic UI + migration diagram — a unified UI screen on the left showing tabs for Assets, Versions and Task Groups, and a migration ramp on the right from Airflow 2.x through the upgrade checker to 3.x.

The unified UI — one lens, three tabs.

  • The old UI's problem. In 2.x, you'd look at a failed task and cross-reference Graph (for structure), Grid (for run history), Calendar (for scheduling gaps), and the Datasets page (for lineage). Four tabs, four mental context switches.
  • The 3.x UI. A single DAG-run view with tabs at the top: Overview (Graph + Grid combined), Task Groups (collapsible hierarchical view), Versions (this run's DAG version + diff to previous), Assets (produced/consumed by this run).
  • The flag. In 3.0, the new UI shipped behind AIRFLOW__UI__NEW_UI=true for opt-in. In 3.1 it is default; the legacy UI is available via a ?legacy=1 query param.
  • What senior users notice first. Time-to-diagnose-a-failed-task drops from ~5 minutes to <60 seconds — the version and Asset context is right there next to the task instance detail.

Edge Worker deployment — Helm chart and systemd.

  • Helm chart. The official apache-airflow/airflow Helm chart adds an edgeWorker section (mirroring the existing workers section). Set edgeWorker.enabled: true and provide apiUrl, queues, concurrency, and secretsRef.
  • systemd path. For on-prem or non-K8s deploys, a systemd unit runs airflow edge worker with env vars. Same behaviour, different packaging.
  • Astronomer / MWAA / Cloud Composer. Managed Airflow services expose Edge Worker registration via a UI-issued token; the operational path is identical to self-hosted.
  • Scale story. One Edge Worker pod per queue per site. Concurrency = local task slots; total capacity = pods × concurrency.

The airflow 3 migration path.

  • Step 1 — inventory. airflow dags list on 2.x + airflow db upgradecheck --run-all prints every deprecated import, every removed operator, every incompatible custom XCom backend.
  • Step 2 — fix on 2.x first. Every finding gets a PR against the 2.x branch. SubDagOperatorTaskGroup. airflow.datasetsairflow.assets (works on 2.9+ via compat shim). Custom XCom backends updated to the 3.x interface.
  • Step 3 — parallel staging. Stand up a 3.x staging cluster with the same executor, same connections, same variables. Deploy the same DAG bundle. Run parallel for 1–2 weeks.
  • Step 4 — cutover. Point production traffic to 3.x. Retire 2.x after 2 weeks of clean prod.
  • Step 5 — adopt. Enable DAG versioning bundle storage, migrate Datasets imports to Assets, deploy Edge Workers as workloads require, turn on the new UI.

The upgrade checker (airflow db upgradecheck).

  • What it scans. Every DAG file in the DAGs folder, plus the metadata DB for deprecated executor configs, deprecated auth backends, and deprecated provider packages.
  • What it reports. Categorised list — ERROR (must fix before cutover), WARNING (should fix before 3.2), INFO (nice-to-have).
  • CI integration. Run in CI on every PR; block merges that introduce ERRORs.
  • The escape hatch. For findings you can't fix immediately, airflow db upgradecheck --ignore <rule> suppresses the rule with a tracked ignore-file — never a silent skip.

Common interview probes on the UI + migration.

  • "What's new in the Airflow 3 UI?" — unified lens, Assets/Versions/Task Groups tabs.
  • "How do you migrate a 2.x deployment?" — upgradecheck → fix on 2.x → staging parallel → cutover → adopt.
  • "What's the fastest 3.x adoption you'd recommend?" — 6 months for a 400-DAG deployment.
  • "What breaks in the migration?" — SubDagOperator (removed), experimental REST API (removed), custom XCom backends (interface changed).

Worked example — running the upgrade checker on a 2.x deployment

Detailed explanation. A team wants to inventory their 2.9 deployment before starting the 3.x migration. They run airflow db upgradecheck --run-all --format json in CI and process the output into a tracking spreadsheet. Every finding gets an owner, a category, and a due date.

  • Command. airflow db upgradecheck --run-all --format json > findings.json.
  • Output. JSON list of findings — rule_id, severity, dag_id, file, line, message.
  • Triage. Findings sorted by severity, filtered by rule_id, grouped by owning team.

Question. Write the CI job and the triage helper. Show what the output looks like for a real 2.x deployment with 400 DAGs.

Input.

Component Value
Airflow version 2.9.4
DAGs folder /opt/airflow/dags
CI system GitHub Actions
Reporting Sheet + Slack

Code.

# .github/workflows/airflow-upgradecheck.yml
name: Airflow 3 upgrade checker

on:
  pull_request:
    paths:
      - "dags/**"
      - "airflow.cfg"
  schedule:
    - cron: "0 12 * * MON"    # weekly Monday scan

jobs:
  upgradecheck:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install Airflow 2.9 with upgrade checker
        run: |
          python -m venv .venv
          source .venv/bin/activate
          pip install "apache-airflow==2.9.4" "apache-airflow-upgrade-check>=3.0"

      - name: Run upgradecheck
        run: |
          source .venv/bin/activate
          airflow db upgradecheck --run-all --format json > findings.json

      - name: Triage & block on ERROR
        run: |
          python scripts/triage_upgradecheck.py findings.json

      - uses: actions/upload-artifact@v4
        with:
          name: upgradecheck-findings
          path: findings.json
Enter fullscreen mode Exit fullscreen mode
# scripts/triage_upgradecheck.py
import json, sys
from collections import Counter

path = sys.argv[1]
with open(path) as f:
    findings = json.load(f)

by_severity = Counter(f["severity"] for f in findings)
by_rule     = Counter(f["rule_id"]  for f in findings)

print("Upgrade check findings:")
for sev, n in by_severity.most_common():
    print(f"  {sev:>8}: {n}")

print("\nTop rules:")
for rule, n in by_rule.most_common(10):
    print(f"  {rule:>40}: {n}")

# Fail CI if any ERRORs
errors = [f for f in findings if f["severity"] == "ERROR"]
if errors:
    print(f"\n{len(errors)} ERROR-severity findings — blocking merge")
    for e in errors[:5]:
        print(f"  {e['dag_id']}:{e['line']}  {e['rule_id']}  {e['message']}")
    sys.exit(1)
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The CI workflow installs 2.9 + the upgrade-check package into a scratch venv, then runs airflow db upgradecheck --run-all against the DAGs folder. The --run-all flag enables every rule; --format json produces machine-readable output.
  2. The triage script summarises findings by severity and by rule_id. Common rule_ids in a 2.9 deployment: SubDagOperatorRule (SubDagOperator removed), ExperimentalApiRule (experimental REST removed), DatasetImportRule (rename to Assets), AuthBackendRule (deprecated auth backends).
  3. The CI job fails if any ERROR-severity finding exists. This makes upgrade-blocker regressions impossible — a PR that adds a SubDagOperator import fails CI immediately.
  4. The weekly cron scan catches drift: even PRs that don't touch DAGs sometimes bring in provider-package changes that surface a new finding. The weekly run keeps the tracking sheet honest.
  5. The uploaded artefact preserves the full JSON for offline analysis. Teams import it into a tracking spreadsheet (or Jira, or Linear) and assign owners.

Output.

Severity Count
ERROR 8
WARNING 42
INFO 137
Top rule DatasetImportRule (68)
Second SubDagOperatorRule (5)
Third ExperimentalApiRule (3)

Rule of thumb. Ship the upgrade checker in CI before you plan the migration timeline. The ERROR count is the size of your block-tier work; the WARNING count sizes your post-cutover backlog. Numbers before dates.

Worked example — deploying an Edge Worker via Helm

Detailed explanation. A team runs their control plane in EKS (Elastic Kubernetes Service on AWS) and wants to add an Edge Worker pool for their on-prem sensor pipeline. The on-prem site runs a small K3s cluster. Deploy the Edge Worker via the official Helm chart with values pointing at the cloud control plane's API.

  • Control plane. EKS in us-east-1, Airflow API at airflow.example.com.
  • Edge site. K3s in the factory, needs Edge Worker for queue=factory.
  • Auth. Bearer token issued from Airflow UI, stored as a K8s secret.

Question. Write the Helm values file for the Edge Worker deploy. Show the install command and the smoke test.

Input.

Component Value
Helm chart apache-airflow/airflow
Chart version ≥ 1.16 (Airflow 3 support)
Edge Worker queue factory
Concurrency 4

Code.

# edge-worker-values.yaml
airflowVersion: 3.1.2

# Turn everything else off — this cluster only runs Edge Workers
scheduler:
  enabled: false
webserver:
  enabled: false
triggerer:
  enabled: false
postgresql:
  enabled: false
redis:
  enabled: false

edgeWorker:
  enabled: true
  replicas: 2
  queues:
    - factory
  concurrency: 4
  env:
    - name: AIRFLOW__EDGE__API_URL
      value: https://airflow.example.com
    - name: AIRFLOW__EDGE__HEARTBEAT_SECONDS
      value: "5"
    - name: AIRFLOW__EDGE__LOG_UPLOAD_URL
      value: https://airflow.example.com/api/v1/edge/logs
  extraEnvFrom:
    - secretRef:
        name: edge-worker-secrets
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
    limits:
      cpu: 2000m
      memory: 4Gi
  nodeSelector:
    role: airflow-edge
Enter fullscreen mode Exit fullscreen mode
# Install
kubectl create ns airflow-edge

kubectl -n airflow-edge create secret generic edge-worker-secrets \
  --from-literal=AIRFLOW__EDGE__API_TOKEN='eyJhbGciOi...'

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow-edge apache-airflow/airflow \
  --namespace airflow-edge \
  --version 1.16.0 \
  --values edge-worker-values.yaml

# Smoke test — check the worker registered with the control plane
kubectl -n airflow-edge logs -l component=edge-worker --tail=50

# Verify from control plane
airflow edge workers list
# +--------------+---------+--------+------------+-----------+
# | worker_id    | queue   | state  | last_hb    | concurrency|
# +--------------+---------+--------+------------+-----------+
# | edge-abc123  | factory | active | 2s ago     | 4         |
# | edge-def456  | factory | active | 3s ago     | 4         |
# +--------------+---------+--------+------------+-----------+
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The values file turns off every component except edgeWorker — no scheduler, no webserver, no metadata DB. This K3s cluster runs Edge Workers only; the control plane lives elsewhere.
  2. edgeWorker.replicas: 2 runs two worker pods per site for HA. Each pod concurrency = 4 → total site capacity 8 concurrent tasks. If one pod fails, the other keeps working; the control plane sees the unhealthy pod's heartbeat stop and re-queues its in-flight tasks.
  3. The env block sets the API URL and heartbeat interval. AIRFLOW__EDGE__HEARTBEAT_SECONDS=5 is the recommended cadence; anything faster wastes API cycles, anything slower delays worker-lost detection.
  4. The extraEnvFrom pulls the API token from a K8s secret. The secret is created out-of-band via kubectl create secret; secrets never live in the values file in git.
  5. The smoke test has two parts: (a) kubectl logs shows the worker's registration log line, (b) airflow edge workers list on the control plane shows the worker registered, active, and heartbeating. Both must pass before considering the deploy successful.

Output.

Component Value
Pods 2 × edgeWorker
Queue factory
Concurrency per pod 4
Total site capacity 8 concurrent tasks
Auth Bearer token via secret
Heartbeat cadence 5s
Control-plane visibility 2 active workers

Rule of thumb. Helm-deploy Edge Workers as their own release, in their own namespace, on their own cluster. Never mix them with the control-plane Helm release — the concerns are entirely separate, and the operational separation makes upgrades independent.

Worked example — a 2.x → 3.x cutover weekend

Detailed explanation. A team has done all the prep — upgradecheck passes, staging has parity, block-tier findings are fixed. The cutover weekend is 48 hours. Walk through the hour-by-hour plan: Friday 18:00 freeze, Saturday morning cutover, weekend soak, Monday morning declare-victory or rollback.

  • Friday 18:00. Freeze DAG changes on both branches.
  • Saturday 08:00. Blue/green flip: DNS points to 3.x cluster.
  • Saturday–Sunday. Soak; watch dashboards; keep 2.x hot for rollback.
  • Monday 09:00. Declare victory or roll back.

Question. Produce the runbook for the 48-hour cutover, with rollback triggers and success criteria.

Input.

Parameter Value
Cutover window Fri 18:00 → Mon 09:00
Rollback trigger task fail-rate > 5% for 30 min
Success criterion 48h at fail-rate ≤ 1%

Code.

Cutover runbook — 2.9 → 3.1
===========================

Friday
  18:00  DAG freeze on both branches; no merges to main until Monday
  18:15  Final upgradecheck on 2.x snapshot; expect 0 ERRORs
  18:30  Backup Airflow metadata DB (pg_dump)
  19:00  Deploy 3.1 to blue cluster (parallel to 2.9 green)
  19:30  Run smoke DAG on blue; verify all 400 DAGs parse

Saturday
  08:00  Blue/green flip: DNS points to 3.1 cluster
         Turn on unified UI (AIRFLOW__UI__NEW_UI=true)
  08:15  Watch task fail-rate dashboard; alert threshold 5% for 30m
  10:00  Manual spot-check: pick 20 diverse DAG runs, verify success + version stamp
  14:00  Rehearsal: kill an Edge Worker pod, confirm re-queue behaviour
  18:00  Sitrep #1: overall fail-rate, saturation, MLops asset triggers

Sunday
  10:00  Sitrep #2
  14:00  Rehearsal: pretend rollback (do not execute) — verify DNS flip time <60s
  18:00  Sitrep #3

Monday
  09:00  If 48h fail-rate ≤ 1%: declare victory
         → decommission 2.9 cluster over the following week
         → announce migration complete to stakeholders
         → open backlog tickets for post-cutover adoption:
            - migrate remaining Dataset imports to Asset
            - deploy production DAG versioning bundle
            - deploy first Edge Worker to hybrid site
         Else: roll back
         → DNS flip back to 2.9 green cluster
         → restore metadata DB from Friday 18:30 backup
         → post-mortem within 48 hours
Enter fullscreen mode Exit fullscreen mode
# Cutover watchdog — page on fail-rate spike
import time, sys, psycopg2
from datetime import datetime, timedelta

FAIL_RATE_THRESHOLD = 0.05      # 5%
WINDOW_MINUTES      = 30

def fail_rate(conn) -> float:
    cur = conn.cursor()
    cur.execute("""
        SELECT COUNT(*) FILTER (WHERE state = 'failed')::float / NULLIF(COUNT(*), 0)
        FROM   task_instance
        WHERE  start_date > now() - INTERVAL '30 minutes'
    """)
    return cur.fetchone()[0] or 0.0

def main():
    conn = psycopg2.connect("postgres://.../airflow")
    while True:
        rate = fail_rate(conn)
        print(f"{datetime.utcnow().isoformat()}  fail_rate={rate:.2%}")
        if rate > FAIL_RATE_THRESHOLD:
            # page on-call, then exit — human decides rollback
            sys.stderr.write(f"ROLLBACK ALERT: fail_rate={rate:.2%}\n")
            sys.exit(1)
        time.sleep(60)

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Friday's freeze prevents "one more merge" from destabilising the migration. Both 2.x and 3.x branches are frozen; the DB backup is the rollback safety net.
  2. Saturday 08:00 is the go-live moment. DNS flip is preferred over gradual traffic split because Airflow doesn't cleanly support two clusters sharing a metadata DB; the 3.1 cluster gets its own DB (restored from the Friday backup).
  3. The fail-rate watchdog runs every minute. Threshold 5% for 30 minutes is aggressive-enough to catch a real regression, conservative-enough to avoid pages on transient blips.
  4. The rehearsal steps (Saturday 14:00, Sunday 14:00) exercise the parts of the runbook that operational muscle-memory forgets: killing an Edge Worker pod, verifying DNS flip timing, checking metadata restore semantics.
  5. Monday 09:00 is the decision point. Declare victory or roll back. The 48-hour fail-rate threshold (≤ 1%) is the objective criterion; the sitreps are the confidence check. Rollback in this framework is a routine operation, not a heroic act.

Output.

Time Event Success signal
Fri 19:30 Blue cluster live in parallel All 400 DAGs parse
Sat 08:00 DNS flip Task fail-rate < 5%
Sat 10:00 Spot-check 20 DAGs All succeed with version stamp
Sat 18:00 Sitrep #1 Fail-rate ≤ 1%
Mon 09:00 Decision 48h fail-rate ≤ 1% → victory

Rule of thumb. Cutover weekends are a runbook problem, not an engineering problem. Ship the runbook + the watchdog + the rehearsals a week before Friday; the actual weekend is boring by design.

Senior interview question on the migration path

A senior interviewer might ask: "You've inherited a 2.9 Airflow deployment with 400 DAGs, a large custom XCom backend, several SubDagOperator-based DAGs, and no CI upgrade checker. Walk me through the migration to 3.1, the order of operations, the rollback strategy, and what you'd adopt on day 1 of the new cluster."

Solution Using upgradecheck-first + fix-on-2.x + parallel staging + Asset-first adoption

Migration plan — 2.9 → 3.1 for 400-DAG deployment
==================================================

Phase A — Foundation (weeks 1–2)
  A1  Ship airflow db upgradecheck to CI (blocks ERROR-severity merges)
  A2  Inventory findings: expect ~8 ERROR, ~42 WARNING, ~137 INFO
  A3  Assign each ERROR to a DAG owner with a fixed-by date
  A4  Rewrite custom XCom backend against 3.x interface (biggest single risk)

Phase B — Block-tier remediation (weeks 3–6)
  B1  Refactor every SubDagOperator to TaskGroup (5 DAGs)
  B2  Migrate every experimental-REST-API caller to the stable API (3 clients)
  B3  Rename airflow.datasets → airflow.assets in DAG imports (68 DAGs)
  B4  Re-run upgradecheck; expect 0 ERROR

Phase C — Parallel staging (weeks 7–8)
  C1  Stand up 3.1 staging cluster with identical executor + connections + variables
  C2  Deploy DAG bundle to staging; run every DAG at least once
  C3  Compare task success rate, runtime, XCom shape against 2.9 baseline
  C4  Sign-off gate: 100% DAGs green in staging for 48 consecutive hours

Phase D — Cutover weekend (week 9)
  D1  Friday freeze; backup metadata DB
  D2  Saturday DNS flip; unified UI on
  D3  48h soak with watchdog; rollback trigger = fail-rate > 5% for 30m
  D4  Monday decision: victory or rollback

Phase E — Day-1 adoption (week 10+)
  E1  Enable DAG versioning bundle storage (S3)
  E2  Ship the version-health alert (task fail-rate per new version)
  E3  Deploy first Edge Worker (for the hybrid customer workload)
  E4  Rewrite one training DAG against Assets + MLflow (proof-of-concept for MLOps team)
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Phase Weeks Exit gate Risk
A: Foundation 1–2 Findings inventory + owners assigned XCom backend surprises
B: Block-tier 3–6 0 ERROR in upgradecheck SubDag rewrites; regression on Asset rename
C: Staging 7–8 100% DAG parity for 48h Missing staging connections
D: Cutover 9 48h fail-rate ≤ 1% Watchdog false negatives
E: Adoption 10+ Versioning + Edge + MLOps proof Team capacity

After phase E is running, the deployment is a fully modern Airflow 3.1 cluster with the unified UI, DAG versioning + bundle storage, at least one Edge Worker in production, and a paved path for the MLOps team to migrate training DAGs one at a time. The migration is complete; adoption is ongoing.

Output:

Property Before After
Airflow version 2.9.4 3.1.2
upgradecheck errors ~8 0
Custom XCom backend 2.x interface 3.x interface
SubDagOperator use 5 DAGs 0 (TaskGroups)
Dataset imports 68 0 (Assets)
DAG versioning off on with bundle storage
Edge Workers 0 1 site (customer workload)
Unified UI off on

Why this works — concept by concept:

  • Upgradecheck-first — the migration plan starts with instrumentation, not code changes. The ERROR count sizes the block-tier work; the WARNING count sizes the post-cutover backlog. Numbers before dates.
  • Fix-on-2.x — every block-tier fix ships on the 2.9 branch first. This means the fixes are stress-tested against the current production workload before the 3.1 cutover ever happens. Big-bang migrations fail; incremental de-risking succeeds.
  • Parallel staging — a 3.1 cluster running the same DAGs against staged data for 2 weeks catches almost every latent incompatibility. The 48h all-green gate is the objective criterion.
  • Cutover weekend as a boring runbook — the actual go-live is a DNS flip + 48h watch. The interesting engineering happened in phases A through C.
  • Cost — 9 weeks calendar time, ~2 senior DEs. The adoption phase (versioning, Edge, MLOps) continues indefinitely at a lower burn rate. Skipping upgradecheck typically doubles the timeline because block-tier findings surface in staging instead of in phase A.

ETL
Topic — etl
ETL problems on migration planning and orchestration cutover

Practice →

Optimization
Topic — optimization
Optimization problems on Airflow deployment and rollout

Practice →


Cheat sheet — Airflow 3 recipes

  • When to upgrade from 2.x. Any two of {hybrid workload, MLOps roadmap, Python 3.12+ target, provider dependency on 3.x-only package} = upgrade now. Otherwise, upgrade within 12 months; the 2.x line is maintenance-only. Do not upgrade without CI upgrade-checker instrumentation first.
  • Edge Worker startup (systemd). ExecStart=/opt/airflow-edge/venv/bin/airflow edge worker with AIRFLOW__EDGE__API_URL=https://airflow.example.com, AIRFLOW__EDGE__QUEUES=factory, AIRFLOW__EDGE__CONCURRENCY=4. Bearer token from EnvironmentFile=/etc/airflow-edge/secrets.env. Only outbound HTTPS/443 required.
  • Edge Worker startup (Helm). edgeWorker.enabled: true in the official apache-airflow/airflow chart; provide apiUrl, queues, concurrency, secretsRef. Deploy in its own namespace, separate from the control-plane Helm release. replicas: 2 for site HA.
  • DAG version history query. SELECT ti.dag_id, ti.task_id, ti.run_id, ti.dag_version_id, dv.created_at, dv.source_hash FROM task_instance ti JOIN dag_version dv ON dv.id = ti.dag_version_id WHERE ti.dag_id = %s ORDER BY ti.start_date DESC LIMIT 100; The dag_version_id on every task_instance row is the audit artefact.
  • DAG versioning bundle storage. [dag_bundles.s3_prod] type=s3, bucket=airflow-dag-bundles, prefix=prod/, snapshot_on_parse=true. Every DAG source snapshotted on every version bump. Without this, historical DAG source is not recoverable and versioning is only half a feature.
  • Bad-deploy rollback via deprecation. airflow dags versions deprecate <dag_id> --version-id <N> --reason "..." marks a version bad; the scheduler auto-falls-back to the newest non-deprecated version. Faster and safer than a git revert; reversible with airflow dags versions undeprecate.
  • Asset (formerly Dataset) declaration. from airflow.assets import Asset; FEATURES = Asset("s3://features/customer_churn/features.parquet"). Emit with @task(outlets=[FEATURES]). Consume with @dag(schedule=[FEATURES], ...). Asset URIs are the canonical primary key across DAGs.
  • Airflow 2 → 3 upgrade checker. pip install "apache-airflow-upgrade-check>=3.0" then airflow db upgradecheck --run-all --format json. Ship in CI; block PRs on ERROR severity. Weekly cron scan catches drift from provider-package changes.
  • Migration checklist (9-week plan). Weeks 1–2: instrument upgradecheck. Weeks 3–6: fix block-tier findings on 2.x. Weeks 7–8: parallel staging 3.x. Week 9: cutover weekend. Week 10+: enable DAG versioning bundle storage, adopt Assets, deploy first Edge Worker.
  • Asset-scheduled DAG. @dag(schedule=[Asset("s3://...")], ...) triggers on Asset update. Multi-Asset schedules (schedule=[Asset(A), Asset(B)]) support fan-in semantics. Never mix schedule=[...] with a cron; Assets win.
  • Model-promotion gate. AirflowSkipException inside a gate task; downstream MLflowPromoteModelOperator with outlets=[PROD_ASSET]. Downstream inference DAGs subscribe to PROD_ASSET. Idiomatic MLOps promotion, no cron.
  • Idempotent Edge task pattern. Key writes on run_id, not on wall-clock time. retries=3 with exponential backoff; retry_delay=timedelta(minutes=1); retry_exponential_backoff=True. worker_lost becomes a routine event, not an incident.
  • XCom PII guardrail for Edge Workers. Pre-XCom hook scans strings for SSN/email/credit-card patterns; raises AirflowFailException on hit. Combined with per-queue routing, gives compliance officers a signed-off GDPR/HIPAA architecture with no ad-hoc reviews.
  • Cutover watchdog. Task fail-rate > 5% for 30 minutes → page on-call → human decides rollback. DNS-flip rollback in <60 seconds. 48-hour soak with all-green ≤ 1% fail-rate = declare victory. Boring by design.
  • Version-health alert. Task fail-rate per DAG version, evaluated hourly against versions created in the last 2 hours. Threshold 50% fail-rate over ≥5 runs. Catches bad deploys within 1 hour; runbook: deprecate the version, trigger a clean run, file bug.

Frequently asked questions

What's new in airflow 3 — the one-paragraph pitch?

Airflow 3.0 (April 2025) is the biggest release since 2.0 and centres on four themes: Edge Workers (task-executing processes running outside the main cluster that reverse-connect to the Airflow API — no inbound port required, unlocking hybrid on-prem + cloud, GDPR, HIPAA workloads); DAG versioning (every task instance stamped with the DAG version that ran it, so backfills replay historical code rather than the current main); MLOps primitives built on Assets (renamed from Datasets — URI-first, freshness-aware, lineage-carrying), with first-class hooks for Vertex AI, MLflow, and Mosaic model registries; and a unified UI that collapses Graph / Grid / Calendar into a single lens with Assets, Versions, and Task Groups tabs. Beneath those four, the release also delivers a new upgrade checker (airflow db upgradecheck), improved provider packaging, Python 3.13 support, and the removal of long-deprecated APIs (experimental REST, SubDagOperator).

Edge Worker vs KubernetesExecutor — when do I pick each?

KubernetesExecutor is the right choice when your workers should run in the same cluster as your scheduler and you want per-task pod isolation with autoscaling. It assumes the scheduler can create pods in a Kubernetes cluster it can reach — great for in-cluster workloads, cloud-native deployments, and teams that already run K8s. Edge Worker is the right choice when your workers need to run outside the scheduler's cluster — a customer data centre, an edge/IoT site, a GDPR/HIPAA-compliant regional cluster, or an on-prem factory. Edge Workers reverse-connect to the Airflow API over outbound HTTPS/443; no inbound port on the worker side, no VPN, no shared broker. In practice, most 3.x deployments use both — KubernetesExecutor for the in-cluster workload and Edge Workers for the hybrid or remote workload. Queue tags (queue=factory, queue=eu_central) route DAG tasks to the right executor.

Do I need to rewrite my 2.x DAGs to move to 3.x?

For the vast majority of DAGs — no. About 90% of typical 2.x DAGs import unchanged on 3.x. The 10% that do need edits fall into three buckets: hard removals (SubDagOperator → TaskGroup; the experimental REST API → the stable API; custom XCom backends → the 3.x interface); rename with compat shim (from airflow.datasets import Datasetfrom airflow.assets import Asset — the old import still works with a deprecation warning through 3.1, hard-removed in 3.2); and auth backend changes (a couple of deprecated auth backends were removed). The right workflow is to install the upgrade checker (pip install apache-airflow-upgrade-check>=3.0), run airflow db upgradecheck --run-all --format json against your 2.x deployment, and triage the output into ERROR (must fix before cutover), WARNING (should fix before 3.2), and INFO (nice-to-have). The ERROR count sizes your migration work; the WARNING count sizes your post-cutover backlog.

Are Datasets still called Datasets in Airflow 3?

No — they're Assets in airflow 3, and the rename is more than cosmetic. The 2.x Dataset was a scheduling hint; the 3.x Asset is a first-class lineage-and-trigger primitive with a canonical URI, freshness metadata, produced-by / consumed-by edges in the metadata DB, and a dedicated Assets tab in the UI. The import path is from airflow.assets import Asset; the old from airflow.datasets import Dataset still works through Airflow 3.1 with a deprecation warning and is fully removed in 3.2. Usage patterns transfer directly: @task(outlets=[Asset("s3://...")]) marks a task as an Asset producer; @dag(schedule=[Asset("s3://...")]) triggers a DAG on Asset update. The MLOps story is built on top of Assets — model-registry hooks (Vertex, MLflow, Mosaic) consume and emit Assets, and asset-scheduled inference DAGs re-run automatically when a new production model ships.

What is dag versioning actually good for?

Three concrete wins. Reproducible backfills — when you trigger airflow dags backfill --logical-date 2026-05-01, the scheduler looks up the DAG version that was live on 2026-05-01 and replays that code, not the current main. This eliminates the "backfill silently ran different logic" bug class that has plagued every 2.x deployment. Audit trails — every task instance in the metadata DB carries a dag_version_id; auditors and post-mortems can query "which code ran this task instance?" and get a deterministic answer. Fast bad-deploy rollback — a bad DAG version can be marked deprecated (airflow dags versions deprecate <dag_id> --version-id N), and the scheduler auto-falls-back to the newest non-deprecated version. This is faster and safer than a git revert (which would create yet another version and delay recovery). To get the full value, pair versioning with bundle storage ([dag_bundles.s3_prod] type=s3, snapshot_on_parse=true) so historical DAG source is preserved in S3/GCS/ABS; without that, versioning captures the hash but not the source, and reproducibility is only partial.

Which release should I upgrade to first — 3.0 or 3.1?

Go to 3.1, and if the timeline allows, wait for the latest 3.1.x point release before cutover. 3.0 (April 2025) shipped the big features — Edge Workers, DAG versioning, Assets, unified UI — but 3.0.0–3.0.3 had rough edges around the Assets rename compatibility shim, some Edge Worker heartbeat edge cases, and provider-package interop. 3.1 (landing late 2025) polishes those, adds the unified UI as default (not behind a flag), stabilises the DAG versioning bundle storage path, and expands the MLOps operator catalogue. For a new deployment, 3.1.2+ is the current recommended target. For an existing 2.9 deployment migrating in 2026, going directly to 3.1 skips the 3.0 stability tax; the migration effort is nearly identical for either target.

Practice on PipeCode

  • Drill the ETL practice library → for the orchestration, backfill, and Asset-scheduling problems senior interviewers love.
  • Rehearse on the SQL practice library → for the metadata-query, audit-trail, and version-history problems that Airflow 3's versioning story makes routine.
  • Sharpen the platform axis with the optimization practice library → for the pool-sizing, worker-scaling, and cutover-tuning problems that decide production Airflow health.
  • Stack the prerequisites against PipeCode's broader 450+ data-engineering catalogue to anchor the workers-versioning-MLOps-UI framing against real graded inputs.

Lock in Airflow 3 muscle memory

Release notes describe features. PipeCode drills describe the decision — when Edge Workers replace a VPN, when DAG versioning saves an audit, when Assets replace a cron. Pipecode.ai is Leetcode for Data Engineering — pattern-first practice tuned for the production trade-offs senior data engineers actually face.

Practice ETL problems →
Practice optimization problems →

Top comments (0)