DEV Community

Cover image for Streamlining CDC with MySQL, Debezium, Azure Event Hubs, and Functions
Arda Çetinkaya
Arda Çetinkaya

Posted on

Streamlining CDC with MySQL, Debezium, Azure Event Hubs, and Functions

Getting real-time updates in data across various systems has become an important aspect for businesses. It can be for synchronizing data, monitoring or integrating systems, when we say real-time, it is crucial to have the ability to act on a change. How do we achieve this efficiently and seamlessly?

In this article we will explore the Change Data Capture (CDC) a strong pattern for identifying and tracking row level changes in databases. We will apply this pattern in the sense that is scalable, reliable and easily manageable in a cloud infrastructure.

CDC is a pattern that allows users to determine and track all the changes (the details of the change) from the dataset.
Debezium is an open source distributed platform for CDC Debezium.io. It captures records in a transaction log all row-level changes which are already committed to each table in the database. Debezium contained CDC systems require Kafka and Zookeeper nodes to run in which Debezium records all events in a persistent, replicated and partitioned way.
Debezium can connect many different database systems but in this article, we will cover only MySQL, replacing Kafka & Zookeeper nodes with Azure Event Hubs while using Debezium.

Target high level diagram

Azure Event Hubs Setup

  • Create a new Event Hubs Namespace: Start by navigating to the 'Event Hubs' section within the Azure Portal. Click on 'Create a resource', select 'Event Hubs', and then 'Create Event Hubs Namespace'.

Azure Event Hubs Namespace create

  • Configure details: Fill in the necessary details such as the namespace name. Select the appropriate subscription and resource group where you want to house your Event Hubs.

The 'Standard' tier will automatically enable the Kafka endpoint. This is essential for compatibility with Kafka Connect with Debezium.

  • Review and Create: After reviewing your settings to ensure they're as desired, proceed to create the namespace. Upon validation, the 'Create' button will be enabled, and you can click it to finalize the setup.

Azure Event Hubs review and create

With these steps, Event Hubs is now ready to handle CDC event streams from MySQL from Debezium. Compared to Kafka setup and maintenance, this is a simpler choice for Azure developers. You can also check the related documentation from What is Azure Event Hubs for Apache Kafka.

MySQL Setup

Since this article covers PoC implementation of CDC setup, I have used the sample image provided by Debezium, MySQL Example Image.

mysql:
    image: debezium/example-mysql:1.6
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    ports:
      - "3306:3306"
    volumes:
      - mysql_data:/var/lib/mysql
Enter fullscreen mode Exit fullscreen mode

This will enable an example MySQL database server with a simple Inventory database, useful for our PoC purpose.

The necessary configuration is already enabled in the example database server provided by Debezium but in real cases the binary log format of MySQL server should be 'ROW' based. You can query the current setup by:

SHOW VARIABLES WHERE variable_name IN ('log_bin', 'binlog_format');
Enter fullscreen mode Exit fullscreen mode
Variable_name Value
binlog_format ROW
log_bin ON

Binary logging is enabled by default from MySQL 8.0 (the log_bin system variable is set to ON). More information for MySQL binary logging

you can set the binlog_format by

SET GLOBAL binlog_format = 'ROW';
Enter fullscreen mode Exit fullscreen mode

Lastly, you need to grant necessary privileges to the user to connect the database changes by:

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "mysqluser";
FLUSH PRIVILEGES;
Enter fullscreen mode Exit fullscreen mode

With those changes the database is now ready for Debezium to watch changes.

Debezium Setup

Setting up Debezium is nothing more than setting up a Kafka connect with a little bit difference.

For this purpose I have used debezium/connect docker image.

connect:
    image: debezium/connect:1.9.6.Final
    ports:
      - "8083:8083"
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses
      BOOTSTRAP_SERVERS: cdc-kafka-endpoint.servicebus.windows.net:9093
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false
      CONNECT_OFFSET_STORAGE_PARTITIONS: 1
      CONNECT_STATUS_STORAGE_PARTITIONS: 1
      CONNECT_REQUEST_TIMEOUT_MS: 60000
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_MECHANISM: PLAIN
      CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="<your-azure-event-hubs-connection-string>";
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="<your-azure-event-hubs-connection-string>";
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="Endpoint=<your-azure-event-hubs-connection-string>";
Enter fullscreen mode Exit fullscreen mode

You can create a new SAS Policy for connecting your event hubs for Debezium to create/send/listen to the event hubs created. For this you need to navigate Settings > Shared access policies > + Add and than copy the value for Connection string-primary key

Connection string for event hubs

with this setup it should create necessary Event Hubs (equivalent of Kafka topics) in your Azure Event Hub namespace 'cdc-kafka-endpoint' automatically.

