DEV Community

Cover image for Tracking User Account Changes in Real-Time: A Debezium and ClickHouse Implementation
Shahab Ranjbary
Shahab Ranjbary

Posted on • Updated on

Tracking User Account Changes in Real-Time: A Debezium and ClickHouse Implementation

Introduction

In modern e-commerce applications, tracking changes to critical data like user account types in real time is essential for business intelligence and regulatory compliance. Traditional databases often overwrite old values, making historical analysis challenging. Enter Change Data Capture (CDC), a technique that captures and stores every change to your data, enabling a comprehensive audit trail and real-time analytics.

Scenario Overview

Imagine you manage a PostgreSQL database for an e-commerce platform where user account types (Bronze, Silver, Gold) frequently change. However, PostgreSQL only retains the latest values, making it difficult to analyze past states of user accounts. To address this, we'll use Debezium and ClickHouse to track and store all account-type changes with timestamps.

Setting Up Your Environment

Getting the project from Git

To get started, clone the postgres-cdc-clickhouse GitHub repository and launch the environment using Docker Compose:

git clone https://github.com/ranjbaryshahab/postgres-cdc-clickhouse
cd postgres-cdc-clickhouse
docker-compose up
Enter fullscreen mode Exit fullscreen mode

Creating the PostgreSQL Table

First, define the users' table in PostgreSQL to store user data and track changes:

CREATE TABLE users (
    user_id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    account_type VARCHAR(20) NOT NULL,
    updated_at TIMESTAMP DEFAULT timezone('UTC', CURRENT_TIMESTAMP),
    created_at TIMESTAMP DEFAULT timezone('UTC', CURRENT_TIMESTAMP)
);

ALTER TABLE public.users REPLICA IDENTITY FULL;
Enter fullscreen mode Exit fullscreen mode

then inserting data:

INSERT INTO users (username, account_type) VALUES
('user1', 'Bronze'),
('user2', 'Silver'),
('user3', 'Gold');
Enter fullscreen mode Exit fullscreen mode

and the result is like:

Postgres Result

Setting Up Debezium Connector

Configure the Debezium PostgreSQL connector in Kafka Connect to capture changes from the users table:

curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "raw_data.shop-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "postgres",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.shop",
        "database.hostname": "postgres",
        "database.password": "postgres",
        "database.port": "5432",
        "database.server.name": "shop",
        "database.user": "postgres",
        "name": "raw_data.shop-connector",
        "plugin.name": "pgoutput",
        "table.include.list": "public.users",
        "tasks.max": "1",
        "topic.creation.default.cleanup.policy": "delete",
        "topic.creation.default.partitions": "1",
        "topic.creation.default.replication.factor": "1",
        "topic.creation.default.retention.ms": "604800000",
        "topic.creation.enable": "true",
        "topic.prefix": "raw_data.shop",
        "database.history.skip.unparseable.ddl": "true",
        "key.converter": " org.apache.kafka.connect.json.JsonConverter",
        "value.converter": " org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "snapshot.mode": "initial"
    }
}'
Enter fullscreen mode Exit fullscreen mode

see the result in Redpanda Console:
Go to http://localhost:9080/topics to see the topics created by the Debezium connector.

Redpanda Console Data

Tracking Changes in Real-Time

Now, let's simulate an update in the users' table to see the change reflected in our Kafka topics and ClickHouse:

-- Update user with ID 1 to change the account type to 'Gold'
UPDATE users
SET account_type = 'Gold', updated_at = timezone('UTC', CURRENT_TIMESTAMP)
WHERE user_id = 1;
Enter fullscreen mode Exit fullscreen mode

When this update is made, Debezium captures the change and produces an event in the Kafka topic. This event contains the previous and new values of the account_type field along with the operation type (u for update).

see the result in Redpanda Console:

Redpanda Console Tracking Data

Storing Changes in ClickHouse

Utilize ClickHouse to store all changes in a dedicated table and materialized view:

Create ClickHouse Database and Table

CREATE DATABASE shop;

CREATE TABLE shop.account_type_switch (
    user_id UInt32,
    username String,
    before_account_type String,
    after_account_type String,
    updated_at DateTime,
    created_at DateTime
) ENGINE = ReplacingMergeTree
ORDER BY (user_id, updated_at)
SETTINGS index_granularity = 8192;
Enter fullscreen mode Exit fullscreen mode

Set Up Kafka Engine Table in ClickHouse

CREATE DATABASE kafka_shop;

CREATE TABLE kafka_shop.kafka__account_type_switch (
    `jsonString` String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker:29092',
        kafka_topic_list = 'raw_data.shop.public.users', 
        kafka_group_name = 'raw_date_clickhouse',
        kafka_format = 'JSONAsString';
Enter fullscreen mode Exit fullscreen mode

Create Materialized View

CREATE MATERIALIZED VIEW kafka_shop.consumer__account_type_switch TO shop.account_type_switch (
    user_id UInt32,
    username String,
    before_account_type String,
    after_account_type String,
    updated_at DateTime,
    created_at DateTime
) AS 
SELECT
    JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'user_id', 'Nullable(UInt32)') AS user_id,
    JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'username', 'Nullable(String)') AS username,
    JSONExtract(JSONExtract(jsonString, 'before', 'Nullable(String)'), 'account_type', 'Nullable(String)') AS before_account_type,
    JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'account_type', 'Nullable(String)') AS after_account_type,
    toDateTime(JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'updated_at', 'Nullable(UInt64)') / 1000000) AS updated_at,
    toDateTime(JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'created_at', 'Nullable(UInt64)') / 1000000) AS created_at
FROM kafka_shop.kafka__account_type_switch
WHERE (before_account_type != after_account_type) AND (JSONExtract(jsonString, 'op', 'Nullable(String)') = 'u');
Enter fullscreen mode Exit fullscreen mode

Verify the data in ClickHouse

ClickHouse Result

Conclusion

Implementing CDC with Debezium and ClickHouse enables your e-commerce platform to maintain a complete user account type change history. This setup empowers detailed auditing, regulatory compliance, and advanced analytics, providing valuable insights into user behavior and business trends.

By following these steps, you can enhance your data management practices and leverage real-time change tracking to drive informed decision-making in your business.

Top comments (0)