In this tutorial, we will use docker-compose, MySQL 8 as examples to demonstrate Kafka Connector by using MySQL as the data source.
This tutorial is mainly based on the tutorial written on Kafka Connect Tutorial on Docker. However, the original tutorial is out-dated that it just won’t work if you followed it step by step. This is a refreshment of that tutorial, also to simplify things, we will get rid of the Avro set up as it serves no purpose to demonstrate Kafka Connector. Enough said, let’s begins.
Preparation
First, you will need to download MYSQL Connector Driver, this can be found
MySQL Connector Driver
You will also need to download the JDBC plugins at
Confluent JDBC plugins
Unzip both mysql-connector-java-8.0.22.tar.gz and confluentinc-kafka-connect-jdbc-10.0–2.1.zip. Create a jars directory, move mysql-connector-java-8.0.22.jar and all the .jar files in onfluentinc-kafka-connect-jdbc-10.0–2.1/lib/ directory to the jars directory.
docker-compose file
Here is the docker-compose file that contains everything you need to run this tutorial
version: '2'
services:
mysql:
privileged: true
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: test
image: mysql:8.0
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0
privileged: true
ports:
- 32181:32181
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:5.0.0
ports:
- 29092:29092
links:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-connector-mysql:
image: confluentinc/cp-kafka-connect:latest
ports:
- 28083:28083
links:
- kafka
- zookeeper
- mysql
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_REST_PORT: 28083
CONNECT_GROUP_ID: "quickstart-avro"
CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
volumes:
- $PWD/jars:/etc/kafka-connect/jars
Before you run the docker-compose, make sure your file structure looks like
Let the fun begins
Let the fun begins with starting mysql service
docker-compose up -d mysql
docker exec -it kafka-connector_mysql_1 bash
Execute the following queries by using MySQL cli mysql -uroot -ptest
CREATE DATABASE IF NOT EXISTS connect_test;
USE connect_test;
CREATE TABLE IF NOT EXISTS test (
id serial NOT NULL PRIMARY KEY,
name varchar(100),
email varchar(200),
department varchar(200),
modified timestamp default CURRENT_TIMESTAMP NOT NULL,
INDEX `modified_index` (`modified`)
);
INSERT INTO test (name, email, department) VALUES ('alice', 'alice@abc.com', 'engineering');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
exit;
Let’s start zookeeper and kafka
docker-compose up -d zookeeper kafka
wait for few seconds, to make sure that all services are up and running.
docker ps
Once the zookeeper, Kafka and mysql are all up and running, let’s prepare our final course, confluent kafka-connect
First, let’s create the topics that will be used by the connector
docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-offsets --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact
docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-config --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact
docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-status --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact
Finally, let’s start the kafka-connect
docker-compose up -d kafka-connector-mysql
Give it few seconds, and let’s create the JDBC Source connector by making a REST API call to the kafka connector service.
curl -X POST \
-H "Content-Type: application/json" \
--data '{ "name": "quickstart-jdbc-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://mysql:3306/connect_test", "connection.user": "root", "connection.password": "test", "mode": "incrementing", "incrementing.column.name": "id", "timestamp.column.name": "modified", "topic.prefix": "quickstart-jdbc-", "poll.interval.ms": 1000 } }' \
http://localhost:28083/connectors
wait for few seconds, and check the status to make sure it is RUNNING
curl -s -X GET http://localhost:28083/connectors/quickstart-jdbc-source/status
You should see something similar to
{"name":"quickstart-jdbc-source","connector":{"state":"RUNNING","worker_id":"localhost:28083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"localhost:28083"}],"type":"source"}%
That’s yet, let’s verify our work by running a Kafka consumer,
docker-compose run --rm kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic quickstart-jdbc-test --from-beginning
and you should see the following logs show up
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":18,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":19,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":20,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}}
^CProcessed a total of 10 messages
That’s yet. To tear it all down, simply run
docker-compose down
Enjoy.
Top comments (2)
docker ps always show starting and curl request not success ?
any idea how to fix it?
Hi Scott Wang,
I'm newbie on kafka, I followed this article and have error as image below. Am I missing any files?
dev-to-uploads.s3.amazonaws.com/up...