Part 3 — Checkpoint and Backlog: Building Visibility Into Your Data Pipeline
In Part 1, we covered idempotency, retries, and recovery — the foundational mechanisms that make pipelines resilient to failure.
In Part 2, we explored dynamic orchestration — how to scale ETL workloads intelligently based on available hardware and data volume.
But here's the critical question we haven't answered yet:
When your pipeline runs successfully, how do you track what actually committed?
When it fails, where did the failures go, and how do you replay them?
That's where checkpoint and backlog hooks come in — the twin pillars of pipeline observability and incident management.
The Problem
You've built a robust ETL pipeline. It extracts data from Postgres, transforms it, and loads it into Elasticsearch. Everything works… until you need to answer these questions:
- "Which records were successfully committed in the last run?"
- "Did the pipeline skip any records during transformation?"
- "What happens when a destination write fails?"
- "Can I replay failed records without reprocessing the entire dataset?"
Without structured tracking, you're flying blind. Success and failure both happen in a black box. You have logs, sure — but logs are unstructured, hard to query, and don't give you a recovery path.
This is where checkpoint and backlog hooks transform your pipeline from a fire-and-forget script into a fully auditable, recoverable data system.
The Concept: Checkpoint and Backlog Hooks
Checkpoint hooks fire automatically after every successful write to the destination. They answer the question: "What just committed?"
Backlog hooks fire automatically when a write to the destination fails. They answer the question: "What just failed, and how do I handle it?"
Together, they provide:
- Data lineage tracking — every committed record is logged with metadata.
- Incident management — every failed record is persisted to a structured dead-letter queue (DLQ) with failure context.
- Replay capability — backlogged records can be reprocessed as a separate flow, independent of the main pipeline.
Let's see how this works in practice.
The Implementation
Checkpoint Hook: Tracking Success
A checkpoint hook is invoked every time data is successfully written to the destination. It receives:
- The committed record(s)
- Access to source, destination, and auxiliary databases
- The pipeline execution context
Here's a real implementation:
import (
"etlfunnel/execution/models"
"etlfunnel/database/cast"
"encoding/json"
"time"
)
func Checkpoint(param *models.CheckpointProps) (*models.CheckpointTune, error) {
if param == nil || param.Ctx == nil || len(param.Records) == 0 {
return &models.CheckpointTune{Action: models.ActionContinue}, nil
}
param.Logger.Info("Checkpoint triggered", zap.Int("record_count", len(param.Records)))
// Cast auxiliary MySQL connection for audit logging
mysqlConn, err := cast.CastAsMySQLDBConnection(param.AuxiliaryDBConnMap["mysql"])
if err != nil {
param.Logger.Error("Failed to cast MySQL connection", zap.Error(err))
return nil, err
}
// Process each committed record
for _, record := range param.Records {
auditEntry := map[string]interface{}{
"pipeline_name": param.Ctx.GetName(),
"record_id": record["id"],
"commit_timestamp": time.Now().UTC(),
"source_table": record["_source_table"],
"destination_table": record["_destination_table"],
"processing_status": "committed",
}
recordJSON, _ := json.Marshal(record)
// Persist to audit log
query := `
INSERT INTO pipeline_audit_log
(pipeline_name, record_id, commit_timestamp, record_data, processing_status)
VALUES (?, ?, ?, ?, ?)
`
_, err = mysqlConn.Exec(query,
auditEntry["pipeline_name"],
auditEntry["record_id"],
auditEntry["commit_timestamp"],
string(recordJSON),
auditEntry["processing_status"],
)
if err != nil {
param.Logger.Error("Failed to write audit log", zap.Error(err))
return nil, err
}
}
// Update pipeline statistics
updatePipelineStats(mysqlConn, param.Ctx.GetName(), len(param.Records))
return &models.CheckpointTune{Action: models.ActionContinue}, nil
}
func updatePipelineStats(conn *client.Conn, pipelineName string) {
query := `
INSERT INTO pipeline_stats (pipeline_name, last_commit_time, record_count)
VALUES (?, ?, 1)
ON DUPLICATE KEY UPDATE
last_commit_time = VALUES(last_commit_time),
record_count = record_count + 1
`
conn.Exec(query, pipelineName, time.Now().UTC())
}
What does this give you?
- Full audit trail — every committed record is logged with timestamp, source, destination, and full payload.
- Pipeline metrics — track throughput, last commit time, and record count per pipeline.
- Event-driven workflows — trigger alerts or downstream processes for specific record types (e.g., high-value transactions).
Backlog Hook: Handling Failure
A backlog hook is invoked when a write to the destination fails. It receives:
- The failed record(s)
- Access to auxiliary databases for DLQ storage
- The pipeline execution context
Here's how you implement it:
import (
"etlfunnel/execution/models"
"etlfunnel/database/cast"
"encoding/json"
"time"
)
func Backlog(param *models.BacklogProps) (*models.BacklogTune, error) {
if param == nil || param.Ctx == nil || len(param.Records) == 0 {
return &models.BacklogTune{Action: models.ActionContinue}, nil
}
param.Logger.Error("Write failure detected", zap.Int("failed_record_count", len(param.Records)))
// Cast auxiliary MySQL connection for DLQ storage
mysqlConn, err := cast.CastAsMySQLDBConnection(param.AuxiliaryDBConnMap["mysql"])
if err != nil {
param.Logger.Error("Failed to cast MySQL connection for backlog", zap.Error(err))
return nil, err
}
// Process each failed record
for _, record := range param.Records {
recordJSON, _ := json.Marshal(record)
// Persist failed record to DLQ
query := `
INSERT INTO failed_records
(pipeline_name, record_id, record_data, failure_timestamp, retry_count, status)
VALUES (?, ?, ?, ?, 0, 'pending')
`
_, err = mysqlConn.Exec(query,
param.Ctx.GetName(),
record["id"],
string(recordJSON),
time.Now().UTC(),
)
if err != nil {
param.Logger.Error("Failed to store backlog record", zap.Error(err))
return nil, err
}
}
// Update failure statistics
updateFailureStats(mysqlConn, param.Ctx.GetName(), len(param.Records))
param.Logger.Info("Records successfully backlogged. Continuing pipeline.",
zap.Int("backlogged_count", len(param.Records)))
return &models.BacklogTune{Action: models.ActionContinue}, nil
}
func updateFailureStats(conn *client.Conn, pipelineName string, failureCount int) {
query := `
INSERT INTO pipeline_failure_stats (pipeline_name, last_failure_time, failure_count)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
last_failure_time = VALUES(last_failure_time),
failure_count = failure_count + VALUES(failure_count)
`
conn.Exec(query, pipelineName, time.Now().UTC(), failureCount)
}
What does this give you?
- Structured DLQ — failed records are stored in a queryable table, not just scattered in logs.
- Failure metrics — track failure rates, last failure time, and failure count per pipeline.
- Incident triage — identify and prioritize critical failures for immediate attention.
- Safe continuation — the main pipeline continues running even when individual writes fail.
The Replay Flow: Turning Failures Into Recovery
Here's where it all comes together.
Failed records in the failed_records table don't just sit there. You can build a separate replay flow that:
- Reads records from the DLQ (with
status = 'pending') - Applies the same transformations as the main pipeline
- Attempts to rewrite to the destination
- Updates the DLQ status to
'resolved'or'failed_again' - Tracks replay progress with its own checkpoints
This is a first-class recovery mechanism — not an ad-hoc script, but a structured flow with the same guarantees as your main pipeline.
Why This Matters
Checkpoint and backlog hooks transform your pipeline from a black box into a glass box.
You get:
- Full visibility — every committed record is tracked. Every failed record is captured.
- Auditable lineage — trace any record from source to destination, with timestamps and metadata.
- Deterministic recovery — replay failed records without reprocessing the entire dataset.
- Operational confidence — know exactly what succeeded, what failed, and what's pending.
This is the difference between pipelines that move data and pipelines that move data reliably.
Ready to Build Auditable, Recoverable ETL Pipelines? Visit etlfunnel.com today to sign up for a free trial of our SaaS platform and transform your data engineering workflows.
Top comments (0)