DEV Community

Rizwan Saleem
Rizwan Saleem

Posted on

Model-driven testing for complex data pipelines

Model-driven testing for complex data pipelines

Model-driven testing for complex data pipelines

Data pipelines are the lifeblood of modern analytics and ML workflows. When data travels from sources through transformations to analytics dashboards or ML models, correctness and reliability become non-negotiable. This tutorial walks through a practical, model-driven approach to testing data pipelines end-to-end. You’ll learn how to express expectations as live, versioned models; how to encode and run tests automatically; and how to integrate this workflow into CI/CD so data products stay trustworthy as they evolve.

Why a model-driven approach

  • Data is not just code: it’s structured, typed, and often probabilistic. You need a way to express expected shapes, distributions, and invariants.
  • Pipelines involve multiple stages: extraction, cleansing, enrichment, aggregation, and loading. A test that only checks the final output can miss upstream issues.
  • Versioning matters: changes in source schemas or transformation logic must be validated against a defined contract to prevent regressions.

Key idea: represent the “advertised” data contract as a set of models, each describing what the stage should produce, given what it consumes. Tests then compare actual data against these models and surface deviations early.

The core concepts

  • Data contracts: formal, versioned schemas and invariants for each stage (shape, ranges, nullability, uniqueness, referential integrity).
  • Ground truth tests: deterministic checks against a known dataset or controlled seed data.
  • Statistical tests: checks on distributions, percentiles, or sample-based expectations for non-deterministic data.
  • Test doubles: synthetic data generators and mocks for external systems to enable isolated testing.
  • Test orchestration: a lightweight runner that can execute per-stage validations as part of CI/CD.
  • Observability hooks: metrics, logs, and alerts tied to test results to help triage failures quickly. ### Planning your model-driven tests

1) Map the pipeline to stages

  • Ingest → Core cleanse → Enrichment → Aggregation → Sink
  • For each stage, define a data contract that captures:
    • Schema: field names, types, nullability
    • Constraints: unique keys, referential integrity
    • Quality metrics: ranges, allowed nulls, allowed distributions

2) Define test targets

  • Deterministic datasets (seeded in CI)
  • Small synthetic datasets that cover edge cases
  • Production-like samples for statistical checks

3) Decide testing levels

  • Unit tests per stage (contract tests)
  • Integration tests across adjacent stages
  • End-to-end tests on a sample run

4) Choose tooling that fits your stack

  • Python with pandas and pytest for data validation
  • SQL-based checks in your data warehouse (dbt tests, BigQuery QA queries)
  • Data-quality frameworks (Great Expectations, Deequ) if you enjoy declarative contracts
  • Lightweight orchestration (Makefiles, GitHub Actions, or a simple Python script) ### A concrete example

Imagine a data pipeline that processes e-commerce orders:

  • Source: orders.csv with fields order_id, user_id, amount, currency, created_at
  • Stage 1: cleansing ensures amount is non-negative, currency is valid, timestamps parsed
  • Stage 2: enrichment adds user_country from a user_dim table
  • Stage 3: aggregation computes daily revenue per country
  • Sink: daily_revenue table in a data warehouse

We’ll implement a model-driven test suite using Python, pandas, and Great Expectations-like concepts (simplified). The goal is to catch issues at each stage before they propagate.

Step 1: Define data contracts (models)

Create a contracts.yaml (or contracts.json) that encodes expectations per stage.

contracts.yaml

  • stage: cleanse
    input_schema:

    • name: order_id; type: string; required: true
    • name: user_id; type: string; required: true
    • name: amount; type: float; required: true
    • name: currency; type: string; required: true
    • name: created_at; type: string; required: true output_schema:
    • name: order_id; type: string; required: true
    • name: user_id; type: string; required: true
    • name: amount; type: float; min: 0.0
    • name: currency; type: string; allowed: [USD, EUR, GBP]
    • name: created_at; type: timestamp; required: true invariants:
    • order_id unique
    • created_at not in future
  • stage: enrich
    input_schema:

    • name: order_id; type: string
    • name: user_id; type: string
    • name: amount; type: float
    • name: currency; type: string
    • name: created_at; type: timestamp output_schema:
    • name: order_id; type: string
    • name: user_id; type: string
    • name: amount; type: float
    • name: currency; type: string
    • name: created_at; type: timestamp
    • name: user_country; type: string; allowed: [US, GB, FR, etc.] invariants:
    • user_id exists in user_dim
  • step: aggregate_daily
    input_schema:

    • name: order_id; type: string
    • name: user_country; type: string
    • name: amount; type: float
    • name: created_at; type: timestamp output_schema:
    • name: date; type: date
    • name: country; type: string
    • name: revenue; type: float invariants:
    • revenue >= 0

This is intentionally lightweight. In practice, you’d formalize with your chosen format and tooling.

Step 2: Implement test utilities

Create a test_utils.py with helpers to load contracts, run stage tests, and report results.

test_utils.py
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, Any, List

def load_contracts(path: str) -> List[Dict[str, Any]]:
import yaml
with open(path, 'r') as f:
return yaml.safe_load(f)

