DEV Community

longtk26
longtk26

Posted on

Debezium: CDC in Practice

If you've read the Overview CDC post, you already know why Change Data Capture is such a compelling pattern. Now it's time to get our hands dirty. In this guide, we'll walk through a complete, working Debezium setup — from standing up Kafka and Debezium Connect with Docker, all the way to watching live database changes flow into Elasticsearch in real time. Let's go! 🛠️


Table of Contents


What is Debezium?

Debezium is an open-source project that provides a low-latency data streaming platform for Change Data Capture (CDC). It connects to your database, monitors its transaction log (e.g., PostgreSQL's Write-Ahead Log), and emits a stream of change events for every row-level modification.

Your applications then consume these events and react accordingly — syncing to a search index, invalidating a cache, updating a data warehouse, or triggering downstream workflows.

Key characteristics:

  • Open source under the Apache 2.0 license
  • Built on top of Kafka Connect — battle-tested, scalable, and extensible
  • Supports many popular databases: PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, and more
  • Guarantees at-least-once delivery of change events with configurable offset management

Setup: PostgreSQL Connector for Tracking Database Changes

Step 1 – Create the Docker Network

All services in this setup share a common Docker bridge network. Create it once before starting any containers:

docker network create base-network
Enter fullscreen mode Exit fullscreen mode

Step 2 – Prepare the Custom Debezium Connect Image

We need a custom Debezium Connect image that includes the Confluent Elasticsearch Sink Connector plugin. Create a Dockerfile inside a folder named connect/:

FROM debezium/connect:3.0.0.Final

# Install the Confluent Elasticsearch Sink Connector for Kafka Connect.

ARG ELASTICSEARCH_SINK_VERSION=15.1.1

RUN set -eux; \

    curl -fsSL "https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/${ELASTICSEARCH_SINK_VERSION}/confluentinc-kafka-connect-elasticsearch-${ELASTICSEARCH_SINK_VERSION}.zip" -o /tmp/es-sink.zip; \

    mkdir -p /kafka/connect/confluentinc-kafka-connect-elasticsearch; \

    unzip -q /tmp/es-sink.zip -d /kafka/connect/confluentinc-kafka-connect-elasticsearch; \

    rm -f /tmp/es-sink.zip
Enter fullscreen mode Exit fullscreen mode

Note: If you don't need Elasticsearch, skip the custom image and use the official debezium/connect:3.0.0.Final image directly in your Compose file.

Step 3 – Prepare the Docker Compose File

Create a docker-compose.yml at the project root with the following content:

version: '3.9'
services:
  kafka-ui:
    container_name: kafka-ui
    image: ghcr.io/kafbat/kafka-ui:latest
    ports:
      - "8070:8080"
    depends_on:
      - kafka
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      DYNAMIC_CONFIG_ENABLED: 'true'
    networks:
      - base-network

  kafka:
    image: apache/kafka:4.0.2
    container_name: kafka
    ports:
      - "9092:9092"
      - "9094:9094"
    environment:

      ############################################
      # KRaft Metadata & Node Identity
      ############################################

      # Unique ID for this Kafka node
      KAFKA_NODE_ID: 1

      # Unique Kafka cluster identifier (required for KRaft mode)
      KAFKA_CLUSTER_ID: 'energy-tracker-cluster-1'

      # Node roles — this instance acts as BOTH a broker and a controller
      KAFKA_PROCESS_ROLES: 'broker,controller'

      ############################################
      # Controller Quorum Configuration (Raft)
      ############################################

      # List of controller nodes that form the Raft quorum
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'

      # Listener name used by controllers for internal Raft communication
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'

      ############################################
      # Network Listeners (Internal, External & Controller)
      ############################################

      # Define all listener endpoints for this Kafka node
      KAFKA_LISTENERS: >
        PLAINTEXT://0.0.0.0:9092,
        EXTERNAL://0.0.0.0:9094,
        CONTROLLER://0.0.0.0:9093

      # What each listener advertises to connecting clients
      # Internal (Docker containers): kafka:9092
      # External (host machine): localhost:9094
      KAFKA_ADVERTISED_LISTENERS: >
        PLAINTEXT://kafka:9092,
        EXTERNAL://localhost:9094

      # Maps listener names to their security protocols
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >
        CONTROLLER:PLAINTEXT,
        PLAINTEXT:PLAINTEXT,
        EXTERNAL:PLAINTEXT

      # Which listener brokers use for internal broker-to-broker communication
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'

      ############################################
      # Single-Node Cluster Safety Settings
      # (replication factor > 1 would fail on a single node)
      ############################################
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

      ############################################
      # Broker Defaults & Quality-of-Life Settings
      ############################################

      # Speed up consumer group startup
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

      # Auto-create topics if they don't already exist
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

      # Default number of partitions per topic
      KAFKA_NUM_PARTITIONS: 1

    networks:
      - base-network

  debezium-connect:
    build:
      context: ./connect
    container_name: debezium-connect
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - CONFIG_STORAGE_REPLICATION_FACTOR=1
      - OFFSET_STORAGE_REPLICATION_FACTOR=1
      - STATUS_STORAGE_REPLICATION_FACTOR=1
      - REST_ADVERTISED_HOST_NAME=debezium-connect
      - KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
      - VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - VALUE_CONVERTER_SCHEMAS_ENABLE=false
      - PLUGIN_PATH=/kafka/connect,/usr/share/java,/usr/share/confluent-hub-components
    depends_on:
      - kafka
    networks:
      - base-network

  debezium-ui:
    container_name: debezium-ui
    image: debezium/debezium-ui
    ports:
      - "8080:8080"
    environment:
      - KAFKA_CONNECT_URIS=http://debezium-connect:8083
    depends_on:
      - debezium-connect
    networks:
      - base-network

networks:
  base-network:
    external: true
Enter fullscreen mode Exit fullscreen mode

Service summary:

Service Role
debezium-connect Hosts the Kafka Connect worker. Connectors registered here read the PostgreSQL WAL and publish change events to Kafka topics.
debezium-ui A web UI that simplifies connector management by wrapping the Debezium Connect REST API.
kafka The message broker that durably stores change event streams. Runs in KRaft mode (no Zookeeper required).
kafka-ui A web UI for browsing Kafka topics, messages, and consumer groups — invaluable for debugging.

Step 4 – Start the Services

docker compose up -d --build
Enter fullscreen mode Exit fullscreen mode

Once the containers are healthy:

  • Debezium UIhttp://localhost:8080
  • Kafka UIhttp://localhost:8070

Step 5 – Configure the PostgreSQL Connector via Debezium UI

Before creating the connector, make sure you have a users table in your PostgreSQL database.

Open Debezium UI at http://localhost:8080 and follow the steps shown in the screenshots below to create a new PostgreSQL source connector:

  1. Click Create Connector

  1. Choose the PostgreSQL connector type

  1. Fill in the basic connection settings (hostname, port, database name, credentials)

  1. Configure advanced settings (table filter, plugin name: pgoutput)

  1. Validate the configuration and finish

  1. Confirm the connector is running

Now try inserting a record into the users table and open Kafka UI at http://localhost:8070 to see the change event appear on the Kafka topic:


Setup: Synchronization from PostgreSQL to Elasticsearch

With Debezium already streaming changes to Kafka, adding Elasticsearch as a sync target is straightforward — we just need to:

  1. Start an Elasticsearch (+ Kibana) stack
  2. Register a sink connector that reads from the Kafka topic and writes to an Elasticsearch index

Step 1 – Start Elasticsearch and Kibana

Create a separate docker-compose-elk.yml file:

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.15
    container_name: elasticsearch
    ports:
      - "9200:9200"
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
    networks:
      - base-network

  kibana:
    image: docker.elastic.co/kibana/kibana:7.17.15
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    depends_on:
      - elasticsearch
    networks:
      - base-network

networks:
  base-network:
    external: true
Enter fullscreen mode Exit fullscreen mode

Start the stack:

docker compose -f docker-compose-elk.yml up -d --build
Enter fullscreen mode Exit fullscreen mode
  • Elasticsearchhttp://localhost:9200
  • Kibanahttp://localhost:5601

Step 2 – Register the Connectors

Note: Make sure the users table exists in your PostgreSQL database before registering the connectors.

Use the following curl commands to register both connectors via the Debezium Connect REST API.

Elasticsearch Sink Connector — reads the users Kafka topic and writes to the users Elasticsearch index:

curl --location 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "users",
    "connection.url": "http://elasticsearch:9200",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "schema.ignore": "true",
    "key.ignore": "false",

    "write.method": "upsert",
    "behavior.on.null.values": "delete",

    "transforms": "extractKey,removeDeleted",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "id",

    "transforms.removeDeleted.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.removeDeleted.blacklist": "__deleted",

    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-users",
    "errors.deadletterqueue.topic.replication.factor": "1"
  }
}'
Enter fullscreen mode Exit fullscreen mode

