Getting real-time updates in data across various systems has become an important aspect for businesses. It can be for synchronizing data, monitoring or integrating systems, when we say real-time, it is crucial to have the ability to act on a change. How do we achieve this efficiently and seamlessly?
In this article we will explore the Change Data Capture (CDC) a strong pattern for identifying and tracking row level changes in databases. We will apply this pattern in the sense that is scalable, reliable and easily manageable in a cloud infrastructure.
CDC is a pattern that allows users to determine and track all the changes (the details of the change) from the dataset.
Debezium is an open source distributed platform for CDC Debezium.io. It captures records in a transaction log all row-level changes which are already committed to each table in the database. Debezium contained CDC systems require Kafka and Zookeeper nodes to run in which Debezium records all events in a persistent, replicated and partitioned way.
Debezium can connect many different database systems but in this article, we will cover only MySQL, replacing Kafka & Zookeeper nodes with Azure Event Hubs while using Debezium.
Azure Event Hubs Setup
- Create a new Event Hubs Namespace: Start by navigating to the 'Event Hubs' section within the Azure Portal. Click on 'Create a resource', select 'Event Hubs', and then 'Create Event Hubs Namespace'.
- Configure details: Fill in the necessary details such as the namespace name. Select the appropriate subscription and resource group where you want to house your Event Hubs.
The 'Standard' tier will automatically enable the Kafka endpoint. This is essential for compatibility with Kafka Connect with Debezium.
- Review and Create: After reviewing your settings to ensure they're as desired, proceed to create the namespace. Upon validation, the 'Create' button will be enabled, and you can click it to finalize the setup.
With these steps, Event Hubs is now ready to handle CDC event streams from MySQL from Debezium. Compared to Kafka setup and maintenance, this is a simpler choice for Azure developers. You can also check the related documentation from What is Azure Event Hubs for Apache Kafka.
MySQL Setup
Since this article covers PoC implementation of CDC setup, I have used the sample image provided by Debezium, MySQL Example Image.
mysql:
image: debezium/example-mysql:1.6
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
This will enable an example MySQL database server with a simple Inventory database, useful for our PoC purpose.
The necessary configuration is already enabled in the example database server provided by Debezium but in real cases the binary log format of MySQL server should be 'ROW' based. You can query the current setup by:
SHOW VARIABLES WHERE variable_name IN ('log_bin', 'binlog_format');
Variable_name | Value |
---|---|
binlog_format | ROW |
log_bin | ON |
Binary logging is enabled by default from MySQL 8.0 (the log_bin system variable is set to ON). More information for MySQL binary logging
you can set the binlog_format by
SET GLOBAL binlog_format = 'ROW';
Lastly, you need to grant necessary privileges to the user to connect the database changes by:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "mysqluser";
FLUSH PRIVILEGES;
With those changes the database is now ready for Debezium to watch changes.
Debezium Setup
Setting up Debezium is nothing more than setting up a Kafka connect with a little bit difference.
For this purpose I have used debezium/connect docker image.
connect:
image: debezium/connect:1.9.6.Final
ports:
- "8083:8083"
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
BOOTSTRAP_SERVERS: cdc-kafka-endpoint.servicebus.windows.net:9093
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false
CONNECT_OFFSET_STORAGE_PARTITIONS: 1
CONNECT_STATUS_STORAGE_PARTITIONS: 1
CONNECT_REQUEST_TIMEOUT_MS: 60000
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="<your-azure-event-hubs-connection-string>";
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="<your-azure-event-hubs-connection-string>";
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="Endpoint=<your-azure-event-hubs-connection-string>";
You can create a new SAS Policy for connecting your event hubs for Debezium to create/send/listen to the event hubs created. For this you need to navigate Settings > Shared access policies > + Add and than copy the value for Connection string-primary key
with this setup it should create necessary Event Hubs (equivalent of Kafka topics) in your Azure Event Hub namespace 'cdc-kafka-endpoint' automatically.
Adding a connector to debezium/connect
You can connect as many connectors you want to your Debezium service, each connector can represent a database connection.
Important note: Debezium creates a topic per database per schema and per table. Which means that you will get so many number of Event Hubs equivalent to the number of tables in your database. This might be not ideal for your Azure setup since standard Event Hubs Namespace allows 10 Event Hubs per namespace.
To overcome this problem, we will use the Reroute functionality of Debezium which will aggregate topics based on the regex that is provided.
{
"name": "inventory-connector-azure",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysqlpw",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "cdc-kafka-endpoint.servicebus.windows.net:9093",
"database.history.kafka.topic": "dbhistory.inventory",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='<your-azure-event-hubs-connection-string>';",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='<your-azure-event-hubs-connection-string>';",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": ".*",
"transforms.Reroute.topic.replacement": "inventory_all"
}
}
More information on topic rerouting can be found here
Now we need to register this connector to Debezium. We will use curl for this purpose:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors/ -d @register-mysql-azure-event-hubs.json
This will add the connector to the Debezium and will start creating event hubs and streaming changes to the relevant event hubs.
Check the status of your registered connector:
curl -i GET http://localhost:8083/connectors
Also check the status of the connector:
curl -i GET http://localhost:8083/connectors/inventory-connector-azure/status
you will receive the status as running:
{
"name": "inventory-connector-azure",
"connector": {
"state": "RUNNING",
"worker_id": "172.22.0.3:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.22.0.3:8083"
}
],
"type": "source"
}
After this connector has been added to Debezium, respective event hubs should be created as follows
Consuming Events
To consume event I have opted for Azure Functions since it has a nice developer experience and convenient.
namespace CDC.ConsumerFunction;
public static class ConsumerFunction
{
[FunctionName("ConsumerFunction")]
public static async Task RunAsync([EventHubTrigger("%EventHubInventoryAllChanges%", Connection = "EventHubConnectionAppSetting")] string myEventHubMessage,
ILogger log)
{
log.LogInformation("Processed a message: {MyEventHubMessage}", myEventHubMessage);
}
}
local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"EventHubConnectionAppSetting": "<your-azure-event-hubs-connection-string>",
"EventHubInventoryAllChanges": "inventory_all"
}
}
to use the development storage and that way running the function in your local, you need azurite running on your local, Azurite
Here is an example result for a value change in 'addresses' table:
281 Riverside Drive
-> 282 Riverside Drive
{
"before": {
"id": 12,
"customer_id": 1002,
"street": "281 Riverside Drive",
"city": "Augusta",
"state": "Georgia",
"zip": "30901",
"type": "BILLING"
},
"after": {
"id": 12,
"customer_id": 1002,
"street": "282 Riverside Drive",
"city": "Augusta",
"state": "Georgia",
"zip": "30901",
"type": "BILLING"
},
"source": {
"version": "1.9.6.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1700492940000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "addresses",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000031",
"pos": 705,
"row": 0,
"thread": 59,
"query": null
},
"op": "u",
"ts_ms": 1700492940959,
"transaction": null
}
We've walked through the steps of configuring each component, from establishing a MySQL environment using Docker to deploying Debezium for change data capture, and finally leveraging Azure Event Hubs and Azure Functions for efficient event streaming and processing. This setup showcases how we can use Azure cloud tools and technologies for creating an efficient CDC pipeline.
Top comments (0)