Imagine we have a Customer Relationship Management (CRM), sales, and inventory systems. As every system's responsibility is specific, these systems only store domain-specific data in their database.
Now, assume we need to generate some reports. Data is scattered in multiple databases; therefore, we must somehow gather data in one place.
One idea can be to make API requests to each system and combine them later. It is definitely a bad idea to communicate systems synchronously. The good news is we can use an asynchronous Event-Driven approach.
Our goal is to keep all service data in the reporting service so that it does not need to ask for related service data. To achieve this, we can publish events when data, e.g., CRM service's customer-related info, is created/updated/deleted, and the reporting service consumes the event and syncs it.
Although it seems a good idea, it might be a headache when a service has a lot of tables and data gets changed using a lot of internal APIs. For simplicity, imagine our inventory service has Product
, Location
, Supplier
, and Stock
tables. We have CREATE
, UPDATE
and DELETE
APIs for each table. Therefore, we need to publish the data change event from each API handler. On top of that, what if we have some scripts that we execute manually to change data? We also need to notify the reporting service about changes.
Change data capture
In databases, change data capture is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data. - Wikipedia
Light-bulb moment, right? We are basically trying to build a system to capture every data change so that the reporting service always has the updated data.
Then I found a beautiful tool Debezium.
Debezium
Just like the Watchers in the Marvel Comics universe, who observe and record events throughout the universe, Debezium watches the database, keeps track of changes, and makes it easy to send those changes to other systems in real time.
Whenever a new row is added, a row is updated, or a row is deleted, Debezium notices it immediately. It then packages up these changes and sends them as a continuous stream of events by leveraging the power of Apache Kafka.
PoC implementation
In our scenario, we need to integrate the Debezium in each service so that events get published continually for each row-level change. If there is a consumer in the reporting service, it can easily get the data and store it in its database.
But for a PoC project, we will assume there is only one service, and that service will consume the change. Although it is funny, we are going to build it. As my favourite database is PostgreSQL, I will use it.
Project setup
Before diving into depth, let’s give a tiny piece of info that all codes are available on GitHub, which is a NestJS-based service.
Debezium has a nice tutorial, and I will basically follow it. But I won’t use ZooKeeper as KRaft is production-ready, and Kafka 4.0 will remove ZooKeeper entirely in 2024. Moreover, I will use a docker-compose file to spin up the required containers in the simplest way ever.
version: '3.7'
networks:
cdc-using-debezium-network:
name: cdc-using-debezium-network
driver: bridge
external: false
services:
cdc-using-debezium-postgres:
image: debezium/postgres:11
container_name: cdc-using-debezium-postgres
hostname: cdc-using-debezium-postgres
restart: always
ports:
- '5443:5432'
environment:
POSTGRES_PASSWORD: 123
POSTGRES_USER: postgres
POSTGRES_DB: cdc-using-debezium
volumes:
- 'cdc-using-debezium-postgres-data:/var/lib/postgresql/data'
networks:
- cdc-using-debezium-network
cdc-using-debezium-kafka:
image: bitnami/kafka:3.4
container_name: cdc-using-debezium-kafka
hostname: cdc-using-debezium-kafka
restart: always
ports:
- '9092:9092'
environment:
KAFKA_CFG_NODE_ID: 1
KAFKA_KRAFT_CLUSTER_ID: q0k00yjQRaqWmAAAZv955w # base64 UUID
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: INTERNAL://cdc-using-debezium-kafka:29092,CONTROLLER://cdc-using-debezium-kafka:29093,EXTERNAL://0.0.0.0:9092
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://cdc-using-debezium-kafka:29092,EXTERNAL://localhost:9092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@cdc-using-debezium-kafka:29093
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
networks:
- cdc-using-debezium-network
cdc-using-debezium-connect:
image: debezium/connect:2.3
container_name: cdc-using-debezium-connect
hostname: cdc-using-debezium-connect
restart: always
ports:
- '8083:8083'
environment:
BOOTSTRAP_SERVERS: cdc-using-debezium-kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
ENABLE_DEBEZIUM_SCRIPTING: 'true'
links:
- cdc-using-debezium-kafka
- cdc-using-debezium-postgres
networks:
- cdc-using-debezium-network
volumes:
cdc-using-debezium-postgres-data:
name: cdc-using-debezium-postgres-data
driver: local
The above docker-compose.yml
file defines and configures a set of Docker services that work together to set up a development environment. It's being set up with PostgreSQL
as the source database, Kafka
as the message broker, and Debezium Connect
as the connector. It also defines a custom network that will be used to communicate between services.
All the required services are up and running. We just get confirmed by executing the docker-compose ps
command. However, we need to verify a few specifications.
| Name | State | Ports |
| --------------------------- | ----- | ------------------------------------------------------------ |
| cdc-using-debezium-postgres | Up | 0.0.0.0:5443->5432/tcp,:::5443->5432/tcp |
| cdc-using-debezium-kafka | Up | 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp |
| cdc-using-debezium-connect | Up | 0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 8778/tcp, 9092/tcp |
Verify PostgreSQL
By running the below command, let’s check the Write-Ahead Logging (WAL) value.
docker exec cdc-using-debezium-postgres psql --username=postgres --dbname=cdc-using-debezium --command='SHOW wal_level'
The value should be logical
. Otherwise, the Debezium won’t manage to capture the data change (e.g., when the value is replica
).
One more configuration needs to be checked.
docker exec cdc-using-debezium-postgres psql --username=postgres --dbname=cdc-using-debezium --command='SHOW shared_preload_libraries'
The above command should return decoderbufs,wal2json
.
Don’t worry; our Docker setup already did it for us. By the way, to get detailed information about those specific configs, we can check the official documentation from here, and here. The mentioned articles also explain what happens behind capturing data changes. For us, it is just magic, but it’s the beauty of engineering. Worth reading!
Database setup
Our most simple DB will have only one table to keep users’ data, and the User
table will have id
, email
, and name
columns.
The query to create the table can be found in my Github repository.
Verify Kafka
By making an API request, we ensure the status of the Kafka.
curl localhost:8083 | jq '.'
I used jq just to prettify the JSON. Anyway, it will give the following response.
{
"version": "3.4.0",
"commit": "2e1947d240607d53",
"kafka_cluster_id": "q0k00yjQRaqWmAAAZv955w"
}
Notice the cluster ID? It is the exact same value we defined in the docker-compose.yml
file.
Deploying PostgreSQL connector
We are ready to deploy the Debezium PostgreSQL connector to start monitoring the cdc-using-debezium
database.
The below curl command essentially sends a request to the Debezium Connect to create a connector that captures changes from a PostgreSQL database and publishes them to Kafka topics.
# IP = $(hostname -I | cut -d ' ' -f 1)
curl --location 'http://localhost:8083/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
"name": "cdc-using-debezium-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.1.110",
"database.port": "5443",
"database.user": "postgres",
"database.password": "123",
"database.dbname": "cdc-using-debezium",
"database.server.id": "184054",
"table.include.list": "public.User",
"topic.prefix": "cdc-using-debezium-topic"
}
}'
In the above, 192.168.1.110
is my machine’s IP address, so please replace this with your IP address. Anyway, the Debezium tutorial explains the above configuration that we send as the API payload; that’s why I am not going to repeat it here.
Verify Kafka
server=localhost:9092
docker exec cdc-using-debezium-kafka /opt/bitnami/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server $server describe --status
docker exec cdc-using-debezium-kafka /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $server --list
The first command is to get the status of the Kafka metadata quorum. The latter retrieves and displays a list of all the Kafka topics within the specified Kafka cluster, allowing us to see the names of all the topics in the Kafka cluster.
By running the Kafka Topics command, we should see the topic cdc-using-debezium-topic.public.User
. The topic name is the combination of values of topic.prefix
and table.include.list
that we send as the API payload while creating the connector.
Monitoring database changes
Now, we can capture every row-level change of the User
table. Let’s execute some queries.
INSERT INTO "User" (email, name)
VALUES ('ehasan+1@firecrackervocabulary.com', CONCAT('name_', (random() * 1000)::INTEGER::VARCHAR))
ON CONFLICT (email) DO UPDATE SET name = EXCLUDED.name
RETURNING *;
INSERT INTO "User" (email, name)
VALUES ('ehasan+2@firecrackervocabulary.com', 'name_2');
UPDATE "User"
SET name = 'name_20'
WHERE email = 'ehasan+2@firecrackervocabulary.com';
DELETE
FROM "User"
WHERE email = 'ehasan+2@firecrackervocabulary.com';
Let's execute the Kafka-provided command to see all published events.
docker exec cdc-using-debezium-kafka /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdc-using-debezium-topic.public.User --from-beginning | jq '.'
Let's paste the generated output here, but as the produced output contains a lot of info, I am going to put curated data only. Don’t worry, a sample full JSON can be found here.
[
{
"payload": {
"before": null,
"after": {
"id": "ad9c4382-db22-4635-a647-a13de4c5bdce",
"email": "ehasan+1@firecrackervocabulary.com",
"name": "name_882"
},
"source": {
"db": "cdc-using-debezium",
"table": "User"
},
"op": "c",
"ts_ms": 1693046762871
}
},
{
"payload": {
"before": null,
"after": {
"id": "4f259f0d-434d-4c73-90de-ee20f5635a3f",
"email": "ehasan+2@firecrackervocabulary.com",
"name": "name_2"
},
"op": "c",
"ts_ms": 1693046762873
}
},
{
"payload": {
"before": {
"id": "4f259f0d-434d-4c73-90de-ee20f5635a3f",
"email": "ehasan+2@firecrackervocabulary.com",
"name": "name_2"
},
"after": {
"id": "4f259f0d-434d-4c73-90de-ee20f5635a3f",
"email": "ehasan+2@firecrackervocabulary.com",
"name": "name_20"
},
"op": "u",
"ts_ms": 1693046762874
}
},
{
"payload": {
"before": {
"id": "4f259f0d-434d-4c73-90de-ee20f5635a3f",
"email": "ehasan+2@firecrackervocabulary.com",
"name": "name_20"
},
"after": null,
"op": "d",
"ts_ms": 1693046762874
}
}
]
The JSON above represents a sequence of database changes – creations, updates, and deletions – in the "User" table. Cool!
An important note: We might not get the previous data (within the before
key) when executing an UPDATE
or a DELETE
query if PostgreSQL’s REPLICA IDENTITY
is DEFAULT
. So, we need to change it to FULL
by executing the below query.
ALTER TABLE "User"
REPLICA IDENTITY FULL;
Implementing consumer
As we see, Kafka is publishing events for any changes, so our task is to subscribe to the event in the reporting service.
I will use the KafkaJS library to achieve it quickly. Let’s just paste the code here.
import { Injectable, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common';
import { Consumer, Kafka } from 'kafkajs';
enum OperationType {
c = 'CREATE',
u = 'UPDATE',
d = 'DELETE',
}
@Injectable()
export class KafkaService implements OnApplicationShutdown, OnApplicationBootstrap {
private readonly kafka: Kafka;
private consumer: Consumer;
constructor() {
this.kafka = new Kafka({
brokers: [`localhost:9092`],
});
}
async onApplicationBootstrap(): Promise<void> {
await this.listen();
}
async onApplicationShutdown(): Promise<void> {
await this.consumer.disconnect();
}
private async listen(): Promise<void> {
const consumer = this.kafka.consumer({
groupId: 'console-consumer-24440',
});
this.consumer = consumer;
await consumer.connect();
await consumer.subscribe({ topics: ['cdc-using-debezium-topic.public.User'], fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, message }) => {
const parsedMessage = JSON.parse(message.value.toString());
const eventPayload = {
topic,
database: parsedMessage.payload.source.db,
table: parsedMessage.payload.source.table,
operationType: OperationType[parsedMessage.payload.op],
data: parsedMessage.payload.after,
previousData: parsedMessage.payload.before,
};
console.log({ eventPayload });
},
});
}
}
The KafkaService
class demonstrates how to consume messages from a Kafka topic and process them. The actual Kafka message consumption logic resides in the listen
method.
When a message arrives, the eachMessage
callback is triggered. The incoming message’s content is transformed into the eventPayload
object. This object contains details about the event, including the topic, database, table, operation type, data after the operation, and previous data before the operation.
Voilà! Now, the reporting service can persist the data into its database, and as we have all the data, we can easily generate various types of reports and perform analytical operations.
Conclusion
Debezium may not be the panacea for all problems, but I believe we may use it to delegate all our responsibilities in scenarios where we need to propagate changed data so that we can just concentrate on our actual business logic.
I hope you enjoyed it. Thank you for reading it.
NB: The cover image is taken from here.
Top comments (8)
Hey Emtiaj, I just worked through this. It's a really well written post, and I successfuly got your app up and running and logging consumed events 🎉
In docker compose I used version 16 of Postgres -
SHOW shared_preload_libraries
didn't return wal2json for me, but everything still worked. The guide is a solid foundation for me to build upon, so thank you :)I did it live on a 2 hour stream, with some commentary, if you like I can share the link.
That is awesome!
Yes, please. Sharing is caring! 🫠
By the way, I had planned to use AWS Kinesis instead of Kafka and write another blog post, but laziness is all with me. 😭
It's at youtu.be/kGixHsxirmA 🤲 maybe it can be useful for someone.
Kinesis looks cool, but I'm very disconnected from the AWS world. Would it be a big advantage over Kafka?
I wanted to play with it as I heavily use SNS and SQS.
Just out of curiosity, you know!
Cool, I see :)
Also, thanks for the intro to NestJS. I'm researching it now. Do you use it professionally?
In my company, for my vocabulary flashcard app's day-to-day's implementation, I use it. 😁
A cool framework!
what if we have two tables related with Many2Many in this case both events from related table can go to different partition of the topic how we can handle that? I tried to add a partition key to the record via debzium configuration but it don't worked for me.
I tried partition routing but still I am getting the same result i.e my one of the user record is going to partition 0 and m2m related table is going to partition 1 and so forth. Here i am sharing my configuration,
Also can you please help what if we have a delete event in that after object will be empty in case of create before object will be empty how we can handle these type of condition?
"transforms.PartitionRouting.partition.payload.fields":"after.username",
Also I saw in doc that we can refer these using change.username is that the correct way?
@krishnesh Kumar, sorry to say that I didn't experience this type of scenario, therefore, I cannot help here. If I manage to replicate this scenario and fix it, for sure, I will share it with you. In the meantime, if you find a solution, don't bother to share it here.
Thanks!