DEV Community

Dylan Dumont
Dylan Dumont

Posted on

Change Data Capture: Streaming Database Changes to Downstream Systems

Manual polling is an anti-pattern; stream the truth directly from the source.

What We're Building

We are constructing a robust Change Data Capture (CDC) pipeline using Go. This system watches the Write-Ahead Log (WAL) of a PostgreSQL instance, captures row-level changes, transforms the payload into a standard domain model, and emits it to a downstream consumer.

The architecture relies on logical replication rather than physical log scanning to handle schema changes gracefully. The system must handle connection resets and maintain exactly-once semantics relative to the upstream store without duplicating processing work.

The high-level data flow looks like this:

+----------------+     +----------------+     +----------------+
|  PostgreSQL    |---->|   CDC Reader   |---->|  Downstream    |
|   WAL Stream   |     |   (Go Service) |     |   System       |
+----------------+     +----------------+     +----------------+
         ^                    |                    ^
         |                    v                    |
   +-----+------------+        |        +----------+-------+
   |  Failover /      |        |        |  Offset Storage  |
   |  Offset Replay   |        |        |  (Kafka/Table)   |
   +------------------+        |        +-------------------+
Enter fullscreen mode Exit fullscreen mode

Step 1 — Establish Replication Slot

The first step is initializing a logical replication slot on the source database. Without this, the backend may discard committed records during server restarts, leading to data loss. We establish the connection using the binary protocol to prepare for WAL decoding.

conn, err := pgxpool.Connect(ctx, dsn)
if err != nil {
    log.Fatal(err)
}
slotName := "cdc_reader_slot"
slot, err := conn.Exec(ctx, `SELECT pg_create_logical_replication_slot($1, 'pgoutput')`, slotName)
if err != nil {
    log.Fatal(err)
}
// Retrieve latest WAL location to start streaming
Enter fullscreen mode Exit fullscreen mode

This ensures the CDC reader captures every single change, including schema evolution signals, rather than relying on implicit polling intervals which introduce latency.

Step 2 — Start WAL Stream

Once the slot is created, we must subscribe to the stream using a specific position. We use CopyFrom with the wal command to receive the raw change events. In production, this stream is non-blocking; we read a buffer of events rather than waiting indefinitely for a single row.

walStream := conn.Pgconn().StartLogicalReplication(ctx, pglogical.NewSlot(0, slotName, conn))
if err != nil {
    log.Fatal(err)
}
for {
    msg := walStream.Next()
    if msg.Status == pglogical.ReplicationMessage {
        processChange(msg.Data)
    }
}
Enter fullscreen mode Exit fullscreen mode

This pattern decouples the ingestion rate from the database commit speed, allowing the downstream system to buffer and process data at its own capacity.

Step 3 — Decode Change Payloads

Raw WAL messages are binary and contain specific flags for insertion, updates, and deletions. We decode these payloads into a generic event struct. We strip the original SQL identity and extract only the necessary columns for the consumer.

type ChangeEvent struct {
    TableName string
    Type      string // INSERT, UPDATE, DELETE
    RowData   map[string]interface{}
}

func processChange(data []byte) *ChangeEvent {
    // Decode binary payload based on replication protocol
    event := &ChangeEvent{TableName: "users", Type: "UPDATE"}
    // Logic to parse row data into map
    return event
}
Enter fullscreen mode Exit fullscreen mode

Choosing binary parsing over simple string replacement ensures we handle complex data types like arrays and JSON correctly without triggering database errors.

Step 4 — Transform and Filter

We must often enrich raw data before emitting it. For example, we might join with a reference table to convert a foreign key user_id into an email. We also implement a filter to drop events from tables irrelevant to the target consumer, saving bandwidth and compute.

func transformEvent(event *ChangeEvent) *ChangeEvent {
    // Add metadata like timestamp and source
    event.Source = "postgres_main"
    if event.Type == "DELETE" {
        event.RowData = map[string]interface{}{"id": event.RowData["id"]}
    }
    return event
}
Enter fullscreen mode Exit fullscreen mode

This step adds context that makes the event self-describing, reducing the need for the consumer to maintain state about which table a record came from.

Step 5 — Emit with Backpressure

Sending the event is the final step. If the downstream service is slow, we must buffer. We use a channel or a queue (like kafka-producer or grpc-stream) to emit data. We handle backpressure by pausing ingestion if the send channel fills up, preventing the reader from consuming too much memory.

func emit(event *ChangeEvent) error {
    // Simulate sending to downstream consumer
    go func() {
        select {
        case downstream.C <- *event:
            // Sent
        case <-time.After(100 * time.Millisecond):
            // Apply backpressure or drop
        }
    }()
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Handling backpressure gracefully prevents the CDC reader from consuming excessive CPU cycles when the consumer is temporarily slow or down.

Key Takeaways

  • WAL Logs: Physical or logical WAL streams provide atomic visibility into database changes that polling cannot match.
  • Replication Slots: They prevent the backend from replaying WAL entries to new readers, ensuring isolation and data integrity.
  • Binary Parsing: Decoding binary formats ensures correct handling of complex data types and schema metadata without SQL ambiguity.
  • Backpressure: Always respect consumer speed; buffering prevents system overload and memory spikes during traffic bursts.

Next Steps

This guide focuses on practical patterns for reliable data pipelines. Part of the Architecture Patterns series.

Top comments (0)