Airflow Best Practices Guide
A practical guide to designing production Airflow DAGs that are reliable,
testable, and maintainable.
DAG Design Principles
1. Idempotency
Every task should be safe to re-run without side effects:
- Use
MERGE/INSERT OVERWRITEinstead ofINSERT - Partition target tables by execution date
- Use
replaceWherewith Delta Lake
# Good: idempotent partition overwrite
df.write.format("delta").mode("overwrite") \
.option("replaceWhere", f"date = '{ds}'") \
.saveAsTable("gold.daily_metrics")
# Bad: append creates duplicates on re-run
df.write.format("delta").mode("append").saveAsTable("gold.daily_metrics")
2. Atomicity
Each task should do one thing. If a task fails halfway through, the
state should be either "not started" or "fully complete" — never
partially complete.
3. Small tasks over large monoliths
Break large processing into separate extract / transform / quality / load
tasks. This gives you:
- Granular retry (only re-run what failed)
- Clear observability (which step is slow?)
- Parallel execution where possible
XCom Best Practices
XCom is for small metadata, not large datasets.
| Good uses | Bad uses |
|---|---|
| File paths | DataFrames |
| Row counts | Full query results |
| Status strings | Large JSON payloads (>48KB) |
| Timestamps | Binary data |
# Push small metadata
context["ti"].xcom_push(key="row_count", value=42)
# Pull in downstream task
count = context["ti"].xcom_pull(task_ids="transform", key="row_count")
Dynamic DAGs
Generate DAGs programmatically for multiple pipelines:
PIPELINES = ["customers", "orders", "products"]
for pipeline in PIPELINES:
dag_id = f"etl_{pipeline}"
with DAG(dag_id=dag_id, schedule="@daily", ...) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_fn,
op_kwargs={"table": pipeline},
)
# ... more tasks
globals()[dag_id] = dag # Register in Airflow
Warning: Keep the number of dynamic DAGs reasonable (<100).
Each DAG adds scheduler overhead.
Airflow 2.x Patterns
TaskFlow API (Airflow 2.0+)
from airflow.decorators import dag, task
@dag(schedule="@daily", start_date=datetime(2025, 1, 1))
def my_pipeline():
@task
def extract() -> dict:
return {"path": "/data/raw/file.json"}
@task
def transform(data: dict) -> int:
return 42
@task
def load(row_count: int) -> None:
print(f"Loaded {row_count} rows")
data = extract()
rows = transform(data)
load(rows)
my_pipeline()
Task Groups (Airflow 2.0+)
Replace SubDAGs with TaskGroups for visual grouping:
from airflow.utils.task_group import TaskGroup
with TaskGroup("quality_checks") as qc:
check_nulls = PythonOperator(...)
check_dupes = PythonOperator(...)
check_freshness = PythonOperator(...)
Dynamic Task Mapping (Airflow 2.3+)
@task
def get_partitions() -> list:
return ["2025-01-01", "2025-01-02", "2025-01-03"]
@task
def process_partition(partition: str) -> None:
print(f"Processing {partition}")
partitions = get_partitions()
process_partition.expand(partition=partitions)
Testing DAGs
Import test (catch syntax errors)
def test_dag_imports():
from airflow.models import DagBag
dag_bag = DagBag(include_examples=False)
assert len(dag_bag.import_errors) == 0
Task callable unit tests
Test your Python callables independently of Airflow:
def test_transform():
result = transform_data(input_path="/test/data.json")
assert result["row_count"] > 0
Common Anti-Patterns
| Anti-pattern | Better approach |
|---|---|
| Heavy processing at DAG parse time | Move to task execution |
| Storing data in XCom | Use S3/GCS intermediate storage |
| Using SubDAGs | Use TaskGroups (Airflow 2.0+) |
depends_on_past=True everywhere |
Use sensors or explicit checks |
| Hardcoded connections | Use Airflow Connections + Variables |
| Catching all exceptions silently | Let tasks fail — Airflow handles retries |
By Datanest Digital | Version 1.0.0
This is 1 of 11 resources in the Data Pipeline Pro toolkit. Get the complete [Airflow DAG Templates] with all files, templates, and documentation for $49.
Or grab the entire Data Pipeline Pro bundle (11 products) for $169 — save 30%.
Top comments (0)