DEV Community

Cover image for Implementing a CDC pipeline with Debezium
Eric Kahindi
Eric Kahindi

Posted on

Implementing a CDC pipeline with Debezium

Introduction

This project is a comprehensive real-time data pipeline that captures, processes, and visualises cryptocurrency market data from Binance. This project leverages Apache Kafka, PostgreSQL, Cassandra, and Grafana to build a scalable, event-driven architecture for financial data processing.

If you need any more info, visit the repository to get the full pictures with all the source files.

Technology Stack

  • Data Ingestion: Python with Binance API
  • Stream Processing: Apache Kafka
  • Change Data Capture: Debezium PostgreSQL Connector
  • Operational Database: PostgreSQL
  • Analytical Database: Apache Cassandra
  • Visualization: Grafana
  • Orchestration: Docker & Docker Compose

Architecture

Data Flow Patterns

Write Path:

Binance API → ETL Service → PostgreSQL Insert
                              ↓
                         Debezium CDC
                              ↓
                         Kafka Topics
                              ↓
                         Cassandra Sink
Enter fullscreen mode Exit fullscreen mode

Read Path:

Cassandra → Grafana Data Sources → Dashboards
Enter fullscreen mode Exit fullscreen mode

Component Interaction Flow

  1. Source Layer (Binance API)

    • Exposes RESTful endpoints for market data
    • No built-in event streaming capability
    • Requires polling via ETL producer
  2. Ingestion Layer (PostgreSQL + Binance ETL)

    • Receives transformed data from API
    • Enables Write-Ahead Logging (WAL) for CDC
    • Maintains logical replication slots for consistency
  3. Streaming Layer (Apache Kafka)

    • Decouples producers from consumers
    • Provides topic-based pub/sub model
    • Enables event replay and stream processing
  4. Persistence Layer (Cassandra)

    • Optimized for time-series analytical queries
    • Distributed storage for fault tolerance
    • Supports high-throughput writes
  5. Visualization Layer (Grafana)

    • Queries both PostgreSQL and Cassandra
    • Real-time dashboard rendering
    • Alert configuration capabilities

Key Components

1. Binance ETL Producer

The ETL Producer is the entry point that orchestrates data collection from Binance. It fetches market data for the top cryptocurrency symbols.
The producer identifies the top 5 crypto gainers in the 24-hour period before fetching detailed data:

def fetch_top5_24hr() -> tuple[list, list]:
    try:
        logging.info(f'Starting to load top 5  data ...')
        response = requests.get(f"https://api.binance.com/api/v3/ticker/24hr")
        response.raise_for_status()
        data = response.json()
        # Filter based on the usdt tethered cryptos  
        usdt_pairs = [
            item for item in data
            if item['symbol'].endswith('USDT') and float(item['quoteVolume']) > 1000000 and float(item['askQty']) > 0.0
            ]
        # sort the data
        sorted_data = sorted(
            usdt_pairs,
            key = lambda x: float(x["priceChangePercent"]),
            reverse=True
        )

        top_5 = [x['symbol'] for x in sorted_data[:5]]

        # push this to the other tasks with xcoms 
        return sorted_data[:5] , top_5
Enter fullscreen mode Exit fullscreen mode

Once the top 5 symbols are identified, the producer fetches and transforms the other 3 types of market data for each of the symbols identified in the list:

  • Kline Data (OHLCV Candlestick Data) - Fetching 5-minute candlestick data for the last 24 hours. Uses interval='5m' to capture granular price movements.
  • Recent Trades (Individual Trade Transactions) - Fetching up to 300 most recent trades. Each trade shows: who traded, at what price, how much, and when.
  • Order Book (Bid/Ask Levels) - Fetching the current order book with up to 300 levels on each side. Returns: {'bids': [[price, qty], ...], 'asks': [[price, qty], ...]}

Then executes the fetch and transform functions in the following main function. It completes in approximately 5-10 seconds.

if __name__ == "__main__":
    # Initialize Cassandra schema
    setup_cassandra()

    # Fetch top 5 gainers (runs once)
    data, top_5 = fetch_top5_24hr()
    transform_load_top5_24hr(data)

    # For each of the top 5 symbols
    for i, item in enumerate(top_5):
        symbol = item['symbol']
        ranking = i + 1

        # Fetch and load klines (25 hours of 5-min data)
        klines = fetch_klines_24hrs(symbol)
        transform_load_klines_data(klines, symbol, ranking)

        # Fetch and load recent trades (300 most recent)
        trades = fetch_recent_trades(symbol)
        transform_load_recent_trades(trades, symbol, ranking)

        # Fetch and load order book (bid/ask levels)
        ob = fetch_OB(symbol)
        transform_load_OB_data(ob, symbol, ranking)
