I’ve been using Kafka for the past few years and while I managed to get it to work there was something I always wondered about:
How the hell Kafka works?
I published this article years ago describing my experience with Kafka when I first used it. It's a high-level overview, more focused on architecture decisions and the experience of migrating to it. Now I want to talk about how Kafka works, and how it stores and retrieves data.
The idea is that after you read this article, you will be able to understand the picture below a bit better.
Preparing the environment
Let's start by cloning the confluentinc/cp-all-in-one GitHub repository.
git clone git@github.com:confluentinc/cp-all-in-one.git
Once cloned, navigate to the cp-all-in-one/cp-all-in-one
directory and then run the docker; it should take a while.
docker-compose up -d
When this is done, let's check if everything is running ok by running the following command on cp-all-in-one/cp-all-in-one
as well:
docker-compose ps
The resut should be similar to this:
Name Command State Ports
---------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp
control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp
ksql-datagen bash -c echo Waiting for K ... Up
ksqldb-cli /bin/sh Up
ksqldb-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp
rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Now let's start working.
First we'll need to create a topic, while we could create a topic using the UI at http://localhost:9021, but I feel it is best to use the command lines so we can learn a bit more how it works.
Run the following command to create a topic named transactions with 3 partitions.
docker exec broker kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic transactions
You should see the following result: Created topic transactions
Now, let's check our topic!
docker exec -t broker kafka-topics --bootstrap-server localhost:9092 --list | grep transactions
You should see the transactions
topic as the response from the command.
Now, let's delve into how a topic works!
Topic
Let's understand what is a topic and how it's created. To begin with, let's check inside the broker container the transactions topic. You can either jump to the cointainer shell cp-all-in-one/cp-all-in-one
, go to /var/lib/kafka/data/
and run ls transactions-*
, or you can run:
docker exec -ti broker sh -c "(cd /var/lib/kafka/data; ls transactions-*)"
Regardless, the result should be similar to the following:
transactions-0:
00000000000000000000.index 00000000000000000000.timeindex partition.metadata
00000000000000000000.log leader-epoch-checkpoint
transactions-1:
00000000000000000000.index 00000000000000000000.timeindex partition.metadata
00000000000000000000.log leader-epoch-checkpoint
transactions-2:
00000000000000000000.index 00000000000000000000.timeindex partition.metadata
00000000000000000000.log leader-epoch-checkpoint
As we can see we have three directories named transactions-* these are the tree partitions of the transactions topic that we defined when creating it. If we check the size of the files by executing ls -lh transactions-0
we should get the following:
total 4.0K
-rw-r--r-- 1 appuser appuser 10M Oct 27 20:22 00000000000000000000.index
-rw-r--r-- 1 appuser appuser 0 Oct 27 20:22 00000000000000000000.log
-rw-r--r-- 1 appuser appuser 10M Oct 27 20:22 00000000000000000000.timeindex
-rw-r--r-- 1 appuser appuser 0 Oct 27 20:22 leader-epoch-checkpoint
-rw-r--r-- 1 appuser appuser 43 Oct 27 20:22 partition.metadata
Kafka's information are stored in log files, which are divided as:
00000000000000000000.index: This file is where we can find offsets and the position of that event on the *.log file. While yes, we could search a specific event on the *.log file, due its nature of storing events, it will get larger and larger to the point where finding an event there could take a long time; whereas the .index file contains exclusive the offset and the position of the message, meaning it is faster to find the offset we're looking for.
00000000000000000000.log: This file is where every event is located, in our case this is where we'll find our transactions
00000000000000000000.timeindex: This file is similar to *.index the difference being it's used to find events by the timestamp.
This sequence of numbers 00000000000000000000 in index, log and timeindex files, is the segment. The segment is - to put it simply - a file that we're using to store events. Kafka divides the events into multiple files, each of which is referred to as a segment. Each segment has a default maximum value of 1 GB, meaning that if we reach 1 GB of logs new files will be created for that partition.
Now, let's observe the creation of new log files. We won't generate 1GB of data worth of messages because I don't want this small project to consume too much space on your machine. Instead we shall reduce the segment size so any message will produce another log file.
The following command will reduce the segment file from 1GB to 100 bytes.
docker exec -it broker kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name transactions --alter --add-config segment.bytes=100
Now lets run the following commands, it will post a few messages to kafka and we will be able to see new log files being created.
docker exec -it broker kafka-console-producer --broker-list localhost:9092 --topic transactions
You will be presented with a prompt, write the following messages on it.
New message
New message 1
New message 2
New message 3
New message 4
New message 5
After that if we run the command were we check the logs, we should be presented with this:
transactions-0:
00000000000000000000.index 00000000000000000003.index
00000000000000000000.log 00000000000000000003.log
00000000000000000000.timeindex 00000000000000000003.snapshot
00000000000000000001.index 00000000000000000003.timeindex
00000000000000000001.log 00000000000000000004.index
00000000000000000001.snapshot 00000000000000000004.log
00000000000000000001.timeindex 00000000000000000004.snapshot
00000000000000000002.index 00000000000000000004.timeindex
00000000000000000002.log leader-epoch-checkpoint
00000000000000000002.snapshot partition.metadata
00000000000000000002.timeindex
transactions-1:
00000000000000000000.index 00000000000000000000.timeindex partition.metadata
00000000000000000000.log leader-epoch-checkpoint
transactions-2:
00000000000000000000.index 00000000000000000000.timeindex partition.metadata
00000000000000000000.log leader-epoch-checkpoint
Notice that when you sent the message, the broker might select another partition to write your message, meaning that you could have either of the 3 partitions updated.
Now you can toy reading the logs, you will see that each one of the logs will have a different message!
docker exec -ti broker sh -c "(cd /var/lib/kafka/data/transactions-0; cat 00000000000000000000.log)"
Segment
Segments are the log files, they are the physical file that contains the sequence of logs on each partition. Each of the 00000000000000000000.log
files are a segment, meaning that when the segment reached the 100 bytes limit, kafka created another segment file named 00000000000000000001.log
. When dealing with multiple Kafka brokers we can have the segments replicated across them to ensure fault tolerance and high availability, but you wont see the same segment file wrote twice in the same broker.
We are talking alot about broker and also, did you noticed we had to use our command to interact with the topic against the broker? So what is it?
Broker
It is the broker responsability to storage and manage the kafka topics. It receives and ensures it will be storage in the required topic, it also is responsible for producing the messages that the consumers will read.
When combined, the brokers form a cluster, and they work together in various ways. The brokers elect a leader for each partition of a topic which is responsible for handling read and write requests. This means that each broker can be the leader of different topic partitions, so you could have two brokers being leaders of the same topic, each for a different partition. As the leader of a partition it is the topic responsability to produce both write and read events, since each broker is responsible for different partitions you will have fast and reliable information provided from different brokers. The replication process is handled by the followers, then the broker leader write an event on their partition, all the followers will copy the event on their replication.
Thanks to the fact the partitions are replicated across the brokers, when the leader of a partition stops working for any reason, the brokers elect another broker to be the leader of a partition and it starts to handle all write and read events. The other brokers will keep their role as followers, but will start to replicate information from the new leader.
We can observe this process in action by listening to the broker logs docker logs -f broker
and then executing the following.
docker exec -it broker kafka-console-consumer --bootstrap-server localhost:9092 --topic transactions --from-beginning
You should see something in the likes of:
[2023-10-28 17:48:16,014] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group console-consumer-70904 in Empty state. Created a new member id console-consumer-9928193a-cb91-4b0c-b542-1844b6b89085 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-10-28 17:48:16,015] INFO [GroupCoordinator 1]: Preparing to rebalance group console-consumer-70904 in state PreparingRebalance with old generation 0 (__consumer_offsets-41) (reason: Adding new member console-consumer-9928193a-cb91-4b0c-b542-1844b6b89085 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-10-28 17:48:16,016] INFO [GroupCoordinator 1]: Stabilized group console-consumer-70904 generation 1 (__consumer_offsets-41) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-10-28 17:48:16,026] INFO [GroupCoordinator 1]: Assignment received from leader console-consumer-9928193a-cb91-4b0c-b542-1844b6b89085 for group console-consumer-70904 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
Notice how the leader is elected. Since we are dealing with a single broker, it becomes the sole leader.
If we create a new producer and start to write messages - as we did before - you will see the message logging on the consumer, but more importantly if you write messages enough you should see the following.
[2023-10-28 17:49:30,763] INFO [ProducerStateManager partition=transactions-1]Wrote producer snapshot at offset 1 with 1 producer ids in 1 ms. (org.apache.kafka.storage.internals.log.ProducerStateManager)
[2023-10-28 17:49:30,764] INFO [MergedLog partition=transactions-1, dir=/var/lib/kafka/data] Rolled new log segment at offset 1 in 4 ms. (kafka.log.MergedLog)
This is creating a new segment, a new segment, in our case since the maxium size of each segment was 100 bytes it quickly generate another file after a single message.
ZooKeeper
ZooKeeper is the cluster manager, responsible for managing and coordinating the cluster. This includes the Broker election for each topic partition, the health monitoring of the cluster and managing information from the whole ecosystem, such as what are the topics, number of partitions, replicas and its configurations.
Though ZooKeeper does its job, its being a while that people were working on subistitute it. Recently we have the option of using KRaft which is an approach that don't use ZooKeeper, it uses an consensus protocol. I'm not diving into it on this article, but I might do it on the next.
Schema Registry
Although not a part of Kafka, the schema registry is often used in conjunction with it and I want to give a quick overview about it.
In a few words, schema registry will ensure the compatibility of the producers and consumers of a topic. When you add a schema to a topic, the schema registry will validate that the message you are producing matches the schema and the consumers will read the messages based on the schema as well. This ensures that the producer will always create a message valdiated by the schema and the consumer will always be able to read the message.
The advantage of that is that the clients will always be able to process messages from the consumers, ensuring the contract compatibility.
You might eventually need to update the schema. I'm not saying you always need to ensure retrocompatibility, but you should do it more often than not. I am going to go as far as to say that if you want to break compability, you need a very good reason.
Now, let's experiment with the schema registry, beginning with the creation of another topic.
Lets then create another topic and create a schema for it, produce a message and then consume it. Starting with the topic, lets now create a topic named currencies.
docker exec broker kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic currencies
Open a consumer
docker exec -it broker kafka-console-consumer --bootstrap-server localhost:9092 --topic currencies --from-beginning
Then we will create a schema registry.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema" : "{\"type\": \"record\", \"name\": \"currency\", \"fields\":[{ \"name\":\"code\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/currencies/versions
The response should be the id of your schema, if it's the first schema it should be {"id":1}
.
After the command ran, you should be able to see the schema under the http://localhost:8081/subjects/currencies/versions/1
url.
Now let's try to send some messages to this topic.
docker exec -it schema-registry kafka-avro-console-producer \
--broker-list broker:29092 \
--topic currencies \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.id=1
After you run this command you will be able to write the message, notice that you won't really find a proper prompt nor you will be able to break lines. If you press Enter you will send an empty message and you will find an error showing that it don't match the schema. Instead when the console stops you will can write the following {"code":"GBP","name":"Pound sterling"}
and then press Enter.
You should see the message GBPPound sterling
on the consumer. If you try to send a message that does not comply with the schema, you should get an exception stating that
We briefly discussed compatibility earlier. Now, let's delve into this. Dealing with schema registry may be a bit trick, because you need to put some thought ahead of time to ensure that the schemas are retrocompatible. But why?
Migrating services is always complicated and can open several windows to errors, the idea of making our services retrocompatible helps to mitgate possible problems as ensuring that every service will be updated simultaneously is not only really hard to accomplish it also render our deployment slow, huge and hard to test. One of the solutions is to make sure our contracts are retrocompatible, once this is done we can deploy the producers first without breaking the consumers and then deploy the consumers as we need.
Now that we went to the details as per why we want to do this, lets go to the next step and update the version of our schema.
Currently our schema is as the following:
{
"type": "record",
"name": "currency",
"fields": [
{
"name": "code",
"type": "string"
},
{
"name": "name",
"type": "string"
}
]
}
Which translated to this JSON object:
{
"code": "GBP",
"name": "Pound sterling"
}
Ok, now let's say we want to add the symbol, we also want to change the attribute name
to description
and also want to know how the currency is backed by adding its type. Our end goal would be to produce this:
{
"code": "GBP",
"description": "Pound sterling",
"symbol": "£",
"type": "FIAT"
}
Let's start with symbol
. This is an easy task, we will just add it to the next version and if the client still don't use the latest version it will simple not pick the symbol
, we won't break anything and we are happy with it.
Starting with the change from name
to description
we get something interessing. We can't update the schema and remove the name
, because this would break the compatibility. In this scenario, the producer would create events with the description
and no name
while the client would be expecting the name
. It won't break because of the new field, but the lacking of the expected field. In order to do this change and ensure the compatibility, we will need to produce both fields: name
and description
.
Let's jump to the type
and this one can be quite easy, it depends on how we will handle it. If we decide that the type
will be a plain string, we can treat it similar to the symbol
. However if we say it is an enum, things change a bit.
If only have the FIAT
type at the moment, one can say the expected avro schema would be
{
"type": {
"type": "enum",
"name": "Type",
"symbols": ["FIAT"]
}
}
While this would work, it poses a problem: When we add a new type
it will break the compatibiliy and the client will no longer be able to process the events, to remediate that we can introduce an UNKNOWN type that the client will default to, and then we can have the client handle such event as we want.
By having this UKNOWN placeholder, when a client consumes an event that was produced with a higher avro schema version and, this event has an enum type that the client's avro version do not has, it will rollback to the default
enum, in that case the UNKNOWN. With this information, the client won't break and you can process it accordingly.
It should then be like this:
{
"type": {
"type": "enum",
"name": "Type",
"symbols": [
"UNKNOWN",
"FIAT"
],
"default": "UNKNOWN"
}
}
Now that we know how our new schema should be, lets create it:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema" : "{\"type\": \"record\", \"name\": \"currency\", \"fields\":[{ \"name\":\"code\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"symbol\",\"type\":[\"null\", \"string\"],\"default\":null},{\"name\":\"description\",\"type\":[\"null\", \"string\"],\"default\":null},{\"name\":\"currencyType\",\"type\":{\"type\":\"enum\",\"name\":\"CurrencyType\",\"symbols\":[\"UNKNOWN\",\"FIAT\"],\"default\":\"UNKNOWN\"},\"default\":\"UNKNOWN\"}]}"}' \
http://localhost:8081/subjects/currencies/versions
Alright, so we have created the new version with default values for all new fields. Let's now post a new message and see what happens.
docker exec -it schema-registry kafka-avro-console-producer \ --broker-list broker:29092 \
--topic currencies \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.id=2
By using the new version we can provide the new fields, our new event should look like this
{
"code": "GBP",
"name": "Pound sterling",
"description": {
"string": "Pound sterling"
},
"symbol": {
"string": "£"
},
"currencyType": "FIAT"
}
We duplicate Pound sterling
to ensure that clients using the older version won't experience issues. Also you probably noticed that description
and symbol
are slightly different than name
and code
. Why? That's because to define it as nullable we need to provide this array of types [ "null", "string" ]
and, when using Avro with a union of types we need to provide a wrapper stating what is the type.
After executing the command, the new message is sent, we should now see the new log on the consumer GBPPound sterling£Pound sterling
. And that is it!
I hope this was a cool short dive into Kafka and that I made you more interested into reading more about it.
Stay awesome :)
Top comments (2)
good stuff, well done
Cheers mate!