DEV Community

fawhizy
fawhizy

Posted on

How to Build Event-driven Architecture with Kafka and Python

What is Event-Driven Architecture?

Event-driven Architecture is a software design pattern that allows the application to act on events in real-time or near real-time. It is commonly used side by side with microservices. An event is any significant incident or change in state that occurs in the application. For example, in an ecommerce platform, an event could be a user interaction, such as adding an item to a cart, checking out, or making payments.

An event-driven architecture has three main components: event producers, routers, and consumers. Event producers are the components of a software application that generates events. These events can be triggered by user interaction or data changes. Once an event is activated, the producer sends a message containing the event data to the router. The event router filters the events, transforms them, and sends them to the event consumers that need them.

Benefits of event-driven architecture

1) Decoupling of Producers and Consumers:

Decoupling producers and consumers in event-driven architecture enables services to communicate without the producer being aware of the consumer and vice versa. There's no need to wait for responses from each other, thereby improving the response time.

2) Reduces the possibility of system failure:

In an event-driven architecture, if one service fails, it does not cause the failure of other services in the application. The event router, a significant component in event-driven architecture, serves as a buffer and stores events. Anytime a failed service comes back online, the event router delivers the event to the service. The event-driven architecture ensures that events are routed to the correct service and that services are isolated from each other, thereby reducing the risk of system failure.

3) Flexibility:

Event-driven architecture makes it possible to easily and quickly add new microservices to the application. The new microservices can easily consume the current events. This brings about flexibility and gives room for innovation.

Understanding Kafka

Kafka is an open-source streaming platform initially designed by Linkedin. Thousands of companies use it for data integration, high-performance data pipelines, and real-time data analytics. Kafka can route events in event-driven architectures with microservices as producers and consumers. Popular use cases of Kafka include messaging, activity monitoring, log aggregation, and database.

The core concept of Kafka

1) Topic:

Topics in Kafka are categories that are used to organize messages. Each topic has a unique name across the Kafka cluster. Producers send events to topics, and consumers read from topics. Kafka topics make it easy for microservices to send and receive events.

2) Producers:

Producers are applications that send events to the Kafka system. These events are sent to specific topics of their choice.

3) Consumers:

Consumers are applications that use the events sent to the Kafka system. They subscribe to topics of their choice and use the data in them.

4) Broker:

A broker is an instance of Kafka responsible for receiving and sending events in an event-driven architecture.

Setting up the environment.

First, you must have installed Apache Kafka and Zookeeper on your local machine. If you use Ubuntu, here's a tutorial for installing APache Kafaka https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04. Tutorials exist for other OS.

The next step is to install kafka-python using pip

pip install kafka-python
Enter fullscreen mode Exit fullscreen mode

You'll have to start the zookeeper server and Kafka broker before you can execute the project you're about to build.

Project codes

In this project, you will create a producer that sends texts from a list of strings to a Kafka broker. Then we will create a consumer that reads the text and saves it in a MongoDB collection.

One of the advantages of using Kafka is that whenever a consumer breaks down, and you fix it, the consumer will continue reading from where it left it earlier. You can also create another consumer to continue from where the earlier one left it. This ensures that all data is stored in the database without missing any data.

Let's start building your project; create a new Python file called producer.py. Import the required libraries.

from time import sleep 
from kafka import KafkaProducer 
from json import dumps
Enter fullscreen mode Exit fullscreen mode

Now that you've imported the required libraries, the next step is initializing the Kafka producer. Pay attention to the following parameters:

  1. Bootstrap_servers = ['localhost: 9092']: This is used to set the host and port to identify the Kafka broker to which the producer and consumer will connect. It is not compulsory to set this since the default is localhost:9092

  2. value_serializer=lambda x: dumps(x).encode('utf-8'): This function is used to serialize the data before sending it to the Kafka broker. The data is converted to a JSON format and encoded to utf-8.

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'))
Enter fullscreen mode Exit fullscreen mode

Now, you'll create a list of items you want to send to the broker.

messages = ["apple", "banana", "mango", "strawberry", "grapes", "orange", "pineapple", "peach", "kiwi"]
Enter fullscreen mode Exit fullscreen mode

Using a for loop, you'll iterate through the list, feeding each text as a dictionary with another 'message' as the key to the broker. This key is not the topic but just the key for your data. This can be done by calling the send method the producer. The send method takes in two arguments; your topic and the data. You'll take a 5-second break after each iteration.

for message in messages:
    data = {'message': message}
    producer.send('message_test', data) sleep(5)
Enter fullscreen mode Exit fullscreen mode

The next step is to create the consumer. Create another file called consumer.py and import the necessary libraries, such as json.loads, MongoClient and KafkaConsumer. It is not compulsory to use Pymongo; you can use any database you're comfortable with.

from pymongo import Mongoclient
from kafka import KafkaConsumer
from json import loads
Enter fullscreen mode Exit fullscreen mode

The next step is to create the KafkaConsumer. Pay attention to the following parameters:

  1. Topic: this is the first argument; in your case, it is message_test

  2. bootstrap_server=['localhost’:9092]: same as in producer

  3. auto_offset_reset=’earliest': This parameter is used to handle where the consumer restarts reading the message after a breakdown. It can be set to 'earliest' or 'latest'. If it is set to 'latest', the consumer starts reading from the end of the log. If set to earliest, the consumer starts reading from the latest committed offset.

  4. enable_auto_commit=True: This is to ensure that the consumer commits its read offset every interval

  5. auto_commit_interval_ms=1000ms: This sets the interval between two commits at 1 second.

  6. group_id=’messge_reader': This is the group to which the consumer belongs. A consumer must be a part of a group to make them work automatically committed.

  7. value_deserializer=lambda x: loads(x.decode('utf-8')): This is used to deserialize data into a general JSON format.

consumer = KafkaConsumer( 
    'message_test', 
    bootstrap_servers = ['localhost : 9092'], 
    auto_offset_reset = 'earliest', 
    enable_auto_commit = True, 
    group_id = 'my-group', 
    value_deserializer = lambda x : loads(x.decode('utf-8')) 
    )
Enter fullscreen mode Exit fullscreen mode

The next step is to connect to the message_test collection of MongoDB.

client = MongoClient('localhost: 27017')
collection = client.message_test.message_test.message_test
Enter fullscreen mode Exit fullscreen mode

The data in the Kafka topic can be extracted by looping through the consumer. Each data can then be inserted in the MongoDB collection while looping.

for message in consumer:
    message = message.value
    collection.insert_one(message)
    print(message + "added to" + collection)
Enter fullscreen mode Exit fullscreen mode

To test the code, execute the produce.py file, then open a new terminal and execute consume.py. You'll notice how all the messages in the list are displayed.

Press Ctrl + C to interrupt the consumer, note the last message, and execute the consumer again. You'll notice that the consumer will pick up all the missed messages and then continue listening for new ones.

Note that if you made the interrupt within one second of reading, the last message would be retrieved again when you restart. This is because auto_commit_interval is set to 1 second.

Building an event-driven architecture with Kafka and Python effectively creates a highly scalable and reliable platform for data streaming and storage. Developers can leverage the power of Kafka and Python to develop applications capable of handling large amounts of data in real-time. With event-driven architecture, developers can ensure their applications are scalable and efficient.

Top comments (0)