DEV Community

Ahmed Moussa
Ahmed Moussa

Posted on

How to Track Data Pipeline Dependencies Automatically with DataLineage

---
title: "Stop Playing Data Detective: Automate Pipeline Dependency Tracing with DataLineage"
published: false
tags: [dataengineering, python, tutorial, dataquality]
---

# Stop Playing Data Detective: Automate Pipeline Dependency Tracing with DataLineage

You know the feeling. It's 2pm on a Tuesday. Someone changed a column name in a source table. By 4pm, three dashboards are broken, a Spark job is throwing cryptic NullPointerExceptions, and your Slack DMs look like a crime scene.

The culprit isn't the schema change — it's the fact that nobody *knew* what depended on that column.

This tutorial walks you through wiring DataLineage into your stack so that the next time a schema shifts, you're the person who already has the answer before anyone asks the question.

---

## What We're Building

By the end of this post, you'll have a working Python script that:

1. Traces dependencies across a mixed pipeline (dbt + Airflow + custom ETL)
2. Stores a lineage graph you can query later
3. Runs an impact analysis *before* a schema change ships

We'll use three endpoints throughout:
- `POST /lineage/trace` — discover and register dependencies
- `GET /lineage/{id}` — retrieve a stored lineage graph
- `POST /lineage/impact` — simulate the blast radius of a change

---

## Prerequisites

Enter fullscreen mode Exit fullscreen mode


bash
pip install requests python-dotenv


Set up your environment:

Enter fullscreen mode Exit fullscreen mode


bash

.env

DATALINEAGE_API_KEY=your_api_key_here
DATALINEAGE_BASE_URL=https://api.datalineage.io/v1


---

## Step 1: Build Your API Client

Before touching any pipeline logic, let's build a thin client wrapper. This keeps auth and error handling in one place — a pattern you'll thank yourself for at 2am.

Enter fullscreen mode Exit fullscreen mode


python

lineage_client.py

import os
import requests
from dotenv import load_dotenv

load_dotenv()

class LineageClient:
def init(self):
self.base_url = os.getenv("DATALINEAGE_BASE_URL")
self.headers = {
"Authorization": f"Bearer {os.getenv('DATALINEAGE_API_KEY')}",
"Content-Type": "application/json"
}

def _handle_response(self, response: requests.Response) -> dict:
    """Centralized response handling with meaningful error messages."""
    try:
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        error_body = response.json() if response.content else {}
        raise RuntimeError(
            f"API error {response.status_code}: "
            f"{error_body.get('message', str(e))}"
        ) from e
    except requests.exceptions.ConnectionError:
        raise RuntimeError(
            "Could not reach DataLineage API. Check your DATALINEAGE_BASE_URL."
        )

def trace(self, payload: dict) -> dict:
    response = requests.post(
        f"{self.base_url}/lineage/trace",
        json=payload,
        headers=self.headers,
        timeout=30
    )
    return self._handle_response(response)

def get_lineage(self, lineage_id: str) -> dict:
    response = requests.get(
        f"{self.base_url}/lineage/{lineage_id}",
        headers=self.headers,
        timeout=15
    )
    return self._handle_response(response)

def impact(self, payload: dict) -> dict:
    response = requests.post(
        f"{self.base_url}/lineage/impact",
        json=payload,
        headers=self.headers,
        timeout=30
    )
    return self._handle_response(response)
Enter fullscreen mode Exit fullscreen mode

---

## Step 2: Trace Your First Pipeline

Now let's register a real dependency graph. The payload describes your pipeline topology — what tools are involved and which assets connect them.

Enter fullscreen mode Exit fullscreen mode


python

trace_pipeline.py

from lineage_client import LineageClient

client = LineageClient()

pipeline_payload = {
"pipeline_name": "customer_revenue_pipeline",
"environment": "production",
"sources": [
{
"tool": "custom_etl",
"asset": "raw.stripe_events",
"schema": {
"customer_id": "string",
"amount_cents": "integer",
"event_timestamp": "timestamp"
}
}
],
"transformations": [
{
"tool": "dbt",
"model": "stg_stripe_events",
"depends_on": ["raw.stripe_events"],
"columns_used": ["customer_id", "amount_cents", "event_timestamp"]
},
{
"tool": "dbt",
"model": "fct_customer_revenue",
"depends_on": ["stg_stripe_events"]
}
],
"consumers": [
{
"tool": "airflow",
"dag_id": "revenue_reporting_dag",
"depends_on": ["fct_customer_revenue"]
},
{
"tool": "spark",
"job_name": "ml_feature_extraction",
"depends_on": ["fct_customer_revenue"],
"columns_used": ["customer_id", "amount_cents"]
}
]
}

try:
result = client.trace(pipeline_payload)
lineage_id = result["lineage_id"]
print(f"✅ Lineage registered successfully.")
print(f" Lineage ID: {lineage_id}")
print(f" Nodes discovered: {result['node_count']}")
print(f" Edges mapped: {result['edge_count']}")
except RuntimeError as e:
print(f"❌ Trace failed: {e}")


**Expected output:**
Enter fullscreen mode Exit fullscreen mode


plaintext
✅ Lineage registered successfully.
Lineage ID: lin_7f3a9c2e
Nodes discovered: 5
Edges mapped: 6


Save that `lineage_id`. It's your handle to everything downstream.

---

## Step 3: Inspect the Dependency Graph

Got your ID? Let's pull the full graph and make it human-readable.

Enter fullscreen mode Exit fullscreen mode


python

inspect_lineage.py

from lineage_client import LineageClient

client = LineageClient()
LINEAGE_ID = "lin_7f3a9c2e" # from Step 2