def assert_schema_match(df: pd.DataFrame, schema: List[Dict[str, Any]]):
for field in schema:
name = field['name']
if name not in df.columns:
raise AssertionError(f"Missing column: {name}")
# Type checks are lightweight; you can extend as needed
# Example: ensure non-nullability
if field.get('required', False):
if df[name].isnull().any():
raise AssertionError(f"Column {name} has nulls but is required")
# Additional constraints (min, max, allowed) can be added here as needed

def check_invariants(df: pd.DataFrame, invariants: List[Dict[str, Any]]):
for inv in invariants:
if inv.get('order_id', '') == 'unique':
if df['order_id'].duplicated().any():
raise AssertionError("order_id is not unique")
# Extend with more invariant checks as needed

def run_stage_test(stage_name: str, df_in: pd.DataFrame, contract: Dict[str, Any]):
out_schema = contract['output_schema']
invariants = contract.get('invariants', [])
# Here, you’d actually run the stage logic to get df_out.
# For demonstration, we'll assume df_in is the supposed input and df_out is produced elsewhere.
df_out = df_in # placeholder
assert_schema_match(df_out, out_schema)
check_invariants(df_out, invariants)
return df_out

Step 3: Create synthetic test data

tests/data/seed_orders.csv
order_id,user_id,amount,currency,created_at
o1,u1,100.0,USD,2026-01-01 12:00:00
o2,u2,50.0,EUR,2026-01-01 12:05:00
o3,u3,-10.0,GBP,2026-01-01 12:10:00 # This will fail amount >= 0

In a real test, you’d generate valid and edge-case rows programmatically to avoid manual edits.

Python script to generate seeds:
import pandas as pd
from datetime import datetime, timedelta

rows = [
{"order_id":"o1","user_id":"u1","amount":100.0,"currency":"USD","created_at":"2026-01-01 12:00:00"},
{"order_id":"o2","user_id":"u2","amount":50.0,"currency":"EUR","created_at":"2026-01-01 12:05:00"},
{"order_id":"o3","user_id":"u3","amount":-10.0,"currency":"GBP","created_at":"2026-01-01 12:10:00"},
]
pd.DataFrame(rows).to_csv("tests/data/seed_orders.csv", index=False)

Step 4: Wire it into CI

  • Create a lightweight runner that executes per-stage tests against seed data.
  • Use a matrix of Python versions if you support multiple environments.
  • Fail the pipeline on any contract breach, and generate a human-friendly report.

ci.yml (GitHub Actions example)
name: Data pipeline model tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install deps
run: |
python -m pip install upgrade pip
pip install pyyaml pandas
- name: Run data contract tests
run: |
python tests/run_contract_tests.py

tests/run_contract_tests.py
import pandas as pd
import yaml
from test_utils import load_contracts, run_stage_test

contracts = load_contracts('contracts.yaml')

Load seed data for the first stage

df_seed = pd.read_csv('tests/data/seed_orders.csv', parse_dates=['created_at'])

Simulate a pipeline: cleanse stage

cleanse_contract = contracts['stage'].get('output_schema', [])

In a full pipeline, you would execute the actual stage logic to obtain df_cleanse

df_cleanse = df_seed.copy()

Apply a simple cleanse: drop negative amounts (as a proxy)

df_cleanse.loc[df_cleanse['amount'] < 0, 'amount'] = None

try:
run_stage_test('cleanse', df_cleanse, contracts)
print("Cleansing stage tests passed")
except AssertionError as e:
print(f"Cleansing stage test failed: {e}")
raise

You would continue for enrich and aggregate stages similarly, chaining outputs

Practical tips

  • Start small: draft a single stage contract and a couple of seed rows. Validate, iterate.
  • Use strict contracts for critical data (financials, identifiers) and looser checks for exploratory data.
  • Automate test data generation to cover edge cases like nulls, duplicates, out-of-range values.
  • Separate data quality tests from business logic tests to keep failures actionable.
  • Integrate test observability: emit test run durations, counts of passing/failing rows, and a summary in PR checks.

    Common pitfalls and how to avoid them

  • Overfitting tests to current data: keep seeds diverse and evolve contracts as the pipeline grows.

  • Slow tests on large datasets: test with bounded samples and synthetic data for unit tests; reserve full-scale validation for nightly runs or staging.

  • Treat tests as production-facing contracts: version contracts, deprecate old ones, and document breaking changes clearly.

    Extending this approach

  • Use a dedicated data quality framework (e.g., Great Expectations) to express expectations as assertions and connect to data sources directly.

  • Add probabilistic checks: compare sample statistics to historical baselines with confidence intervals.

  • Introduce schema evolution testing: verify that downstream contracts remain compatible when upstream schemas change.
    If you’d like, I can tailor this model-driven testing approach to your stack (e.g., Spark, PostgreSQL, BigQuery, dbt). Tell me your pipeline stages, data sources, and your preferred tooling, and I’ll adapt the contracts and test runner accordingly. Would you like a ready-to-run template for your exact stack?

-

Rizwan Saleem | https://rizwansaleem.co

Top comments (0)