CDC Replication Guide
A comprehensive guide to setting up and operating change data capture pipelines with the CDC Replication Toolkit.
Table of Contents
- What Is CDC?
- Debezium Setup
- Pipeline Architecture
- Configuration Walkthrough
- Schema Mapping
- Exactly-Once Delivery
- Schema Evolution
- Monitoring & Alerting
- Troubleshooting
What Is CDC?
Change Data Capture (CDC) is a pattern that tracks row-level changes (inserts, updates, deletes) in a source database and streams them to downstream systems. Instead of periodic full-table loads, CDC provides near-real-time replication with minimal source impact.
Why CDC over batch ETL?
| Aspect | Batch ETL | CDC |
|---|---|---|
| Latency | Minutes to hours | Seconds to minutes |
| Source load | High (full scans) | Minimal (log reading) |
| Data freshness | Stale between loads | Near real-time |
| Delete detection | Requires diff logic | Native support |
| Network traffic | Entire table each run | Only changes |
Debezium Setup
Debezium is the most widely-used open-source CDC connector. It reads database transaction logs (WAL, binlog, CDC tables) and publishes events to Kafka.
Prerequisites
- Apache Kafka cluster (or Confluent Cloud / MSK)
- Debezium Connect instance (Kafka Connect with Debezium plugins)
- Source database with logical replication enabled
PostgreSQL Example
-
Enable logical replication in
postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
- Create a replication user:
CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD 'secret';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
- Deploy Debezium connector:
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db-host",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "mydb",
"topic.prefix": "dbserver1",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot"
}
}
MySQL Example
Enable binlog in my.cnf:
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
SQL Server Example
Enable CDC on the database and tables:
EXEC sys.sp_cdc_enable_db;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL;
Pipeline Architecture
Source DB → Debezium → Kafka → Spark Structured Streaming → Delta Lake
(log) (events) (parse + transform) (MERGE INTO)
The toolkit processes events in this order:
- DebeziumParser reads raw Kafka JSON values and extracts the envelope fields (op, before, after, source metadata)
- SchemaMapper converts source types to Delta-compatible PySpark types and renames columns to snake_case
-
MergeApplier applies changes via
MERGE INTO— inserts, updates, and deletes in a single atomic operation - OffsetManager tracks Kafka offsets and database LSN positions for resumability
- ReplicationMonitor records throughput, lag, and error metrics
Configuration Walkthrough
Main config (configs/replication_config.yaml)
source:
type: postgres # Database type
kafka_bootstrap_servers: "..." # Kafka brokers
topic_prefix: "dbserver1" # Debezium topic prefix
tables: # Tables to replicate
- "public.orders"
target:
catalog: "main" # Unity Catalog catalog
schema: "bronze" # Target schema
streaming:
checkpoint_location: "/mnt/checkpoints/cdc"
trigger_interval: "30 seconds" # Micro-batch interval
Source mappings (configs/source_mappings/*.yaml)
Type mapping files define how source database types convert to PySpark types:
type_mappings:
- source_types: ["varchar", "text", "char"]
target_type: "string"
- source_types: ["int4", "integer"]
target_type: "integer"
Schema Mapping
The SchemaMapper handles three transformations:
1. Column name conversion
Source columns in camelCase or PascalCase are automatically converted to snake_case:
-
firstName→first_name -
OrderID→order_id -
HTTPSEnabled→https_enabled
2. Type conversion
Each source database type maps to a PySpark equivalent. See configs/source_mappings/ for the full mappings. Common conversions:
| PostgreSQL | MySQL | SQL Server | PySpark |
|---|---|---|---|
varchar |
varchar |
nvarchar |
StringType |
int4 |
int |
int |
IntegerType |
timestamp |
datetime |
datetime2 |
TimestampType |
boolean |
bit |
bit |
BooleanType |
jsonb |
json |
nvarchar(max) |
StringType |
3. Computed columns
The mapper adds metadata columns automatically:
-
_ingested_at— timestamp when the row was processed -
_source_system— source database type (e.g. "postgres") -
_row_hash— SHA-256 hash of all columns for change detection
Exactly-Once Delivery
CDC pipelines must guarantee that every change is applied exactly once to the target. The toolkit achieves this through:
Spark checkpointing
Structured Streaming checkpoints track which Kafka offsets have been processed. On restart, processing resumes from the last committed checkpoint.
Idempotent MERGE
The MergeApplier uses MERGE INTO which is inherently idempotent — re-processing the same event produces the same result because the merge condition matches on primary keys.
Deduplication
Within a micro-batch, the applier deduplicates events by key column, keeping only the latest event (by source_ts_ms). This handles cases where Kafka delivers duplicate messages.
Offset tracking
The OffsetManager persists Kafka offsets and database LSN positions to a Delta table, providing an additional resumability layer independent of Spark checkpoints.
Schema Evolution
When source tables change (new columns, type changes), CDC pipelines need to handle it gracefully.
Adding columns
Debezium automatically includes new columns in the after image. The toolkit's from_json parsing will set unknown columns to null until the target schema is updated.
Recommended approach:
- Add the column to the target Delta table:
ALTER TABLE main.bronze.orders ADD COLUMN new_col STRING - Update the source mapping YAML if needed
- The next batch will populate the new column automatically
Removing columns
Removed source columns will appear as null in the Debezium after image. No action needed unless you want to drop the column from the target.
Type changes
Type changes require careful handling. Use SchemaMapper.validate_schema_compatibility() to detect mismatches before they cause pipeline failures.
Monitoring & Alerting
Metrics table
The ReplicationMonitor writes per-batch metrics to main.cdc_meta.replication_metrics:
| Column | Description |
|---|---|
table_name |
Source table |
event_count |
Events in the batch |
duration_ms |
Processing time |
errors |
Error count |
events_per_second |
Computed throughput |
recorded_at |
Timestamp |
Key metrics to watch
- Replication lag — time since last successful batch per table
- Throughput — events processed per second
- Error rate — percentage of batches with errors
- Offset gap — difference between latest Kafka offset and committed offset
Alert thresholds
Configure in replication_config.yaml:
monitoring:
max_lag_seconds: 300 # 5 minutes
max_error_rate_percent: 5 # 5% of batches
Use the check_alerts() method or the Replication Status notebook for automated monitoring.
Troubleshooting
Pipeline not processing events
- Check Kafka connectivity: verify
kafka_bootstrap_serversis reachable - Verify topic exists:
kafka-topics.sh --list --bootstrap-server broker:9092 - Check Debezium connector status:
curl localhost:8083/connectors/postgres-connector/status - Review Spark streaming query status in the Databricks UI
High replication lag
- Increase
max_offsets_per_triggerto process more events per batch - Decrease
trigger_intervalfor more frequent processing - Scale up the Databricks cluster
- Check for slow MERGE operations (large target tables may need Z-ORDER optimisation)
Schema mismatch errors
- Run
SchemaMapper.validate_schema_compatibility()to identify mismatches - Update the target table schema to match
- Update source mapping YAML if type conversions have changed
Duplicate records in target
- Verify
key_columnsare correctly configured for each table - Ensure deduplication is enabled in
MergeConfig.enable_dedup - Check that the merge condition uses all primary key columns
By Datanest Digital | Version 1.0.0 | $49
This is 1 of 11 resources in the Data Pipeline Pro toolkit. Get the complete [CDC Replication Toolkit] with all files, templates, and documentation for $49.
Or grab the entire Data Pipeline Pro bundle (11 products) for $169 — save 30%.
Top comments (0)