DEV Community

loading...

Kafka Connect - Crash Course

thegroo profile image Marcos Maia Updated on ・7 min read

Kafka Connect is becoming a force on the Change Data Capture field.

Kafka itself has gained a lot of momentum being more and more adopted by Companies trying to move their data workloads from batch processing to micro-batching/realtime processing of events among other practical possible solutions of using it.

Kafka Connect is a tool which helps to enable almost Real-Time synchronization of data between silos within an organization. With its clear Approach to connect systems, helping you to move data and apply simple transformations to the data.

Kafka Connect enables us to solve the "real time" data integration needed these days within a corporation with its hundreds or thousands of different systems while relying on standard Kafka features like scalability, fault tolerance and high availability.

From Kafka Documentation:

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency.

Types of connectors

Source Connectors: import data from another system into Kafka

Sink Connectors: export data from Kafka to another systems

Kafka Connectors Architectural overview

Kafka Connect Overview

Modes of execution

Kafka Connect currently supports two modes of execution: standalone (single process) and distributed.

In standalone mode, all work is performed in a single process it's simpler to get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance.

Distributed mode handles automatically the balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data.

In its last release, while writing this post, Kafka Connect 2.3.0 incorporated a new implementation feature which makes it even more attractive, it's now capable of only partially pausing processing messages when going through a rebalance. A feature called, Incremental Cooperative Rebalancing.
i.e - only the affected partition processed by a worker will be paused while the other workers connected to different partitions can continue processing during rebalancing.

One thing I've noticed is Kafka Connect available tutorials and instructions are everywhere but they are mostly covering the Confluent Connectors and some other existing platforms and recommending to use their CLIs to install and configure the connectors, which is good and brings many benefits in an enterprise environment to running production workloads, but those tools hide away the Connector Implementation and possibilities behind it from Developers like us, so the goal here in this article is to cover the basics and run the simplest possible Kafka Connect using the "plain" Kafka distribution and the command line, no specific vendor add ons on top of it.

Setup

  • clone the repo
git clone git@github.com:stockgeeks/docker-compose.git

To run the example from this post we will use a docker-compose file with all our dependencies to run Kafka plus an extra container with the built-in FileStream Source Connector configured.

For details on the docker-compose setup for Kafka and Zookeeper please check this previous article to run a basic local Kafka instance for local development with some useful commands and tricks.

We will be using the wurstmeister/kafka image as it's a pure build directly from the Open Source Kafka Distribution.

The full docker-compose file can be picked up cloning this repo under kafka-connect-crash-course folder. Here's a direct link to it.

The compose declared services explained

The compose file set up 3 containers, the two first containers are picked up from the docker hub and use the wurstmeister images for Kafka and Zookeeper.

  # https://github.com/wurstmeister/zookeeper-docker
  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - '2181:2181'

  # https://hub.docker.com/r/wurstmeister/kafka/
  kafka:
    container_name: kafka
    image: wurstmeister/kafka:2.12-2.3.0
    environment:
      ## the >- used below infers a value which is a string and properly
      ## ignore the multiple lines resulting in one long string:
      ## https://yaml.org/spec/1.2/spec.html

      ## You need to make sure to specify your hostname in a file in this
      ## same dir as this compose file called `.env`(uncomment the line)
      ## or to register in your `/etc/hosts` kafka as your loopback interface
      ## address together with hostname and 127.0.0.1
      KAFKA_ADVERTISED_LISTENERS: >-
        LISTENER_DOCKER_INTERNAL://kafka:19092,
        LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-kafka}:9092

      KAFKA_LISTENERS: >-
        LISTENER_DOCKER_INTERNAL://:19092,
        LISTENER_DOCKER_EXTERNAL://:9092

      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >-
        LISTENER_DOCKER_INTERNAL:PLAINTEXT,
        LISTENER_DOCKER_EXTERNAL:PLAINTEXT

      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      # we create topic with 1 partition and 1 replica as it's for local dev and we're running a single broker instance.
      KAFKA_CREATE_TOPICS: 'simple-connect:1:1'
      KAFKA_LOG4J_LOGGERS: >-
        kafka.controller=INFO,
        kafka.producer.async.DefaultEventHandler=INFO,
        state.change.logger=INFO

    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

