In this article, we will cover the following
- Introduction to Kafka and its use cases
- Setting up a Kafka server
- Manual setup
- Docker setup
- Installing the Python Kafka library (e.g. Kafka-python)
- Producing messages to a Kafka topic
- Consuming messages from a Kafka topic
- Advanced Kafka features (e.g. custom serialization/deserialization, message keys, etc.)
- Error handling and troubleshooting
- Conclusion and resources for further learning
1. Introduction to Kafka and its use cases
Apache Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. One of its key features is its ability to handle a large number of concurrent reads and writes, making it well-suited for handling high volumes of data from multiple sources.
Some common use cases for Kafka include:
Real-time data pipelines: Collecting and processing data from various sources in real-time, such as log data, sensor data, and social media feeds.
Stream processing: Analyzing and processing data streams as they are generated, such as detecting patterns or anomalies in data.
Event-driven architectures: Building systems that respond to specific events, such as sending a message or triggering a workflow.
How do the Kafka brokers and clients keep track of all the Kafka brokers if there is more than one? The Kafka team decided to use Zookeeper for this purpose.
Zookeeper is used for metadata management in the Kafka world. For example:
- Zookeeper keeps track of which brokers are part of the Kafka cluster
- Zookeeper is used by Kafka brokers to determine which broker is the leader of a given partition and topic and perform leader elections
- Zookeeper stores configurations for topics and permissions
- Zookeeper sends notifications to Kafka in case of changes (e.g. new topic, broker dies, broker comes up, delete topics, etc.…)
In this tutorial, you'll learn how to use the Kafka-python library to interact with a Kafka cluster. We'll start by setting up a Kafka cluster, then move on to producing and consuming messages using python code.
2.0. Setting up a Kafka server (Option 1)
Setting up a Kafka server can be a bit involved, but once set up, it can be run on any machine that has a Java Runtime Environment (JRE) installed. Here are the general steps to set up a Kafka server:
There are two options of Kafka: One by Apache foundation and other by Confluent as a package. For this tutorial, I will go with the one provided by Apache foundation.
- Download the latest version of Kafka from the Apache Kafka.
- Extract the downloaded file to a directory on your machine
- Navigate to the extracted folder directory ```bash
cd /kafka_
4. Start the zookeeper server by running the following command from the Kafka directory:
```bash
bin/zookeeper-server-start.sh config/zookeeper.properties
- Start the Kafka server by running the following command from the Kafka directory: ```bash
bin/kafka-server-start.sh config/server.properties
By default, the server will listen on port `9092` and the zookeeper server on port `2181`. You can change these settings by modifying the server.properties and zookeeper.properties file respectively.
If you want to create a multi-node Kafka cluster, you'll need to set up and configure additional Kafka brokers on separate machines. Each broker will need its own unique broker ID, and you'll need to configure the cluster so that the brokers can communicate with each other.
You can also run Kafka on cloud providers like AWS, GCP, or Azure. You can use their managed Kafka service or launch kafka clusters on their virtual machines.
It's worth noting that running a kafka server in production requires a lot of configuration and monitoring, so it's recommended to use a managed service or use Confluent Platform which is a more complete distribution of Apache Kafka that includes additional features and management tools.
---
## 2.1. Setting up a Kafka server using Docker (Option 2)
Setting up a Kafka server using Docker can be a convenient way to quickly spin up a Kafka cluster for testing or development purposes. Here are the general steps to set up a Kafka server using Docker:
1. Install [Docker](https://docs.docker.com/engine/install/) on your machine if it's not already installed.
2. Install [docker-compose](https://docs.docker.com/compose/)
3. Create a `docker-compose.yml` file and add the following `zookeeper` and `kafkga` config
```yaml
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
restart: on-failure
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
restart: on-failure
- Start a zookeeper & kafka container by running the following command: ```bash
docker-compose up -d
This command will start a new container named "kafka" and map port `29092` of the host machine to port '29092' of the container. It also links the "kafka" container to the "zookeeper" container, so that the Kafka container can connect to the zookeeper container.
5. Check the logs to see the zookeeper container has booted up successfully
```bash
docker logs zookeeper
## Output ->
## ...
## 2023-01-25 13:22:48 [2023-01-25 10:22:48,634] INFO binding to port 0.0.0.0/0.0.0.0:32181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
## ...
- Check the logs to see the kafka server has booted up successfully ```bash
docker logs zookeeper
Output ->
## ....
## [2023-01-25 13:00,295] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
## [2023-01-25 13:31:00,295] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
## ...
> It's worth noting that the above setup is for development and testing purposes, running a Kafka server in production requires a lot of configuration and monitoring, so it's recommended to use a managed service like AWS MSK, GCP, or Azure or use Confluent Platform for more advanced features and management tools.
---
## Simple Python Project
---
**_NOTE:_** Ensure you get the right Kafka version for the step below. You can get this bu using the following command
```bash
docker exec kafka kafka-topics --version # with docker setup
bin/kafka-topics.sh --version # If you used the manual setup
The output should be something like this:
7.3.1-ccs (Commit:8628b0341c3c46766f141043367cc0052f75b090)
3.1. Installing the Python Kafka library (e.g. Kafka-python)
Make sure that you have the kafka-python library installed, you can install it via pip:
pip install kafka-python
3.2. Producing messages to a Kafka topic
Create a new file named producer.py
and add the following code:
import os
import time
import random
import json
from kafka import KafkaProducer
KAFKA_BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:29092")
KAFKA_TOPIC_TEST = os.environ.get("KAFKA_TOPIC_TEST", "test")
KAFKA_API_VERSION = os.environ.get("KAFKA_API_VERSION", "7.3.1")
producer = KafkaProducer(
bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
api_version=KAFKA_API_VERSION,
)
i = 0
while i <= 30:
producer.send(
KAFKA_TOPIC_TEST,
json.dumps({"message": f"Hello, Kafka! - test {i}"}).encode("utf-8"),
)
i += 1
time.sleep(random.randint(1, 5))
producer.flush()
This code creates a new Kafka producer, which is connected to the Kafka cluster specified by the "bootstrap_servers" parameter. The code then sends a message "Hello, Kafka!" to the topic "test". We simulate events being published by taking take a random time break between 1 to 5 on every iteration.
3.3. Consuming messages from a Kafka topic
Create a new file named consumer.py
and add the following code:
import os
from kafka import KafkaConsumer
KAFKA_BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:29092")
KAFKA_TOPIC_TEST = os.environ.get("KAFKA_TOPIC_TEST", "test")
KAFKA_API_VERSION = os.environ.get("KAFKA_API_VERSION", "7.3.1")
consumer = KafkaConsumer(
KAFKA_TOPIC_TEST,
bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
api_version=KAFKA_API_VERSION,
auto_offset_reset="earliest",
enable_auto_commit=True,
)
for message in consumer:
print(message.value.decode("utf-8"))
This code creates a new Kafka consumer, which is connected to the Kafka cluster specified by the "bootstrap_servers" parameter. The code then subscribes to the topic "test" and continuously polls for new messages. Each message received is printed to the console
- Run the producer.py script to send the message to the kafka topic ```bash
python producer.py
2. Run the consumer.py script to consume the message from the kafka topic
```bash
python consumer.py
This will start the consumer, which will continuously poll for new messages from the "test" topic. Each time a message is received, it will be printed to the console.
6. Advanced Kafka features
Kafka provides many advanced features that you can use in your application, including:
Compression: you can compress the messages before sending to kafka to save on bandwidth
Message keys: you can specify a key for each message which can be used for partitioning the messages
Consumer Groups: allows you to have multiple consumers reading from the same topic, enabling parallel processing and load balancing
Fault Tolerance: kafka is designed to be fault-tolerant and can handle failures of individual nodes without losing messages or affecting performance
7. Error handling and troubleshooting
As with any distributed system, errors can occur while working with Kafka. Some common errors you may encounter include:
Connection errors: These occur when the producer or consumer is unable to connect to the Kafka cluster.
Leader not available: This error occurs when the leader broker for a partition is not available
To troubleshoot these errors, you can check the logs of the Kafka broker and consumer, and check for error messages. The Kafka-python library also provides several exception classes that you can catch and handle in your code, such as KafkaError and KafkaTimeoutError.
8. Conclusion and resources for further learning
In this tutorial, we've covered the basics of using the Kafka-python library to interact with a Kafka cluster. We've shown how to set up a Kafka cluster, produce and consume messages, and use some of the advanced features provided by Kafka.
To continue learning about Kafka, you can check out the official Kafka documentation, as well as the documentation for the Kafka-python library. There are also many tutorials and blog posts available online that cover different aspects of using Kafka.
I hope this tutorial has been helpful in getting you started with using Kafka and the Kafka-python library. Let me know if there is anything else that you would like me to include or explain in more detail.
PS: Here's a GitHub link to the final project
Apache Kafka with Python
Screen.Recording.2023-01-25.at.16.55.49.mov
How to set up the project
Features
- python 3.10
- poetry as dependency manager
PROJECT SETUP
- clone the repository
git clone https://github.com/Hesbon5600/kafka-python.git
- cd into the directory
cd kafka-python
create environment variables
On Unix or MacOS, run:
cp .env.example .env
You can edit whatever values you like in there.
Note: There is no space next to '='
On terminal
source .env
VIRTUAL ENVIRONMENT
To Create:
make env
Installing dependencies:
make install
Top comments (0)