DEV Community

Cover image for Introduction To How Kafka Works And Implementation Using Python-client
horiyomi
horiyomi

Posted on

Introduction To How Kafka Works And Implementation Using Python-client

OUTLINE

i. A brief introduction to why we might consider Kafka for our business.

ii. We explain what Kafka is.

iii. A brief explanation on the why of Kafka.

iv. Highlighting why Kafka is so fast.

v. A brief mention of companies using Kafka.

vi. How to get started with Kafka installations, components of Kafka and what they're responsible for.

vii. A walk through tutorial on Kafka

viii. Conclusion


Setup a Python client for Kafka with kafka-python

Real-time data usage has become the new business order of the day both for
businesses and their customers. However, one of the key factors to
consider is how the business use case comes about their data for real-time
usage i.e does the use case do more of writing data than they read,
more of read than write or more of both and need to take actionable
steps in real-time and in an event driven approach, here is where Apache Kafka comes in. We will be going over what Kafka is, Kafka concepts, who is using it, how to set it up and how to use it with a python client (kafka-python) in this tutorial.

What is Apache Kafka?

Kafka is an event streaming distributed messaging system which consists of
servers and clients communicating over high-performance TCP network
protocol.

PS: Kafka was developed at Linkedin but now managed under the Apache foundation hence the Apache Kafka. I will be referring to Apache Kafka as Kafka throughout this tutorial

Event Streaming

Event streaming is the capturing, processing and transforming of data in real-time to various events from different sources e.g website clicks, databases, logging systems, IOT devices e.t.c.

while ensuring continuous flow and routing stream data to various destinations anticipating the data from the event.

Why Kafka?

Kafka is used in real-time event streaming data architectures to provide real-time data analytics, messages are stored on disk with Kafka, providing intra-cluster replication thereby making messages more durable, more reliable and supporting multiple subscribers.

Kafka is able to continuously stream events by using
publish-subscribe(pub-sub) model in that events can be read(subscribe)
as soon as they are written(publish), processed or even stored for data
retention over a period as Kafka gives the flexibility on how long to
retain(store) the data.

Why Is Kafka so fast?

Kafka is fast for a number of reasons we will be highlighting some of this reasons below

  1. Zero-copy - It relies heavily on the zero-copy principle i.e it interacts directly with the OS kernel to move data.
  2. Batching - It allows batching of data in chunks which enables efficient data compression thereby, reducing I/O latency.
  3. Horizontal Scaling - Kafka allows for horizontal scaling as it allows for multiple partitions (even in thousands) on a topic which could be across thousands of machines, either on premise or cloud makes it very capable of high loads.
  4. Avoidance of RAM - Kafka writes to an immutable commit log to the disk sequential thereby, avoiding slow disk seeking.

What Problem does Kafka Solve?

With the rise of innovation in various aspects of life from the internet of
things (IOT), self-driving cars, artificial intelligence, blockchain
solutions,robotics and many more to mention a few, the rate of data
generation is growing exponentially and it’s not slowing down anytime
soon. Hence, for businesses to innovate and understand their customers
more and provide better services, the traditional way of software
development needs to be enhanced in order to incorporate inflow of this
huge and growing datasets from various data sources including the
aforementioned and others. With Kafka all various components of the
system can communicate in an event driven approach where an event from
one part of the system is translated to action in another part of the
system the beauty of this is that it is going to be happening in real-time.

What Companies Use Kafka?

Thousands of companies are using Kafka in production including Fortune 500
companies, some of the companies including Microsoft, Netflix, Goldman
Sachs, Target, Cisco, Intuit,Box, Pinterest, New York times and many more.

Getting Started With Kafka.

Kafka involves communication between servers and clients.

Servers: Kafka runs as a cluster of one or more servers which could be located
in one or multiple data centers on-premise or in cloud.

Clients: Kafka clients allow us to write distributed system systems/applications that reads,
writes and processes streams of events in a fault-tolerant approach in
case of network or machine failure. The clients are available as REST APIs and in various programming languages including Java, Scala, Go, Python, C/C++ and many others. In this tutorial we will focus on using the python client.

There are several client we can use to communicate with Kafka

  1. Command line

  2. confluent-kafka

  3. kafka-python (what we would be using)

Installation:

STEP 1:

Download Kafka from here

Run tar -xzf kafka_2.13-2.7.0.tgz

Run cd kafka_2.13-2.7.0

STEP 2:

NOTE: Your local environment must have Java 8+ installed.

Open a terminal and run this command:

Run bin/zookeeper-server-start.sh config/zookeeper.properties

Open another terminal and run this command

Run bin/kafka-server-start.sh config/server.properties

STEP 3:

Creating a topic to store events