the third one, which is the connector, is built in from the project to show how Kafka Connect can be configured and set up for local development. It relies on docker-compose ability to build docker images from a Dockerfile.

connect-standalone:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: connect-standalone
    ports:
      - 8083:8083
    depends_on:
      - kafka
    volumes:
      - /tmp:/tmp

So we're building the image from a Dockerfile, let's take a look at it:

# we start from a Kafka image as the connector is in Kafka distribution
FROM wurstmeister/kafka:2.12-2.3.0

# we replace the default connect-standalone.properties so we can properly resolve to our local Kafka docker development
COPY connect-standalone.properties /opt/kafka/config/

COPY connect-file-source.properties /opt/kafka/config/

# we replace the start command creating a connector instead of starting a kafka broker.
COPY start-kafka.sh /usr/bin/

# permissions
RUN chmod a+x /usr/bin/start-kafka.sh

The start-kafka.sh script is a very simple script that uses the provided connector script to initialize a standalone connector passing in it's general properties and specific connector properties files as parameter:

# connector start command here.
exec "/opt/kafka/bin/connect-standalone.sh" "/opt/kafka/config/connect-standalone.properties" "/opt/kafka/config/connect-file-source.properties"

We override the general connect-standalone.properties to change the resolution name for the kafka broker running in our docker-compose setup so we have changed the line of the bootstrap-servers to the resolvable name kafka within our docker network:

bootstrap.servers=kafka:9092

And the connect-file-source.properties is the local configuration of the built-in FileStreamSource Connector. This connector is provided as part of the Apache Kafka Distribution the contents of the file are commented when not self explainable next.

name=file-source-connector
connector.class=FileStreamSource
tasks.max=1

# the file from where the connector should read lines and publish to kafka, this is inside the docker container so we have this
# mount in the compose file mapping this to an external file where we have rights to read and write and use that as input.
file=/tmp/my-source-file.txt

# data read from the file will be published to the specified topic
topic=simple-connect

# We override the defaults for this connector example as we want to quickly just validate it for now.
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# We don't need converters for the simple example
key.converter.schemas.enable=false
value.converter.schemas.enable=false

For simplicity, we override the key and value converters and schemas and we configure the file to be used as input to be /tmp/my-source-file.txt this is an internal container path that we then map to an external volume in our workspace where we know we have read and write access, fromi docker-compose.yml

connect-standalone:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: connect-standalone
    ports:
      - 8083:8083
    depends_on:
      - kafka
    volumes:
      - /tmp:/tmp

Running it

  • Build the custom docker image locally, navigate to the directory where the docker-compose is and run: docker-compose build. This will download the required images and build the connector image.

  • Run it: docker-compose up -d will run the containers in background, you can then check the running containers with docker ps or docker-compose ps, there should be 3 containers running:

connect-setup-running

  • Let's now attach a console consumer to the topic so we can consume when new messages are published to it, enter the Kafka running container:
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic simple-connect --from-beginning

There are some nice tools that enable you to run Kafka commands in simpler ways(see references by the end of this article) but they usually wrap around the basic commands and scripts and I find it important that you be aware of the core available commands for learning purposes, that's why I don't recommend you to use those tools initially.

Some useful REST calls

The Kafka Connectors Framework exposes by default a REST port so we can get some useful information and issue some commands to manage it. You can use your favorite tool to interact with it, I mostly use curl, Insomnia and Postman.

See some examples of querying for info:

# returns a list of active connectors
GET /connectors

# information about the specified connector
GET /connectors/{name}

# configuration for specified connector
GET /connectors/{name}/config

# status of specified connector
GET /connectors/{name}/status

# connector tasks
GET /connectors/{name}/tasks

# status of tasks for specified connector
GET /connectors/{name}/tasks/{taskid}/status

# list of connector installed plugins
GET /connector-plugins