Enter fullscreen mode Exit fullscreen mode

2. PostgreSQL

PostgreSQL serves as the operational database—the first landing zone for all market data. Its role is critical: it must capture every insert with precision for downstream CDC.

Database Initialization

# docker-compose.yml excerpt
  postgres:
    image: debezium/postgres:15
    container_name: postgres
    ports:
      - "5432:5432"
    command: postgres -c wal_level=logical -c max_wal_senders=10 -c max_replication_slots=10
    environment:
      POSTGRES_USER: dbz
      POSTGRES_PASSWORD: dbz
      POSTGRES_DB: cap_stock_db
Enter fullscreen mode Exit fullscreen mode

Critical setting: wal_level=logical enables Logical Replication, which is required for Debezium CDC to function.

3. Debezium PostgreSQL Connector: The CDC Engine

Debezium is the change data capture engine that transforms PostgreSQL into an event publisher.

How CDC Works

  1. Logical Replication Slot — Debezium creates a slot that tracks WAL position
  2. WAL Scanning — Reads Write-Ahead Log sequentially
  3. Event Decoding — Converts log entries to JSON change events
  4. Kafka Publishing — Sends events to topics matching table names

Docker setup

    connect:
    image: quay.io/debezium/connect:3.1
    container_name: connect
    depends_on: [kafka]
    ports: ["8083:8083"]
    environment:
      #Bootstrap
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses
      HOST_NAME: "connect"
      ADVERTISED_HOST_NAME: "connect"
      ADVERTISED_PORT: "8083"
      KAFKA_CONNECT_PLUGIN_PATH: /kafka/connect,/kafka/plugins,/kafka/plugins/kafka-connect-cassandra-sink-1.7.3,/debezium/connect

    volumes:
      - ./plugins/kafka-connect-cassandra-sink-1.7.3:/kafka/connect/kafka-connect-cassandra-sink-1.7.3
Enter fullscreen mode Exit fullscreen mode

Connector Configuration

{
  "name": "cap-stock-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "dbz",
    "database.password": "dbz",
    "database.dbname": "cap_stock_db",
    "topic.prefix": "cap_stock",
    "slot.name": "cap_stock_slot",
    "publication.autocreate.mode": "filtered",
    "table.include.list": "public.kline_data, public.order_book, public.recent_trades, public.top_24hr",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.cap_stock_db"
  }
}
Enter fullscreen mode Exit fullscreen mode

4. Apache Kafka: The Event Streaming Backbone

Kafka acts as the central event hub, decoupling data producers (PostgreSQL via Debezium) from consumers (Cassandra Sink Connector).

Kafka Setup

  zookeeper:
    image: quay.io/debezium/zookeeper:3.1
    container_name: zookeeper
    ports: ["2181:2181"]

  kafka:
    image: quay.io/debezium/kafka:3.1
    container_name: kafka
    depends_on: [zookeeper]
    ports: ["29092:29092"]
    environment:
      KAFKA_BROKER_ID: 1
      ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  akhq:
    image: tchiotludo/akhq:latest
    ports: [8080:8080]
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            local:
              properties:
                bootstrap.servers: "kafka:9092"
              connect:
                - name: "connect"
                  url: "http://connect:8083"
    depends_on: [kafka, connect]

Enter fullscreen mode Exit fullscreen mode

Four topics are created:

Topic Partitions Retention Purpose
cap_stock.public.kline_data 5 7 days Candlestick events
cap_stock.public.recent_trades 5 7 days Trade events
cap_stock.public.order_book 5 7 days Order book snapshots
cap_stock.public.top_24hr 5 7 days 24hr statistics

To verify, navigate to localhost:8080. Here you'll the kafka ui running with all the info you might need

5. Cassandra Database: The Analytical Store

Cassandra is the analytics powerhouse for our dashboard, purpose-built for time-series analytics with massive write throughput and fast range queries.
But before you do anything, don't forget to set up the database first, lest you want to spend hours debugging, why your queries work on the terminal but nowhere else

