DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

CDC Replication Toolkit: CDC Replication Guide

CDC Replication Guide

A comprehensive guide to setting up and operating change data capture pipelines with the CDC Replication Toolkit.

By Datanest Digital


Table of Contents

  1. What Is CDC?
  2. Debezium Setup
  3. Pipeline Architecture
  4. Configuration Walkthrough
  5. Schema Mapping
  6. Exactly-Once Delivery
  7. Schema Evolution
  8. Monitoring & Alerting
  9. Troubleshooting

What Is CDC?

Change Data Capture (CDC) is a pattern that tracks row-level changes (inserts, updates, deletes) in a source database and streams them to downstream systems. Instead of periodic full-table loads, CDC provides near-real-time replication with minimal source impact.

Why CDC over batch ETL?

Aspect Batch ETL CDC
Latency Minutes to hours Seconds to minutes
Source load High (full scans) Minimal (log reading)
Data freshness Stale between loads Near real-time
Delete detection Requires diff logic Native support
Network traffic Entire table each run Only changes

Debezium Setup

Debezium is the most widely-used open-source CDC connector. It reads database transaction logs (WAL, binlog, CDC tables) and publishes events to Kafka.

Prerequisites

  • Apache Kafka cluster (or Confluent Cloud / MSK)
  • Debezium Connect instance (Kafka Connect with Debezium plugins)
  • Source database with logical replication enabled

PostgreSQL Example

  1. Enable logical replication in postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
Enter fullscreen mode Exit fullscreen mode
  1. Create a replication user:
CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD 'secret';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
Enter fullscreen mode Exit fullscreen mode
  1. Deploy Debezium connector:
{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "db-host",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "mydb",
    "topic.prefix": "dbserver1",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot"
  }
}
Enter fullscreen mode Exit fullscreen mode

MySQL Example

Enable binlog in my.cnf:

server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
Enter fullscreen mode Exit fullscreen mode

SQL Server Example

Enable CDC on the database and tables:

EXEC sys.sp_cdc_enable_db;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL;
Enter fullscreen mode Exit fullscreen mode

Pipeline Architecture

Source DB → Debezium → Kafka → Spark Structured Streaming → Delta Lake
             (log)    (events)   (parse + transform)       (MERGE INTO)
Enter fullscreen mode Exit fullscreen mode

The toolkit processes events in this order:

  1. DebeziumParser reads raw Kafka JSON values and extracts the envelope fields (op, before, after, source metadata)
  2. SchemaMapper converts source types to Delta-compatible PySpark types and renames columns to snake_case
  3. MergeApplier applies changes via MERGE INTO — inserts, updates, and deletes in a single atomic operation
  4. OffsetManager tracks Kafka offsets and database LSN positions for resumability
  5. ReplicationMonitor records throughput, lag, and error metrics

Configuration Walkthrough

Main config (configs/replication_config.yaml)

source:
  type: postgres                    # Database type
  kafka_bootstrap_servers: "..."    # Kafka brokers
  topic_prefix: "dbserver1"         # Debezium topic prefix
  tables:                           # Tables to replicate
    - "public.orders"

target:
  catalog: "main"                   # Unity Catalog catalog
  schema: "bronze"                  # Target schema

streaming:
  checkpoint_location: "/mnt/checkpoints/cdc"
  trigger_interval: "30 seconds"    # Micro-batch interval
Enter fullscreen mode Exit fullscreen mode

