DEV Community

Cover image for Building Bulletproof Data Pipelines: Orchestration, Testing, and Monitoring (Part 3 of 3)
Karthikeyan Rajasekaran
Karthikeyan Rajasekaran

Posted on

Building Bulletproof Data Pipelines: Orchestration, Testing, and Monitoring (Part 3 of 3)

It was 3:17 AM when my phone buzzed. I grabbed it, squinting at the screen: "Pipeline failed: account_summary."

Still half-asleep, I opened my laptop and pulled up the logs. The error message stared back at me: "Relation 'intermediate_accounts' does not exist."

Wait, what? That table should exist. The intermediate layer runs before the marts layer. Why is it missing?

Then I saw it. The intermediate job had failed silently 20 minutes earlier. The marts job ran anyway, looking for a table that wasn't there. The orchestration had failed.

This is the moment I realized: you can have perfect transformations and blazing-fast incremental processing (see Part 2), but if your orchestration is broken, your pipeline is a ticking time bomb.

Let me show you how we fixed it.

The Orchestration Problem

Here's what I got wrong for way too long: I focused on the transformations and forgot about the orchestration.

I had perfect SQL, clean architecture, and blazing-fast incremental processing. But my jobs were held together with cron jobs and shell scripts. And that's how I ended up debugging at 3 AM.

Let me show you what I learned.

The Naive Approach: Cron Jobs

Our pipeline started with cron jobs. Simple, right?

# crontab
0 2 * * * /scripts/run_source.sh
5 2 * * * /scripts/run_staging.sh
10 2 * * * /scripts/run_snapshots.sh
15 2 * * * /scripts/run_intermediate.sh
20 2 * * * /scripts/run_marts.sh
Enter fullscreen mode Exit fullscreen mode

Each job runs 5 minutes after the previous one. Plenty of time, right?

What could go wrong?

Everything.

Scenario 1: The source job takes 7 minutes instead of 4. The staging job starts before source finishes. Chaos.

Scenario 2: The intermediate job fails. The marts job runs anyway, using stale data. Nobody notices for three days.

Scenario 3: You need to rerun just the marts layer. You have to manually figure out which script to run and in what order.

Scenario 4: Someone asks "when did this job last run successfully?" You grep through logs for 20 minutes.

Cron jobs work for simple tasks. For data pipelines? They're a disaster waiting to happen.

Enter Dagster: Asset-Centric Orchestration

Switching to Dagster changed everything for us. Not because Dagster is magic, but because it forced me to think about data, not tasks.

Here's the mental shift: Instead of "run this script, then that script," you think "this data depends on that data."

Let me show you what this looks like in practice:

@asset(group_name="ingestion")
def customers_raw(context):
    """Ingest customer data from CSV"""
    df = pd.read_csv("data/customers.csv")
    return df

@asset(deps=[customers_raw])  # Wait for customers_raw
def dbt_transformations(context, dbt):
    """Run all DBT models"""
    dbt.cli(["build"], context=context)

@asset(deps=[dbt_transformations])  # Wait for transformations
def account_summary_csv(context):
    """Export results to CSV"""
    # Read from database and export
Enter fullscreen mode Exit fullscreen mode

Notice what's different? We're not saying "run at 2:05 AM." We're saying "this asset depends on that asset."

Dagster figures out the order. If customers_raw fails, dbt_transformations doesn't run. If dbt_transformations fails, account_summary_csv doesn't run. The failure stops propagating.

The Asset Lineage View

Here's where Dagster really shines. You get a visual graph of your entire pipeline:

customers_raw ──┐
                ├──> dbt_transformations ──> account_summary_csv
accounts_raw ───┘                       └──> account_summary_parquet
                                        └──> data_quality_report
Enter fullscreen mode Exit fullscreen mode

This isn't just pretty. It's functional. Click on any asset and you see:

  • When it last ran
  • How long it took
  • What data it produced
  • What depends on it
  • The full logs

That 3 AM debugging session? Would have taken 2 minutes instead of 20 with this visibility.

Retry Logic: Because Things Fail

Networks timeout. Databases get overloaded. Cloud services have hiccups. I learned this during a Databricks outage last month.

Our pipeline failed. Then it retried. And succeeded. I didn't even know there was an outage until I checked the logs later.

Here's the retry strategy that saved us:

@asset(
    retry_policy=RetryPolicy(
        max_retries=3,
        delay=1,  # Start with 1 second
        backoff=Backoff.EXPONENTIAL  # Double each time
    )
)
def account_summary_to_databricks(context, databricks):
    """Load data to Databricks with retry logic"""

    for attempt in range(max_retries):
        try:
            # Attempt to load data
            databricks.load_data(df, "account_summary")
            return
        except ConnectionTimeout as e:
            if attempt < max_retries - 1:
                wait_time = delay * (2 ** attempt)  # Exponential backoff
                context.log.warning(f"Attempt {attempt + 1} failed, retrying in {wait_time}s")
                time.sleep(wait_time)
            else:
                raise
