<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Nirav Bhadradiya</title>
    <description>The latest articles on DEV Community by Nirav Bhadradiya (@niravbhadradiya).</description>
    <link>https://dev.to/niravbhadradiya</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1042119%2F32085bc7-8ff8-43b1-a62f-5f46469f8457.png</url>
      <title>DEV Community: Nirav Bhadradiya</title>
      <link>https://dev.to/niravbhadradiya</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/niravbhadradiya"/>
    <language>en</language>
    <item>
      <title>Streaming MySQL data using Debezium, Strimzi to Azure Eventhub</title>
      <dc:creator>Nirav Bhadradiya</dc:creator>
      <pubDate>Thu, 09 Mar 2023 20:02:20 +0000</pubDate>
      <link>https://dev.to/niravbhadradiya/streaming-mysql-data-using-debezium-strimzi-to-azure-eventhub-497g</link>
      <guid>https://dev.to/niravbhadradiya/streaming-mysql-data-using-debezium-strimzi-to-azure-eventhub-497g</guid>
      <description>&lt;h2&gt;
  
  
  Motivation:
&lt;/h2&gt;

&lt;p&gt;Last couple of days I was trying to setup an infrastructure which can support us getting near real-time database changes from on-premises hosted MySQL Server to Kafka enabled Azure Eventhub.&lt;/p&gt;

&lt;p&gt;Struggled a lot to connect MySQL via Debezium running on Azure Kubernetes Service to Azure Eventhub. There are lot of articles around connecting to Kafka but not Azure Eventhub and that's the reason writing this down for others.&lt;/p&gt;

&lt;p&gt;I hope this helps.!!&lt;/p&gt;

&lt;h2&gt;
  
  
  High Level Architecture:
&lt;/h2&gt;

&lt;p&gt;Following is the high level Architecture.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgynxlfxbn5kpwli1pdst.PNG" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgynxlfxbn5kpwli1pdst.PNG" alt="MySQL-Debeium-KafkaConnect-AzureEventhub"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Tools Involved:
&lt;/h2&gt;

&lt;p&gt;I will skip setting up AKS, Eventhubs and will directly focus on how to connect MySQL via KafkaConnect to Azure Eventhub.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;a href="https://learn.microsoft.com/en-us/azure/aks/learn/quick-kubernetes-deploy-cli" rel="noopener noreferrer"&gt;Azure Kubernetes Service&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-create" rel="noopener noreferrer"&gt;Azure Eventhub - Kafka Enabled ( Standard SKU )&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://strimzi.io/docs/operators/latest/overview.html" rel="noopener noreferrer"&gt;Strimzi Kubernetes Operator&lt;/a&gt; : Strimzi is an open-source tool that helps manage and maintain Kafka clusters. Strimzi offers several operators, including ZooKeeper, Kafka Connect, Kafka MirrorMaker, and Kafka Exporter.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://debezium.io/documentation/reference/stable/connectors/mysql.html" rel="noopener noreferrer"&gt;Debezium MySQL Connector&lt;/a&gt; : Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.&lt;/li&gt;
&lt;li&gt;&lt;a href="https://learn.microsoft.com/en-us/azure/container-registry/container-registry-get-started-portal?tabs=azure-cli" rel="noopener noreferrer"&gt;Azure Container Registry&lt;/a&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Let's start:
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Installing Strimzi Operator
&lt;/h3&gt;

&lt;p&gt;First we need to deploy strimzi operator on Azure Kuberenetes Cluster.&lt;/p&gt;

&lt;p&gt;You need &lt;code&gt;kubectl&lt;/code&gt; and &lt;code&gt;helm&lt;/code&gt; configured for your system&lt;/p&gt;

&lt;p&gt;First add the Strimzi Chart Repository.&lt;br&gt;
&lt;code&gt;helm repo add strimzi https://strimzi.io/charts/&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Then you can install a version of the Strimzi Cluster Operator.&lt;br&gt;
&lt;code&gt;helm install strimzi/strimzi-kafka-operator&lt;/code&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  Create MySQL Database
&lt;/h3&gt;

