DEV Community

Ahmed Moussa
Ahmed Moussa

Posted on

How to Track Data Pipeline Dependencies Automatically with DataLineage

---
title: "Stop Playing Data Detective: Automated Lineage Tracing Across Your Entire Pipeline Stack"
published: false
tags: [dataengineering, python, dbt, tutorial]
---

# Stop Playing Data Detective: Automated Lineage Tracing Across Your Entire Pipeline Stack

Picture this: it's 4:47 PM on a Friday. Someone renamed a column in a source table. Your Slack is on fire. Three dashboards are broken, an Airflow DAG is throwing cryptic errors, and a Spark job silently swallowed bad data for the last six hours. You're about to spend your weekend playing data detective — manually grepping through YAML files, SQL transforms, and Python scripts trying to answer one deceptively simple question:

**What broke, and what else is about to?**

This is the problem DataLineage was built to eliminate. In this tutorial, we'll wire it into a realistic multi-tool pipeline (dbt + Airflow + Spark) and watch it answer that Friday-afternoon question in seconds.

---

## What We're Building

We'll simulate a pipeline that looks like most production data stacks:

Enter fullscreen mode Exit fullscreen mode

raw_orders (Postgres)
└── dbt: stg_orders
└── dbt: fct_orders
├── Airflow DAG: daily_revenue_report
└── Spark job: customer_ltv_model


When `raw_orders` gets a schema change, we'll use DataLineage to instantly surface every downstream consumer — before they surface themselves as incidents.

---

## Prerequisites

- Python 3.9+
- A DataLineage account and API key (set as `DATALINEAGE_API_KEY` in your environment)
- `requests` and `python-dotenv` installed

Enter fullscreen mode Exit fullscreen mode


bash
pip install requests python-dotenv


---

## Step 1: Register Your Pipeline Assets

Before DataLineage can trace anything, it needs to know what exists. Think of this as drawing the map before you navigate it.

We'll register each node in our pipeline using the `POST /lineage/trace` endpoint. This endpoint doesn't just store metadata — it actively discovers dependency relationships between the assets you register.

Enter fullscreen mode Exit fullscreen mode


python
import os
import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("DATALINEAGE_API_KEY")
BASE_URL = "https://api.datalineage.io/v1"

HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}

def register_asset(asset_type: str, name: str, source_ref: str, upstream: list[str] = None) -> dict:
"""Register a pipeline asset and its upstream dependencies."""
payload = {
"asset_type": asset_type, # "table", "dbt_model", "dag", "spark_job"
"name": name,
"source_ref": source_ref, # file path, table name, or DAG id
"upstream_assets": upstream or []
}

response = requests.post(
    f"{BASE_URL}/lineage/trace",
    headers=HEADERS,
    json=payload
)

if response.status_code == 422:
    raise ValueError(f"Invalid asset definition: {response.json()['detail']}")
if response.status_code != 201:
    response.raise_for_status()

return response.json()
Enter fullscreen mode Exit fullscreen mode

Register the source table

raw_orders = register_asset(
asset_type="table",
name="raw_orders",
source_ref="postgres://warehouse/raw/orders"
)
print(f"Registered source: {raw_orders['id']}")

Register dbt models, referencing upstream assets by name

stg_orders = register_asset(
asset_type="dbt_model",
name="stg_orders",
source_ref="models/staging/stg_orders.sql",
upstream=["raw_orders"]
)

fct_orders = register_asset(
asset_type="dbt_model",
name="fct_orders",
source_ref="models/marts/fct_orders.sql",
upstream=["stg_orders"]
)

Register downstream consumers

register_asset(
asset_type="dag",
name="daily_revenue_report",
source_ref="dags/daily_revenue_report.py",
upstream=["fct_orders"]
)

register_asset(
asset_type="spark_job",
name="customer_ltv_model",
source_ref="jobs/customer_ltv.py",
upstream=["fct_orders"]
)

print("Pipeline graph registered successfully.")


**Expected output:**
Enter fullscreen mode Exit fullscreen mode


plaintext
Registered source: asset_7f3a2c1b
Pipeline graph registered successfully.


> **Best practice:** Automate this registration step in your CI/CD pipeline. Every time a new dbt model or DAG is merged, register it automatically. Stale lineage graphs are almost as dangerous as no lineage graphs.

---

## Step 2: Retrieve and Visualize the Lineage Graph

Now let's pull the full lineage graph for `raw_orders` and see what DataLineage discovered. The `GET /lineage/{id}` endpoint returns the complete dependency tree — both upstream and downstream — for any registered asset.

Enter fullscreen mode Exit fullscreen mode


python
def get_lineage(asset_id: str, direction: str = "downstream") -> dict:
"""
Fetch the lineage graph for an asset.
direction: "upstream", "downstream", or "both"
"""
response = requests.get(
f"{BASE_URL}/lineage/{asset_id}",
headers=HEADERS,
params={"direction": direction, "depth": 10} # depth: how many hops to traverse
)

if response.status_code == 404:
    raise LookupError(f"Asset {asset_id} not found. Has it been registered?")
if response.status_code != 200:
    response.raise_for_status()

return response.json()
Enter fullscreen mode Exit fullscreen mode

def print_lineage_tree(node: dict, indent: int = 0) -> None:
"""Recursively print the lineage tree in a readable format."""
prefix = " " * indent + ("└── " if indent > 0 else "")
asset_type = node.get("asset_type", "unknown").upper()
print(f"{prefix}[{asset_type}] {node['name']}")

