Same pattern, third time in six months.
DAG runs green. Airflow shows all tasks succeeded. I wake up to a Slack message saying the revenue dashboard is broken. I dig through logs, trace it back to an extract job from four hours ago — a field that was always numeric now has strings mixed in. 48,000 rows loaded. Every downstream model is wrong.
The fix was obvious in hindsight: the quality check needs to happen before the INSERT, not after it. dbt tests, Great Expectations, warehouse constraints — they're all good tools, but they validate data that's already been written. By the time they flag an issue, the damage is done.
So I built a quality gate task that sits between extract and load. Here's exactly what I did.
The pattern
Before:
extract → load → warehouse ✓
[ bad rows sitting in production ]
[ dashboard broken at 9am ]
After:
extract → screen → PASS → load → warehouse ✓
→ WARN → load + flag for review
→ BLOCK → dead-letter queue, pipeline stops
One task. No custom validation logic. No rules to write and maintain.
Setup
Install the SDK:
pip install datascreeniq
Add your API key to Airflow Variables or environment (free tier at datascreeniq.com — 500K rows/month, no credit card):
export DATASCREENIQ_API_KEY="dsiq_live_..."
The DAG
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
import datascreeniq as dsiq
from datascreeniq.exceptions import DataQualityError
@dag(schedule_interval="@hourly", start_date=days_ago(1), catchup=False)
def orders_pipeline():
@task
def extract():
return fetch_from_source() # your existing extract logic
@task
def quality_gate(rows):
client = dsiq.Client() # reads DATASCREENIQ_API_KEY from env
try:
client.screen(rows, source="orders").raise_on_block()
except DataQualityError as e:
# Raises AirflowException — task fails, DAG stops
raise ValueError(f"Quality gate failed: {e.report.summary()}")
return rows # only passes through on PASS or WARN
@task
def load(rows):
insert_to_warehouse(rows) # only runs if quality_gate passed
rows = extract()
clean = quality_gate(rows)
load(clean)
dag = orders_pipeline()
That's the whole thing. Three tasks, one new one in the middle.
What it actually checks
The screen call runs 18 checks in a single pass on a deterministic sample of your rows:
- Null rates — columns with too many missing values
- Type mismatches — a field that's always been numeric now has strings
- Schema drift — new fields appeared, known fields disappeared, a type changed from the baseline
-
Empty string rates — fields full of
""instead ofnull - Duplicate rates — cardinality collapse
- Outliers — numeric values beyond 1.5× IQR
- Timestamp staleness — most recent timestamp older than expected
The response comes back in under 10ms (runs on Cloudflare Workers at the edge, no cold starts). Your pipeline barely notices it.
The response looks like this:
{
"status": "BLOCK",
"health_score": 0.34,
"decision": {
"action": "BLOCK",
"reason": "Type mismatch in 'amount'; High null rate in 'email' (50%)"
},
"issues": {
"type_mismatches": {
"amount": {
"expected": "number",
"found": ["string"],
"sample_value": "broken",
"severity": "critical"
}
},
"null_rates": {
"email": { "actual": 0.50, "threshold": 0.30, "severity": "critical" }
}
},
"drift": [
{
"field": "amount",
"kind": "type_changed",
"severity": "block",
"detail": "Field 'amount' changed from number to mixed"
}
],
"latency_ms": 7
}
The decision.reason field is what ends up in your Airflow task failure message — so when the pipeline stops, you know immediately what went wrong and in which field.
Handling WARN vs BLOCK
By default:
- PASS — pipeline continues, nothing logged
- WARN — pipeline continues, issue is in the report
-
BLOCK —
raise_on_block()raisesDataQualityError, task fails, DAG stops
If you want WARN to also stop the pipeline (useful for financial data pipelines where you want zero tolerance):
@task
def quality_gate(rows):
client = dsiq.Client()
report = client.screen(rows, source="payments")
if report.is_warn:
raise ValueError(f"Quality warning on payments: {report.summary()}")
if report.is_blocked:
raise ValueError(f"Quality blocked on payments: {report.summary()}")
return rows
Per-source thresholds
The default thresholds are reasonable for most sources — WARN at 30% null, BLOCK at 70%. But a payments pipeline and an events pipeline have completely different tolerances.
You can set per-source thresholds inline:
# Financial data — very tight
client.screen(
rows,
source="payments",
options={
"thresholds": {
"null_rate_warn": 0.01, # WARN if > 1% nulls
"null_rate_block": 0.02, # BLOCK if > 2% nulls
"type_mismatch_warn": 0.0, # WARN on any type mismatch at all
"type_mismatch_block": 0.01,
}
}
)
# Event tracking — high null rate is normal
client.screen(
rows,
source="events",
options={
"thresholds": {
"null_rate_warn": 0.50, # user_agent nulls are expected
"null_rate_block": 0.80,
}
}
)
Or set them once in the dashboard and they apply automatically to every future call for that source.
The part that surprised me
The obvious failures — the ones you'd catch anyway — aren't what makes this worth it.
What makes it worth it is the slow drift. A null rate creeping up 2% per week. A field that was 99% numeric and is now 94% numeric. Nobody notices either of those until it's been wrong for a month and your ML model has been training on garbage.
Because the baseline is set on the first run and every subsequent batch is compared against it, even small shifts get caught. That's the thing dbt tests don't do — they test what you wrote rules for. This catches what you didn't know to write rules for.
Resources
Free tier is 500K rows/month. Enough for most pipelines to run for months before needing to upgrade.
Curious how others are handling this. Are you doing quality checks pre-load, post-load, or both? And if pre-load — what are you using?
Top comments (0)