Manual polling is an anti-pattern; stream the truth directly from the source.
What We're Building
We are constructing a robust Change Data Capture (CDC) pipeline using Go. This system watches the Write-Ahead Log (WAL) of a PostgreSQL instance, captures row-level changes, transforms the payload into a standard domain model, and emits it to a downstream consumer.
The architecture relies on logical replication rather than physical log scanning to handle schema changes gracefully. The system must handle connection resets and maintain exactly-once semantics relative to the upstream store without duplicating processing work.
The high-level data flow looks like this:
+----------------+ +----------------+ +----------------+
| PostgreSQL |---->| CDC Reader |---->| Downstream |
| WAL Stream | | (Go Service) | | System |
+----------------+ +----------------+ +----------------+
^ | ^
| v |
+-----+------------+ | +----------+-------+
| Failover / | | | Offset Storage |
| Offset Replay | | | (Kafka/Table) |
+------------------+ | +-------------------+
Step 1 — Establish Replication Slot
The first step is initializing a logical replication slot on the source database. Without this, the backend may discard committed records during server restarts, leading to data loss. We establish the connection using the binary protocol to prepare for WAL decoding.
conn, err := pgxpool.Connect(ctx, dsn)
if err != nil {
log.Fatal(err)
}
slotName := "cdc_reader_slot"
slot, err := conn.Exec(ctx, `SELECT pg_create_logical_replication_slot($1, 'pgoutput')`, slotName)
if err != nil {
log.Fatal(err)
}
// Retrieve latest WAL location to start streaming
This ensures the CDC reader captures every single change, including schema evolution signals, rather than relying on implicit polling intervals which introduce latency.
Step 2 — Start WAL Stream
Once the slot is created, we must subscribe to the stream using a specific position. We use CopyFrom with the wal command to receive the raw change events. In production, this stream is non-blocking; we read a buffer of events rather than waiting indefinitely for a single row.
walStream := conn.Pgconn().StartLogicalReplication(ctx, pglogical.NewSlot(0, slotName, conn))
if err != nil {
log.Fatal(err)
}
for {
msg := walStream.Next()
if msg.Status == pglogical.ReplicationMessage {
processChange(msg.Data)
}
}
This pattern decouples the ingestion rate from the database commit speed, allowing the downstream system to buffer and process data at its own capacity.
Step 3 — Decode Change Payloads
Raw WAL messages are binary and contain specific flags for insertion, updates, and deletions. We decode these payloads into a generic event struct. We strip the original SQL identity and extract only the necessary columns for the consumer.
type ChangeEvent struct {
TableName string
Type string // INSERT, UPDATE, DELETE
RowData map[string]interface{}
}
func processChange(data []byte) *ChangeEvent {
// Decode binary payload based on replication protocol
event := &ChangeEvent{TableName: "users", Type: "UPDATE"}
// Logic to parse row data into map
return event
}
Choosing binary parsing over simple string replacement ensures we handle complex data types like arrays and JSON correctly without triggering database errors.
Step 4 — Transform and Filter
We must often enrich raw data before emitting it. For example, we might join with a reference table to convert a foreign key user_id into an email. We also implement a filter to drop events from tables irrelevant to the target consumer, saving bandwidth and compute.
func transformEvent(event *ChangeEvent) *ChangeEvent {
// Add metadata like timestamp and source
event.Source = "postgres_main"
if event.Type == "DELETE" {
event.RowData = map[string]interface{}{"id": event.RowData["id"]}
}
return event
}
This step adds context that makes the event self-describing, reducing the need for the consumer to maintain state about which table a record came from.
Step 5 — Emit with Backpressure
Sending the event is the final step. If the downstream service is slow, we must buffer. We use a channel or a queue (like kafka-producer or grpc-stream) to emit data. We handle backpressure by pausing ingestion if the send channel fills up, preventing the reader from consuming too much memory.
func emit(event *ChangeEvent) error {
// Simulate sending to downstream consumer
go func() {
select {
case downstream.C <- *event:
// Sent
case <-time.After(100 * time.Millisecond):
// Apply backpressure or drop
}
}()
return nil
}
Handling backpressure gracefully prevents the CDC reader from consuming excessive CPU cycles when the consumer is temporarily slow or down.
Key Takeaways
- WAL Logs: Physical or logical WAL streams provide atomic visibility into database changes that polling cannot match.
- Replication Slots: They prevent the backend from replaying WAL entries to new readers, ensuring isolation and data integrity.
- Binary Parsing: Decoding binary formats ensures correct handling of complex data types and schema metadata without SQL ambiguity.
- Backpressure: Always respect consumer speed; buffering prevents system overload and memory spikes during traffic bursts.
Next Steps
- Read Designing Data-Intensive Applications (Kleppmann) (Ch. 8) to understand consistency models like eventually consistent replication.
- Review A Philosophy of Software Design (Ousterhout) (Ch. 2) to manage complexity when adding transformation logic to data pipelines.
- Implement offset storage in your system to allow for manual recovery or checkpointing after restarts.
This guide focuses on practical patterns for reliable data pipelines. Part of the Architecture Patterns series.
Top comments (0)