DEV Community

Cover image for Apache Kafka with Python
Hesbon
Hesbon

Posted on • Updated on

Apache Kafka with Python

In this article, we will cover the following

  1. Introduction to Kafka and its use cases
  2. Setting up a Kafka server
    • Manual setup
    • Docker setup
  3. Installing the Python Kafka library (e.g. Kafka-python)
  4. Producing messages to a Kafka topic
  5. Consuming messages from a Kafka topic
  6. Advanced Kafka features (e.g. custom serialization/deserialization, message keys, etc.)
  7. Error handling and troubleshooting
  8. Conclusion and resources for further learning

1. Introduction to Kafka and its use cases

Apache Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. One of its key features is its ability to handle a large number of concurrent reads and writes, making it well-suited for handling high volumes of data from multiple sources.

Some common use cases for Kafka include:

Real-time data pipelines: Collecting and processing data from various sources in real-time, such as log data, sensor data, and social media feeds.
Stream processing: Analyzing and processing data streams as they are generated, such as detecting patterns or anomalies in data.
Event-driven architectures: Building systems that respond to specific events, such as sending a message or triggering a workflow.



How do the Kafka brokers and clients keep track of all the Kafka brokers if there is more than one? The Kafka team decided to use Zookeeper for this purpose.

Zookeeper is used for metadata management in the Kafka world. For example:

  • Zookeeper keeps track of which brokers are part of the Kafka cluster
  • Zookeeper is used by Kafka brokers to determine which broker is the leader of a given partition and topic and perform leader elections
  • Zookeeper stores configurations for topics and permissions
  • Zookeeper sends notifications to Kafka in case of changes (e.g. new topic, broker dies, broker comes up, delete topics, etc.…)

Apache Kafka Components


In this tutorial, you'll learn how to use the Kafka-python library to interact with a Kafka cluster. We'll start by setting up a Kafka cluster, then move on to producing and consuming messages using python code.


2.0. Setting up a Kafka server (Option 1)

Setting up a Kafka server can be a bit involved, but once set up, it can be run on any machine that has a Java Runtime Environment (JRE) installed. Here are the general steps to set up a Kafka server:

There are two options of Kafka: One by Apache foundation and other by Confluent as a package. For this tutorial, I will go with the one provided by Apache foundation.

  1. Download the latest version of Kafka from the Apache Kafka.
  2. Extract the downloaded file to a directory on your machine
  3. Navigate to the extracted folder directory

    cd <BASE_DIRECTORY>/kafka_<version>
    
  4. Start the zookeeper server by running the following command from the Kafka directory:

    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  5. Start the Kafka server by running the following command from the Kafka directory:

    bin/kafka-server-start.sh config/server.properties
    

By default, the server will listen on port 9092 and the zookeeper server on port 2181. You can change these settings by modifying the server.properties and zookeeper.properties file respectively.

If you want to create a multi-node Kafka cluster, you'll need to set up and configure additional Kafka brokers on separate machines. Each broker will need its own unique broker ID, and you'll need to configure the cluster so that the brokers can communicate with each other.

You can also run Kafka on cloud providers like AWS, GCP, or Azure. You can use their managed Kafka service or launch kafka clusters on their virtual machines.

It's worth noting that running a kafka server in production requires a lot of configuration and monitoring, so it's recommended to use a managed service or use Confluent Platform which is a more complete distribution of Apache Kafka that includes additional features and management tools.


2.1. Setting up a Kafka server using Docker (Option 2)

Setting up a Kafka server using Docker can be a convenient way to quickly spin up a Kafka cluster for testing or development purposes. Here are the general steps to set up a Kafka server using Docker:

  1. Install Docker on your machine if it's not already installed.
  2. Install docker-compose
  3. Create a docker-compose.yml file and add the following zookeeper and kafkga config

    version: '3.7'
    services:
    zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    restart: on-failure
    kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    restart: on-failure
    
  4. Start a zookeeper & kafka container by running the following command:

    docker-compose up -d
    

    This command will start a new container named "kafka" and map port 29092 of the host machine to port '29092' of the container. It also links the "kafka" container to the "zookeeper" container, so that the Kafka container can connect to the zookeeper container.

  5. Check the logs to see the zookeeper container has booted up successfully

    docker logs zookeeper
    ## Output -> 
    ## ...
    ## 2023-01-25 13:22:48 [2023-01-25 10:22:48,634] INFO binding to port 0.0.0.0/0.0.0.0:32181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
    ## ...
    
  6. Check the logs to see the kafka server has booted up successfully

    docker logs zookeeper
    ## Output -> 
    ## ....
    ## [2023-01-25 13:00,295] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
    ## [2023-01-25 13:31:00,295] INFO [Kafka Server 1], started (kafka.server.KafkaServer) 
    ## ...
    

It's worth noting that the above setup is for development and testing purposes, running a Kafka server in production requires a lot of configuration and monitoring, so it's recommended to use a managed service like AWS MSK, GCP, or Azure or use Confluent Platform for more advanced features and management tools.


Simple Python Project


NOTE: Ensure you get the right Kafka version for the step below. You can get this bu using the following command

