DEV Community

Cover image for How to Data Engineer the ETLFunnel Way
Vivek Burman
Vivek Burman

Posted on

How to Data Engineer the ETLFunnel Way

Part 1 — Idempotency, Retry, and Recovery

Modern data engineering isn't about moving data from A to B — it's about doing it correctly every single time. The deeper a pipeline goes, the more fragile it becomes. A single failed batch can leave datasets half-written, cause duplicates downstream, or silently skip records. The hardest part isn't transformation logic — it's keeping the system consistent when things go wrong. At ETLFunnel, we solve these challenges to ensure reliable, scalable data pipelines for developers and teams.

The Problem: When Retry Becomes Corruption

A failed task is easy to rerun, but doing so safely isn't.
If a data step partially commits results before crashing, the next retry might reprocess the same data again — duplicating rows or overwriting the wrong partitions. Traditional batch pipelines (like simple cron + SQL scripts) have no built-in concept of idempotency — they only know how to rerun.

Retries without idempotency create corruption.
Retries without structured state tracking create blind spots.
Retries without visibility create operational debt.

Dead-letter queues (DLQs) are supposed to help here, but most implementations are crude: a folder of failed records, no schema, no context, no replay mechanism. They collect data but don't provide an actual recovery path.

Data engineers end up writing retry scripts, marking rows manually, or purging partial results — a process that doesn't scale or audit well.


The Solution: Declarative Checkpoints and Backlogs

The fix begins with a change in mindset: a pipeline must remember where it was and record what failed — consistently.

Instead of treating idempotency as a postmortem patch, we treat it as a first-class citizen of execution. Each step carries three key definitions:

Checkpoint Hook— The checkpoint hook receives a *models.CheckpointProps. Read the current context and records from that struct, resolve an auxiliary DB engine via CastAsPostgresDBConnection(...), and persist a compact checkpoint row (pipeline identifier + last-processed id + timestamp). The hook returns a *models.CheckpointTune indicating whether the pipeline should continue or stop. The example below shows a conservative, idempotent upsert into a pipeline_checkpoints table and logs useful events via the provided ILoggerContract.

import (
 "context"
 "fmt"
 "time"

 "etlfunnel/execution/models"

 "github.com/jackc/pgx/v5"
)

// Checkpoint persists last processed ID to Postgres.
func Checkpoint(param *models.CheckpointProps) (*models.CheckpointTune, error) {
 if param == nil || param.Ctx == nil || len(param.Records) == 0 {
  return &models.CheckpointTune{Action: models.ActionContinue}, nil
 }

 ctx := param.Ctx.GetContext()
 logger := param.Logger

 // Extract last ID
 lastRec := param.Records[len(param.Records)-1]
 lastIDRaw, ok := lastRec["id"]
 if !ok {
  logger.Warn("No 'id' field; skipping persist")
  return &models.CheckpointTune{Action: models.ActionContinue}, nil
 }
 lastIDStr := fmt.Sprintf("%v", lastIDRaw)

 // Resolve Postgres connection (auxiliary or fallback to dest)
 var engine models.IDatabaseEngine
 if param.AuxiliaryDBConnMap != nil {
  if e, ok := param.AuxiliaryDBConnMap["checkpoint_db"]; ok {
   engine = e
  }
 }
 if engine == nil {
  engine = param.DestDBConn
 }
 pgConn, err := models.CastAsPostgresDBConnection(engine)
 if err != nil {
  return &models.CheckpointTune{Action: models.ActionStop}, fmt.Errorf("cast failed: %w", err)
 }

 // Ensure table and upsert
 createSQL := `CREATE TABLE IF NOT EXISTS pipeline_checkpoints (
  pipeline_name TEXT PRIMARY KEY, last_id TEXT, updated_at TIMESTAMP WITH TIME ZONE
 )`
 pgConn.Exec(ctx, createSQL)

 pipelineName := getPipelineName(param.Ctx) // Helper to derive name
 upsertSQL := `INSERT INTO pipeline_checkpoints (pipeline_name, last_id, updated_at)
  VALUES ($1, $2, $3) ON CONFLICT (pipeline_name) DO UPDATE SET last_id = EXCLUDED.last_id, updated_at = EXCLUDED.updated_at`
 if _, err := pgConn.Exec(ctx, upsertSQL, pipelineName, lastIDStr, time.Now().UTC()); err != nil {
  logger.Error("Persist failed")
  return &models.CheckpointTune{Action: models.ActionStop}, err
 }

 logger.Info(fmt.Sprintf("Checkpoint for %s at %s", pipelineName, lastIDStr))
 return &models.CheckpointTune{Action: models.ActionContinue}, nil
}
Enter fullscreen mode Exit fullscreen mode

