Recently, I delved into Kafka Streams for my job and found it to be a very interesting topic. I am by no means an expert; on the contrary, I am just documenting my journey of apprenticeship.
So, let's do this.
Well, (Apache) Kafka was named after the famous novelist Franz Kafka just because it is the framework creator's (Jay Kreps) favourite writer. Just because. I find that to be very nice.
Apache Kafka is a distributed event streaming platform that is designed to be fast, scalable, and durable. It provides a publish-subscribe model, allowing multiple producers to send messages into topics, and consumers to read messages from topics. Kafka is commonly used for building real-time data pipelines and streaming applications because of its ability to handle high throughput with low latency.
Okay, so what are it's main components?
Let's delve a little deeper into Kafka's main components, starting with:
Kafka Topics
Kafka topics are a particular stream of data within your Kafka cluster. Your Kafka cluster can have many topics. Kind of like a database table, but without all the constraints, because you send whatever you want to a Kafka topic, there is no data validation involved. A topic is identifiable by its name.
The sequence of messages in a topic is called a data stream. Kafka is used to make data stream through topics.
Topics cannot be queried, unlike a database table. Instead, Kafka uses Producers to send data and Consumers to read the data.
Topics can be divided into Partitions and each message within each partition is ordered and gets an incremental id, called offset. Each offset has meaning only to its respective partition, i.e. the offset values don't mix up between each partition. Offsets are not re-used even when a previous message has been deleted.
Kafka Topics are also immutable. Once data has been written to a partition, it cannot be changed, you must keep writing to the latter.
Data is assigned randomly to a partition unless a message key (more on that later on) is provided. Data is kept for a limited time (the default is one week - or seven days).
Kafka Broker
In the Kafka ecosystem, a broker acts as the workhorse by storing data and handling client requests. Multiple brokers work in unison in a Kafka cluster to provide scalability, fault tolerance, and high throughput. The presence of multiple brokers and their ability to replicate data across each other ensures that the Kafka system remains robust and available even in the face of broker failures. It's pretty awesome.
A Kafka cluster is composed of multiple brokers (or servers, for that matter). Each broker is identified with its ID (which is an integer). Each broker contains certain topic partitions and after connecting to any broker (called a bootstrap broker), you will be connected to the entire cluster, but don't worry, Kafka clients are smart enough to work with the scenario.
A good number of brokers to start with would be three or so, but there are clusters with hundreds of brokers.
Visual example (behold my drawing skillz):
Topic-A has 3 partitions and Topic-B has 2 partitions. The data is distributed but broker 3 doesn't have any Topic-B data because it has already been placed within the other 2 brokers.
Kafka Broker Discovery
Every Kafka broker is also called a bootstrap server. Meaning: you only need to connect to one broker and Kafka automatically will have the capability of connecting to the entire cluster (using smart clients). Each broker knows of all brokers, topics and partitions (the metadata!).
An arrow points from the Kafka client to "broker 1 (bootstrap)" with the label "1. connection + metadata request," indicating the initial step where the client connects to a bootstrap broker and requests metadata.
A second arrow points back from "broker 1 (bootstrap)" to the Kafka client, labeled "2. list of all brokers," suggesting that the bootstrap broker responds with a list of all brokers in the cluster.
A third arrow leads from the Kafka client towards the bottom of the image and then points to the right, indicating "3. kafka client is able to connect to the needed brokers," meaning that after receiving the list, the client can connect to any of the brokers in the cluster as required."/>
Topic Replication Factor
Topics should have a replication factor > 1 (usually between 2 and 3). If a broker is down, another broker can then serve the data.
Partition Leader
At any time, only ONE broker can be a leader for a given partition. Producers can only send data to the broker that is the pratition leader, while the other brokers will replicate the data. Consumers will read, by default, from the leader broker of a partition (since Kafka 2.4, it is possible to configure consumers to read from the closes replica, improving latency). Long story short: each partition has one leader and mutiple in-sync replicas (ISR).
Visual example: Topic-A with 2 partitions and a replication factor of 2, with it's partitions leaders:
Broker 2: Contains two blue rectangles. The top rectangle is labeled "partition 1, topic a (leader)" with a yellow star, indicating that this broker is the leader for partition 1 of topic a. The bottom rectangle is labeled "partition 0, topic a (ISR)," suggesting that this broker has a replica of partition 0 and is in the set of In-Sync Replicas (ISR).
Broker 3: Features a single blue rectangle labeled "partition 1, topic a (ISR)," indicating that this broker is part of the ISR for partition 1 of topic a."/>
Topic Durability
For a topic replication of factor 3, topic data durability can withstand 2 broker losses.
As a general rule, for a replication factor of N
, you can permanently lose up to N-1
brokers and still recover the data.
Kafka Producers
Producers could be described as applications that create and send messages to topics (which are made of partitions). Producerts know to which partition to write to and which Kafka broker has it. Producers will automatically recover if a Kafka broker fails.
We mentioned message keys above, so let's conceptualize it: Kafka Producers send an optional key (a string, number, binary...) within the messages.
Let's take, for instance, a Producer with two partitions: if the message key is null, it's content is going to be saved round-robin style (first partition 0, then partition 1 and so on - as a form of load balancing). However, if the key isn't null, all messages for that key are going to be sent to the same partition. Cool, huh?
Producer Acknowledgements (acks)
Producers can choose to receive acknowledgement of data writes:
-
acks = 0
: Producer won't wait for acknowledgements, which could end up in data loss. -
acks = 1
: Producer will wait for the leader acknowledgement, which could generate limited data loss. -
acks = all
: Producer will wait for both leader and ISRs to acknowledge. No data loss.
A basic Producer example written in Java could be like this:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// Creating Kafka producer properties:
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "all");
// Initializing the Kafka producer:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// Creating and sending the message with a key:
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record);
// Closing the producer:
producer.close();
}
}
Ok but what?
bootstrap.servers
: This is the address to your Kafka cluster (a group of Kafka brokers). In this case, it's pointing to a Kafka broker (a server that stores data and serves client requests from producers and consumers) running locally on port 9092.
key.serializer
and value.serializer
: These specify how the producer should serialize (or convert) the keys and values to bytes before sending them to the Kafka broker. In this case, we're using Kafka's built-in StringSerializer
, which means we're sending strings for both keys and values.
acks
: This setting means the producer will receive an acknowledgment after all in-sync replicas have received the data.
First of all, we're initializing the Kafka producer with the given properties. The type parameters signify that both the key and the value are of type String.
Then, we create a ProducerRecord which contains the topic we want to send the message to (test-topic
) and the key-value pair we want to send (key
and value
). Afterwards, we send the record using the send
method of the producer.
After sending the message, it's a good practice to close the producer to free up resources.
In summary, this code initializes a Kafka producer, sends a single message to the test-topic
, and then closes the producer. This is a simple illustration and in real-world scenarios, additional error handling, callback mechanisms for acknowledgments, and other configurations ARE VERY NEEDED.
Now, let's talk about them Kafka Consumers.
Kafka Consumers
Consumers are applications that read messages from topics. Here's a basic example of a Kafka consumer in Java:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("Topic: %s, Partition: %s, Offset: %s, Key: %s, Value: %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
});
}
}
}
Yeah but?
The code above is a simple example of a Kafka consumer using the Kafka client library. It consumes messages from the test-topic
topic and prints out details about each message. Let's walk through the code step by step:
BOOTSTRAP_SERVERS_CONFIG
: Specifies the Kafka broker's address. In this example, the broker is running locally on port 9092.
GROUP_ID_CONFIG
: Specifies the consumer group ID. Consumers can join a group to collaboratively consume topics. Kafka ensures that each message is delivered to one consumer within each consumer group.
KEY_DESERIALIZER_CLASS_CONFIG
& VALUE_DESERIALIZER_CLASS_CONFIG
: Define how to deserialize (convert from bytes back to objects) the keys and values from the Kafka records. In this example, we are using Kafka's built-in StringDeserializer, indicating that our keys and values are strings.
Then we're creating an instance of the Kafka consumer with the properties defined above. The type parameters indicate that both the key and the value are of type String.
The line consumer.subscribe(Collections.singletonList("test-topic"));
specifies that the consumer is interested in messages from the test-topic topic. The subscribe method expects a list of topics, so we wrap our single topic in a singletonList (not a real world example).
Then, the consumer continuously polls for new messages from the topic:
consumer.poll(Duration.ofMillis(100))
: This method retrieves any available messages from the topic. The duration (100 milliseconds in this case) specifies the maximum amount of time the poll method will block if no records are available.
For each message (or record) retrieved, we print out its details such as the topic name, partition number, offset, key, and value.
Zookeeper
When installing Kafka, you usually install Zookeeper with it. But what is Zookeeper?
Well, Zookeeper manages brokers, keeping a list. It also helps electing leaders for partitions and sends notifications to Kafka in case of changes (e.g.: new topic, deleted topics, broker dies, broker comes up...).
Kafka 2.x can not work without Zookeeper.
Kafka 3.x can work without Zookeeper (it uses Kafka Raft - KRaft instead).
Kafka 4.x will not have Zookeeper.
Zookeeper, by design, will work with an odd number of servers (1, 3, 5...) and Zookeeper also has leaders (which can write) and followers (which can read). Zookeeper also does not store any consumer data.
We will not cover details of Zookeeper on this post, but should you use it?
In Kafka Brokers
Yes. Until version 4.x is out, you should use Zookeeper in production.
In Kafka Clients
As of version 0.10, Kafka has deprecated the use of Zookeeper for consumer offset storage; consumers should instead store offsets directly in Kafka. From version 2.2 onwards, the kafka-topics.sh CLI command has been updated to interact with Kafka brokers rather than Zookeeper for topic management tasks such as creation and deletion. Consequently, any APIs and commands that previously depended on Zookeeper have been transitioned to utilize Kafka. This ensures a seamless experience for clients when clusters eventually operate without Zookeeper. For enhanced security, Zookeeper ports should be restricted to accept connections solely from Kafka brokers and not from Kafka clients.
TLDR: DON'T USE ZOOKEEPER IN KAFKA CLIENTS.
Hope you enjoyed it! Any constructive feedback is more than welcome.
Next up: setting a closer-to-what-you-see-in-your-day-to-day-work-as-a-developer-in-information-technology Kafka application!
Top comments (0)