Enter fullscreen mode Exit fullscreen mode

First attempt fails: Wait 1 second, retry

Second attempt fails: Wait 2 seconds, retry

Third attempt fails: Wait 4 seconds, retry

Fourth attempt fails: Give up, alert humans

This pattern saved us during that outage. The first few attempts failed, but by the time the third retry happened, Databricks was back up. The pipeline succeeded without waking me up.

Data Quality: Trust But Verify

Fast pipelines are useless if they produce wrong results. I learned this the embarrassing way.

A business user asked why the interest calculations looked off. I checked the code. Looked fine. I checked the data. Looked fine. Then I dug deeper.

Turns out, we had a bug in the staging layer. Some boolean values weren't being standardized correctly. The pipeline ran successfully every day, producing wrong results every day. For three weeks.

That's when I became obsessed with testing.

Layer 1: Schema Tests

First line of defense: make sure the data structure is correct.

models:
  - name: stg_customer
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null

      - name: has_loan_flag
        tests:
          - accepted_values:
              values: [true, false]
Enter fullscreen mode Exit fullscreen mode

These tests run after every transformation. If customer_id has duplicates, the pipeline fails. If has_loan_flag has a value other than true/false, the pipeline fails.

Fail fast, fail loud.

Layer 2: Relationship Tests

Make sure data relationships are valid.

models:
  - name: int_account_with_customer
    columns:
      - name: customer_id
        tests:
          - relationships:
              to: ref('stg_customer')
              field: customer_id
Enter fullscreen mode Exit fullscreen mode

This ensures every account has a valid customer. No orphaned records, no broken foreign keys.

Layer 3: Business Logic Tests

Make sure calculations are correct.

models:
  - name: account_summary
    columns:
      - name: interest_rate_pct
        tests:
          - positive_value  # Custom test
          - accepted_range:
              min_value: 0.01
              max_value: 0.025

      - name: new_balance_amount
        tests:
          - positive_value
Enter fullscreen mode Exit fullscreen mode

Interest rates should be between 1% and 2.5%. If we see 25% or 0.001%, something's wrong.

Layer 4: Freshness Tests

Make sure data is recent.

sources:
  - name: raw
    tables:
      - name: customers_raw
        freshness:
          warn_after: {count: 24, period: hour}
          error_after: {count: 48, period: hour}
Enter fullscreen mode Exit fullscreen mode

If customer data hasn't been updated in 24 hours, warn us. If it's been 48 hours, fail the pipeline.

This catches issues where ingestion jobs silently fail. The pipeline keeps running with stale data until the freshness test catches it.

The Quality Report

After every run, we generate a quality report:

