DEV Community

Nirav Bhadradiya
Nirav Bhadradiya

Posted on

Streaming MySQL data using Debezium, Strimzi to Azure Eventhub

Motivation:

Last couple of days I was trying to setup an infrastructure which can support us getting near real-time database changes from on-premises hosted MySQL Server to Kafka enabled Azure Eventhub.

Struggled a lot to connect MySQL via Debezium running on Azure Kubernetes Service to Azure Eventhub. There are lot of articles around connecting to Kafka but not Azure Eventhub and that's the reason writing this down for others.

I hope this helps.!!

High Level Architecture:

Following is the high level Architecture.

MySQL-Debeium-KafkaConnect-AzureEventhub

Tools Involved:

I will skip setting up AKS, Eventhubs and will directly focus on how to connect MySQL via KafkaConnect to Azure Eventhub.

  1. Azure Kubernetes Service
  2. Azure Eventhub - Kafka Enabled ( Standard SKU )
  3. Strimzi Kubernetes Operator : Strimzi is an open-source tool that helps manage and maintain Kafka clusters. Strimzi offers several operators, including ZooKeeper, Kafka Connect, Kafka MirrorMaker, and Kafka Exporter.
  4. Debezium MySQL Connector : Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.
  5. Azure Container Registry

Let's start:

Installing Strimzi Operator

First we need to deploy strimzi operator on Azure Kuberenetes Cluster.

You need kubectl and helm configured for your system

First add the Strimzi Chart Repository.
helm repo add strimzi https://strimzi.io/charts/

Then you can install a version of the Strimzi Cluster Operator.
helm install strimzi/strimzi-kafka-operator

Create MySQL Database

We need mysql database to mimic this scenario so we will be spinning it up on our cluster.

mysql.yml

apiVersion: v1
kind: Service
metadata:
  name: mysql
spec:
  ports:
  - port: 3306
  selector:
    app: mysql
  clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: quay.io/debezium/example-mysql:2.1
        name: mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: debezium
        - name: MYSQL_USER
          value: mysqluser
        - name: MYSQL_PASSWORD
          value: mysqlpw
        ports:
        - containerPort: 3306
          name: mysql
Enter fullscreen mode Exit fullscreen mode

kubectl apply -f mysql.yml

Create Eventhub Secret

eventhub-secret.yml


apiVersion: v1
kind: Secret
metadata:
  name: eventhubssecret
type: Opaque
stringData:
  eventhubsuser: $ConnectionString
  eventhubspassword: Endpoint=sb://kafkacheck.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR_SHARD_KEY}
Enter fullscreen mode Exit fullscreen mode

kubectl apply -f eventhub-secret.yml

Create Container Registry Secret

we need container registry secret so that kafkaconnect can pull and push created container image there.

For this demo we will use Admin account to connect to ACR, Pull and Push to ACR

az acr update -n niravbcheckml --admin-enabled true

Deploy Kafka Connect Object

Kafka Connect is a CRD available once you install strimzi on your cluster and it helps to build your connector with the plugin you want. for mysql you would build MySQL connector, for MS SQL you would build MSSQL plugin..

kafkaconnect.yml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.4.0
  replicas: 1
  # mention your kafka enabled azure eventhub here as a broker
  bootstrapServers: kafkacheck.servicebus.windows.net:9093
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
  # this is the most important part of the setup, this will connect to Eventhub with its connection string
  authentication:
    type: plain
    username: $ConnectionString
    passwordSecret:
      secretName: eventhubssecret
      password: eventhubspassword
  # it took edges for me to find out that even if there is no tls, we need to mention this property else it wont connect to azure eventhub
  tls:
    trustedCertificates: []
 # strimzi will build container image and push it to the ACR with secret we created
  build:
    output:
      type: docker
      image: niravbcheckml.azurecr.io/connect:latest
      pushSecret: my-registry-credentials
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.0.0.Final/debezium-connector-mysql-2.0.0.Final-plugin.tar.gz
  template:
    pod:
      imagePullSecrets:
        - name: my-registry-credentials
Enter fullscreen mode Exit fullscreen mode

kubectl apply -f kafkaconnect.yml

Deploy KafkaConnector

Now we will deploy connector which will use Debezium plugin and detect changes on source database system

NOTE: There are steps to follow on databases before you can use this, please refer Debezium setup guide.

KafkaConnector.yml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: root
    database.password: debezium # this can be stored as secret
    database.server.id: 184054
    topic.prefix: mysql
    database.include.list: inventory
    schema.history.internal.kafka.bootstrap.servers: kafkacheck.servicebus.windows.net:9093
    schema.history.internal.kafka.topic: mysql.inventory.customers
    schema.history.internal.consumer.security.protocol: "SASL_SSL"
    schema.history.internal.consumer.sasl.mechanism: "PLAIN"
    schema.history.internal.consumer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://kafkacheck.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR_SHARED_KEY}";
    schema.history.internal.producer.security.protocol: "SASL_SSL"
    schema.history.internal.producer.sasl.mechanism: "PLAIN"
    schema.history.internal.producer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://kafkacheck.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR_SHARED_KEY}";
Enter fullscreen mode Exit fullscreen mode

kubectl apply -f KafkaConnector.yml

Time to test the setup

Connect to MySQL

In one terminal connect to mysql

kubectl run -it --rm --image=mysql:8.0 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm
-- mysql -hmysql -P3306 -uroot -pdebezium

In another terminal connect check logs of KafkaConnector

kubectl get pods

Get the pod for cluster connect and check logs
kubectl logs debezium-connect-cluster-connect-69f54bcb6d-h5pkj --follow

Now update schema of customers table under inventory schema, as that is what we configured in the KafkaConnector object

so it will listen for changes on mentioned database/tables based on your need.

Inside MySQL terminal,

`> mysql use inventory;

mysql select * from customers;
+------+------------+--------------+-----------------------+
| id | first_name | last_name | email |
+------+------------+--------------+-----------------------+
| 1001 | Nirav4 | Bhadradiya29 | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
mysql update customer set first_name='Nirav12345' WHERE id=
1001
Query OK, 1 row affected (0.01 sec)`

This update should be detected by Debezium MySQL plugin running as a connector and you should see that KafkaConnect posts it to Kafka.

2023-03-09 19:43:28,511 INFO [debezium-connector-mysql|task-0] 13 records sent during previous 31:03:09.991, last recorded offset of {server=mysql} partition is {transaction_id=null, ts_sec=1678391008, file=mysql-bin.000003, pos=26703, row=1, server_id=223344, event=2} (io.debezium.connector.common.BaseSourceTask) [task-thread-debezium-connector-mysql-0]

and you should receive an Event for a change in this row:

**{"before":{"id":1001,"first_name":"Nirav4","last_name":"Bhadradiya29","email":"sally.thomas@acme.com"},"after":{"id":1001,"first_name":"Nirav12345","last_name":"Bhadradiya29","email":"sally.thomas@acme.com"}**

Here is a sample github for sending and receiving events from azure eventhub written in c#.

Repo containing all kubernetes objects for deployments.

strimzi-debezium-kafkaconnect-eventhub

Top comments (0)