---
title: "Stop Playing Data Detective: Automated Lineage Tracing Across Your Entire Pipeline Stack"
published: false
tags: [dataengineering, python, dbt, tutorial]
---
# Stop Playing Data Detective: Automated Lineage Tracing Across Your Entire Pipeline Stack
Picture this: it's 4:47 PM on a Friday. Someone renamed a column in a source table. Your Slack is on fire. Three dashboards are broken, an Airflow DAG is throwing cryptic errors, and a Spark job silently swallowed bad data for the last six hours. You're about to spend your weekend playing data detective — manually grepping through YAML files, SQL transforms, and Python scripts trying to answer one deceptively simple question:
**What broke, and what else is about to?**
This is the problem DataLineage was built to eliminate. In this tutorial, we'll wire it into a realistic multi-tool pipeline (dbt + Airflow + Spark) and watch it answer that Friday-afternoon question in seconds.
---
## What We're Building
We'll simulate a pipeline that looks like most production data stacks:
raw_orders (Postgres)
└── dbt: stg_orders
└── dbt: fct_orders
├── Airflow DAG: daily_revenue_report
└── Spark job: customer_ltv_model
When `raw_orders` gets a schema change, we'll use DataLineage to instantly surface every downstream consumer — before they surface themselves as incidents.
---
## Prerequisites
- Python 3.9+
- A DataLineage account and API key (set as `DATALINEAGE_API_KEY` in your environment)
- `requests` and `python-dotenv` installed
bash
pip install requests python-dotenv
---
## Step 1: Register Your Pipeline Assets
Before DataLineage can trace anything, it needs to know what exists. Think of this as drawing the map before you navigate it.
We'll register each node in our pipeline using the `POST /lineage/trace` endpoint. This endpoint doesn't just store metadata — it actively discovers dependency relationships between the assets you register.
python
import os
import requests
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("DATALINEAGE_API_KEY")
BASE_URL = "https://api.datalineage.io/v1"
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
def register_asset(asset_type: str, name: str, source_ref: str, upstream: list[str] = None) -> dict:
"""Register a pipeline asset and its upstream dependencies."""
payload = {
"asset_type": asset_type, # "table", "dbt_model", "dag", "spark_job"
"name": name,
"source_ref": source_ref, # file path, table name, or DAG id
"upstream_assets": upstream or []
}
response = requests.post(
f"{BASE_URL}/lineage/trace",
headers=HEADERS,
json=payload
)
if response.status_code == 422:
raise ValueError(f"Invalid asset definition: {response.json()['detail']}")
if response.status_code != 201:
response.raise_for_status()
return response.json()
Register the source table
raw_orders = register_asset(
asset_type="table",
name="raw_orders",
source_ref="postgres://warehouse/raw/orders"
)
print(f"Registered source: {raw_orders['id']}")
Register dbt models, referencing upstream assets by name
stg_orders = register_asset(
asset_type="dbt_model",
name="stg_orders",
source_ref="models/staging/stg_orders.sql",
upstream=["raw_orders"]
)
fct_orders = register_asset(
asset_type="dbt_model",
name="fct_orders",
source_ref="models/marts/fct_orders.sql",
upstream=["stg_orders"]
)
Register downstream consumers
register_asset(
asset_type="dag",
name="daily_revenue_report",
source_ref="dags/daily_revenue_report.py",
upstream=["fct_orders"]
)
register_asset(
asset_type="spark_job",
name="customer_ltv_model",
source_ref="jobs/customer_ltv.py",
upstream=["fct_orders"]
)
print("Pipeline graph registered successfully.")
**Expected output:**
plaintext
Registered source: asset_7f3a2c1b
Pipeline graph registered successfully.
> **Best practice:** Automate this registration step in your CI/CD pipeline. Every time a new dbt model or DAG is merged, register it automatically. Stale lineage graphs are almost as dangerous as no lineage graphs.
---
## Step 2: Retrieve and Visualize the Lineage Graph
Now let's pull the full lineage graph for `raw_orders` and see what DataLineage discovered. The `GET /lineage/{id}` endpoint returns the complete dependency tree — both upstream and downstream — for any registered asset.
python
def get_lineage(asset_id: str, direction: str = "downstream") -> dict:
"""
Fetch the lineage graph for an asset.
direction: "upstream", "downstream", or "both"
"""
response = requests.get(
f"{BASE_URL}/lineage/{asset_id}",
headers=HEADERS,
params={"direction": direction, "depth": 10} # depth: how many hops to traverse
)
if response.status_code == 404:
raise LookupError(f"Asset {asset_id} not found. Has it been registered?")
if response.status_code != 200:
response.raise_for_status()
return response.json()
def print_lineage_tree(node: dict, indent: int = 0) -> None:
"""Recursively print the lineage tree in a readable format."""
prefix = " " * indent + ("└── " if indent > 0 else "")
asset_type = node.get("asset_type", "unknown").upper()
print(f"{prefix}[{asset_type}] {node['name']}")
for child in node.get("downstream", []):
print_lineage_tree(child, indent + 1)
Fetch and display the full downstream graph
lineage = get_lineage(raw_orders["id"], direction="downstream")
print_lineage_tree(lineage["graph"])
**Expected output:**
plaintext
[TABLE] raw_orders
└── [DBT_MODEL] stg_orders
└── [DBT_MODEL] fct_orders
└── [DAG] daily_revenue_report
└── [SPARK_JOB] customer_ltv_model
That's your entire pipeline dependency chain, rendered from a single API call. No YAML archaeology required.
---
## Step 3: Run an Impact Analysis Before a Schema Change
Here's where DataLineage earns its keep. Before you merge that migration that renames `order_total` to `total_amount`, run an impact analysis. The `POST /lineage/impact` endpoint simulates the blast radius of a proposed change.
python
def analyze_impact(asset_id: str, changes: list[dict]) -> dict:
"""
Simulate the impact of schema changes before they happen.
changes: list of proposed modifications, e.g.:
[{"type": "column_rename", "from": "order_total", "to": "total_amount"}]
"""
payload = {
"asset_id": asset_id,
"proposed_changes": changes
}
response = requests.post(
f"{BASE_URL}/lineage/impact",
headers=HEADERS,
json=payload
)
if response.status_code == 400:
raise ValueError(f"Malformed change spec: {response.json()['detail']}")
if response.status_code != 200:
response.raise_for_status()
return response.json()
Simulate renaming a column in raw_orders
proposed_changes = [
{"type": "column_rename", "from": "order_total", "to": "total_amount"},
{"type": "column_drop", "column": "legacy_discount_code"}
]
impact_report = analyze_impact(raw_orders["id"], proposed_changes)
print(f"\n{'='*50}")
print(f"IMPACT ANALYSIS REPORT")
print(f"{'='*50}")
print(f"Risk level: {impact_report['risk_level'].upper()}")
print(f"Affected assets: {impact_report['affected_count']}")
print(f"\nBreaking changes detected:")
for affected in impact_report["affected_assets"]:
print(f"\n ⚠ {affected['name']} ({affected['asset_type']})")
print(f" References: {', '.join(affected['affected_references'])}")
print(f" Severity: {affected['severity']}")
**Expected output:**
plaintext
IMPACT ANALYSIS REPORT
Risk level: HIGH
Affected assets: 4
Breaking changes detected:
⚠ stg_orders (dbt_model)
References: order_total, legacy_discount_code
Severity: breaking
⚠ fct_orders (dbt_model)
References: order_total
Severity: breaking
⚠ daily_revenue_report (dag)
References: order_total
Severity: breaking
⚠ customer_ltv_model (spark_job)
References: order_total
Severity: warning
That's four assets flagged, with specific column references identified, before a single line of production code changed. The difference between a controlled migration and a Friday incident.
---
## Putting It All Together: A Pre-Migration Safety Check
Here's a utility function you can drop into any migration workflow or CI pipeline:
python
def safe_migration_check(asset_id: str, changes: list[dict]) -> bool:
"""
Returns True if safe to proceed, False if breaking changes detected.
Designed to be used as a CI gate.
"""
try:
report = analyze_impact(asset_id, changes)
except (ValueError, requests.HTTPError) as e:
print(f"[ERROR] Impact analysis failed: {e}")
return False # Fail safe: block the migration if we can't assess impact
breaking = [a for a in report["affected_assets"] if a["severity"] == "breaking"]
if breaking:
print(f"[BLOCKED] {len(breaking)} breaking change(s) detected. Fix before merging.")
for asset in breaking:
print(f" - {asset['name']}: update references to {asset['affected_references']}")
return False
print(f"[OK] No breaking changes. {report['affected_count']} assets may need review.")
return True
Use as a migration gate
changes = [{"type": "column_rename", "from": "order_total", "to": "total_amount"}]
is_safe = safe_migration_check(raw_orders["id"], changes)
sys.exit(0 if is_safe else 1) # Integrate cleanly with CI/CD exit codes
---
## What You've Built
In about 80 lines of Python, you've replaced hours of manual dependency tracing with an automated safety net that:
1. **Maintains a living map** of your entire pipeline graph across dbt, Airflow, and Spark
2. **Answers "what does this asset feed?"** with a single API call
3. **Catches breaking changes before deployment** with specific, actionable impact reports
The Friday 4:47 PM scenario doesn't disappear — schema changes will always happen. But now, instead of discovering the blast radius after the fact, you know it before you merge.
That's not just better tooling. That's a fundamentally different relationship with your data infrastructure.
---
*DataLineage documentation: [docs.datalineage.io](https://docs.datalineage.io) | Questions? Drop them in the comments below.*
Top comments (0)