{
  "timestamp": "2024-12-01T02:30:00",
  "summary": {
    "total_tests": 40,
    "passed": 38,
    "failed": 2,
    "pass_rate": 95.0
  },
  "failures": [
    {
      "test": "unique_customer_id",
      "model": "stg_customer",
      "message": "Found 3 duplicate customer IDs: [101, 205, 309]"
    },
    {
      "test": "positive_value_balance",
      "model": "stg_account",
      "message": "Found 1 negative balance: account A042 has balance -150.00"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

This report goes to our Slack channel. If tests fail, we investigate before the data reaches production.

The Quarantine Pattern

Sometimes you can't fix bad data immediately. Maybe it's a weekend, or the source system is down, or you need business input on how to handle it.

We use a quarantine pattern:

-- stg_customer.sql
-- Good records go to stg_customer
SELECT *
FROM src_customer
WHERE customer_id IS NOT NULL
  AND customer_name IS NOT NULL

-- Bad records go to quarantine
-- quarantine_stg_customer.sql
SELECT 
    *,
    CASE 
        WHEN customer_id IS NULL THEN 'missing_customer_id'
        WHEN customer_name IS NULL THEN 'missing_customer_name'
    END as quarantine_reason,
    CURRENT_TIMESTAMP() as quarantined_at
FROM src_customer
WHERE customer_id IS NULL
   OR customer_name IS NULL
Enter fullscreen mode Exit fullscreen mode

Bad records don't break the pipeline. They go to a quarantine table where we can review them later. The pipeline continues with good data.

This saved us when our source system started sending records with null IDs. Instead of failing the entire pipeline, we quarantined those records and processed everything else. We fixed the source system later and reprocessed the quarantined records.

Monitoring: Know What's Happening

I learned to track three key metrics:

1. Run duration:

context.add_output_metadata({
    "duration_seconds": end_time - start_time,
    "records_processed": len(df)
})
Enter fullscreen mode Exit fullscreen mode

If a job that usually takes 6 seconds suddenly takes 60 seconds, something's wrong.

2. Record counts:

context.add_output_metadata({
    "input_records": len(input_df),
    "output_records": len(output_df),
    "filtered_records": len(input_df) - len(output_df)
})
Enter fullscreen mode Exit fullscreen mode

If we usually process 50 records and suddenly process 5,000, something's wrong.

3. Data quality:

context.add_output_metadata({
    "null_count": df.isnull().sum().sum(),
    "duplicate_count": len(df) - len(df.drop_duplicates())
})
Enter fullscreen mode Exit fullscreen mode

If null counts spike, something's wrong.

These metrics go to a dashboard. I check it every morning. If something looks off, I investigate.

The 3 AM Incident: Resolved

Remember that 3 AM failure from the beginning? Here's how proper orchestration would have prevented it:

Before:

  • Intermediate job failed silently
  • Marts job ran anyway
  • Used stale data
  • Nobody noticed for hours

After (with proper orchestration):

  • Intermediate job fails
  • Dagster stops the pipeline
  • Marts job doesn't run
  • Slack alert: "Pipeline stopped at intermediate layer"
  • Error is visible immediately
  • Fix takes 5 minutes instead of 3 hours

The fix was simple: the job ran out of memory, so I increased the allocation. But I only caught it quickly because of proper orchestration.

The Checklist

If you want reliable pipelines, you need:

Dependency management: Jobs run in the right order

Failure isolation: One failure doesn't cascade

Retry logic: Transient failures resolve automatically

Data quality tests: Catch issues before production

Quarantine pattern: Bad data doesn't break the pipeline

Monitoring: Know what's happening in real-time

Alerting: Get notified when things go wrong

Observability: Debug issues quickly

Without these, you're flying blind.

What We Learned

1. Orchestration is not optional: Cron jobs work for simple tasks. For data pipelines, use a proper orchestrator.

2. Test everything: Schema, relationships, business logic, freshness. If you don't test it, it will break.

3. Fail fast: Better to catch issues early than to produce wrong results.

4. Make debugging easy: When things break (and they will), you need to diagnose quickly.

5. Automate recovery: Retry transient failures. Quarantine bad data. Don't wake humans for things machines can handle.

The Final Architecture

Here's what a complete, production-ready pipeline looks like:

CSV Files
    ↓
[Ingestion Assets]
    ↓ (Dagster orchestration)
[DBT Transformations]
    ├─ Source Layer (raw data)
    ├─ Staging Layer (cleaned data)
    ├─ Snapshots (SCD2 history)
    ├─ Intermediate Layer (joins)
    └─ Marts Layer (analytics)
    ↓
[Data Quality Tests] (40+ tests)
    ↓
[Output Assets]
    ├─ CSV exports
    ├─ Parquet files
    └─ Databricks tables
    ↓
[Quality Report]
    └─ Slack notification
Enter fullscreen mode Exit fullscreen mode

Every step is orchestrated. Every layer is tested. Every failure is caught. Every metric is tracked.

The Results

Here's what changed after we implemented proper orchestration and data quality:

Before:

  • Pipeline failures: 2-3 per week
  • Mean time to detection: 4 hours
  • Mean time to resolution: 2 hours
  • Data quality issues in production: Weekly
  • On-call stress level: High
  • 3 AM wake-ups: Too many

After:

  • Pipeline failures: 1-2 per month
  • Mean time to detection: 2 minutes
  • Mean time to resolution: 15 minutes
  • Data quality issues in production: None in 6 months
  • On-call stress level: Low
  • 3 AM wake-ups: Zero

The pipeline isn't perfect. Things still break. But when they do, I know immediately, and I can fix them quickly. Usually before anyone else even notices.

Closing Thoughts

Building a data pipeline is easy. Building a reliable data pipeline is hard.

The transformations are the easy part. The orchestration, testing, monitoring, and error handling—that's where the real work is.

But it's worth it. Because a pipeline that runs reliably at 3 AM, catches issues before production, and recovers from failures automatically? That's the difference between a script and a production system.

And that's what lets me sleep through the night instead of waking up to Slack alerts.


This is Part 3 of a 3-part series on modern data pipeline architecture.

Part 1: Modern Data Pipelines - Why Five Layers Changed Everything

Part 2: The Day Our Pipeline Went From 10 Minutes to 6 Seconds

Want to see the full code? Check out the GitHub repository with complete source code, documentation, and production metrics.

Tech Stack: Dagster • DBT • DuckDB • Databricks • Python • Docker


What's your worst pipeline incident?

How did you fix it? What lessons did you learn? Drop a comment below—I'd love to hear your war stories! 👇


Top comments (0)