The checkpoint commits after a successful batch, ensuring that the next retry begins exactly from the last consistent point.

Backlog Function — following the same pattern as the checkpoint function. It uses the *models.BacklogProps struct, persists failed records to an auxiliary Postgres DB, and returns a *models.BacklogTune indicating whether to continue or stop the pipeline.

import (
 "context"
 "encoding/json"
 "fmt"
 "time"

 "etlfunnel/execution/models"

 "github.com/jackc/pgx/v5"
)

// Backlog persists failed records to DLQ table.
func Backlog(param *models.BacklogProps) (*models.BacklogTune, error) {
 if param == nil || param.Ctx == nil || len(param.Records) == 0 {
  return &models.BacklogTune{Action: models.ActionContinue}, nil
 }

 ctx := param.Ctx.GetContext()
 logger := param.Logger

 // Resolve Postgres connection
 var engine models.IDatabaseEngine
 if param.AuxiliaryDBConnMap != nil {
  if e, ok := param.AuxiliaryDBConnMap["backlog_db"]; ok {
   engine = e
  }
 }
 if engine == nil {
  engine = param.DestDBConn
 }
 pgConn, err := models.CastAsPostgresDBConnection(engine)
 if err != nil {
  return &models.BacklogTune{Action: models.ActionStop}, fmt.Errorf("cast failed: %w", err)
 }

 // Ensure DLQ table
 createSQL := `CREATE TABLE IF NOT EXISTS pipeline_backlog (
  id SERIAL PRIMARY KEY, pipeline_name TEXT, record_data JSONB, error_time TIMESTAMP WITH TIME ZONE DEFAULT now()
 )`
 pgConn.Exec(ctx, createSQL)

 pipelineName := getPipelineName(param.Ctx)
 for _, rec := range param.Records {
  dataBytes, err := json.Marshal(rec)
  if err != nil {
   logger.Warn(fmt.Sprintf("Marshal failed: %v", err))
   continue
  }

  insertSQL := `INSERT INTO pipeline_backlog (pipeline_name, record_data, error_time) VALUES ($1, $2, $3)`
  if _, err := pgConn.Exec(ctx, insertSQL, pipelineName, dataBytes, time.Now().UTC()); err != nil {
   logger.Error("Insert failed")
   return &models.BacklogTune{Action: models.ActionStop}, err
  }
 }

 logger.Info(fmt.Sprintf("Backlog: %d records for %s", len(param.Records), pipelineName))
 return &models.BacklogTune{Action: models.ActionContinue}, nil
}
Enter fullscreen mode Exit fullscreen mode

Each backlog entry includes structured metadata: the step ID, failure reason, and serialized payload. Failed records are never retried automatically in the main pipeline. Instead, they form the source for separate replay flows.

Separate Replay Flow

Once records are persisted in the backlog, they can be processed as a dedicated flow:

  • This replay flow reads records from the DLQ or auxiliary database.
  • It applies the same transformations and business logic as the original pipeline.
  • It can also include additional validation, sampling, or recovery logic.
  • Progress in the replay flow is tracked independently using its own checkpoints, providing auditable, deterministic recovery.

By separating normal execution from replay, you avoid mixing failure handling with live processing, simplify debugging, and maintain idempotent guarantees across both flows. ETLFunnel automates this replay capability to make recovery seamless for on-premise and cloud deployments.


Why This Works

  • Idempotency by Design: Checkpoints persist logical progress, not just job status.
  • Structured Recovery: Backlogs store typed errors, making them queryable and auditable.
  • Replay as a First-Class Flow: Recovery is explicit and traceable, avoiding accidental reprocessing in the main pipeline.
  • Reviewable Operations: Failures aren't opaque; they're captured with metadata and visible through structured logs or UI.

This approach ensures that every record — successful or failed — has a deterministic, auditable path, while keeping the main pipeline clean and focused.


Ready to build resilient, idempotent ETL pipelines that handle failures gracefully? Visit etlfunnel.com today to sign up for a free trial of our SaaS platform and transform your data engineering workflows.

Top comments (0)