Most data pipeline guides cover the happy path: extract data, transform it, load it to the destination. What they skip is everything that happens when the path isn't happy: the API that returns unexpected data, the transformation that fails partway through, the destination write that times out after writing 400 of 500 records.
This guide is the other half: how to handle errors correctly, how to monitor pipeline health, and how to make your pipeline re-runnable after a failure.
Step 1: Categorize Your Error Types
Before writing error handling code, decide which category an error belongs to. The handling is different for each type.
Transient errors are temporary conditions that resolve themselves: rate limit exceeded, connection timeout, destination temporarily unavailable. These should be retried automatically with exponential backoff. They should not fail the pipeline on first occurrence. After a configurable number of retries, they should escalate to an alert.
Structural errors are problems with the data itself: a required field is null, a value doesn't match the expected type, a foreign key doesn't exist in the destination. These records cannot be processed with the current transformation logic. They should be written to a dead-letter log (with the record content, error type, and timestamp) and skipped. The pipeline should continue processing other records. At the end of the run, alert if structural error count exceeds zero or a defined threshold.
Fatal errors are conditions that make continued execution meaningless: the source system is completely unavailable, authentication has failed, the destination schema has changed in a way that invalidates all records. These should fail the pipeline immediately, log the full context, and alert immediately. Do not attempt to continue.
The most common mistake is using a single broad exception handler that converts structural and fatal errors into transient ones. This produces the silent failure pattern where the pipeline "succeeds" by swallowing exceptions.
Step 2: Implement Retry Logic for Transient Errors
A basic exponential backoff implementation for transient errors:
import time
import random
def with_retry(fn, max_attempts=5, base_delay=1.0, max_delay=60.0):
for attempt in range(max_attempts):
try:
return fn()
except TransientError as e:
if attempt == max_attempts - 1:
raise
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
log.warning(f"Transient error on attempt {attempt + 1}: {e}. Retrying in {delay:.1f}s")
time.sleep(delay)
The jitter (random.uniform(0, 1)) prevents multiple concurrent pipelines from all retrying at the same time, which can amplify load spikes on the source system.
The important detail: define TransientError as a specific exception class (or a set of HTTP status codes: 429, 503, 502) rather than catching all exceptions. Retrying a ValueError or KeyError is not useful and hides bugs.
Step 3: Build a Dead-Letter Log for Structural Errors
Records that fail transformation should not be silently dropped. Write them to a dead-letter log table or file with enough context to investigate:
def process_record(record, dead_letter_log):
try:
transformed = transform(record)
return transformed
except StructuralError as e:
dead_letter_log.write({
"timestamp": utcnow(),
"source_id": record.get("id"),
"error_type": type(e).__name__,
"error_message": str(e),
"raw_record": json.dumps(record),
"pipeline_run_id": current_run_id()
})
return None
The pipeline_run_id is critical for correlating dead-letter records with a specific run when debugging later.
At the end of each run, count dead-letter entries created during the run. If the count exceeds your threshold (zero for a stable pipeline, or a small absolute number), include the count in your run summary and trigger an alert.

Photo by Markus Spiske on Pexels
Step 4: Add Run-Level Metrics Logging
Write a metrics record at the end of each pipeline run. This is separate from error logging - it captures the overall run health:
run_metrics = {
"run_id": run_id,
"pipeline_name": "crm_to_warehouse",
"started_at": start_time,
"completed_at": utcnow(),
"duration_seconds": (utcnow() - start_time).total_seconds(),
"records_extracted": extracted_count,
"records_transformed": transformed_count,
"records_loaded": loaded_count,
"structural_errors": dead_letter_count,
"transient_errors_retried": retry_count,
"status": "success" if dead_letter_count == 0 else "partial"
}
metrics_log.write(run_metrics)
With this log table, you can query:
- "How many records did this pipeline process yesterday vs the historical average?"
- "How many structural errors have accumulated this week?"
- "Which runs took significantly longer than usual?"
These queries are the basis for alerting that doesn't require anyone to look at logs manually.
Step 5: Implement Schema Validation
Schema drift - unexpected changes to the source API's response format - is the most common cause of silent failures that aren't caught by error handling. Add explicit schema validation at the extraction step:
EXPECTED_FIELDS = {"id", "email", "created_at", "updated_at", "status"}
def validate_schema(records):
if not records:
return
sample_keys = set(records[0].keys())
new_fields = sample_keys - EXPECTED_FIELDS
missing_fields = EXPECTED_FIELDS - sample_keys
if new_fields:
log.warning(f"New fields detected in source: {new_fields}")
alert("Schema change detected", details={"new_fields": list(new_fields)})
if missing_fields:
log.error(f"Expected fields missing from source: {missing_fields}")
raise FatalError(f"Required fields missing: {missing_fields}")
New fields are a warning (the API added something, you might want to include it). Missing required fields are a fatal error (the API removed something your transformation depends on).
"A pipeline that validates its input schema and alerts on changes costs almost nothing to build. But it's the single most valuable defensive measure you can add, because schema changes in source systems are the failure mode that catches teams off guard most consistently." - Dennis Traina, founder of 137Foundry
Step 6: Make the Pipeline Re-Runnable
A pipeline that can be safely re-run after a partial failure is worth significantly more than one that can't. Two properties make this possible:
Checkpointing. Store the high-water mark after each successful batch write. If the pipeline fails, restart from the last checkpoint rather than from the beginning. The checkpoint is typically a timestamp or sequence number stored in a persistent store (a database row, a file, a cache entry).
Idempotent loads. Use upsert semantics at the destination rather than insert. An upsert with a unique key (customer ID, order number, record hash) ensures that re-running a batch doesn't create duplicate records. This interacts with checkpointing: if your checkpoint has any overlap window (you re-process the last N records for safety), upserts ensure the overlap records are updated rather than duplicated.
Setting Up Alerting
With run metrics logging in place, alerting is straightforward. Three alert conditions cover most critical failures:
-
Pipeline didn't run. Alert if no run has completed within
expected_interval * 1.5of the last successful run. -
Record count anomaly. Alert if
records_extractedis more than 30% below the historical average for this pipeline and time window. -
Structural errors above threshold. Alert if
structural_errors > 0(or your defined threshold).
These three conditions catch the vast majority of real pipeline failures without requiring manual monitoring.
How to Build an ETL Pipeline for Business Data Syncing covers the extraction and load design that this error handling layer builds on top of - incremental extraction, idempotent upserts, and checkpoint management in an integrated design.
For help building data pipelines with these operational properties from the start, https://137foundry.com works with businesses on both architecture and implementation. The data integration services cover the full pipeline lifecycle, including the monitoring setup that makes pipelines trustworthy in production.
For pipeline implementation, Python with SQLAlchemy is the standard stack for custom ETL with relational databases. PostgreSQL handles both pipeline operational state (dead-letter tables, run logs) and destination storage. For orchestration-level error handling and retry policies, Apache Airflow provides per-task retry configuration and failure branching.

Photo by panumas nikhomkhai on Pexels
Top comments (0)