<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Cauê Ferreira</title>
    <description>The latest articles on DEV Community by Cauê Ferreira (@caueferreira).</description>
    <link>https://dev.to/caueferreira</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F240737%2F8aa9bdcd-9ee2-4530-b7b2-4cc4cf23c1d5.jpg</url>
      <title>DEV Community: Cauê Ferreira</title>
      <link>https://dev.to/caueferreira</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/caueferreira"/>
    <language>en</language>
    <item>
      <title>Kafka Internally - A brief story of how Kafka works</title>
      <dc:creator>Cauê Ferreira</dc:creator>
      <pubDate>Mon, 13 Nov 2023 14:06:32 +0000</pubDate>
      <link>https://dev.to/caueferreira/kafka-internally-a-brief-story-of-how-kafka-works-17lb</link>
      <guid>https://dev.to/caueferreira/kafka-internally-a-brief-story-of-how-kafka-works-17lb</guid>
      <description>&lt;p&gt;I’ve been using Kafka for the past few years and while I managed to get it to work there was something I always wondered about:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;How the hell Kafka works?&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;I published &lt;a href="https://medium.com/swlh/a-real-showcase-of-kafka-at-wirecard-brazil-9b9c2055fcce" rel="noopener noreferrer"&gt;this article&lt;/a&gt; years ago describing my experience with Kafka when I first used it. It's a high-level overview, more focused on architecture decisions and the experience of migrating to it. Now I want to talk about how Kafka works, and how it stores and retrieves data.&lt;/p&gt;

&lt;p&gt;The idea is that after you read this article, you will be able to understand the picture below a bit better.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsh3j8l8a7riv1z78invc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsh3j8l8a7riv1z78invc.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Preparing the environment
&lt;/h2&gt;

&lt;p&gt;Let's start by cloning the confluentinc/cp-all-in-one GitHub repository.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;git clone git@github.com:confluentinc/cp-all-in-one.git
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once cloned, navigate to the &lt;code&gt;cp-all-in-one/cp-all-in-one&lt;/code&gt; directory and then run the docker; it should take a while.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up -d
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When this is done, let's check if everything is running ok by running the following command on &lt;code&gt;cp-all-in-one/cp-all-in-one&lt;/code&gt; as  well:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose ps
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The resut should be similar to this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;     Name                    Command               State                       Ports
---------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run        Up      0.0.0.0:9092-&amp;gt;9092/tcp, 0.0.0.0:9101-&amp;gt;9101/tcp
connect           /etc/confluent/docker/run        Up      0.0.0.0:8083-&amp;gt;8083/tcp, 9092/tcp
control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021-&amp;gt;9021/tcp
ksql-datagen      bash -c echo Waiting for K ...   Up
ksqldb-cli        /bin/sh                          Up
ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088-&amp;gt;8088/tcp
rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082-&amp;gt;8082/tcp
schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081-&amp;gt;8081/tcp
zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181-&amp;gt;2181/tcp, 2888/tcp, 3888/tcp
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now let's start working.&lt;/p&gt;

&lt;p&gt;First we'll need to create a topic, while we could create a topic using the UI at &lt;a href="http://localhost:9021" rel="noopener noreferrer"&gt;http://localhost:9021&lt;/a&gt;, but I feel it is best to use the command lines so we can learn a bit more how it works.&lt;/p&gt;

&lt;p&gt;Run the following command to create a topic named &lt;strong&gt;transactions&lt;/strong&gt; with 3 &lt;strong&gt;partitions&lt;/strong&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec broker kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic transactions
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should see the following result: &lt;code&gt;Created topic transactions&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Now, let's check our topic!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -t broker kafka-topics --bootstrap-server localhost:9092 --list | grep transactions
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should see the &lt;code&gt;transactions&lt;/code&gt; topic as the response from the command.&lt;/p&gt;

&lt;p&gt;Now, let's delve into how a topic works!&lt;/p&gt;

&lt;h2&gt;
  
  
   Topic
&lt;/h2&gt;

&lt;p&gt;Let's understand what is a topic and how it's created. To begin with, let's check inside the broker container the transactions topic. You can either jump to the cointainer shell &lt;code&gt;cp-all-in-one/cp-all-in-one&lt;/code&gt;, go to &lt;code&gt;/var/lib/kafka/data/&lt;/code&gt; and run &lt;code&gt;ls transactions-*&lt;/code&gt;, or you can run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -ti broker sh -c "(cd /var/lib/kafka/data; ls transactions-*)"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Regardless, the result should be similar to the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;transactions-0:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint

transactions-1:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint

transactions-2:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As we can see we have three directories named &lt;em&gt;transactions-*&lt;/em&gt; these are the tree partitions of the transactions &lt;em&gt;topic&lt;/em&gt; that we defined when creating it. If we check the size of the files by executing &lt;code&gt;ls -lh transactions-0&lt;/code&gt; we should get the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;total 4.0K
-rw-r--r-- 1 appuser appuser 10M Oct 27 20:22 00000000000000000000.index
-rw-r--r-- 1 appuser appuser   0 Oct 27 20:22 00000000000000000000.log
-rw-r--r-- 1 appuser appuser 10M Oct 27 20:22 00000000000000000000.timeindex
-rw-r--r-- 1 appuser appuser   0 Oct 27 20:22 leader-epoch-checkpoint
-rw-r--r-- 1 appuser appuser  43 Oct 27 20:22 partition.metadata
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Kafka's information are stored in  log files, which are divided as:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;00000000000000000000.index&lt;/strong&gt;: This file is where we can find offsets and the position of that event on the &lt;em&gt;*.log&lt;/em&gt; file. While yes, we could search a specific event on the &lt;em&gt;*.log&lt;/em&gt; file, due its nature of storing events, it will get larger and larger to the point where finding an event there could take a long time; whereas the &lt;em&gt;.index&lt;/em&gt; file contains exclusive the offset and the position of the message, meaning it is faster to find the offset we're looking for.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;00000000000000000000.log&lt;/strong&gt;: This file is where every event is located, in our case this is where we'll find our transactions &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;00000000000000000000.timeindex&lt;/strong&gt;: This file is similar to &lt;em&gt;*.index&lt;/em&gt; the difference being it's used to find events by the timestamp.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This sequence of numbers &lt;strong&gt;00000000000000000000&lt;/strong&gt; in &lt;em&gt;index&lt;/em&gt;, log and &lt;em&gt;timeindex&lt;/em&gt; files, is the segment. The segment is - to put it simply - a file that we're using to store events. Kafka divides the events into multiple files, each of which is referred to as a segment. Each segment has a default maximum value of 1 GB, meaning that if we reach 1 GB of logs new files will be created for that partition.&lt;/p&gt;

&lt;p&gt;Now, let's observe the creation of new log files. We won't generate 1GB of data worth of messages because I don't want this small project to consume too much space on your machine. Instead we shall reduce the segment size so any message will produce another log file. &lt;/p&gt;

&lt;p&gt;The following command will reduce the segment file from 1GB to 100 bytes.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it broker kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name transactions --alter --add-config segment.bytes=100
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now lets run the following commands, it will post a few messages to kafka and we will be able to see new log files being created.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it broker kafka-console-producer --broker-list localhost:9092 --topic transactions
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You will be presented with a prompt, write the following messages on it.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;New message
New message 1
New message 2
New message 3
New message 4
New message 5
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After that if we run the command were we check the logs, we should be presented with this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;transactions-0:
00000000000000000000.index  00000000000000000003.index
00000000000000000000.log    00000000000000000003.log
00000000000000000000.timeindex  00000000000000000003.snapshot
00000000000000000001.index  00000000000000000003.timeindex
00000000000000000001.log    00000000000000000004.index
00000000000000000001.snapshot   00000000000000000004.log
00000000000000000001.timeindex  00000000000000000004.snapshot
00000000000000000002.index  00000000000000000004.timeindex
00000000000000000002.log    leader-epoch-checkpoint
00000000000000000002.snapshot   partition.metadata
00000000000000000002.timeindex

transactions-1:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint

transactions-2:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice that when you sent the message, the broker might select another partition to write your message, meaning that you could have either of the 3 partitions updated.&lt;/p&gt;

&lt;p&gt;Now you can toy reading the logs, you will see that each one of the logs will have a different message!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -ti broker sh -c "(cd /var/lib/kafka/data/transactions-0; cat 00000000000000000000.log)"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Segment
&lt;/h3&gt;

&lt;p&gt;Segments are the log files, they are the physical file that contains the sequence of logs on each partition. Each of the &lt;code&gt;00000000000000000000.log&lt;/code&gt; files are a segment, meaning that when the segment reached the 100 bytes limit, kafka created another segment file named &lt;code&gt;00000000000000000001.log&lt;/code&gt;. When dealing with multiple Kafka brokers we can have the segments replicated across them to ensure fault tolerance and high availability, but you wont see the same segment file wrote twice in the same broker. &lt;/p&gt;

