DEV Community

César Fabián CHÁVEZ LINARES
César Fabián CHÁVEZ LINARES

Posted on

1 1

Building Real-Time Data Pipelines with Debezium and Kafka: A Practical Guide

Introduction

In today's data-driven world, maintaining synchronized data across different systems is crucial. Change Data Capture (CDC) has emerged as a powerful pattern for tracking and propagating changes from your database in real-time. In this guide, we'll build a practical example using Debezium and Apache Kafka to create a robust CDC pipeline.

What We'll Build

We'll create a simple e-commerce scenario where order updates in a PostgreSQL database are automatically synchronized with an Elasticsearch instance for real-time search capabilities. This setup demonstrates a common real-world use case for CDC.

Prerequisites

  • Docker and Docker Compose
  • Java 11 or higher
  • Maven
  • Git
  • PostgreSQL client (psql)
  • curl (for testing)

Architecture Overview

Our architecture consists of several components:

  1. PostgreSQL database (source)
  2. Debezium connector
  3. Apache Kafka
  4. Kafka Connect
  5. Elasticsearch (target)
  6. Simple Spring Boot application for testing
graph LR
    A[PostgreSQL] -->|Debezium| B[Kafka Connect]
    B -->|Events| C[Kafka]
    C -->|Sink Connector| D[Elasticsearch]
    E[Spring Boot App] -->|Writes| A
    D -->|Search| E
Enter fullscreen mode Exit fullscreen mode

Implementation Steps

1. Setting Up the Environment

First, let's create our project structure:

mkdir cdc-demo
cd cdc-demo
git init
Enter fullscreen mode Exit fullscreen mode

Create a docker-compose.yml file:

version: '3'
services:
  postgres:
    image: debezium/postgres:13
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=inventory
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    volumes:
      - ./postgres/init:/docker-entrypoint-initdb.d

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  connect:
    image: debezium/connect:2.1
    ports:
      - "8083:8083"
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses
      BOOTSTRAP_SERVERS: kafka:29092
    depends_on:
      - kafka
      - postgres

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
    ports:
      - "9200:9200"
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
Enter fullscreen mode Exit fullscreen mode

2. Creating the Database Schema

Create postgres/init/init.sql:

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER NOT NULL,
    order_date TIMESTAMP NOT NULL,
    status VARCHAR(50) NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL
);

ALTER TABLE orders REPLICA IDENTITY FULL;
Enter fullscreen mode Exit fullscreen mode

3. Configuring Debezium

After starting the containers, configure the Debezium connector:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "inventory",
        "database.server.name": "dbserver1",
        "table.include.list": "public.orders",
        "plugin.name": "pgoutput"
    }
}'
Enter fullscreen mode Exit fullscreen mode

4. Spring Boot Application

Create a new Spring Boot project with the following dependencies:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

Create the Order entity:

@Entity
@Table(name = "orders")
public class Order {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long customerId;
    private LocalDateTime orderDate;
    private String status;
    private BigDecimal totalAmount;

    // Getters, setters, and constructors
}
Enter fullscreen mode Exit fullscreen mode

Create a REST controller for testing:

@RestController
@RequestMapping("/api/orders")
public class OrderController {
    private final OrderRepository orderRepository;

    public OrderController(OrderRepository orderRepository) {
        this.orderRepository = orderRepository;
    }

    @PostMapping
    public Order createOrder(@RequestBody Order order) {
        order.setOrderDate(LocalDateTime.now());
        return orderRepository.save(order);
    }

    @PutMapping("/{id}")
    public Order updateOrder(@PathVariable Long id, @RequestBody Order order) {
        return orderRepository.findById(id)
            .map(existingOrder -> {
                existingOrder.setStatus(order.getStatus());
                existingOrder.setTotalAmount(order.getTotalAmount());
                return orderRepository.save(existingOrder);
            })
            .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND));
    }
}
Enter fullscreen mode Exit fullscreen mode

5. Testing the Pipeline

  1. Start all containers:
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode
  1. Create a test order:
curl -X POST http://localhost:8080/api/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customerId": 1,
    "status": "NEW",
    "totalAmount": 99.99
  }'
Enter fullscreen mode Exit fullscreen mode
  1. Update the order:
curl -X PUT http://localhost:8080/api/orders/1 \
  -H "Content-Type: application/json" \
  -d '{
    "status": "PROCESSING",
    "totalAmount": 99.99
  }'
Enter fullscreen mode Exit fullscreen mode
  1. Check Kafka topics to verify the CDC events:
docker-compose exec kafka kafka-console-consumer \
  --bootstrap-server kafka:29092 \
  --topic dbserver1.public.orders \
  --from-beginning
Enter fullscreen mode Exit fullscreen mode

Common Challenges and Solutions

  1. Data Consistency

    • Use transaction logs for accurate change capture
    • Implement idempotent consumers
    • Handle out-of-order events
  2. Performance Optimization

    • Batch updates when possible
    • Monitor Kafka partition lag
    • Tune PostgreSQL replication slots
  3. Error Handling

    • Implement dead letter queues
    • Set up proper monitoring and alerting
    • Create retry mechanisms

Best Practices

  1. Schema Evolution

    • Use Avro for schema management
    • Plan for backward/forward compatibility
    • Test schema changes thoroughly
  2. Monitoring

    • Track replication lag
    • Monitor Kafka consumer group offsets
    • Set up alerts for connector failures
  3. Security

    • Use SSL/TLS for communication
    • Implement proper authentication
    • Follow least privilege principle

Conclusion

CDC with Debezium and Kafka provides a robust solution for real-time data synchronization. This setup can be extended to handle more complex scenarios like:

  • Multi-region deployment
  • Multiple target systems
  • Complex transformation pipelines
  • High availability requirements

Resources

Billboard image

Monitoring as code

With Checkly, you can use Playwright tests and Javascript to monitor end-to-end scenarios in your NextJS, Astro, Remix, or other application.

Get started now!

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

Discover a treasure trove of wisdom within this insightful piece, highly respected in the nurturing DEV Community enviroment. Developers, whether novice or expert, are encouraged to participate and add to our shared knowledge basin.

A simple "thank you" can illuminate someone's day. Express your appreciation in the comments section!

On DEV, sharing ideas smoothens our journey and strengthens our community ties. Learn something useful? Offering a quick thanks to the author is deeply appreciated.

Okay