&lt;p&gt;We need mysql database to mimic this scenario so we will be spinning it up on our cluster.&lt;/p&gt;

&lt;p&gt;mysql.yml&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;apiVersion: v1
kind: Service
metadata:
  name: mysql
spec:
  ports:
  - port: 3306
  selector:
    app: mysql
  clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: quay.io/debezium/example-mysql:2.1
        name: mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: debezium
        - name: MYSQL_USER
          value: mysqluser
        - name: MYSQL_PASSWORD
          value: mysqlpw
        ports:
        - containerPort: 3306
          name: mysql
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;kubectl apply -f mysql.yml&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Create Eventhub Secret
&lt;/h3&gt;

&lt;p&gt;eventhub-secret.yml&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
apiVersion: v1
kind: Secret
metadata:
  name: eventhubssecret
type: Opaque
stringData:
  eventhubsuser: $ConnectionString
  eventhubspassword: Endpoint=sb://kafkacheck.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR_SHARD_KEY}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;kubectl apply -f eventhub-secret.yml&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Create Container Registry Secret
&lt;/h3&gt;

&lt;p&gt;we need container registry secret so that kafkaconnect can pull and push created container image there.&lt;/p&gt;

&lt;p&gt;For this demo we will use Admin account to connect to ACR, &lt;a href="https://learn.microsoft.com/en-us/azure/container-registry/container-registry-authentication?tabs=azure-cli#admin-account" rel="noopener noreferrer"&gt;Pull and Push to ACR&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;az acr update -n niravbcheckml --admin-enabled true&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Deploy Kafka Connect Object
&lt;/h3&gt;

&lt;p&gt;Kafka Connect is a CRD available once you install strimzi on your cluster and it helps to build your connector with the plugin you want. for mysql you would build MySQL connector, for MS SQL you would build MSSQL plugin..&lt;/p&gt;

&lt;p&gt;kafkaconnect.yml&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.4.0
  replicas: 1
  # mention your kafka enabled azure eventhub here as a broker
  bootstrapServers: kafkacheck.servicebus.windows.net:9093
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
  # this is the most important part of the setup, this will connect to Eventhub with its connection string
  authentication:
    type: plain
    username: $ConnectionString
    passwordSecret:
      secretName: eventhubssecret
      password: eventhubspassword
  # it took edges for me to find out that even if there is no tls, we need to mention this property else it wont connect to azure eventhub
  tls:
    trustedCertificates: []
 # strimzi will build container image and push it to the ACR with secret we created
  build:
    output:
      type: docker
      image: niravbcheckml.azurecr.io/connect:latest
      pushSecret: my-registry-credentials
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.0.0.Final/debezium-connector-mysql-2.0.0.Final-plugin.tar.gz
  template:
    pod:
      imagePullSecrets:
        - name: my-registry-credentials
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;kubectl apply -f kafkaconnect.yml&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Deploy KafkaConnector
&lt;/h3&gt;

&lt;p&gt;Now we will deploy connector which will use Debezium plugin and detect changes on source database system&lt;/p&gt;

&lt;p&gt;NOTE: There are steps to follow on databases before you can use this, please refer Debezium setup guide.&lt;/p&gt;

&lt;p&gt;KafkaConnector.yml&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: root
    database.password: debezium # this can be stored as secret
    database.server.id: 184054
    topic.prefix: mysql
    database.include.list: inventory
    schema.history.internal.kafka.bootstrap.servers: kafkacheck.servicebus.windows.net:9093
    schema.history.internal.kafka.topic: mysql.inventory.customers
    schema.history.internal.consumer.security.protocol: "SASL_SSL"
    schema.history.internal.consumer.sasl.mechanism: "PLAIN"
    schema.history.internal.consumer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://kafkacheck.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR_SHARED_KEY}";
    schema.history.internal.producer.security.protocol: "SASL_SSL"
    schema.history.internal.producer.sasl.mechanism: "PLAIN"
    schema.history.internal.producer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://kafkacheck.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR_SHARED_KEY}";
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;kubectl apply -f KafkaConnector.yml&lt;/code&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Time to test the setup
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Connect to MySQL
&lt;/h3&gt;

