---
title: "Stop Playing Data Detective: Automate Pipeline Dependency Tracing with DataLineage"
published: false
tags: [dataengineering, python, tutorial, dataquality]
---
# Stop Playing Data Detective: Automate Pipeline Dependency Tracing with DataLineage
You know the feeling. It's 2pm on a Tuesday. Someone changed a column name in a source table. By 4pm, three dashboards are broken, a Spark job is throwing cryptic NullPointerExceptions, and your Slack DMs look like a crime scene.
The culprit isn't the schema change — it's the fact that nobody *knew* what depended on that column.
This tutorial walks you through wiring DataLineage into your stack so that the next time a schema shifts, you're the person who already has the answer before anyone asks the question.
---
## What We're Building
By the end of this post, you'll have a working Python script that:
1. Traces dependencies across a mixed pipeline (dbt + Airflow + custom ETL)
2. Stores a lineage graph you can query later
3. Runs an impact analysis *before* a schema change ships
We'll use three endpoints throughout:
- `POST /lineage/trace` — discover and register dependencies
- `GET /lineage/{id}` — retrieve a stored lineage graph
- `POST /lineage/impact` — simulate the blast radius of a change
---
## Prerequisites
bash
pip install requests python-dotenv
Set up your environment:
bash
.env
DATALINEAGE_API_KEY=your_api_key_here
DATALINEAGE_BASE_URL=https://api.datalineage.io/v1
---
## Step 1: Build Your API Client
Before touching any pipeline logic, let's build a thin client wrapper. This keeps auth and error handling in one place — a pattern you'll thank yourself for at 2am.
python
lineage_client.py
import os
import requests
from dotenv import load_dotenv
load_dotenv()
class LineageClient:
def init(self):
self.base_url = os.getenv("DATALINEAGE_BASE_URL")
self.headers = {
"Authorization": f"Bearer {os.getenv('DATALINEAGE_API_KEY')}",
"Content-Type": "application/json"
}
def _handle_response(self, response: requests.Response) -> dict:
"""Centralized response handling with meaningful error messages."""
try:
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
error_body = response.json() if response.content else {}
raise RuntimeError(
f"API error {response.status_code}: "
f"{error_body.get('message', str(e))}"
) from e
except requests.exceptions.ConnectionError:
raise RuntimeError(
"Could not reach DataLineage API. Check your DATALINEAGE_BASE_URL."
)
def trace(self, payload: dict) -> dict:
response = requests.post(
f"{self.base_url}/lineage/trace",
json=payload,
headers=self.headers,
timeout=30
)
return self._handle_response(response)
def get_lineage(self, lineage_id: str) -> dict:
response = requests.get(
f"{self.base_url}/lineage/{lineage_id}",
headers=self.headers,
timeout=15
)
return self._handle_response(response)
def impact(self, payload: dict) -> dict:
response = requests.post(
f"{self.base_url}/lineage/impact",
json=payload,
headers=self.headers,
timeout=30
)
return self._handle_response(response)
---
## Step 2: Trace Your First Pipeline
Now let's register a real dependency graph. The payload describes your pipeline topology — what tools are involved and which assets connect them.
python
trace_pipeline.py
from lineage_client import LineageClient
client = LineageClient()
pipeline_payload = {
"pipeline_name": "customer_revenue_pipeline",
"environment": "production",
"sources": [
{
"tool": "custom_etl",
"asset": "raw.stripe_events",
"schema": {
"customer_id": "string",
"amount_cents": "integer",
"event_timestamp": "timestamp"
}
}
],
"transformations": [
{
"tool": "dbt",
"model": "stg_stripe_events",
"depends_on": ["raw.stripe_events"],
"columns_used": ["customer_id", "amount_cents", "event_timestamp"]
},
{
"tool": "dbt",
"model": "fct_customer_revenue",
"depends_on": ["stg_stripe_events"]
}
],
"consumers": [
{
"tool": "airflow",
"dag_id": "revenue_reporting_dag",
"depends_on": ["fct_customer_revenue"]
},
{
"tool": "spark",
"job_name": "ml_feature_extraction",
"depends_on": ["fct_customer_revenue"],
"columns_used": ["customer_id", "amount_cents"]
}
]
}
try:
result = client.trace(pipeline_payload)
lineage_id = result["lineage_id"]
print(f"✅ Lineage registered successfully.")
print(f" Lineage ID: {lineage_id}")
print(f" Nodes discovered: {result['node_count']}")
print(f" Edges mapped: {result['edge_count']}")
except RuntimeError as e:
print(f"❌ Trace failed: {e}")
**Expected output:**
plaintext
✅ Lineage registered successfully.
Lineage ID: lin_7f3a9c2e
Nodes discovered: 5
Edges mapped: 6
Save that `lineage_id`. It's your handle to everything downstream.
---
## Step 3: Inspect the Dependency Graph
Got your ID? Let's pull the full graph and make it human-readable.
python
inspect_lineage.py
from lineage_client import LineageClient
client = LineageClient()
LINEAGE_ID = "lin_7f3a9c2e" # from Step 2
def print_dependency_tree(lineage: dict):
"""Render a simple ASCII dependency tree from the lineage graph."""
nodes = {n["id"]: n for n in lineage["nodes"]}
edges = lineage["edges"] # list of {"from": id, "to": id}
# Find root nodes (no incoming edges)
targets = {e["to"] for e in edges}
roots = [n for n in nodes if n not in targets]
def render(node_id, depth=0):
node = nodes[node_id]
prefix = " " * depth + ("└─ " if depth > 0 else "")
print(f"{prefix}[{node['tool']}] {node['asset_name']}")
children = [e["to"] for e in edges if e["from"] == node_id]
for child in children:
render(child, depth + 1)
print("\n📊 Dependency Tree:")
print("=" * 40)
for root in roots:
render(root)
try:
lineage = client.get_lineage(LINEAGE_ID)
print_dependency_tree(lineage)
print(f"\n Last updated: {lineage['updated_at']}")
print(f" Health status: {lineage['health_status']}")
except RuntimeError as e:
print(f"❌ Could not retrieve lineage: {e}")
**Expected output:**
plaintext
📊 Dependency Tree:
[custom_etl] raw.stripe_events
└─ [dbt] stg_stripe_events
└─ [dbt] fct_customer_revenue
└─ [airflow] revenue_reporting_dag
└─ [spark] ml_feature_extraction
Last updated: 2024-11-12T14:32:01Z
Health status: healthy
This is the map that saves you on Tuesday afternoons.
---
## Step 4: Run Impact Analysis Before Shipping a Change
Here's where DataLineage earns its keep. Before you rename `amount_cents` to `amount_usd` in your source schema, ask the API what breaks.
python
impact_analysis.py
from lineage_client import LineageClient
client = LineageClient()
proposed_change = {
"lineage_id": "lin_7f3a9c2e",
"change_type": "column_rename",
"target_asset": "raw.stripe_events",
"change_details": {
"column": "amount_cents",
"rename_to": "amount_usd"
}
}
def render_impact_report(impact: dict):
severity_icons = {"high": "🔴", "medium": "🟡", "low": "🟢"}
print("\n⚡ Impact Analysis Report")
print("=" * 40)
print(f"Proposed change: {impact['change_summary']}")
print(f"Affected assets: {impact['total_affected']}\n")
for affected in impact["affected_assets"]:
icon = severity_icons.get(affected["severity"], "⚪")
print(f"{icon} [{affected['tool']}] {affected['asset_name']}")
print(f" Reason: {affected['reason']}")
print(f" Columns at risk: {', '.join(affected['columns_at_risk'])}")
print()
if impact["total_affected"] == 0:
print("✅ No downstream consumers affected. Safe to ship.")
try:
impact = client.impact(proposed_change)
render_impact_report(impact)
except RuntimeError as e:
print(f"❌ Impact analysis failed: {e}")
**Expected output:**
plaintext
⚡ Impact Analysis Report
Proposed change: Rename 'amount_cents' → 'amount_usd' in raw.stripe_events
Affected assets: 2
🔴 [spark] ml_feature_extraction
Reason: Directly references column 'amount_cents'
Columns at risk: amount_cents
🟡 [dbt] stg_stripe_events
Reason: Selects all columns from source; may inherit rename
Columns at risk: amount_cents
Two affected assets, surfaced in seconds. Before you type a single migration script.
---
## Putting It All Together: A Pre-Deployment Hook
Combine everything into a script you can run in CI before any schema migration merges:
python
pre_deploy_check.py
import sys
from lineage_client import LineageClient
def run_pre_deploy_check(lineage_id: str, change_payload: dict) -> bool:
client = LineageClient()
print(f"🔍 Running impact analysis for lineage: {lineage_id}")
try:
impact = client.impact({"lineage_id": lineage_id, **change_payload})
high_severity = [
a for a in impact["affected_assets"]
if a["severity"] == "high"
]
if high_severity:
print(f"🚫 Deployment blocked: {len(high_severity)} high-severity impact(s) detected.")
for asset in high_severity:
print(f" - [{asset['tool']}] {asset['asset_name']}: {asset['reason']}")
return False
print(f"✅ No high-severity impacts. Deployment cleared.")
return True
except RuntimeError as e:
print(f"⚠️ Could not complete impact analysis: {e}")
print(" Blocking deployment as a precaution.")
return False
if name == "main":
cleared = run_pre_deploy_check(
lineage_id="lin_7f3a9c2e",
change_payload={
"change_type": "column_rename",
"target_asset": "raw.stripe_events",
"change_details": {"column": "amount_cents", "rename_to": "amount_usd"}
}
)
sys.exit(0 if cleared else 1)
Drop this in your CI pipeline. Schema changes that break downstream consumers never reach production again.
---
## What's Next
You've got the foundation. From here, consider:
- **Scheduling `trace` calls** in Airflow after each pipeline run to keep lineage fresh
- **Alerting on `health_status` changes** from `GET /lineage/{id}` — catch drift before users do
- **Extending the impact payload** with `change_type: "column_drop"` for deletion risk analysis
The goal isn't just knowing what broke — it's building a system where you know *before* it breaks. That's the difference between being reactive and being the engineer everyone trusts to ship safely.
Now go enjoy your Tuesday afternoons.
Top comments (0)