Motivation:
Last couple of days I was trying to setup an infrastructure which can support us getting near real-time database changes from on-premises hosted MySQL Server to Kafka enabled Azure Eventhub.
Struggled a lot to connect MySQL via Debezium running on Azure Kubernetes Service to Azure Eventhub. There are lot of articles around connecting to Kafka but not Azure Eventhub and that's the reason writing this down for others.
I hope this helps.!!
High Level Architecture:
Following is the high level Architecture.
Tools Involved:
I will skip setting up AKS, Eventhubs and will directly focus on how to connect MySQL via KafkaConnect to Azure Eventhub.
- Azure Kubernetes Service
- Azure Eventhub - Kafka Enabled ( Standard SKU )
- Strimzi Kubernetes Operator : Strimzi is an open-source tool that helps manage and maintain Kafka clusters. Strimzi offers several operators, including ZooKeeper, Kafka Connect, Kafka MirrorMaker, and Kafka Exporter.
- Debezium MySQL Connector : Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.
- Azure Container Registry
Let's start:
Installing Strimzi Operator
First we need to deploy strimzi operator on Azure Kuberenetes Cluster.
You need kubectl
and helm
configured for your system
First add the Strimzi Chart Repository.
helm repo add strimzi https://strimzi.io/charts/
Then you can install a version of the Strimzi Cluster Operator.
helm install strimzi/strimzi-kafka-operator
Create MySQL Database
We need mysql database to mimic this scenario so we will be spinning it up on our cluster.
mysql.yml
apiVersion: v1
kind: Service
metadata:
name: mysql
spec:
ports:
- port: 3306
selector:
app: mysql
clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
- image: quay.io/debezium/example-mysql:2.1
name: mysql
env:
- name: MYSQL_ROOT_PASSWORD
value: debezium
- name: MYSQL_USER
value: mysqluser
- name: MYSQL_PASSWORD
value: mysqlpw
ports:
- containerPort: 3306
name: mysql
kubectl apply -f mysql.yml
Create Eventhub Secret
eventhub-secret.yml
apiVersion: v1
kind: Secret
metadata:
name: eventhubssecret
type: Opaque
stringData:
eventhubsuser: $ConnectionString
eventhubspassword: Endpoint=sb://kafkacheck.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR_SHARD_KEY}
kubectl apply -f eventhub-secret.yml
Create Container Registry Secret
we need container registry secret so that kafkaconnect can pull and push created container image there.
For this demo we will use Admin account to connect to ACR, Pull and Push to ACR
az acr update -n niravbcheckml --admin-enabled true
Deploy Kafka Connect Object
Kafka Connect is a CRD available once you install strimzi on your cluster and it helps to build your connector with the plugin you want. for mysql you would build MySQL connector, for MS SQL you would build MSSQL plugin..
kafkaconnect.yml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.4.0
replicas: 1
# mention your kafka enabled azure eventhub here as a broker
bootstrapServers: kafkacheck.servicebus.windows.net:9093
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# this is the most important part of the setup, this will connect to Eventhub with its connection string
authentication:
type: plain
username: $ConnectionString
passwordSecret:
secretName: eventhubssecret
password: eventhubspassword
# it took edges for me to find out that even if there is no tls, we need to mention this property else it wont connect to azure eventhub
tls:
trustedCertificates: []
# strimzi will build container image and push it to the ACR with secret we created
build:
output:
type: docker
image: niravbcheckml.azurecr.io/connect:latest
pushSecret: my-registry-credentials
plugins:
- name: debezium-mysql-connector
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.0.0.Final/debezium-connector-mysql-2.0.0.Final-plugin.tar.gz
template:
pod:
imagePullSecrets:
- name: my-registry-credentials
kubectl apply -f kafkaconnect.yml
Deploy KafkaConnector
Now we will deploy connector which will use Debezium plugin and detect changes on source database system
NOTE: There are steps to follow on databases before you can use this, please refer Debezium setup guide.
KafkaConnector.yml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: root
database.password: debezium # this can be stored as secret
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: kafkacheck.servicebus.windows.net:9093
schema.history.internal.kafka.topic: mysql.inventory.customers
schema.history.internal.consumer.security.protocol: "SASL_SSL"
schema.history.internal.consumer.sasl.mechanism: "PLAIN"
schema.history.internal.consumer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://kafkacheck.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR_SHARED_KEY}";
schema.history.internal.producer.security.protocol: "SASL_SSL"
schema.history.internal.producer.sasl.mechanism: "PLAIN"
schema.history.internal.producer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://kafkacheck.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR_SHARED_KEY}";
kubectl apply -f KafkaConnector.yml
Time to test the setup
Connect to MySQL
In one terminal connect to mysql
kubectl run -it --rm --image=mysql:8.0 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm
-- mysql -hmysql -P3306 -uroot -pdebezium
In another terminal connect check logs of KafkaConnector
kubectl get pods
Get the pod for cluster connect and check logs
kubectl logs debezium-connect-cluster-connect-69f54bcb6d-h5pkj --follow
Now update schema of customers table under inventory schema, as that is what we configured in the KafkaConnector object
so it will listen for changes on mentioned database/tables based on your need.
Inside MySQL terminal,
`> mysql use inventory;
mysql select * from customers;
+------+------------+--------------+-----------------------+
| id | first_name | last_name | email |
+------+------------+--------------+-----------------------+
| 1001 | Nirav4 | Bhadradiya29 | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
mysql update customer set first_name='Nirav12345' WHERE id=
1001
Query OK, 1 row affected (0.01 sec)`
This update should be detected by Debezium MySQL plugin running as a connector and you should see that KafkaConnect posts it to Kafka.
2023-03-09 19:43:28,511 INFO [debezium-connector-mysql|task-0] 13 records sent during previous 31:03:09.991, last recorded offset of {server=mysql} partition is {transaction_id=null, ts_sec=1678391008, file=mysql-bin.000003, pos=26703, row=1, server_id=223344, event=2} (io.debezium.connector.common.BaseSourceTask) [task-thread-debezium-connector-mysql-0]
and you should receive an Event for a change in this row:
**{"before":{"id":1001,"first_name":"Nirav4","last_name":"Bhadradiya29","email":"sally.thomas@acme.com"},"after":{"id":1001,"first_name":"Nirav12345","last_name":"Bhadradiya29","email":"sally.thomas@acme.com"}**
Here is a sample github for sending and receiving events from azure eventhub written in c#.
Repo containing all kubernetes objects for deployments.
Top comments (0)