Source mappings (configs/source_mappings/*.yaml)

Type mapping files define how source database types convert to PySpark types:

type_mappings:
  - source_types: ["varchar", "text", "char"]
    target_type: "string"
  - source_types: ["int4", "integer"]
    target_type: "integer"
Enter fullscreen mode Exit fullscreen mode

Schema Mapping

The SchemaMapper handles three transformations:

1. Column name conversion

Source columns in camelCase or PascalCase are automatically converted to snake_case:

  • firstNamefirst_name
  • OrderIDorder_id
  • HTTPSEnabledhttps_enabled

2. Type conversion

Each source database type maps to a PySpark equivalent. See configs/source_mappings/ for the full mappings. Common conversions:

PostgreSQL MySQL SQL Server PySpark
varchar varchar nvarchar StringType
int4 int int IntegerType
timestamp datetime datetime2 TimestampType
boolean bit bit BooleanType
jsonb json nvarchar(max) StringType

3. Computed columns

The mapper adds metadata columns automatically:

  • _ingested_at — timestamp when the row was processed
  • _source_system — source database type (e.g. "postgres")
  • _row_hash — SHA-256 hash of all columns for change detection

Exactly-Once Delivery

CDC pipelines must guarantee that every change is applied exactly once to the target. The toolkit achieves this through:

Spark checkpointing

Structured Streaming checkpoints track which Kafka offsets have been processed. On restart, processing resumes from the last committed checkpoint.

Idempotent MERGE

The MergeApplier uses MERGE INTO which is inherently idempotent — re-processing the same event produces the same result because the merge condition matches on primary keys.

Deduplication

Within a micro-batch, the applier deduplicates events by key column, keeping only the latest event (by source_ts_ms). This handles cases where Kafka delivers duplicate messages.

Offset tracking

The OffsetManager persists Kafka offsets and database LSN positions to a Delta table, providing an additional resumability layer independent of Spark checkpoints.


Schema Evolution

When source tables change (new columns, type changes), CDC pipelines need to handle it gracefully.

Adding columns

Debezium automatically includes new columns in the after image. The toolkit's from_json parsing will set unknown columns to null until the target schema is updated.

Recommended approach:

  1. Add the column to the target Delta table: ALTER TABLE main.bronze.orders ADD COLUMN new_col STRING
  2. Update the source mapping YAML if needed
  3. The next batch will populate the new column automatically

Removing columns

Removed source columns will appear as null in the Debezium after image. No action needed unless you want to drop the column from the target.

Type changes

Type changes require careful handling. Use SchemaMapper.validate_schema_compatibility() to detect mismatches before they cause pipeline failures.


Monitoring & Alerting

Metrics table

The ReplicationMonitor writes per-batch metrics to main.cdc_meta.replication_metrics:

Column Description
table_name Source table
event_count Events in the batch
duration_ms Processing time
errors Error count
events_per_second Computed throughput
recorded_at Timestamp

Key metrics to watch

  • Replication lag — time since last successful batch per table
  • Throughput — events processed per second
  • Error rate — percentage of batches with errors
  • Offset gap — difference between latest Kafka offset and committed offset

Alert thresholds

Configure in replication_config.yaml:

monitoring:
  max_lag_seconds: 300       # 5 minutes
  max_error_rate_percent: 5  # 5% of batches
Enter fullscreen mode Exit fullscreen mode

Use the check_alerts() method or the Replication Status notebook for automated monitoring.


Troubleshooting

Pipeline not processing events

  1. Check Kafka connectivity: verify kafka_bootstrap_servers is reachable
  2. Verify topic exists: kafka-topics.sh --list --bootstrap-server broker:9092
  3. Check Debezium connector status: curl localhost:8083/connectors/postgres-connector/status
  4. Review Spark streaming query status in the Databricks UI

High replication lag

  1. Increase max_offsets_per_trigger to process more events per batch
  2. Decrease trigger_interval for more frequent processing
  3. Scale up the Databricks cluster
  4. Check for slow MERGE operations (large target tables may need Z-ORDER optimisation)

Schema mismatch errors

  1. Run SchemaMapper.validate_schema_compatibility() to identify mismatches
  2. Update the target table schema to match
  3. Update source mapping YAML if type conversions have changed

Duplicate records in target

  1. Verify key_columns are correctly configured for each table
  2. Ensure deduplication is enabled in MergeConfig.enable_dedup
  3. Check that the merge condition uses all primary key columns

By Datanest Digital | Version 1.0.0 | $49


This is 1 of 11 resources in the Data Pipeline Pro toolkit. Get the complete [CDC Replication Toolkit] with all files, templates, and documentation for $49.

Get the Full Kit →

Or grab the entire Data Pipeline Pro bundle (11 products) for $169 — save 30%.

Get the Complete Bundle →


Related Articles

Top comments (0)