Run this command on another terminal

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092`
Enter fullscreen mode Exit fullscreen mode

Run this command to see the topic

bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

Which should return something like this

Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:

Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Enter fullscreen mode Exit fullscreen mode

STEP 4:

Run this on your terminal to write an event to a topic

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

STEP 5:

Run this on your terminal to read event from the topic

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

Zookeeper is a consistent file system for configuration information which Kafka
uses in managing and coordinating clusters/brokers which includes leadership election for broker topics partition.

Kafka broker: Kafka clusters are made up of multiple brokers, each broker having a unique id. Each broker containing topic logs partitions connecting one broker bootstrap client to the entire Kafka client.

With the steps highlighted above, we now have a running instance of Kafka on our machine. Before we continue, let’s get familiar with concepts of how Kafka works and the components it entails.

Kafka Concepts

Screenshot_2021-02-26_at_20.06.15.png

Events: It signifies something as happened i.e data is generated in a particular part of the system that we are interested thus a record/message is written to a designated topic. Hence, an event is recorded in a key, value and timestamp format for every event written.

Topics: Kafka topic partitioned across different buckets over various number of data centers
in across regions to ensure fault tolerance. It also ensures events are stored in the order they are written by appending new arriving events to the existing ones and are replicated across various partitions across different partitions. Note Each topic is identified by a topic name.

Producers: Are client applications written in any of the available Kafka clients to solely write(publish) events i.e messages/records to their designated topic which is identified by a topic name.
They are written to be agnostic of the consumer i.e the producer is not
aware of the consumer application it does one job and does it well
writing of events to the topic.

Consumers: Are
client applications for consuming events i.e messages/records in the
order they arrived at a topic from specific topic.

USING KAFKA-PYTHON

For this tutorial, it’s assumed that you are familiar with python programming language and python virtual environments. We will be using pipenv as our virtual environment for this tutorial.And we would be using an open source kafka python client called kafka-python github.

We would setup our virtual environment with pipenv by running this command pipenv shell and we install kafka-python with pip install kafka-python.

Before we proceed, we need to briefly looked at some key terms when working with kafka-python client.

KafkaProducer

KafkaProducer is the client responsible for publishing record to a Kafka cluster. It does this by calling the send method which is asynchronous and when called adds the record to a buffer of pending records, it returns immediately. Also, the producer automatically retry if the request fails unless it's configured otherwise which is one of the config that can be set.

Let's create a KafkaProducer

from kafka import KafkaProducer

from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['broker1:1234'], retries=5)

future = producer.send('order-topic', b'item_name=Nike Air|item_id=1543|price=23000')

try:

record_metadata = future.get(timeout=10)

except KafkaError:

# handle exception appropriately

log.exception()

pass
Enter fullscreen mode Exit fullscreen mode

Let's do quick walk through of what is going on in the above code snippet.

KafkaProducer is the class used by kafka-python the python client to instantiate a connection to Kafka cluster.

bootstrap_servers is a list of host[:port] that the producer should contact to bootstrap initial cluster metadata.

We now send record from the producer by calling send method which takes argument of the topic-name which is a str in this case order-topic, the message, key, value, timestamp, and some other optional arguments.

Now to the synchronous flow, their could be errors perhaps the topic name was not found kafka-python client throw the KafkaError exception which we can handle and deal appropriately.

We could also send encoded records by using msgpack which will produce json messages. Here is what that would look like

producer = KafkaProducer(value_serializer=msgpack.dumps)

producer.send('order-topic', {'item_name': 'Nike Air','item_id':1543,price: 23000 })

# produce json messages

producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))

#topic in json

producer.send('order-topic',  {'item_name': 'Nike AirForce','item_id':1583,price: 28500 })
Enter fullscreen mode Exit fullscreen mode

PS: There are more config that can be set on the KafakProducer see the documentation to view more configs that can be set.

KafkaConsumer

Consumer subscribe(reads) records from Kafka cluster. The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers.

Let's create Kafka Consumer

from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets

consumer = KafkaConsumer('order-topic', group_id='sample-group',  bootstrap_servers=['localhost:9092'])

for message in consumer:

# message value and key are raw bytes -- decode if necessary!

# e.g., for unicode: `message.value.decode('utf-8')`

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
Enter fullscreen mode Exit fullscreen mode

Let's walk through what's going on the Consumer code snippet

KafkaConsumer

bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the consumer should contact to bootstrap initial cluster metadata.

group_id - Is the name of the consumer group that can be join dynamically if partition assignment is enabled, which is used for fetching and committing offsets.

value_deserializer(callback) is any callable that takes a raw message value and returns a de-serialized value.

Various approaches of consuming record from a topic

# consume earliest available messages, don't commit offsets

KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False) # consume json messages

KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii'))) # consume msgpack KafkaConsumer(value_deserializer=msgpack.unpackb) # StopIteration if no message after 1sec KafkaConsumer(consumer_timeout_ms=1000)
Enter fullscreen mode Exit fullscreen mode

Conclusion

Phew!!, if you come this far i say thank you. We've only scratched the surface of what we can do with Kafka, there many more things that can be achieved by extending the arguments in both the KafkaProducer and KafkaConsumer from authentication using SSL, setting SSL certificate, adding new topic dynamically. We can explore more config from the kafka-python documentation.

Top comments (0)