Most Airflow DAGs have zero data quality checks. The pipeline runs, data lands in the warehouse, and you find out something is wrong when a stakeholder asks why the dashboard numbers look off. Three days later.
Adding quality checks feels like a project: pick a tool, configure it, write checks for every table, maintain them as schemas change. So it never happens.
Here's how to add auto-generated data quality checks to any Airflow DAG in under 5 minutes. No configuration, no writing checks by hand.
Option 1: BashOperator (zero install beyond pip)
If you already have DQLens installed in your Airflow environment:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG("my_pipeline", start_date=datetime(2026, 1, 1), schedule="@daily") as dag:
load_data = BashOperator(
task_id="load_data",
bash_command="python load_script.py",
)
quality_check = BashOperator(
task_id="quality_check",
bash_command=(
"dqlens init $DATABASE_URL --schema public && "
"dqlens profile && "
"dqlens run --ci --focus high"
),
env={"DATABASE_URL": "postgresql://user:pass@host:5432/db"},
)
load_data >> quality_check
That's it. After your data loads, DQLens profiles every table, compares against the previous run, and fails the task if it finds HIGH severity problems.
Option 2: DQLensOperator (cleaner, typed)
pip install airflow-provider-dqlens
from airflow import DAG
from dqlens_airflow.operators import DQLensOperator
from datetime import datetime
with DAG("my_pipeline", start_date=datetime(2026, 1, 1), schedule="@daily") as dag:
load_data = ...
quality_check = DQLensOperator(
task_id="quality_check",
conn_id="my_postgres",
schema="public",
focus="high",
)
load_data >> quality_check
The operator reads your Airflow connection, profiles the database, and fails if problems are found. Results are pushed to XCom so downstream tasks can access them.
What it catches (without you writing anything)
On the first run, DQLens profiles your tables and stores a baseline. On every subsequent run, it compares and flags:
- Null rate spikes: email column went from 0.1% null to 12% null
- Row count anomalies: table grew 50% overnight (possible duplicate ingestion)
- Schema drift: a column was dropped or changed type
- Empty strings: columns that pass not-null checks but carry no information
- Freshness: data that hasn't been updated recently
Every finding has a severity level (HIGH / MEDIUM / LOW). The focus="high" parameter means only structural problems (FK violations, schema changes, major null spikes) fail the task. Medium and low findings are logged but don't block the pipeline.
Why not Great Expectations or Soda?
Both require you to write every check by hand. Great Expectations needs Python expectation suites. Soda needs YAML check definitions. For 200 tables, that's days of work and ongoing maintenance as schemas change.
DQLens generates checks automatically from your data. You add one task to your DAG and get coverage you never had to write.
Accessing results downstream
from airflow.operators.python import PythonOperator
def review_results(**context):
results = context["ti"].xcom_pull(task_ids="quality_check")
if results:
print(f"Tables profiled: {results['tables_profiled']}")
print(f"Findings: {results['findings_count']}")
print(f"Passed: {results['passed_count']}")
review = PythonOperator(
task_id="review",
python_callable=review_results,
)
quality_check >> review
Supported databases
PostgreSQL, DuckDB, SQLite, MySQL. The operator reads your Airflow connection type and builds the right connection URL automatically.
Try it
pip install airflow-provider-dqlens
Add one task to your DAG. Run it. See what it finds.
GitHub: github.com/vahid110/airflow-provider-dqlens
Core engine: github.com/vahid110/dqlens
If your DAG loads data but doesn't check it, you're flying blind. One task fixes that.
Top comments (0)