Cassandra Initialization

def setup_cassandra() -> None:

    logging.basicConfig(
        level=logging.INFO,
        format= '%(asctime)s|%(levelname)s|%(name)s|%(message)s',
        handlers = [
            logging.FileHandler(f"cassandra_setup{datetime.now().strftime('%Y%m%d')}.log"),
            logging.StreamHandler()
        ]
    )

    load_dotenv()

    logging.info('Creating the connection ...')
    # connecting to cassandra 
    cluster = Cluster(['cassandra'])
    session = cluster.connect()

    logging.info('Connected ...')
    logging.info('Creating the keyspace cap_stock ...')
    # create the key space 
    create_keyspace_query = """
    CREATE KEYSPACE IF NOT EXISTS cap_stock WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1 };
    """
    session.execute(create_keyspace_query)

    use_keyspace_query= """
    USE cap_stock;
    """
    session.execute(use_keyspace_query)

    logging.info('using the created keyspace ...')

    logging.info('Creating the tables')

    create_kline_query = """
    CREATE TABLE IF NOT EXISTS cap_stock.kline_data (
    symbol              text,
    k_open_time         timestamp,
    k_close_time        timestamp,
    open                double,
    high                double,
    low                 double,
    close               double,
    volume              double,
    quote_asset_volume  double,
    number_of_trades    int,
    tb_base_volume      double,
    tb_quote_volume     double,
    ranking             int,
    time_collected      timestamp,
    PRIMARY KEY ((symbol), k_open_time)
    ) WITH CLUSTERING ORDER BY (k_open_time DESC)
    AND compaction = {
        'class': 'TimeWindowCompactionStrategy',
        'compaction_window_unit': 'DAYS',
        'compaction_window_size': '1'
    };

    """
    create_OB_query =   """
    CREATE TABLE IF NOT EXISTS cap_stock.order_book (
    symbol          text,
    side            text,       -- 'bids' or 'asks'
    ranking         int,        -- rank of the crypto in top performers
    price           double,
    quantity        double,
    time_collected  timestamp,  -- when this snapshot was taken
    PRIMARY KEY ((symbol, side, time_collected), price)
    ) WITH CLUSTERING ORDER BY (price DESC);

    """

    create_recent_trades_query = """
    CREATE TABLE IF NOT EXISTS cap_stock.recent_trades(
    symbol          text,
    trade_time      timestamp,  
    trade_id        bigint,      
    price           double,
    qty             double,
    quote_qty       double,
    is_buyer_maker  boolean,
    is_best_match   boolean,
    ranking         int,
    time_collected  timestamp,  
    PRIMARY KEY ((symbol), trade_time, trade_id)
    ) WITH CLUSTERING ORDER BY (trade_time DESC, trade_id DESC)
    AND compaction = {
        'class': 'TimeWindowCompactionStrategy',
        'compaction_window_unit': 'HOURS',
        'compaction_window_size': '1'
    };
    """

    create_top_24hrs = """
    CREATE TABLE IF NOT EXISTS cap_stock.top_24hr (
    symbol                 text,
    time_collected         timestamp,    

    price_change           double,
    price_change_percent   double,
    weighted_avg_price     double,
    prev_close_price       double,
    last_price             double,
    last_qty               double,
    bid_price              double,
    bid_qty                double,
    ask_price              double,
    ask_qty                double,
    open_price             double,
    high_price             double,
    low_price              double,
    volume                 double,
    quote_volume           double,

    open_time              timestamp,    
    close_time             timestamp,    
    first_id               bigint,
    last_id                bigint,

    PRIMARY KEY ((symbol), time_collected)
    ) WITH CLUSTERING ORDER BY (time_collected DESC)
    AND compaction = {
        'class': 'TimeWindowCompactionStrategy',
        'compaction_window_unit': 'HOURS',
        'compaction_window_size': '1'
    };
    """
    session.execute(create_top_24hrs)
    session.execute(create_recent_trades_query)
    session.execute(create_OB_query)
    session.execute(create_kline_query)
Enter fullscreen mode Exit fullscreen mode

6. Kafka Connect (The same DBZ connector repurposed)

Kafka Connect is the middleware that runs connectors to move data between systems.

Cassandra Sink Connector Configuration