&lt;p&gt;We are talking alot about broker and also, did you noticed we had to use our command to interact with the topic against the broker? So what is it?&lt;/p&gt;

&lt;h2&gt;
  
  
  Broker
&lt;/h2&gt;

&lt;p&gt;It is the broker responsability to storage and manage the kafka topics. It receives and ensures it will be storage in the required topic, it also is responsible for producing the messages that the consumers will read. &lt;/p&gt;

&lt;p&gt;When combined, the brokers form a cluster, and they work together in various ways. The brokers elect a leader for each partition of a topic which is responsible for handling read and write requests. This means that each broker can be the leader of different topic partitions, so you could have two brokers being leaders of the same topic, each for a different partition. As the leader of a partition it is the topic responsability to produce both write and read events, since each broker is responsible for different partitions you will have fast and reliable information provided from different brokers. The replication process is handled by the followers, then the broker leader write an event on their partition, all the followers will copy the event on their replication.&lt;/p&gt;

&lt;p&gt;Thanks to the fact the partitions are replicated across the brokers, when the leader of a partition stops working for any reason, the brokers elect another broker to be the leader of a partition and it starts to handle all write and read events. The other brokers will keep their role as followers, but will start to replicate information from the new leader.&lt;/p&gt;

&lt;p&gt;We can observe this process in action by listening to the broker logs &lt;code&gt;docker logs -f broker&lt;/code&gt; and then executing the following.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it broker kafka-console-consumer --bootstrap-server localhost:9092 --topic transactions --from-beginning
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should see something in the likes of:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[2023-10-28 17:48:16,014] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group console-consumer-70904 in Empty state. Created a new member id console-consumer-9928193a-cb91-4b0c-b542-1844b6b89085 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-10-28 17:48:16,015] INFO [GroupCoordinator 1]: Preparing to rebalance group console-consumer-70904 in state PreparingRebalance with old generation 0 (__consumer_offsets-41) (reason: Adding new member console-consumer-9928193a-cb91-4b0c-b542-1844b6b89085 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-10-28 17:48:16,016] INFO [GroupCoordinator 1]: Stabilized group console-consumer-70904 generation 1 (__consumer_offsets-41) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-10-28 17:48:16,026] INFO [GroupCoordinator 1]: Assignment received from leader console-consumer-9928193a-cb91-4b0c-b542-1844b6b89085 for group console-consumer-70904 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice how the leader is elected. Since we are dealing with a single broker, it becomes the sole leader.&lt;/p&gt;

&lt;p&gt;If we create a new producer and start to write messages - as we did before - you will see the message logging on the consumer, but more importantly if you write messages enough you should see the following.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[2023-10-28 17:49:30,763] INFO [ProducerStateManager partition=transactions-1]Wrote producer snapshot at offset 1 with 1 producer ids in 1 ms. (org.apache.kafka.storage.internals.log.ProducerStateManager)
[2023-10-28 17:49:30,764] INFO [MergedLog partition=transactions-1, dir=/var/lib/kafka/data] Rolled new log segment at offset 1 in 4 ms. (kafka.log.MergedLog)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is creating a new segment, a new segment, in our case since the maxium size of each segment was 100 bytes it quickly generate another file after a single message.&lt;/p&gt;

&lt;h2&gt;
  
  
  ZooKeeper
&lt;/h2&gt;

&lt;p&gt;ZooKeeper is the cluster manager, responsible for managing and coordinating the cluster. This includes the Broker election for each topic partition, the health monitoring of the cluster and managing information from the whole ecosystem, such as what are the topics, number of partitions, replicas and its configurations.&lt;/p&gt;

&lt;p&gt;Though ZooKeeper does its job, its being a while that people were working on subistitute it. Recently we have the option of using &lt;a href="https://developer.confluent.io/learn/kraft/" rel="noopener noreferrer"&gt;KRaft&lt;/a&gt; which is an approach that don't use ZooKeeper, it uses an consensus protocol. I'm not diving into it on this article, but I might do it on the next. &lt;/p&gt;

&lt;h2&gt;
  
  
  Schema Registry
&lt;/h2&gt;

&lt;p&gt;Although not a part of Kafka, the schema registry is often used in conjunction with it and I want to give a quick overview about it.&lt;/p&gt;

