☁️Confluent Cloud is a great solution for a hosted and managed Apache Kafka service, with the additional benefits of Confluent Platform such as ksqlDB and managed Kafka Connect connectors. But as a developer, you won’t always have a reliable internet connection. Train, planes, and automobiles—not to mention crappy hotel or conference Wi-Fi. Wouldn’t it be useful if you could have a replica of your Cloud data on your local machine? That just pulled down new data automagically, without needing to be restarted each time you got back on the network?
Let me show you here how you can go about doing this, to replicate one (or more) topics from Confluent Cloud onto your local machine. It’s also a really useful thing if you want to develop something locally without perhaps being ready to deploy it against your cloud environment.
How?
Confluent Replicator is a Kafka Connect plugin, acting as a consumer from one Kafka cluster (Confluent Cloud) and producer to another (your local Kafka cluster). I use Docker Compose to run Kafka locally, almost exclusively. It’s a piece of cake to provision, spin up - and tear down new environments, in isolation from others.
Setup
Create a .env
file with your Confluent Cloud broker details and credentials in it:
CCLOUD_BROKER_HOST=foo.bar.bork.bork.bork.us-central1.gcp.confluent.cloud:9092
CCLOUD_API_KEY=yyy
CCLOUD_API_SECRET=xxx
Use these environment variables in your local shell (we’ll use them with Docker later, hence writing them to a file for re-use)
source .env
Create some test data on Confluent Cloud
Chuck some dummy data into a Confluent Cloud topic:
echo $(date) | \
kafkacat -b $CCLOUD_BROKER_HOST \
-X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \
-X api.version.request=true \
-t test_topic -P
Verify that it’s there:
➜ kafkacat -b $CCLOUD_BROKER_HOST \
-X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \
-X api.version.request=true \
-t test_topic -C -e
Fri 17 Apr 2020 18:03:17 BST
Create local Kafka cluster
Now spin up yourself a local Kafka cluster using this Docker Compose:
➜ docker-compose up -d
➜ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------
kafka-1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
kafka-2 /etc/confluent/docker/run Up 0.0.0.0:19092->19092/tcp, 9092/tcp
kafka-3 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 9092/tcp
replicator /etc/confluent/docker/run Up 0.0.0.0:58083->58083/tcp, 8083/tcp, 9092/tcp
zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
Set up Replicator
First, we’ll check that the Replicator container has started and is ready. Replicator runs as a plugin to Kafka Connect, so we use its API for interacting with Replicator.
echo "Waiting for Kafka Connect to start listening on localhost:58083 ⏳"
while : ; do
curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:58083/connectors)
echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
if [$curl_status -eq 200] ; then
break
fi
sleep 5
done
Should show:
Mon 20 Apr 2020 12:32:37 BST Kafka Connect listener HTTP state: 200 (waiting for 200)
We can also verify that the Replicator plugin has been loaded correctly:
curl -s localhost:58083/connector-plugins|jq '.[].class'|grep -q io.confluent.connect.replicator.ReplicatorSourceConnector
if [$? -eq 0]; then
echo "Replicator plugin is correctly loaded ✅"
else
echo "😢 Replicator plugin is not loaded. Please check the Kafka Connect worker logs and installation steps"
fi
Should show:
Replicator plugin is correctly loaded ✅
Now send the config to the Kafka Connect worker
source .env
epoch=$(date +%s)
curl -s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" "http://localhost:58083/connectors/replicator-source"$epoch"/config" \
-d '
{
"connector.class" : "io.confluent.connect.replicator.ReplicatorSourceConnector",
"key.converter" : "io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter" : "io.confluent.connect.replicator.util.ByteArrayConverter",
"header.converter" : "io.confluent.connect.replicator.util.ByteArrayConverter",
"src.kafka.bootstrap.servers" : "'$CCLOUD_BROKER_HOST':9092",
"src.kafka.security.protocol" : "SASL_SSL",
"src.kafka.sasl.mechanism" : "PLAIN",
"src.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CCLOUD_API_KEY'\" password=\"'$CCLOUD_API_SECRET'\";",
"src.consumer.group.id" : "replicator-'$epoch'",
"dest.kafka.bootstrap.servers": "kafka-1:39092,kafka-2:49092,kafka-3:59092",
"topic.whitelist" : "test_topic",
"topic.rename.format" :"${topic}",
"confluent.license" :"",
"confluent.topic.bootstrap.servers":"kafka-1:39092,kafka-2:49092,kafka-3:59092",
"confluent.topic.replication.factor":1,
"offset.start" :"consumer"
}'
Check that it’s running:
➜ curl -s "http://localhost:58083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [.value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
source | replicator-source1587382706 | RUNNING | RUNNING | io.confluent.connect.replicator.ReplicatorSourceConnector
Check that we’ve got data:
➜ kafkacat -b localhost:29092 -t test_topic -C -e
Fri 17 Apr 2020 18:03:17 BST
So now when data gets sent to Confluent Cloud, we get it also pushed to our local Kafka cluster.
Restarting and dealing with network glitches
The cool thing about Kafka, and Kafka Connect, is that it keeps track of where a particular consumer has read up to in a topic. Replicator therefore will read from a topic whilst it’s running, and if you stop and restart it, it’ll just catch up from where it got to before it stopped.
The same principle applies to if your local machine goes off the network, or perhaps just goes through some patchy connectivity. If it can connect to the source cluster (Confluent Cloud), it will do so. If it can’t, it’ll just keep trying and carry on again once it can do.
Ingesting a fresh copy of the data
So there’s restarting, and then there’s restarting. What if instead of wanting to restart the connector (we rebooted the machine, made a config change, whatever) we want to actually start afresh and start a new replication of the topic from Confluent Cloud?
Because of the clever way Kafka Connect uses the Kafka consumer group protocol to track offsets, if you were to delete the replicator configuration and create it afresh, it would still carry on from where it got to before! You can see the consumer group name (and consumption progress) in Confluent Cloud UI:
For this reason you may have noticed in the config that we ran above the use of epoch
in the configuration name and, most importantly, src.consumer.group.id
. This is just one way of ensuring a unique group name tied to this particular instance of the replicator. We can then choose to provision a new one if we want to start afresh, or restart an existing one.
Whilst you’re there in the Confluent Cloud UI you can check out the detailed view of the progress of a particular consumer group
Changing the target topic
There’s a bunch of parameters that you can set with Replicator. One particularly useful one is to modify the name of the target topic that Replicator writes to. Here’s an example of routing a source topic to a target one that includes the identifier (epoch
) of the Replicator that wrote it
"topic.rename.format":"${topic}-ccloud-'$epoch'",
The resulting topic name goes from test_topic
on the source (Confluent Cloud) to test_topic-ccloud-1587388241
on our target local cluster
➜ kafkacat -b localhost:29092 -t test_topic-ccloud-1587388241 -C -q -u -o end
Here's a test message, sent at Mon 20 Apr 2020 14:14:09 BST
Storing credentials safely
In the example above we passed the credentials for Confluent Cloud just as environment variables to Kafka Connect, which is not great from a security point of view. Instead we could use external secrets. Note that the Replicator docker container has the necessary config.providers
settings to enable this, and that we’ve mounted out local .env
file into the container.
…
CONNECT_CONFIG_PROVIDERS: 'file'
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
volumes:
- ./.env:/opt/config
Now when we create the replicator we can reference the file and attributes within it:
epoch=$(date +%s)
curl -s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" "http://localhost:58083/connectors/replicator-source"$epoch"/config" \
-d '
{
"connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
"key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"header.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"src.kafka.bootstrap.servers": "${file:/opt/config:CCLOUD_BROKER_HOST}",
"src.kafka.security.protocol": "SASL_SSL",
"src.kafka.sasl.mechanism": "PLAIN",
"src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/config:CCLOUD_API_KEY}\" password=\"${file:/opt/config:CCLOUD_API_SECRET}\";",
"src.consumer.group.id": "replicator-'$epoch'",
"dest.kafka.bootstrap.servers": "kafka-1:39092,kafka-2:49092,kafka-3:59092",
"topic.whitelist": "test_topic",
"topic.rename.format":"${topic}",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"kafka-1:39092,kafka-2:49092,kafka-3:59092",
"confluent.topic.replication.factor":1,
"offset.start":"consumer"
}'
It’s not just for Cloud
You can use Replicator between any two Apache Kafka clusters, and Confluent Control Center to give you the same consumer group monitoring view that I showed above.
Try it out by downloading Confluent Platform.
Top comments (0)