Introduction
In modern e-commerce applications, tracking changes to critical data like user account types in real time is essential for business intelligence and regulatory compliance. Traditional databases often overwrite old values, making historical analysis challenging. Enter Change Data Capture (CDC), a technique that captures and stores every change to your data, enabling a comprehensive audit trail and real-time analytics.
Scenario Overview
Imagine you manage a PostgreSQL database for an e-commerce platform where user account types (Bronze, Silver, Gold) frequently change. However, PostgreSQL only retains the latest values, making it difficult to analyze past states of user accounts. To address this, we'll use Debezium and ClickHouse to track and store all account-type changes with timestamps.
Setting Up Your Environment
Getting the project from Git
To get started, clone the postgres-cdc-clickhouse GitHub repository and launch the environment using Docker Compose:
git clone https://github.com/ranjbaryshahab/postgres-cdc-clickhouse
cd postgres-cdc-clickhouse
docker-compose up
Creating the PostgreSQL Table
First, define the users' table in PostgreSQL to store user data and track changes:
CREATE TABLE users (
user_id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
account_type VARCHAR(20) NOT NULL,
updated_at TIMESTAMP DEFAULT timezone('UTC', CURRENT_TIMESTAMP),
created_at TIMESTAMP DEFAULT timezone('UTC', CURRENT_TIMESTAMP)
);
ALTER TABLE public.users REPLICA IDENTITY FULL;
then inserting data:
INSERT INTO users (username, account_type) VALUES
('user1', 'Bronze'),
('user2', 'Silver'),
('user3', 'Gold');
and the result is like:
Setting Up Debezium Connector
Configure the Debezium PostgreSQL connector in Kafka Connect to capture changes from the users table:
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "raw_data.shop-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "postgres",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.shop",
"database.hostname": "postgres",
"database.password": "postgres",
"database.port": "5432",
"database.server.name": "shop",
"database.user": "postgres",
"name": "raw_data.shop-connector",
"plugin.name": "pgoutput",
"table.include.list": "public.users",
"tasks.max": "1",
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.partitions": "1",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.retention.ms": "604800000",
"topic.creation.enable": "true",
"topic.prefix": "raw_data.shop",
"database.history.skip.unparseable.ddl": "true",
"key.converter": " org.apache.kafka.connect.json.JsonConverter",
"value.converter": " org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"snapshot.mode": "initial"
}
}'
see the result in Redpanda Console:
Go to http://localhost:9080/topics to see the topics created by the Debezium connector.
Tracking Changes in Real-Time
Now, let's simulate an update in the users' table to see the change reflected in our Kafka topics and ClickHouse:
-- Update user with ID 1 to change the account type to 'Gold'
UPDATE users
SET account_type = 'Gold', updated_at = timezone('UTC', CURRENT_TIMESTAMP)
WHERE user_id = 1;
When this update is made, Debezium captures the change and produces an event in the Kafka topic. This event contains the previous and new values of the account_type field along with the operation type (u for update).
see the result in Redpanda Console:
Storing Changes in ClickHouse
Utilize ClickHouse to store all changes in a dedicated table and materialized view:
Create ClickHouse Database and Table
CREATE DATABASE shop;
CREATE TABLE shop.account_type_switch (
user_id UInt32,
username String,
before_account_type String,
after_account_type String,
updated_at DateTime,
created_at DateTime
) ENGINE = ReplacingMergeTree
ORDER BY (user_id, updated_at)
SETTINGS index_granularity = 8192;
Set Up Kafka Engine Table in ClickHouse
CREATE DATABASE kafka_shop;
CREATE TABLE kafka_shop.kafka__account_type_switch (
jsonString
String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker:29092',
kafka_topic_list = 'raw_data.shop.public.users',
kafka_group_name = 'raw_date_clickhouse',
kafka_format = 'JSONAsString';
Create Materialized View
CREATE MATERIALIZED VIEW kafka_shop.consumer_account_type_switch TO shop.account_type_switch (
user_id UInt32,
username String,
before_account_type String,
after_account_type String,
updated_at DateTime,
created_at DateTime
) AS
SELECT
JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'user_id', 'Nullable(UInt32)') AS user_id,
JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'username', 'Nullable(String)') AS username,
JSONExtract(JSONExtract(jsonString, 'before', 'Nullable(String)'), 'account_type', 'Nullable(String)') AS before_account_type,
JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'account_type', 'Nullable(String)') AS after_account_type,
toDateTime(JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'updated_at', 'Nullable(UInt64)') / 1000000) AS updated_at,
toDateTime(JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'created_at', 'Nullable(UInt64)') / 1000000) AS created_at
FROM kafka_shop.kafka_account_type_switch
WHERE (before_account_type != after_account_type) AND (JSONExtract(jsonString, 'op', 'Nullable(String)') = 'u');
Verify the data in ClickHouse
Conclusion
Implementing CDC with Debezium and ClickHouse enables your e-commerce platform to maintain a complete user account type change history. This setup empowers detailed auditing, regulatory compliance, and advanced analytics, providing valuable insights into user behavior and business trends.
By following these steps, you can enhance your data management practices and leverage real-time change tracking to drive informed decision-making in your business.
Top comments (0)