for child in node.get("downstream", []):
    print_lineage_tree(child, indent + 1)
Enter fullscreen mode Exit fullscreen mode

Fetch and display the full downstream graph

lineage = get_lineage(raw_orders["id"], direction="downstream")
print_lineage_tree(lineage["graph"])


**Expected output:**
Enter fullscreen mode Exit fullscreen mode


plaintext
[TABLE] raw_orders
└── [DBT_MODEL] stg_orders
└── [DBT_MODEL] fct_orders
└── [DAG] daily_revenue_report
└── [SPARK_JOB] customer_ltv_model


That's your entire pipeline dependency chain, rendered from a single API call. No YAML archaeology required.

---

## Step 3: Run an Impact Analysis Before a Schema Change

Here's where DataLineage earns its keep. Before you merge that migration that renames `order_total` to `total_amount`, run an impact analysis. The `POST /lineage/impact` endpoint simulates the blast radius of a proposed change.

Enter fullscreen mode Exit fullscreen mode


python
def analyze_impact(asset_id: str, changes: list[dict]) -> dict:
"""
Simulate the impact of schema changes before they happen.

changes: list of proposed modifications, e.g.:
    [{"type": "column_rename", "from": "order_total", "to": "total_amount"}]
"""
payload = {
    "asset_id": asset_id,
    "proposed_changes": changes
}

response = requests.post(
    f"{BASE_URL}/lineage/impact",
    headers=HEADERS,
    json=payload
)

if response.status_code == 400:
    raise ValueError(f"Malformed change spec: {response.json()['detail']}")
if response.status_code != 200:
    response.raise_for_status()

return response.json()
Enter fullscreen mode Exit fullscreen mode

Simulate renaming a column in raw_orders

proposed_changes = [
{"type": "column_rename", "from": "order_total", "to": "total_amount"},
{"type": "column_drop", "column": "legacy_discount_code"}
]

impact_report = analyze_impact(raw_orders["id"], proposed_changes)

print(f"\n{'='*50}")
print(f"IMPACT ANALYSIS REPORT")
print(f"{'='*50}")
print(f"Risk level: {impact_report['risk_level'].upper()}")
print(f"Affected assets: {impact_report['affected_count']}")
print(f"\nBreaking changes detected:")

for affected in impact_report["affected_assets"]:
print(f"\n ⚠ {affected['name']} ({affected['asset_type']})")
print(f" References: {', '.join(affected['affected_references'])}")
print(f" Severity: {affected['severity']}")


**Expected output:**
Enter fullscreen mode Exit fullscreen mode

plaintext

IMPACT ANALYSIS REPORT

Risk level: HIGH
Affected assets: 4

Breaking changes detected:

⚠ stg_orders (dbt_model)
References: order_total, legacy_discount_code
Severity: breaking

⚠ fct_orders (dbt_model)
References: order_total
Severity: breaking

⚠ daily_revenue_report (dag)
References: order_total
Severity: breaking

⚠ customer_ltv_model (spark_job)
References: order_total
Severity: warning


That's four assets flagged, with specific column references identified, before a single line of production code changed. The difference between a controlled migration and a Friday incident.

---

## Putting It All Together: A Pre-Migration Safety Check

Here's a utility function you can drop into any migration workflow or CI pipeline:

Enter fullscreen mode Exit fullscreen mode


python
def safe_migration_check(asset_id: str, changes: list[dict]) -> bool:
"""
Returns True if safe to proceed, False if breaking changes detected.
Designed to be used as a CI gate.
"""
try:
report = analyze_impact(asset_id, changes)
except (ValueError, requests.HTTPError) as e:
print(f"[ERROR] Impact analysis failed: {e}")
return False # Fail safe: block the migration if we can't assess impact

breaking = [a for a in report["affected_assets"] if a["severity"] == "breaking"]

if breaking:
    print(f"[BLOCKED] {len(breaking)} breaking change(s) detected. Fix before merging.")
    for asset in breaking:
        print(f"  - {asset['name']}: update references to {asset['affected_references']}")
    return False

print(f"[OK] No breaking changes. {report['affected_count']} assets may need review.")
return True
Enter fullscreen mode Exit fullscreen mode

Use as a migration gate

changes = [{"type": "column_rename", "from": "order_total", "to": "total_amount"}]
is_safe = safe_migration_check(raw_orders["id"], changes)
sys.exit(0 if is_safe else 1) # Integrate cleanly with CI/CD exit codes


---

## What You've Built

In about 80 lines of Python, you've replaced hours of manual dependency tracing with an automated safety net that:

1. **Maintains a living map** of your entire pipeline graph across dbt, Airflow, and Spark
2. **Answers "what does this asset feed?"** with a single API call
3. **Catches breaking changes before deployment** with specific, actionable impact reports

The Friday 4:47 PM scenario doesn't disappear — schema changes will always happen. But now, instead of discovering the blast radius after the fact, you know it before you merge.

That's not just better tooling. That's a fundamentally different relationship with your data infrastructure.

---

*DataLineage documentation: [docs.datalineage.io](https://docs.datalineage.io) | Questions? Drop them in the comments below.*
Enter fullscreen mode Exit fullscreen mode

Top comments (0)