DEV Community

Cover image for Stop letting bad data break your production pipelines
Aniket Abhishek Soni
Aniket Abhishek Soni

Posted on

Stop letting bad data break your production pipelines

Two years ago, a bad batch of FX rates from a vendor hit our silver layer, wiped out the historical aggregates, and triggered a margin call for a client. We spent 48 hours manually re-processing files, running DELETE statements on partitioned tables, and praying the S3 lifecycle policies didn't catch our backup snapshots.

Today, that same corrupted file hits the pipeline, the Delta Live Tables (DLT) expectation fails, the records are shunted into a quarantine table, and the pipeline finishes with a "success" status while alerting me to the specific offending rows. No downtime, no manual data surgery, no heart palpitations.

Why I chose this topic: In financial services, "data quality" isn't a suggestion—it's a compliance requirement. I’ve spent too many weekends cleaning up messes that should have been caught at the gate, and I want to save you from the same fate.

It was 3:14 AM on a Tuesday. PagerDuty went off like a siren in my bedroom. The dashboard was bleeding red: the daily_ledger_agg table, which powers our regulatory reporting to the SEC, had hit a null value in a currency code column. This shouldn't have been possible. The schema was enforced, the DDL was solid, yet somehow, a join was producing rows with NULL keys, which cascaded into a NaN in our risk models.

What we saw

The symptoms were deceptive. The job logs showed the Spark cluster scaling perfectly. Memory usage was nominal. The failure wasn't a crash; it was a "silent" corruption.

My first instinct was to blame the upstream merge logic. I spent two hours digging into the Spark plans, convinced we had a partitioning skew that was causing a hash join to drop records. I even bumped the spark.sql.shuffle.partitions from 200 to 1000, thinking it was a resource contention issue. It wasn't. The data was simply bad at the source, and our pipeline—being "resilient"—happily ingested the garbage and propagated it into the downstream analytical tables.

Photo by Artturi Jalli on Unsplash
Photo by Artturi Jalli on Unsplash

Root cause

The root cause was a configuration oversight in our DLT pipeline definition. We were using EXPECT instead of EXPECT_ROW_OR_DROP or EXPECT_ROW_OR_FAIL.

@dlt.table
@dlt.expect("valid_currency", "currency_code IS NOT NULL")
def silver_ledger():
  return dlt.read("bronze_ledger")
Enter fullscreen mode Exit fullscreen mode

In DLT, EXPECT is merely a warning. It logs the violation to the event_log and keeps moving. The pipeline marked the run as "Succeeded," but our downstream dashboard was showing a 12% drift in total balance. We were essentially poisoning our own data lake, one row at a time. The code was "correct," but the intent of the data contract was ignored by the execution engine.

Photo by Alex Oviedo on Unsplash
Photo by Alex Oviedo on Unsplash

The fix

I refactored the pipeline to enforce strict quarantine patterns. I switched to EXPECT_ROW_OR_DROP for non-critical noise and EXPECT_ROW_OR_FAIL for integrity-critical fields. More importantly, I implemented a secondary "quarantine" sink to capture what was dropped.

Here is what the pattern looks like now:

@dlt.table(
    name="silver_ledger",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_currency", "currency_code IS NOT NULL")
def silver_ledger():
    return dlt.read("bronze_ledger")

@dlt.table(
    name="quarantine_ledger"
)
def quarantine_ledger():
    # Capture only the rows that failed the silver_ledger expectations
    return dlt.read("bronze_ledger").filter("currency_code IS NULL")
Enter fullscreen mode Exit fullscreen mode

By separating the "clean" pipeline from the "rejected" stream, we gain visibility. When a row hits the quarantine table, it gets a _processed_at timestamp and an error_reason column. My SRE team now has a dedicated dashboard for this table. If the count(*) in quarantine_ledger > 0, they get a Slack alert. We don't stop the business, but we know exactly which records need manual intervention from the data vendor.

What we changed so it never happens again

We stopped treating data quality as a post-hoc analysis task. We now treat it as a CI/CD contract.

First, we moved all DLT expectation definitions into a central registry (a YAML-based schema config). We no longer hardcode business logic expectations in the Python files. By centralizing these, we can run a "Dry Run" test suite in GitHub Actions that validates the schema against a sample of incoming Parquet files before the pipeline code is even deployed to the Databricks workspace.

Second, we implemented a "Circuit Breaker" pattern. If the number of records in the quarantine table exceeds 5% of the total record count of the batch, the pipeline is configured to fail explicitly. This handles the "silent corruption" case where a vendor sends a file that is 90% garbage.

Third, we moved away from generic expect calls. Every table in our silver layer must have at least three mandatory expectations: not_null, unique_id, and range_check. If a developer forgets these, the pipeline fails the unit test suite in our CI environment. No exceptions.

In a regulated environment, "good enough" is a liability. You either gate the data, or you accept that your production environment is essentially a lottery. Use your quarantine tables, make your expectations explicit, and stop letting bad data hide in your silver layer. It’s not about being pedantic; it’s about knowing your data is actually usable before the regulators come asking for a reconciliation report.


Tags: #data #engineering #dlt #pipelines

Cover photo by Brecht Corbeel on Unsplash.

Top comments (0)