DEV Community

Cover image for How to Add a Data Quality Gate to Your Airflow Pipeline in 5 Minutes
Vignesh
Vignesh

Posted on

How to Add a Data Quality Gate to Your Airflow Pipeline in 5 Minutes

Your Airflow DAG ran successfully last night. Your dashboard is broken this morning.

Somewhere between the upstream API and your warehouse, a field went null, a type changed, or a schema drifted — and nothing caught it. dbt tests and Great Expectations are great, but they run after the data is already written. By the time they flag an issue, bad rows have been in production for hours.

This post shows how to add a real-time quality gate before the write happens — using DataScreenIQ inside an Airflow DAG.


The problem with post-insert validation

Most data quality tools follow the same pattern:

Extract → Load → Transform → Test

The test step catches issues — but only after the data is already in your warehouse. The feedback loop is slow. The damage is already done.

What we actually want is:

Extract → Screen → Load (only if clean) → Transform


Setup

pip install datascreeniq apache-airflow
Enter fullscreen mode Exit fullscreen mode

Get a free API key at datascreeniq.com (500K rows/month,
no credit card).

Set your environment variable:

export DATASCREENIQ_API_KEY=dsiq_live_...
Enter fullscreen mode Exit fullscreen mode

The basic quality gate task

import datascreeniq as dsiq
from datascreeniq.exceptions import DataQualityError

def quality_gate(rows: list, source: str) -> dict:
    """Drop this into any DAG as a quality gate."""
    client = dsiq.Client()  # reads DATASCREENIQ_API_KEY from env
    report = client.screen(rows, source=source)

    if report.is_blocked:
        raise ValueError(
            f"Data quality gate FAILED for '{source}': "
            f"health={report.health_pct}, "
            f"issues={report.type_mismatches or report.null_rates}"
        )

    if report.is_warn:
        import logging
        logging.warning(
            f"Data quality WARNING for '{source}': {report.summary()}"
        )

    return report.to_dict()
Enter fullscreen mode Exit fullscreen mode

Full Airflow DAG example

from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
import datascreeniq as dsiq
import requests

default_args = {"owner": "airflow", "retries": 1}

with DAG(
    dag_id="orders_pipeline_with_quality_gate",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=days_ago(1),
    catchup=False,
) as dag:

    @task
    def extract_orders() -> list:
        """Extract rows from upstream API."""
        response = requests.get("https://api.example.com/orders")
        return response.json()["data"]

    @task
    def screen_data_quality(rows: list) -> dict:
        """
        Quality gate — screens rows before they reach the warehouse.
        Raises ValueError on BLOCK, logs warning on WARN.
        """
        client = dsiq.Client()
        report = client.screen(rows, source="orders")

        if report.is_blocked:
            raise ValueError(
                f"Pipeline blocked: {report.summary()}\n"
                f"Type mismatches: {report.type_mismatches}\n"
                f"Null rates: {report.null_rates}"
            )

        if report.is_warn:
            print(f"WARNING: {report.summary()}")

        return report.to_dict()

    @task
    def load_to_warehouse(rows: list, quality_report: dict):
        """
        Only runs if quality gate passed.
        quality_report is passed for logging/auditing.
        """
        print(f"Loading {len(rows)} rows — quality: {quality_report['status']}")
        # your warehouse load logic here

    # DAG wiring
    rows = extract_orders()
    report = screen_data_quality(rows)
    load_to_warehouse(rows, report)
Enter fullscreen mode Exit fullscreen mode

What the report looks like

When a batch has issues, you get a structured report back:

# report.summary() output:
🚨 BLOCK | Health: 34.0% | Rows: 1200 | 
Type mismatches: amount | Null rate: email=67% | (8ms)

# Structured access:
report.status           # "BLOCK"
report.health_pct       # "34.0%"
report.type_mismatches  # ["amount"]
report.null_rates       # {"email": 0.67}
report.drift            # schema drift events vs baseline
Enter fullscreen mode Exit fullscreen mode

Handling the BLOCK in production

When the gate blocks, you have a few options depending on
your pipeline:

@task
def screen_data_quality(rows: list) -> dict:
    client = dsiq.Client()
    report = client.screen(rows, source="orders")

    if report.is_blocked:
        # Option 1: Halt the pipeline (raises, DAG fails)
        raise ValueError(f"Quality gate failed: {report.summary()}")

        # Option 2: Send to dead-letter queue instead
        # send_to_dlq(rows, reason=report.summary())
        # return {"status": "QUARANTINED", "report": report.to_dict()}

        # Option 3: Alert and continue (not recommended for critical data)
        # alert_slack(report.summary())

    return report.to_dict()
Enter fullscreen mode Exit fullscreen mode

Drift detection across batches

After the first run, every subsequent batch is compared against your stored schema baseline.

This catches issues like:

  • A field that was always number suddenly arriving as string
  • A column that was never null now showing 40% nulls
  • A new field appearing that wasn't in the original schema
# drift events in the report
for event in report.drift:
    print(f"{event['kind']}: {event['field']}{event['detail']}")

# field_added: user_age — New field not in previous schema
# type_changed: amount — number → string  
# null_spike: email — null rate increased from 2% to 67%
Enter fullscreen mode Exit fullscreen mode

Custom thresholds per source

Different sources have different tolerance levels. Override
the defaults inline:

report = client.screen(
    rows,
    source="orders",
    options={
        "thresholds": {
            "null_rate_warn": 0.05,   # warn if >5% nulls
            "null_rate_block": 0.2,   # block if >20% nulls
            "type_mismatch_block": 0.05,
        }
    }
)
Enter fullscreen mode Exit fullscreen mode

Screening a CSV or DataFrame instead

If your DAG extracts files rather than API rows:

# CSV file
report = client.screen_file("orders.csv", source="orders")

# pandas DataFrame
import pandas as pd
df = pd.read_csv("orders.csv")
report = client.screen_dataframe(df, source="orders")
Enter fullscreen mode Exit fullscreen mode

Summary

The pattern is simple:

  1. Extract your data as usual
  2. Pass rows through client.screen() before the load step
  3. Raise on BLOCK, log on WARN, proceed on PASS
  4. Every subsequent batch gets drift-checked against your baseline automatically

No infrastructure changes. No new database. No config files.
Just a quality gate that takes 5 minutes to add and catches
issues your downstream tests never will.


Install:

pip install datascreeniq
Enter fullscreen mode Exit fullscreen mode

Free tier: 500K rows/month — datascreeniq.com

SDK + more examples:
github.com/AppDevIQ/datascreeniq-python


Have questions or a different integration pattern you'd like
covered? Drop a comment below.

Top comments (0)