PostgreSQL Source Connector — reads WAL changes from PostgreSQL and publishes them to the users Kafka topic:

curl --location 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgresdb",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "template",
    "topic.prefix": "users_table",
    "table.include.list": "public.users",
    "plugin.name": "pgoutput",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "transforms": "route,unwrap",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "users_table.public.users",
    "transforms.route.replacement": "users",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite"
  }
}'
Enter fullscreen mode Exit fullscreen mode

Step 3 – Verify the Pipeline

After registering both connectors, check their status in Debezium UI and verify that a consumer group has been created in Kafka UI:

Now run some INSERT, UPDATE, or DELETE statements on the users table in PostgreSQL and follow the data through the pipeline:

1. Run a search in Kibana DevTools before inserting any data:

2. Insert a new record into the users table:

3. Check the users topic in Kafka UI — the change event should appear:

4. Query the users index in Elasticsearch via Kibana DevTools — the new document should be there:

That's the full pipeline working end-to-end: PostgreSQL → Debezium → Kafka → Elasticsearch. 🎉


Reference

Wrapping Up

You've just set up a fully functional CDC pipeline with Debezium! From a single row update in PostgreSQL, a change event flows through Kafka and lands in Elasticsearch — all automatically, with zero application code involved.

This is just the beginning. From here you can explore:

  • Adding more sink connectors (e.g., a data warehouse, another database)
  • Schema evolution strategies using a schema registry
  • Monitoring connector health and lag in production

Thank you so much for following along! I hope this guide saved you hours of trial and error. If you haven't read it yet, the Overview CDC post provides great background context on the concepts behind what we built here. Happy streaming! 🚀

Top comments (0)