It was 3:17 AM when my phone buzzed. I grabbed it, squinting at the screen: "Pipeline failed: account_summary."
Still half-asleep, I opened my laptop and pulled up the logs. The error message stared back at me: "Relation 'intermediate_accounts' does not exist."
Wait, what? That table should exist. The intermediate layer runs before the marts layer. Why is it missing?
Then I saw it. The intermediate job had failed silently 20 minutes earlier. The marts job ran anyway, looking for a table that wasn't there. The orchestration had failed.
This is the moment I realized: you can have perfect transformations and blazing-fast incremental processing (see Part 2), but if your orchestration is broken, your pipeline is a ticking time bomb.
Let me show you how we fixed it.
The Orchestration Problem
Here's what I got wrong for way too long: I focused on the transformations and forgot about the orchestration.
I had perfect SQL, clean architecture, and blazing-fast incremental processing. But my jobs were held together with cron jobs and shell scripts. And that's how I ended up debugging at 3 AM.
Let me show you what I learned.
The Naive Approach: Cron Jobs
Our pipeline started with cron jobs. Simple, right?
# crontab
0 2 * * * /scripts/run_source.sh
5 2 * * * /scripts/run_staging.sh
10 2 * * * /scripts/run_snapshots.sh
15 2 * * * /scripts/run_intermediate.sh
20 2 * * * /scripts/run_marts.sh
Each job runs 5 minutes after the previous one. Plenty of time, right?
What could go wrong?
Everything.
Scenario 1: The source job takes 7 minutes instead of 4. The staging job starts before source finishes. Chaos.
Scenario 2: The intermediate job fails. The marts job runs anyway, using stale data. Nobody notices for three days.
Scenario 3: You need to rerun just the marts layer. You have to manually figure out which script to run and in what order.
Scenario 4: Someone asks "when did this job last run successfully?" You grep through logs for 20 minutes.
Cron jobs work for simple tasks. For data pipelines? They're a disaster waiting to happen.
Enter Dagster: Asset-Centric Orchestration
Switching to Dagster changed everything for us. Not because Dagster is magic, but because it forced me to think about data, not tasks.
Here's the mental shift: Instead of "run this script, then that script," you think "this data depends on that data."
Let me show you what this looks like in practice:
@asset(group_name="ingestion")
def customers_raw(context):
"""Ingest customer data from CSV"""
df = pd.read_csv("data/customers.csv")
return df
@asset(deps=[customers_raw]) # Wait for customers_raw
def dbt_transformations(context, dbt):
"""Run all DBT models"""
dbt.cli(["build"], context=context)
@asset(deps=[dbt_transformations]) # Wait for transformations
def account_summary_csv(context):
"""Export results to CSV"""
# Read from database and export
Notice what's different? We're not saying "run at 2:05 AM." We're saying "this asset depends on that asset."
Dagster figures out the order. If customers_raw fails, dbt_transformations doesn't run. If dbt_transformations fails, account_summary_csv doesn't run. The failure stops propagating.
The Asset Lineage View
Here's where Dagster really shines. You get a visual graph of your entire pipeline:
customers_raw ──┐
├──> dbt_transformations ──> account_summary_csv
accounts_raw ───┘ └──> account_summary_parquet
└──> data_quality_report
This isn't just pretty. It's functional. Click on any asset and you see:
- When it last ran
- How long it took
- What data it produced
- What depends on it
- The full logs
That 3 AM debugging session? Would have taken 2 minutes instead of 20 with this visibility.
Retry Logic: Because Things Fail
Networks timeout. Databases get overloaded. Cloud services have hiccups. I learned this during a Databricks outage last month.
Our pipeline failed. Then it retried. And succeeded. I didn't even know there was an outage until I checked the logs later.
Here's the retry strategy that saved us:
@asset(
retry_policy=RetryPolicy(
max_retries=3,
delay=1, # Start with 1 second
backoff=Backoff.EXPONENTIAL # Double each time
)
)
def account_summary_to_databricks(context, databricks):
"""Load data to Databricks with retry logic"""
for attempt in range(max_retries):
try:
# Attempt to load data
databricks.load_data(df, "account_summary")
return
except ConnectionTimeout as e:
if attempt < max_retries - 1:
wait_time = delay * (2 ** attempt) # Exponential backoff
context.log.warning(f"Attempt {attempt + 1} failed, retrying in {wait_time}s")
time.sleep(wait_time)
else:
raise
First attempt fails: Wait 1 second, retry
Second attempt fails: Wait 2 seconds, retry
Third attempt fails: Wait 4 seconds, retry
Fourth attempt fails: Give up, alert humans
This pattern saved us during that outage. The first few attempts failed, but by the time the third retry happened, Databricks was back up. The pipeline succeeded without waking me up.
Data Quality: Trust But Verify
Fast pipelines are useless if they produce wrong results. I learned this the embarrassing way.
A business user asked why the interest calculations looked off. I checked the code. Looked fine. I checked the data. Looked fine. Then I dug deeper.
Turns out, we had a bug in the staging layer. Some boolean values weren't being standardized correctly. The pipeline ran successfully every day, producing wrong results every day. For three weeks.
That's when I became obsessed with testing.
Layer 1: Schema Tests
First line of defense: make sure the data structure is correct.
models:
- name: stg_customer
columns:
- name: customer_id
tests:
- unique
- not_null
- name: has_loan_flag
tests:
- accepted_values:
values: [true, false]
These tests run after every transformation. If customer_id has duplicates, the pipeline fails. If has_loan_flag has a value other than true/false, the pipeline fails.
Fail fast, fail loud.
Layer 2: Relationship Tests
Make sure data relationships are valid.
models:
- name: int_account_with_customer
columns:
- name: customer_id
tests:
- relationships:
to: ref('stg_customer')
field: customer_id
This ensures every account has a valid customer. No orphaned records, no broken foreign keys.
Layer 3: Business Logic Tests
Make sure calculations are correct.
models:
- name: account_summary
columns:
- name: interest_rate_pct
tests:
- positive_value # Custom test
- accepted_range:
min_value: 0.01
max_value: 0.025
- name: new_balance_amount
tests:
- positive_value
Interest rates should be between 1% and 2.5%. If we see 25% or 0.001%, something's wrong.
Layer 4: Freshness Tests
Make sure data is recent.
sources:
- name: raw
tables:
- name: customers_raw
freshness:
warn_after: {count: 24, period: hour}
error_after: {count: 48, period: hour}
If customer data hasn't been updated in 24 hours, warn us. If it's been 48 hours, fail the pipeline.
This catches issues where ingestion jobs silently fail. The pipeline keeps running with stale data until the freshness test catches it.
The Quality Report
After every run, we generate a quality report:
{
"timestamp": "2024-12-01T02:30:00",
"summary": {
"total_tests": 40,
"passed": 38,
"failed": 2,
"pass_rate": 95.0
},
"failures": [
{
"test": "unique_customer_id",
"model": "stg_customer",
"message": "Found 3 duplicate customer IDs: [101, 205, 309]"
},
{
"test": "positive_value_balance",
"model": "stg_account",
"message": "Found 1 negative balance: account A042 has balance -150.00"
}
]
}
This report goes to our Slack channel. If tests fail, we investigate before the data reaches production.
The Quarantine Pattern
Sometimes you can't fix bad data immediately. Maybe it's a weekend, or the source system is down, or you need business input on how to handle it.
We use a quarantine pattern:
-- stg_customer.sql
-- Good records go to stg_customer
SELECT *
FROM src_customer
WHERE customer_id IS NOT NULL
AND customer_name IS NOT NULL
-- Bad records go to quarantine
-- quarantine_stg_customer.sql
SELECT
*,
CASE
WHEN customer_id IS NULL THEN 'missing_customer_id'
WHEN customer_name IS NULL THEN 'missing_customer_name'
END as quarantine_reason,
CURRENT_TIMESTAMP() as quarantined_at
FROM src_customer
WHERE customer_id IS NULL
OR customer_name IS NULL
Bad records don't break the pipeline. They go to a quarantine table where we can review them later. The pipeline continues with good data.
This saved us when our source system started sending records with null IDs. Instead of failing the entire pipeline, we quarantined those records and processed everything else. We fixed the source system later and reprocessed the quarantined records.
Monitoring: Know What's Happening
I learned to track three key metrics:
1. Run duration:
context.add_output_metadata({
"duration_seconds": end_time - start_time,
"records_processed": len(df)
})
If a job that usually takes 6 seconds suddenly takes 60 seconds, something's wrong.
2. Record counts:
context.add_output_metadata({
"input_records": len(input_df),
"output_records": len(output_df),
"filtered_records": len(input_df) - len(output_df)
})
If we usually process 50 records and suddenly process 5,000, something's wrong.
3. Data quality:
context.add_output_metadata({
"null_count": df.isnull().sum().sum(),
"duplicate_count": len(df) - len(df.drop_duplicates())
})
If null counts spike, something's wrong.
These metrics go to a dashboard. I check it every morning. If something looks off, I investigate.
The 3 AM Incident: Resolved
Remember that 3 AM failure from the beginning? Here's how proper orchestration would have prevented it:
Before:
- Intermediate job failed silently
- Marts job ran anyway
- Used stale data
- Nobody noticed for hours
After (with proper orchestration):
- Intermediate job fails
- Dagster stops the pipeline
- Marts job doesn't run
- Slack alert: "Pipeline stopped at intermediate layer"
- Error is visible immediately
- Fix takes 5 minutes instead of 3 hours
The fix was simple: the job ran out of memory, so I increased the allocation. But I only caught it quickly because of proper orchestration.
The Checklist
If you want reliable pipelines, you need:
✅ Dependency management: Jobs run in the right order
✅ Failure isolation: One failure doesn't cascade
✅ Retry logic: Transient failures resolve automatically
✅ Data quality tests: Catch issues before production
✅ Quarantine pattern: Bad data doesn't break the pipeline
✅ Monitoring: Know what's happening in real-time
✅ Alerting: Get notified when things go wrong
✅ Observability: Debug issues quickly
Without these, you're flying blind.
What We Learned
1. Orchestration is not optional: Cron jobs work for simple tasks. For data pipelines, use a proper orchestrator.
2. Test everything: Schema, relationships, business logic, freshness. If you don't test it, it will break.
3. Fail fast: Better to catch issues early than to produce wrong results.
4. Make debugging easy: When things break (and they will), you need to diagnose quickly.
5. Automate recovery: Retry transient failures. Quarantine bad data. Don't wake humans for things machines can handle.
The Final Architecture
Here's what a complete, production-ready pipeline looks like:
CSV Files
↓
[Ingestion Assets]
↓ (Dagster orchestration)
[DBT Transformations]
├─ Source Layer (raw data)
├─ Staging Layer (cleaned data)
├─ Snapshots (SCD2 history)
├─ Intermediate Layer (joins)
└─ Marts Layer (analytics)
↓
[Data Quality Tests] (40+ tests)
↓
[Output Assets]
├─ CSV exports
├─ Parquet files
└─ Databricks tables
↓
[Quality Report]
└─ Slack notification
Every step is orchestrated. Every layer is tested. Every failure is caught. Every metric is tracked.
The Results
Here's what changed after we implemented proper orchestration and data quality:
Before:
- Pipeline failures: 2-3 per week
- Mean time to detection: 4 hours
- Mean time to resolution: 2 hours
- Data quality issues in production: Weekly
- On-call stress level: High
- 3 AM wake-ups: Too many
After:
- Pipeline failures: 1-2 per month
- Mean time to detection: 2 minutes
- Mean time to resolution: 15 minutes
- Data quality issues in production: None in 6 months
- On-call stress level: Low
- 3 AM wake-ups: Zero
The pipeline isn't perfect. Things still break. But when they do, I know immediately, and I can fix them quickly. Usually before anyone else even notices.
Closing Thoughts
Building a data pipeline is easy. Building a reliable data pipeline is hard.
The transformations are the easy part. The orchestration, testing, monitoring, and error handling—that's where the real work is.
But it's worth it. Because a pipeline that runs reliably at 3 AM, catches issues before production, and recovers from failures automatically? That's the difference between a script and a production system.
And that's what lets me sleep through the night instead of waking up to Slack alerts.
This is Part 3 of a 3-part series on modern data pipeline architecture.
Part 1: Modern Data Pipelines - Why Five Layers Changed Everything
Part 2: The Day Our Pipeline Went From 10 Minutes to 6 Seconds
Want to see the full code? Check out the GitHub repository with complete source code, documentation, and production metrics.
Tech Stack: Dagster • DBT • DuckDB • Databricks • Python • Docker
What's your worst pipeline incident?
How did you fix it? What lessons did you learn? Drop a comment below—I'd love to hear your war stories! 👇
Top comments (0)