&lt;p&gt;In a few words, schema registry will ensure the compatibility of the producers and consumers of a topic. When you add a schema to a topic, the schema registry will validate that the message you are producing matches the schema and the consumers will read the messages based on the schema as well. This ensures that the producer will always create a message valdiated by the schema and the consumer will always be able to read the message.&lt;/p&gt;

&lt;p&gt;The advantage of that is that the clients will always be able to process messages from the consumers, ensuring the contract compatibility.&lt;/p&gt;

&lt;p&gt;You might eventually need to update the schema. I'm not saying you &lt;em&gt;always&lt;/em&gt; need to ensure retrocompatibility, but you should do it more often than not. I am going to go as far as to say that if you want to break compability, you need a &lt;em&gt;very good reason&lt;/em&gt;. &lt;/p&gt;

&lt;p&gt;Now, let's experiment with the schema registry, beginning with the creation of another topic.&lt;/p&gt;

&lt;p&gt;Lets then create another topic and create a schema for it, produce a message and then consume it. Starting with the topic, lets now create a topic named currencies.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec broker kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic currencies
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Open a consumer&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it broker kafka-console-consumer --bootstrap-server localhost:9092 --topic currencies --from-beginning
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then we will create a schema registry.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema" : "{\"type\": \"record\", \"name\": \"currency\", \"fields\":[{ \"name\":\"code\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/currencies/versions
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The response should be the id of your schema, if it's the first schema it should be &lt;code&gt;{"id":1}&lt;/code&gt;.&lt;br&gt;
After the command ran, you should be able to see the schema under the &lt;code&gt;http://localhost:8081/subjects/currencies/versions/1&lt;/code&gt; url. &lt;/p&gt;

&lt;p&gt;Now let's try to send some messages to this topic.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it schema-registry kafka-avro-console-producer \
  --broker-list broker:29092 \
  --topic currencies \
  --property schema.registry.url=http://schema-registry:8081 \
  --property value.schema.id=1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After you run this command you will be able to write the message, notice that you won't really find a proper prompt nor you will be able to break lines. If you press Enter you will send an empty message and you will find an error showing that it don't match the schema. Instead when the console stops you will can write the following &lt;code&gt;{"code":"GBP","name":"Pound sterling"}&lt;/code&gt; and then press Enter.&lt;/p&gt;

&lt;p&gt;You should see the message &lt;code&gt;GBPPound sterling&lt;/code&gt; on the consumer. If you try to send a message that does not comply with the schema, you should get an exception stating that &lt;/p&gt;

&lt;p&gt;We briefly discussed compatibility earlier. Now, let's delve into this. Dealing with schema registry may be a bit trick, because you need to put some thought ahead of time to ensure that the schemas are retrocompatible. But why? &lt;/p&gt;

&lt;p&gt;Migrating services is always complicated and can open several windows to errors, the idea of making our services retrocompatible helps to mitgate possible problems as ensuring that every service will be updated simultaneously is not only really hard to accomplish it also render our deployment slow, huge and hard to test. One of the solutions is to make sure our contracts are retrocompatible, once this is done we can deploy the producers first without breaking the consumers and then deploy the consumers as we need. &lt;/p&gt;

&lt;p&gt;Now that we went to the details as per why we want to do this, lets go to the next step and update the version of our schema.&lt;/p&gt;

&lt;p&gt;Currently our schema is as the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "type": "record",
  "name": "currency",
  "fields": [
    {
      "name": "code",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Which translated to this JSON object:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "code": "GBP",
  "name": "Pound sterling"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Ok, now let's say we want to add the symbol, we also want to change the attribute &lt;code&gt;name&lt;/code&gt; to &lt;code&gt;description&lt;/code&gt; and also want to know how the currency is backed by adding its type. Our end goal would be to produce this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "code": "GBP",
  "description": "Pound sterling",
  "symbol": "£",
  "type": "FIAT"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's start with &lt;code&gt;symbol&lt;/code&gt;. This is an easy task, we will just add it to the next version and if the client still don't use the latest version it will simple not pick the &lt;code&gt;symbol&lt;/code&gt;, we won't break anything and we are happy with it.&lt;/p&gt;

&lt;p&gt;Starting with the change from &lt;code&gt;name&lt;/code&gt; to &lt;code&gt;description&lt;/code&gt; we get something interessing. We can't update the schema and remove the &lt;code&gt;name&lt;/code&gt;, because this would break the compatibility. In this scenario, the producer would create events with the &lt;code&gt;description&lt;/code&gt; and no &lt;code&gt;name&lt;/code&gt; while the client would be expecting the &lt;code&gt;name&lt;/code&gt;. It won't break because of the new field, but the lacking of the expected field. In order to do this change and ensure the compatibility, we will need to produce both fields: &lt;code&gt;name&lt;/code&gt; and &lt;code&gt;description&lt;/code&gt;. &lt;/p&gt;

&lt;p&gt;Let's jump to the &lt;code&gt;type&lt;/code&gt; and this one can be quite easy, it depends on how we will handle it. If we decide that the &lt;code&gt;type&lt;/code&gt; will be a plain string, we can treat it similar to the &lt;code&gt;symbol&lt;/code&gt;. However if we say it is an enum, things change a bit. &lt;/p&gt;

&lt;p&gt;If only have the &lt;code&gt;FIAT&lt;/code&gt; type at the moment, one can say the expected avro schema would be&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "type": {
      "type": "enum",
      "name": "Type", 
      "symbols": ["FIAT"]
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;While this would work, it poses a problem: When we add a new &lt;code&gt;type&lt;/code&gt; it will break the compatibiliy and the client will no longer be able to process the events, to remediate that we can introduce an &lt;em&gt;UNKNOWN&lt;/em&gt; type that the client will default to, and then we can have the client handle such event as we want.&lt;br&gt;
By having this &lt;em&gt;UKNOWN&lt;/em&gt; placeholder, when a client consumes an event that was produced with a higher avro schema version and, this event has an enum type that the client's avro version do not has, it will rollback to the &lt;code&gt;default&lt;/code&gt;enum, in that case the &lt;em&gt;UNKNOWN&lt;/em&gt;. With this information, the client won't break and you can process it accordingly.&lt;/p&gt;

&lt;p&gt;It should then be like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "type": {
    "type": "enum",
    "name": "Type",
    "symbols": [
      "UNKNOWN",
      "FIAT"
    ],
    "default": "UNKNOWN"
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now that we know how our new schema should be, lets create it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema" : "{\"type\": \"record\", \"name\": \"currency\", \"fields\":[{ \"name\":\"code\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"symbol\",\"type\":[\"null\", \"string\"],\"default\":null},{\"name\":\"description\",\"type\":[\"null\", \"string\"],\"default\":null},{\"name\":\"currencyType\",\"type\":{\"type\":\"enum\",\"name\":\"CurrencyType\",\"symbols\":[\"UNKNOWN\",\"FIAT\"],\"default\":\"UNKNOWN\"},\"default\":\"UNKNOWN\"}]}"}' \
http://localhost:8081/subjects/currencies/versions
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Alright, so we have created the new version with default values for all new fields. Let's now post a new message and see what happens.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it schema-registry kafka-avro-console-producer \      --broker-list broker:29092 \
  --topic currencies \
  --property schema.registry.url=http://schema-registry:8081 \
  --property value.schema.id=2
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;By using the new version we can provide the new fields, our new event should look like this&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "code": "GBP",
  "name": "Pound sterling",
  "description": {
    "string": "Pound sterling"
  },
  "symbol": {
    "string": "£"
  },
  "currencyType": "FIAT"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We duplicate &lt;code&gt;Pound sterling&lt;/code&gt; to ensure that clients using the older version won't experience issues. Also you probably noticed that &lt;code&gt;description&lt;/code&gt; and &lt;code&gt;symbol&lt;/code&gt; are slightly different than &lt;code&gt;name&lt;/code&gt; and &lt;code&gt;code&lt;/code&gt;. Why? That's because to define it as nullable we need to provide this array of types &lt;code&gt;[ "null", "string" ]&lt;/code&gt; and, when using Avro with a union of types we need to provide a wrapper stating what is the type.&lt;/p&gt;

&lt;p&gt;After executing the command, the new message is sent, we should now see the new log on the consumer &lt;code&gt;GBPPound sterling£Pound sterling&lt;/code&gt;. And that is it!&lt;/p&gt;

&lt;p&gt;I hope this was a cool short dive into Kafka and that I made you more interested into reading more about it.&lt;/p&gt;

&lt;p&gt;Stay awesome :)&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>architecture</category>
      <category>devops</category>
      <category>distributedsystems</category>
    </item>
  </channel>
</rss>