def print_dependency_tree(lineage: dict):
"""Render a simple ASCII dependency tree from the lineage graph."""
nodes = {n["id"]: n for n in lineage["nodes"]}
edges = lineage["edges"] # list of {"from": id, "to": id}

# Find root nodes (no incoming edges)
targets = {e["to"] for e in edges}
roots = [n for n in nodes if n not in targets]

def render(node_id, depth=0):
    node = nodes[node_id]
    prefix = "  " * depth + ("└─ " if depth > 0 else "")
    print(f"{prefix}[{node['tool']}] {node['asset_name']}")
    children = [e["to"] for e in edges if e["from"] == node_id]
    for child in children:
        render(child, depth + 1)

print("\n📊 Dependency Tree:")
print("=" * 40)
for root in roots:
    render(root)
Enter fullscreen mode Exit fullscreen mode

try:
lineage = client.get_lineage(LINEAGE_ID)
print_dependency_tree(lineage)
print(f"\n Last updated: {lineage['updated_at']}")
print(f" Health status: {lineage['health_status']}")
except RuntimeError as e:
print(f"❌ Could not retrieve lineage: {e}")


**Expected output:**
Enter fullscreen mode Exit fullscreen mode


plaintext

📊 Dependency Tree:

[custom_etl] raw.stripe_events
└─ [dbt] stg_stripe_events
└─ [dbt] fct_customer_revenue
└─ [airflow] revenue_reporting_dag
└─ [spark] ml_feature_extraction

Last updated: 2024-11-12T14:32:01Z
Health status: healthy


This is the map that saves you on Tuesday afternoons.

---

## Step 4: Run Impact Analysis Before Shipping a Change

Here's where DataLineage earns its keep. Before you rename `amount_cents` to `amount_usd` in your source schema, ask the API what breaks.

Enter fullscreen mode Exit fullscreen mode


python

impact_analysis.py

from lineage_client import LineageClient

client = LineageClient()

proposed_change = {
"lineage_id": "lin_7f3a9c2e",
"change_type": "column_rename",
"target_asset": "raw.stripe_events",
"change_details": {
"column": "amount_cents",
"rename_to": "amount_usd"
}
}

def render_impact_report(impact: dict):
severity_icons = {"high": "🔴", "medium": "🟡", "low": "🟢"}

print("\n⚡ Impact Analysis Report")
print("=" * 40)
print(f"Proposed change: {impact['change_summary']}")
print(f"Affected assets: {impact['total_affected']}\n")

for affected in impact["affected_assets"]:
    icon = severity_icons.get(affected["severity"], "⚪")
    print(f"{icon} [{affected['tool']}] {affected['asset_name']}")
    print(f"     Reason: {affected['reason']}")
    print(f"     Columns at risk: {', '.join(affected['columns_at_risk'])}")
    print()

if impact["total_affected"] == 0:
    print("✅ No downstream consumers affected. Safe to ship.")
Enter fullscreen mode Exit fullscreen mode

try:
impact = client.impact(proposed_change)
render_impact_report(impact)
except RuntimeError as e:
print(f"❌ Impact analysis failed: {e}")


**Expected output:**
Enter fullscreen mode Exit fullscreen mode


plaintext

⚡ Impact Analysis Report

Proposed change: Rename 'amount_cents' → 'amount_usd' in raw.stripe_events
Affected assets: 2

🔴 [spark] ml_feature_extraction
Reason: Directly references column 'amount_cents'
Columns at risk: amount_cents

🟡 [dbt] stg_stripe_events
Reason: Selects all columns from source; may inherit rename
Columns at risk: amount_cents


Two affected assets, surfaced in seconds. Before you type a single migration script.

---

## Putting It All Together: A Pre-Deployment Hook

Combine everything into a script you can run in CI before any schema migration merges:

Enter fullscreen mode Exit fullscreen mode


python

pre_deploy_check.py

import sys
from lineage_client import LineageClient

def run_pre_deploy_check(lineage_id: str, change_payload: dict) -> bool:
client = LineageClient()

print(f"🔍 Running impact analysis for lineage: {lineage_id}")

try:
    impact = client.impact({"lineage_id": lineage_id, **change_payload})
    high_severity = [
        a for a in impact["affected_assets"] 
        if a["severity"] == "high"
    ]

    if high_severity:
        print(f"🚫 Deployment blocked: {len(high_severity)} high-severity impact(s) detected.")
        for asset in high_severity:
            print(f"   - [{asset['tool']}] {asset['asset_name']}: {asset['reason']}")
        return False

    print(f"✅ No high-severity impacts. Deployment cleared.")
    return True

except RuntimeError as e:
    print(f"⚠️  Could not complete impact analysis: {e}")
    print("   Blocking deployment as a precaution.")
    return False
Enter fullscreen mode Exit fullscreen mode

if name == "main":
cleared = run_pre_deploy_check(
lineage_id="lin_7f3a9c2e",
change_payload={
"change_type": "column_rename",
"target_asset": "raw.stripe_events",
"change_details": {"column": "amount_cents", "rename_to": "amount_usd"}
}
)
sys.exit(0 if cleared else 1)


Drop this in your CI pipeline. Schema changes that break downstream consumers never reach production again.

---

## What's Next

You've got the foundation. From here, consider:

- **Scheduling `trace` calls** in Airflow after each pipeline run to keep lineage fresh
- **Alerting on `health_status` changes** from `GET /lineage/{id}` — catch drift before users do
- **Extending the impact payload** with `change_type: "column_drop"` for deletion risk analysis

The goal isn't just knowing what broke — it's building a system where you know *before* it breaks. That's the difference between being reactive and being the engineer everyone trusts to ship safely.

Now go enjoy your Tuesday afternoons.
Enter fullscreen mode Exit fullscreen mode

Top comments (0)