Your Airflow DAG ran successfully last night. Your dashboard is broken this morning.
Somewhere between the upstream API and your warehouse, a field went null, a type changed, or a schema drifted — and nothing caught it. dbt tests and Great Expectations are great, but they run after the data is already written. By the time they flag an issue, bad rows have been in production for hours.
This post shows how to add a real-time quality gate before the write happens — using DataScreenIQ inside an Airflow DAG.
The problem with post-insert validation
Most data quality tools follow the same pattern:
Extract → Load → Transform → Test
The test step catches issues — but only after the data is already in your warehouse. The feedback loop is slow. The damage is already done.
What we actually want is:
Extract → Screen → Load (only if clean) → Transform
Setup
pip install datascreeniq apache-airflow
Get a free API key at datascreeniq.com (500K rows/month,
no credit card).
Set your environment variable:
export DATASCREENIQ_API_KEY=dsiq_live_...
The basic quality gate task
import datascreeniq as dsiq
from datascreeniq.exceptions import DataQualityError
def quality_gate(rows: list, source: str) -> dict:
"""Drop this into any DAG as a quality gate."""
client = dsiq.Client() # reads DATASCREENIQ_API_KEY from env
report = client.screen(rows, source=source)
if report.is_blocked:
raise ValueError(
f"Data quality gate FAILED for '{source}': "
f"health={report.health_pct}, "
f"issues={report.type_mismatches or report.null_rates}"
)
if report.is_warn:
import logging
logging.warning(
f"Data quality WARNING for '{source}': {report.summary()}"
)
return report.to_dict()
Full Airflow DAG example
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
import datascreeniq as dsiq
import requests
default_args = {"owner": "airflow", "retries": 1}
with DAG(
dag_id="orders_pipeline_with_quality_gate",
default_args=default_args,
schedule_interval="@daily",
start_date=days_ago(1),
catchup=False,
) as dag:
@task
def extract_orders() -> list:
"""Extract rows from upstream API."""
response = requests.get("https://api.example.com/orders")
return response.json()["data"]
@task
def screen_data_quality(rows: list) -> dict:
"""
Quality gate — screens rows before they reach the warehouse.
Raises ValueError on BLOCK, logs warning on WARN.
"""
client = dsiq.Client()
report = client.screen(rows, source="orders")
if report.is_blocked:
raise ValueError(
f"Pipeline blocked: {report.summary()}\n"
f"Type mismatches: {report.type_mismatches}\n"
f"Null rates: {report.null_rates}"
)
if report.is_warn:
print(f"WARNING: {report.summary()}")
return report.to_dict()
@task
def load_to_warehouse(rows: list, quality_report: dict):
"""
Only runs if quality gate passed.
quality_report is passed for logging/auditing.
"""
print(f"Loading {len(rows)} rows — quality: {quality_report['status']}")
# your warehouse load logic here
# DAG wiring
rows = extract_orders()
report = screen_data_quality(rows)
load_to_warehouse(rows, report)
What the report looks like
When a batch has issues, you get a structured report back:
# report.summary() output:
🚨 BLOCK | Health: 34.0% | Rows: 1200 |
Type mismatches: amount | Null rate: email=67% | (8ms)
# Structured access:
report.status # "BLOCK"
report.health_pct # "34.0%"
report.type_mismatches # ["amount"]
report.null_rates # {"email": 0.67}
report.drift # schema drift events vs baseline
Handling the BLOCK in production
When the gate blocks, you have a few options depending on
your pipeline:
@task
def screen_data_quality(rows: list) -> dict:
client = dsiq.Client()
report = client.screen(rows, source="orders")
if report.is_blocked:
# Option 1: Halt the pipeline (raises, DAG fails)
raise ValueError(f"Quality gate failed: {report.summary()}")
# Option 2: Send to dead-letter queue instead
# send_to_dlq(rows, reason=report.summary())
# return {"status": "QUARANTINED", "report": report.to_dict()}
# Option 3: Alert and continue (not recommended for critical data)
# alert_slack(report.summary())
return report.to_dict()
Drift detection across batches
After the first run, every subsequent batch is compared against your stored schema baseline.
This catches issues like:
- A field that was always
numbersuddenly arriving asstring - A column that was never null now showing 40% nulls
- A new field appearing that wasn't in the original schema
# drift events in the report
for event in report.drift:
print(f"{event['kind']}: {event['field']} — {event['detail']}")
# field_added: user_age — New field not in previous schema
# type_changed: amount — number → string
# null_spike: email — null rate increased from 2% to 67%
Custom thresholds per source
Different sources have different tolerance levels. Override
the defaults inline:
report = client.screen(
rows,
source="orders",
options={
"thresholds": {
"null_rate_warn": 0.05, # warn if >5% nulls
"null_rate_block": 0.2, # block if >20% nulls
"type_mismatch_block": 0.05,
}
}
)
Screening a CSV or DataFrame instead
If your DAG extracts files rather than API rows:
# CSV file
report = client.screen_file("orders.csv", source="orders")
# pandas DataFrame
import pandas as pd
df = pd.read_csv("orders.csv")
report = client.screen_dataframe(df, source="orders")
Summary
The pattern is simple:
- Extract your data as usual
- Pass rows through
client.screen()before the load step - Raise on BLOCK, log on WARN, proceed on PASS
- Every subsequent batch gets drift-checked against your baseline automatically
No infrastructure changes. No new database. No config files.
Just a quality gate that takes 5 minutes to add and catches
issues your downstream tests never will.
Install:
pip install datascreeniq
Free tier: 500K rows/month — datascreeniq.com
SDK + more examples:
github.com/AppDevIQ/datascreeniq-python
Have questions or a different integration pattern you'd like
covered? Drop a comment below.
Top comments (0)