One of the best ways to learn a new technology is to try it within an assisted environment that anybody can replicate and get working within few minutes. Notebooks represent an excellence in this field by allowing people to share and use pre-built content which includes in a single page both written descriptions, media and executable code.
This blog post aims to teach you the basics of Apache Kafka Producers and Consumers through building an interactive notebook in Python. If you want to browse a full ready-made solution instead, check out our dedicated github repository.
Language support and multi-window GUI: the case for JupyterLab
One of the main actors in the notebook space is the Jupyter project. With JupyterLab it provides a solid web interface where to create and distribute notebooks written in a variety of languages (named kernels in Jupyter terms).
JupyterLab can be started as Docker container. Using Docker allows us to focus on the learning without having to deal with software installation and configuration: big win for productivity. We start by creating a folder on our computer named kafka-jupyter
and navigating into it. If you prefer terminal over a GUI, you can achieve the same by issuing the following two commands:
mkdir -p kafka-jupyter
cd kafka-jupyter
Now we can start a the Docker container with the following command:
docker run \
--rm -p 8888:8888 \
-e JUPYTER_ENABLE_LAB=yes \
-v "$PWD":/home/jovyan/work \
jupyter/datascience-notebook
The above command will create a Jupyter Docker container, with JupyterLab enabled, mapping the existing kafka-jupyter
folder in it, this step is required if we want to share files from the host computer to the guest. When the above command is executed, we see a message like this:
To access the server, open this file in a browser:
file:///home/jovyan/.local/share/jupyter/runtime/jpserver-9-open.html
Or copy and paste one of these URLs:
http://639a69244ab1:8888/lab?token=5031e1652236a8050ea2a9213df7c6ade24a790d3710b239
http://127.0.0.1:8888/lab?token=5031e1652236a8050ea2a9213df7c6ade24a790d3710b239
Now JupyterLab is accessible at http://127.0.0.1:8888/?token=<token>
, where <token>
is the one shown in the above message.
One of JupyterLab's beauties is that it provides an easy GUI-driven method to configure and arrange the user interface, making it the perfect choice to learn not only sequential step-by-step tutorials but also more complex and branched examples where multiple code sections have to run in parallel... can you spot where I am going?
Why Kafka on a Notebook?
Apache Kafka is a streaming technology. This means that to understand its beauty you need to have data flowing from Point A (aka the Producer) to Point B (aka the Consumer). Kafka step-by-step tutorials can become complex to follow, since they usually require a continuous switch of focus between various applications or windows. JupyterLab, with its great language support and multi-window layout, is the perfect way to dig into the basic of Kafka with the text descriptions, media and executable code available in a single Web UI. This way you can focus on the technology concepts rather than on your local setup.
Now, to have a basic playground, let's create a Kafka instance with the Aiven Console. If you haven't done it already, sign up for an Aiven account and redeem the free credit to start your trial.
To create a Kafka Service, select a cloud provider, the region where you want to deploy the service and the plan which drives the amount of resources available for your cluster. Finally set the service name; in this example we'll refer to an instance named kafka-notebook
but you can choose any name you wish.
(Tip! A video is available allowing you to review the whole service creation process.)
While we wait for the service to be ready, let's click on it to check its details. On the Overview tab we can find the Host and Port information we'll later use to connect to the cluster. While we're here, we can download into our local kafka-jupyter
folder the three SSL certificates required to authenticate to Kafka (Access Key, Access Certificate and CA Certificate)
Last change required: we need to scroll down the Overview tab till the Advanced configuration section and enable the kafka.auto_create_topics_enable
parameter which will allow us to produce messages to Kafka without needing to create a topic beforehand.
Now, it's time for an espresso while we wait a couple of minutes until all the Nodes
lights become green 🟢🟢🟢 meaning that our kafka-notebook
Kafka instance is running and ready to be used.
Producing the first message
If we now check the JupyterLab Web UI at http://127.0.0.1:8888/
, we should see something like this:
On the top left we can spot the work
folder. By double clicking on it we can see that it contains the three certificates (ca.pem
, service.cert
, service.key
) downloaded before in our host kafka-jupyter
folder. It's time now to create a Kafka producer by selecting the Python 3 icon under the Notebook section of the main page. A notebook will be opened with a first empty cell that we can use to install the Python library needed to connect to Kafka. Copy the following in the cell and run it:
%%bash
pip install kafka-python
Even if we are creating a Python notebook, the prefix %%bash
allows us to execute bash commands. This section installs kafka-python the main Python client for Apache Kafka.
Now we're all set to produce our first record to Kafka.
A new empty code block should already be there, if not let's click on the +
icon on top of our notebook. In this new code section we'll speak Pythonese and create an instance of KafkaProducer
. Copy and paste the following code into the block, replacing the <host>
and <port>
parameters with the ones taken from Aiven's console:
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers='<host>:<port>',
security_protocol="SSL",
ssl_cafile="./ca.pem",
ssl_certfile="./service.cert",
ssl_keyfile="./service.key",
value_serializer=lambda v: json.dumps(v).encode('ascii')
)
The code creates a producer, pointing to Kafka via the bootstrap_servers
parameter and using the SSL
authentication and the three SSL certificates. The value_serializer
transforms our json message value into a bytes array, the format requested and understood by Kafka.
Now let's produce our first message. Since it's time to think about summer holidays, we'll create a hotel booking message by pasting the following in a new code block and execute it.
producer.send(
'hotel-booking-request',
value=
{
"name": "Giuseppe Rossi",
"hotel": "Luxury Hotel",
"dateFrom": "25-06-2021",
"dateTo": "07-07-2021",
"details": "I want the best room 😀😀😀😀😀!!!!"
}
)
producer.flush()
The above code adds Giuseppe's booking for Luxury hotel to a buffer of pending records, which will be sent to a topic named hotel-booking-request
. With the flush()
method we make sure the record is actually sent to Kafka. Let's save our producer notebook as Producer.ipynb
.
Happy times! Our 1st message has gone to Kafka. How can be sure? Well... let's create a Consumer.
Consuming the Message(s)
We now create a new Python notebook to host our Consumer code. In general, it's a good idea to create separate notebooks for producer and consumer, since they solve two different problems and are usually placed in different sections of the containing application. It also enables us to keep the related code separate and to focus only on one block at the time.
Nevertheless JupyterLab allows us to visualise the Consumer alongside the Producer, to do so we can drag and drop the newly created notebook alongside the Producer one as shown in the image below
It's time now to create a KafkaConsumer
, by pasting the following code into the first code block of our new notebook and, after amending the <host>:<port>
section as done in the producer, executing it.
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
bootstrap_servers='<host>:<port>',
security_protocol="SSL",
ssl_cafile="./ca.pem",
ssl_certfile="./service.cert",
ssl_keyfile="./service.key",
value_deserializer = lambda v: json.loads(v.decode('ascii')),
auto_offset_reset='earliest'
)
The consumer is ready, pointing to our Kafka cluster and using a deserialization function that will take the bytes from the message value and transform them into a json structure performing the opposite transformation to the one done during the production phase.
By default a consumer starts reading from a Kafka topic from the point in time it attaches to the cluster, previous messages are not read. We are changing this behaviour with the auto_offset_reset='earliest'
parameter, allowing us to read from the beginning of the topic.
We are now ready to subscribe to the hotel-booking-request
topic and start reading from it with the following code
consumer.subscribe(topics='hotel-booking-request')
for message in consumer:
print ("%d:%d: v=%s" % (message.partition,
message.offset,
message.value))
The consumer thread never ends: this is justified by the fact that we always want to consume messages as soon as they're available in the Kafka topic, and there is no "end time" in the streaming world. We should also see the first message appearing on our consumer console
0:0: v={'name': 'Giuseppe Rossi',
'hotel': 'Luxury Hotel',
'dateFrom': '25-06-2021',
'dateTo': '07-07-2021',
'details': 'I want the best room 😀😀😀😀😀!!!!'}
Now if we go back to the Producer notebook and produce an holiday booking for Carlo Bianchi
by pasting the following code in a new code block
producer.send(
'hotel-booking-request',
key=b'Average Hotel',
value=
{
"name": "Carlo Bianchi",
"hotel": "Average Hotel",
"dateFrom": "12-07-2021",
"dateTo": "23-07-2021",
"details": "Room next to the highway 🚗🚗🚗🚗"
}
)
producer.flush()
After executing it, we should immediately receive the same message on the consumer side.
0:1: v={'name': 'Carlo Bianchi',
'hotel': 'Average Hotel',
'dateFrom': '12-07-2021',
'dateTo': '23-07-2021',
'details': 'Room next to the highway 🚗🚗🚗🚗'}
If you're wondering what the 0:1
prefix is, check out the consumer code. They are the topic partition and offset meaning that we are reading the second message (offset starts with 0) from partition 0 of the topic. Our Producer/Consumer pipeline is working, step 1 done, congrats!
Want more?
Notebooks represent an awesome method to learn new concepts and technologies providing a way to incapsulate in an unique artefact text explanations, medias and executable code. JupyterLab, with its huge language support and multi-window layout constitutes the perfect playground even for technologies, like Apache Kafka, composed by multiple pieces working simultaneously.
If this first notebook tickled your Kafka appetite then check out our pizza-based Kafka Python notebook providing examples of further Kafka concepts like Partitioning, Consumer Groups and Kafka Connect. Please try the whole set of notebooks and let us know what else you would like to be included in it.
This blog post provides the first and very basic Apache Kafka Producer/Consumer setup, if you want to understand and test more, here are few additional resources:
- Aiven's console to create Apache Kafka and other open source data platform instances
- Project Jupyter where you can find information about JupyterLab
- kafka-python the Python library used to interact with Kafka
Top comments (0)