Event Hubs created by Debezium

Adding a connector to debezium/connect

You can connect as many connectors you want to your Debezium service, each connector can represent a database connection.

Important note: Debezium creates a topic per database per schema and per table. Which means that you will get so many number of Event Hubs equivalent to the number of tables in your database. This might be not ideal for your Azure setup since standard Event Hubs Namespace allows 10 Event Hubs per namespace.

To overcome this problem, we will use the Reroute functionality of Debezium which will aggregate topics based on the regex that is provided.


{
  "name": "inventory-connector-azure",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "mysqluser",
    "database.password": "mysqlpw",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.include.list": "inventory",
    "database.history.kafka.bootstrap.servers": "cdc-kafka-endpoint.servicebus.windows.net:9093",
    "database.history.kafka.topic": "dbhistory.inventory",
    "database.history.producer.security.protocol": "SASL_SSL",
    "database.history.producer.sasl.mechanism": "PLAIN",
    "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='<your-azure-event-hubs-connection-string>';",
    "database.history.consumer.security.protocol": "SASL_SSL",
    "database.history.consumer.sasl.mechanism": "PLAIN",
    "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='<your-azure-event-hubs-connection-string>';",
    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": ".*",
    "transforms.Reroute.topic.replacement": "inventory_all"
  }
}

Enter fullscreen mode Exit fullscreen mode

More information on topic rerouting can be found here

Now we need to register this connector to Debezium. We will use curl for this purpose:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors/ -d @register-mysql-azure-event-hubs.json
Enter fullscreen mode Exit fullscreen mode

This will add the connector to the Debezium and will start creating event hubs and streaming changes to the relevant event hubs.

Check the status of your registered connector:

curl -i GET http://localhost:8083/connectors
Enter fullscreen mode Exit fullscreen mode

Also check the status of the connector:

curl -i GET http://localhost:8083/connectors/inventory-connector-azure/status
Enter fullscreen mode Exit fullscreen mode

you will receive the status as running:

{
  "name": "inventory-connector-azure",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.22.0.3:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "172.22.0.3:8083"
    }
  ],
  "type": "source"
}
Enter fullscreen mode Exit fullscreen mode

After this connector has been added to Debezium, respective event hubs should be created as follows

auto created event hubs

Consuming Events

To consume event I have opted for Azure Functions since it has a nice developer experience and convenient.


namespace CDC.ConsumerFunction;

public static class ConsumerFunction
{
    [FunctionName("ConsumerFunction")]
    public static async Task RunAsync([EventHubTrigger("%EventHubInventoryAllChanges%", Connection = "EventHubConnectionAppSetting")] string myEventHubMessage,
        ILogger log)
    {
        log.LogInformation("Processed a message: {MyEventHubMessage}", myEventHubMessage);
    }
}

Enter fullscreen mode Exit fullscreen mode

local.settings.json


{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "EventHubConnectionAppSetting": "<your-azure-event-hubs-connection-string>",
        "EventHubInventoryAllChanges": "inventory_all"
    }
}
Enter fullscreen mode Exit fullscreen mode

to use the development storage and that way running the function in your local, you need azurite running on your local, Azurite

Here is an example result for a value change in 'addresses' table:

281 Riverside Drive -> 282 Riverside Drive

{
  "before": {
    "id": 12,
    "customer_id": 1002,
    "street": "281 Riverside Drive",
    "city": "Augusta",
    "state": "Georgia",
    "zip": "30901",
    "type": "BILLING"
  },
  "after": {
    "id": 12,
    "customer_id": 1002,
    "street": "282 Riverside Drive",
    "city": "Augusta",
    "state": "Georgia",
    "zip": "30901",
    "type": "BILLING"
  },
  "source": {
    "version": "1.9.6.Final",
    "connector": "mysql",
    "name": "dbserver1",
    "ts_ms": 1700492940000,
    "snapshot": "false",
    "db": "inventory",
    "sequence": null,
    "table": "addresses",
    "server_id": 223344,
    "gtid": null,
    "file": "mysql-bin.000031",
    "pos": 705,
    "row": 0,
    "thread": 59,
    "query": null
  },
  "op": "u",
  "ts_ms": 1700492940959,
  "transaction": null
}

Enter fullscreen mode Exit fullscreen mode

We've walked through the steps of configuring each component, from establishing a MySQL environment using Docker to deploying Debezium for change data capture, and finally leveraging Azure Event Hubs and Azure Functions for efficient event streaming and processing. This setup showcases how we can use Azure cloud tools and technologies for creating an efficient CDC pipeline.

Top comments (0)