References

Apache Kafka 2.3.0 Release Notes

Apache Kafka Connect Documentation

Confluent Connectors Developer Documentation

Confluent Connectors Hub

Other connectors - Debezium, Landoop, and many others available at a single search distance with your favorite search engine.

Some Kafka Tools - Confluent CLI, KafkaCat

Discussion (6)

Collapse
emperoar profile image
Julius Bacosa

do you have any idea why my dockerized asp.net core app can't connect to kafka's localhost:9092, but if not dockerized it can successfully connect?

e.g.
docker run -p 5001:80 --rm my-api
my bootstrap server is: BootstrapServers = "127.0.0.1:9092"

Collapse
thegroo profile image
Marcos Maia Author • Edited

Hi, most likely related to the KAFKA_ADVERTISED_LISTENERS when using a docker container which enables you to pass in the advertised.listeners values to kafka, notice that this is a comma-separated list of listeners with their the host/ip and port and it's metadata that’s passed back to clients, so check your docker image settings, which docker image are you using?

kafka.apache.org/documentation/#br...

Also make sure to be exposing the internal kafka docker port.

Collapse
emperoar profile image
Julius Bacosa

Sorry for the late reply, got this working already Thanks!

Collapse
ververo3 profile image
Tiurina Veronika

I`ve followed that instraction, but have got the error:
PS D:\9_soft\docker\streaming_kafka_with_docker\win_kafka_manager\kafka_connect_docker> docker-compose build
zookeeper uses an image, skipping
kafka uses an image, skipping
Building connect-standalone
Step 1/5 : FROM wurstmeister/kafka:2.12-2.3.0
---> 48591fe02a72
Step 2/5 : COPY connect-standalone.properties /opt/kafka/config/
---> Using cache
---> 5c91bc255d31
Step 3/5 : COPY connect-file-sink.properties /opt/kafka/config/
---> Using cache
---> 32044e2edb55
Step 4/5 : COPY start-kafka.sh /opt/kafka/bin/
---> Using cache
---> 6f1c44233efc
Step 5/5 : RUN chmod a+x /usr/bin/start-kafka.sh
---> Using cache
---> 45a188c5f8bc

Successfully built 45a188c5f8bc
Successfully tagged kafka_connect_docker_connect-standalone:latest
PS D:\9_soft\docker\streaming_kafka_with_docker\win_kafka_manager\kafka_connect_docker> docker-compose up -d
Creating network "kafka_connect_docker_default" with the default driver
Creating zookeeper ... done Creating kafka ... done Creating connect-standalone ... done PS D:\9_soft\docker\streaming_kafka_with_docker\win_kafka_manager\kafka_connect_docker> docker-compose ps

Name Command State Ports

connect-standalone start-kafka.sh Exit 1
kafka start-kafka.sh Up 0.0.0.0:9092->9092/tcp
zookeeper /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
PS D:\9_soft\docker\streaming_kafka_with_docker\win_kafka_manager\kafka_connect_docker> docker-compose logs connect-standalone
Attaching to connect-standalone
connect-standalone | ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT

Collapse
thegroo profile image
Marcos Maia Author

Hum.. That's unfortunate, I will run this again and double check but definitely worked before? What O.S are you running? Did you check the ports if they're all available? Did you update any versions of the images in the compose-file before running? Because KAFKA_ZOOKEEPER_CONNECT is actually defined.

Also I would check your yml file for proper identation and formatting, yml can be very tricky for those. Make sure it's compliant. I've never used but you could try this one: yamllint.com/ or add a yml lint to your IDE and make sure it's properly formatted.

Collapse
arabindabehera profile image
Arabinda Behera

If will set bootstrap.servers=kafka:9092 in my connect-standalone.properties file and copy it to the connect container how is going to resolve the kafka broker ip ? I am getting following error as it is not able to resolve.

WARN Couldn't resolve server kafka:9092 from bootstrap.servers as DNS resolution failed for kafka (org.apache.kafka.clients.ClientUtils:74)

Forem Open with the Forem app