&lt;p&gt;In one terminal connect to mysql&lt;/p&gt;

&lt;p&gt;&lt;code&gt;kubectl run  -it --rm --image=mysql:8.0 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm&lt;br&gt;
 -- mysql -hmysql -P3306 -uroot -pdebezium&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;In another terminal connect check logs of KafkaConnector&lt;/p&gt;

&lt;p&gt;&lt;code&gt;kubectl get pods&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Get the pod for cluster connect and check logs&lt;br&gt;
&lt;code&gt;kubectl logs debezium-connect-cluster-connect-69f54bcb6d-h5pkj --follow&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Now update schema of customers table under inventory schema, as that is what we configured in the KafkaConnector object&lt;/p&gt;

&lt;p&gt;so it will listen for changes on mentioned database/tables based on your need.&lt;/p&gt;

&lt;p&gt;Inside MySQL terminal,&lt;/p&gt;

&lt;p&gt;`&amp;gt; mysql use inventory;&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;mysql select * from customers;&lt;br&gt;
+------+------------+--------------+-----------------------+&lt;br&gt;
| id   | first_name | last_name    | email                 |&lt;br&gt;
+------+------------+--------------+-----------------------+&lt;br&gt;
| 1001 | Nirav4     | Bhadradiya29 | &lt;a href="mailto:sally.thomas@acme.com"&gt;sally.thomas@acme.com&lt;/a&gt; |&lt;br&gt;
| 1002 | George     | Bailey       | &lt;a href="mailto:gbailey@foobar.com"&gt;gbailey@foobar.com&lt;/a&gt;    |&lt;br&gt;
mysql update customer set first_name='Nirav12345' WHERE id=&lt;br&gt;
1001&lt;br&gt;
Query OK, 1 row affected (0.01 sec)`&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;This update should be detected by Debezium MySQL plugin running as a connector and you should see that KafkaConnect posts it to Kafka.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;2023-03-09 19:43:28,511 INFO [debezium-connector-mysql|task-0] 13 records sent during previous 31:03:09.991, last recorded offset of {server=mysql} partition is {transaction_id=null, ts_sec=1678391008, file=mysql-bin.000003, pos=26703, row=1, server_id=223344, event=2} (io.debezium.connector.common.BaseSourceTask) [task-thread-debezium-connector-mysql-0]&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;and you should receive an Event for a change in this row:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;**{"before":{"id":1001,"first_name":"Nirav4","last_name":"Bhadradiya29","email":"sally.thomas@acme.com"},"after":{"id":1001,"first_name":"Nirav12345","last_name":"Bhadradiya29","email":"sally.thomas@acme.com"}**&lt;br&gt;
&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Here is a sample &lt;a href="https://github.com/Nirav-Bhadradiya/azure-eventhub-receiver-sender" rel="noopener noreferrer"&gt;github&lt;/a&gt; for sending and receiving events from azure eventhub written in c#.&lt;/p&gt;

&lt;p&gt;Repo containing all kubernetes objects for deployments.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/Nirav-Bhadradiya/strimzi-debezium-kafkaconnect-eventhub" rel="noopener noreferrer"&gt;strimzi-debezium-kafkaconnect-eventhub&lt;/a&gt;&lt;/p&gt;

</description>
      <category>azureeventhub</category>
      <category>strimzi</category>
      <category>kubernetes</category>
      <category>debezium</category>
    </item>
  </channel>
</rss>