// register_cassandra.json
{
  "name": "cassandra_sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "1",

    "topics": "cap_stock.public.kline_data, cap_stock.public.recent_trades ,cap_stock.public.top_24hr, cap_stock.public.order_book ",

    "contactPoints": "cassandra",
    "loadBalancing.localDc": "datacenter1",


    "topic.cap_stock.public.kline_data.cap_stock.kline_data.mapping": "symbol=value.symbol,k_open_time=value.k_open_time,k_close_time=value.k_close_time,open=value.open,high=value.high,low=value.low,close=value.close,volume=value.volume,quote_asset_volume=value.quote_asset_volume,number_of_trades=value.number_of_trades,tb_base_volume=value.tb_base_volume,tb_quote_volume=value.tb_quote_volume,ranking=value.ranking,time_collected=value.time_collected",

    "topic.cap_stock.public.recent_trades.cap_stock.recent_trades.mapping":"symbol=value.symbol,trade_time=value.time,trade_id=value.id,price=value.price,qty=value.qty,quote_qty=value.quote_qty,is_buyer_maker=value.is_buyer_maker,is_best_match=value.is_best_match,ranking=value.ranking,time_collected=value.time_collected",

    "topic.cap_stock.public.top_24hr.cap_stock.top_24hr.mapping":"symbol=value.symbol, time_collected=value.time_collected,price_change=value.price_change,price_change_percent=value.price_change_percent,weighted_avg_price=value.weighted_avg_price,prev_close_price=value.prev_close_price,last_price=value.last_price,last_qty=value.last_qty,bid_price=value.bid_price,bid_qty=value.bid_qty,ask_price=value.ask_price,ask_qty=value.ask_qty,open_price=value.open_price,high_price=value.high_price,low_price=value.low_price,volume=value.volume,quote_volume=value.quote_volume,open_time=value.open_time,close_time=value.close_time,first_id=value.first_id,last_id=value.last_id",

    "topic.cap_stock.public.order_book.cap_stock.order_book.mapping": "symbol=value.symbol, side=value.side, ranking=value.ranking, price=value.price, quantity=value.quantity, time_collected=value.time_collected",


    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "transforms":"unwrap",
    "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState"
  }
}
Enter fullscreen mode Exit fullscreen mode

Connector Registration

This is a bash script that basically initiates the connections using the configuration files to ensure that Debezium is listening for changes to record to Kafka, and Kafka Connect is listening for changes to record to Cassandra

Verify the status of the connection at localhost:8080

#!/bin/bash

# Register Postgres and Cassandra connectors to Kafka Connect

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @register-postgres.json

echo "✓ Postgres connector registered"

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @register_cassandra.json

echo "✓ Cassandra connector registered"

Enter fullscreen mode Exit fullscreen mode

7. Grafana: Real-Time Visualisation

Grafana provides the user-facing dashboards that display market data in real-time.

Grafana Configuration

  grafana:
    image: grafana/grafana:main-ubuntu
    container_name: grafana
    environment:
      GF_SECURITY_ADMIN_USER: admin
      GF_SECURITY_ADMIN_PASSWORD: admin
    ports:
      - "3001:3000"
    volumes:
      - ./grafana_data:/var/lib/grafana
    depends_on:
      - postgres
Enter fullscreen mode Exit fullscreen mode

Once you get Grafana running, create a new dashboard, navigate to settings, add a variable symbol, then add panels to create visualisations with the sample queries below
This will allow you to switch between different crypto symbols like so

Sample Dashboard Queries

--- Klines
SELECT symbol,
  k_open_time AS time,
  open,
  high,
  low,
  close,
  volume
FROM cap_stock.kline_data
WHERE symbol = '$symbol'
  AND k_open_time >= ${__from}
  AND k_open_time <= ${__to}


--- Winners at the moment 
SELECT  symbol,
       price_change_percent,
       volume
FROM cap_stock.top_24hr 
WHERE time_collected >= ${__from}
  AND time_collected <= ${__to}
ALLOW FILTERING;

Enter fullscreen mode Exit fullscreen mode

Results

Conclusion: A Data Engineering Success

The Binance Top Crypto Pipeline successfully implements a sophisticated, resilient, and performant data architecture. By strategically combining a relational anchor (PostgreSQL), an immutable log (Kafka), and a distributed analytical store (Cassandra) using Change Data Capture (Debezium).

Top comments (0)