docker exec kafka kafka-topics --version # with docker setup
bin/kafka-topics.sh --version # If you used the manual setup
Enter fullscreen mode Exit fullscreen mode

The output should be something like this:

7.3.1-ccs (Commit:8628b0341c3c46766f141043367cc0052f75b090)
Enter fullscreen mode Exit fullscreen mode

3.1. Installing the Python Kafka library (e.g. Kafka-python)

Make sure that you have the kafka-python library installed, you can install it via pip:

pip install kafka-python
Enter fullscreen mode Exit fullscreen mode

3.2. Producing messages to a Kafka topic

Create a new file named producer.py and add the following code:

import os
import time
import random
import json
from kafka import KafkaProducer
KAFKA_BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:29092")
KAFKA_TOPIC_TEST = os.environ.get("KAFKA_TOPIC_TEST", "test")
KAFKA_API_VERSION = os.environ.get("KAFKA_API_VERSION", "7.3.1")
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
    api_version=KAFKA_API_VERSION,
)
i = 0
while i <= 30:
    producer.send(
        KAFKA_TOPIC_TEST,
        json.dumps({"message": f"Hello, Kafka! - test {i}"}).encode("utf-8"),
    )
    i += 1
    time.sleep(random.randint(1, 5))
producer.flush()
Enter fullscreen mode Exit fullscreen mode

This code creates a new Kafka producer, which is connected to the Kafka cluster specified by the "bootstrap_servers" parameter. The code then sends a message "Hello, Kafka!" to the topic "test". We simulate events being published by taking take a random time break between 1 to 5 on every iteration.

3.3. Consuming messages from a Kafka topic

Create a new file named consumer.py and add the following code:

import os
from kafka import KafkaConsumer
KAFKA_BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:29092")
KAFKA_TOPIC_TEST = os.environ.get("KAFKA_TOPIC_TEST", "test")
KAFKA_API_VERSION = os.environ.get("KAFKA_API_VERSION", "7.3.1")
consumer = KafkaConsumer(
    KAFKA_TOPIC_TEST,
    bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
    api_version=KAFKA_API_VERSION,
    auto_offset_reset="earliest",
    enable_auto_commit=True,
)
for message in consumer:
    print(message.value.decode("utf-8"))
Enter fullscreen mode Exit fullscreen mode

This code creates a new Kafka consumer, which is connected to the Kafka cluster specified by the "bootstrap_servers" parameter. The code then subscribes to the topic "test" and continuously polls for new messages. Each message received is printed to the console

  1. Run the producer.py script to send the message to the kafka topic

    python producer.py
    
  2. Run the consumer.py script to consume the message from the kafka topic

    python consumer.py
    

This will start the consumer, which will continuously poll for new messages from the "test" topic. Each time a message is received, it will be printed to the console.


6. Advanced Kafka features

Kafka provides many advanced features that you can use in your application, including:

Compression: you can compress the messages before sending to kafka to save on bandwidth
Message keys: you can specify a key for each message which can be used for partitioning the messages
Consumer Groups: allows you to have multiple consumers reading from the same topic, enabling parallel processing and load balancing
Fault Tolerance: kafka is designed to be fault-tolerant and can handle failures of individual nodes without losing messages or affecting performance


7. Error handling and troubleshooting

As with any distributed system, errors can occur while working with Kafka. Some common errors you may encounter include:

Connection errors: These occur when the producer or consumer is unable to connect to the Kafka cluster.
Leader not available: This error occurs when the leader broker for a partition is not available
To troubleshoot these errors, you can check the logs of the Kafka broker and consumer, and check for error messages. The Kafka-python library also provides several exception classes that you can catch and handle in your code, such as KafkaError and KafkaTimeoutError.


8. Conclusion and resources for further learning

In this tutorial, we've covered the basics of using the Kafka-python library to interact with a Kafka cluster. We've shown how to set up a Kafka cluster, produce and consume messages, and use some of the advanced features provided by Kafka.

To continue learning about Kafka, you can check out the official Kafka documentation, as well as the documentation for the Kafka-python library. There are also many tutorials and blog posts available online that cover different aspects of using Kafka.

I hope this tutorial has been helpful in getting you started with using Kafka and the Kafka-python library. Let me know if there is anything else that you would like me to include or explain in more detail.

PS: Here's a GitHub link to the final project

Apache Kafka with Python

Screen.Recording.2023-01-25.at.16.55.49.mov

Tutorial Link (Apache Kafka with Python)

How to set up the project

Features

  • python 3.10
  • poetry as dependency manager

PROJECT SETUP

  • clone the repository
git clone https://github.com/Hesbon5600/kafka-python.git
Enter fullscreen mode Exit fullscreen mode
  • cd into the directory
cd kafka-python
Enter fullscreen mode Exit fullscreen mode

create environment variables

On Unix or MacOS, run:

cp .env.example .env
Enter fullscreen mode Exit fullscreen mode

You can edit whatever values you like in there.

Note: There is no space next to '='

On terminal

source .env
Enter fullscreen mode Exit fullscreen mode

VIRTUAL ENVIRONMENT


To Create:

make env
Enter fullscreen mode Exit fullscreen mode

Installing dependencies:

make install
Enter fullscreen mode Exit fullscreen mode

Top comments (0)