Over the past few weeks, I’ve been diving into Kafka and taking notes along the way, which I decided to organize and structure them a blog post, on it, apart from concepts and tips there is a practical example built with NestJS and KafkaJs.
What is Kafka?
Apache Kafka is a distributed event-streaming platform designed to handle real-time events. It enables storing, processing, and retrieving large-scale, high-throughput, low-latency data streams, making it suitable for building real-time data pipelines and event-driven applications.
Key Features:
- Event Streaming: Kafka organizes data into topics, which are ordered logs of events.
- Distributed Architecture: Kafka is built for scalability and fault tolerance. It operates as a cluster of nodes called brokers and can distribute data across multiple servers.
- Publish-Subscribe Model: Producers write messages to the topics, and consumers read messages from them. Kafka supports multiple consumers, allowing different applications to independently process the same data stream.
- High Performance: Kafka is optimized for high throughput, processing millions of messages per second with low latency.
- Durable Storage: Kafka stores messages on disk with configurable retention periods, ensuring data persistence and reliability.
- Partitioning and Replication: Topics are divided into partitions for scalability and replicated across brokers for fault tolerance.
- Replayability: Consumers can re-read messages by resetting their offset, enabling data reprocessing or recovery.
- Integration and Ecosystem: Kafka integrates with various systems and has tools like Kafka Connect (for data integration) and Kafka Streams (for stream processing).
Advantages
- Reliability: It ensures fault tolerance through data distribution, replication, and partitioning.
- Scalability: Kafka can process massive data volumes and scale horizontally without downtime.
- Durability: Messages are promptly stored, ensuring resilience and data persistence.
- Performance: Kafka maintains high performance under extreme data loads, handling large volumes of data without downtime or data loss.
Disadvantages
These trade-offs are intentional design choices to maximize Kafka's performance but may pose challenges for use cases requiring greater flexibility:
- Limited Flexibility: Kafka lacks support for extended queries, such as filtering specific data in reports. Consumers must handle these tasks, as Kafka retrieves messages by offsets in the order they are received.
- Not Designed for Long-Term Storage: Kafka excels in streaming data but isn't suited for storing historical data for extended periods. Data duplication can make storage costly for large datasets.
-
No Wildcard Topic Support: Kafka doesn’t allow consuming from multiple topics using wildcard patterns (e.g.,
log-2024-*
).
Use cases
- Real-Time Analytics: Process and analyze data streams as they occur.
- Event Sourcing: Record all changes to an application’s state as a sequence of events.
- Log Aggregation: Collect and manage logs from distributed systems.
- Data Pipelines: Stream data between systems reliably and efficiently.
- IoT Applications: Handle sensor data from IoT devices in real-time.
How does Kafka work?
Kafka integrates the features of both queuing and publish-subscribe messaging models, offering consumers the advantages of each approach.
- Queuing enables scalable data processing by distributing tasks across multiple consumer instances but traditional queues do not support multiple subscribers.
- The publish-subscribe model supports multiple subscribers but cannot distribute tasks among multiple worker processes as each message is sent to all subscribers.
Kafka employs a partitioned log system to combine the benefits of queuing and publish-subscribe models. Logs, which are ordered sequences of records, are divided into partitions, with each partition assigned to different subscribers (consumers). This setup enables multiple subscribers to share a topic while maintaining scalability.
Events, Topics, and Partitions
We have seen that Kafka is a platform designed to handle real-time events, before talking about how those events are handled we need to have a definition for them:
An event is an action, incident, or change that's recorded applications, for example, a payment, a website click, or a temperature reading.
Events in Kafka are modeled as key/value pairs, where both keys and values are serialized into byte sequences.
- Values often represent serialized domain objects or raw inputs, such as sensor outputs or other application data. They encapsulate the core information being transmitted in the Kafka event.
- Keys can be complex domain objects, however are often simple types like strings or integers. Instead of uniquely identifying an individual event (as a primary key does in a relational database), keys typically identify entities within the system, such as a specific user, order, or connected device.
Kafka organizes events into ordered logs called topics. When an external system writes an event to Kafka, it is appended to the end of a topic. Messages remain in topics for a configurable duration, even after being read. Unlike queues, topics are durable, replicated, and fault-tolerant, efficiently storing event records. However, logs can only be scanned sequentially, not queried.
Topics are stored as log files on disk, however, disks have limitations such as finite size and I/O. To overcome this, Kafka allows topics to be divided into partitions, breaking a single log into multiple logs that can be distributed across different servers. This partitioning enables Kafka to scale horizontally, enhancing its capacity to handle large volumes of events and high throughput.
Kafka assigns messages to partitions based on whether they have a key:
- No Key: Messages are distributed round-robin across all partitions, ensuring an even data distribution but not preserving message order.
- With Key: The partition is determined by hashing the key, ensuring that messages with the same key always go to the same partition and maintain their order.
Brokers
Kafka operates as a distributed data infrastructure using nodes called brokers, which collectively form a Kafka cluster. Brokers can run on bare metal hardware, a cloud instance, in a container managed by Kubernetes, in Docker on your laptop, or wherever JVM processes can run.
Brokers focus on:
- Writing new events to partitions.
- Serving reads from partitions.
- Replicating partitions across brokers.
They do not perform message computation or topic-to-topic routing, keeping their design simple and efficient.
Replication
Kafka ensures data durability and fault tolerance by replicating partition data across multiple brokers. The primary copy of a partition is the leader replica, while additional copies are follower replicas. Data is written to the leader, which automatically replicates updates to the followers.
This replication process ensures:
- Data safety, even in the event of broker or storage failures.
- Automatic failover, where another replica takes over as leader if the current leader fails.
Developers benefit from these guarantees without needing to manage replication directly, as Kafka handles it transparently.
Producers
A Kafka producer is a client application that sends (publishes) data to Kafka topics. It’s responsible for creating and transmitting messages (records) to the Kafka cluster. Producers determine the topic and partition where messages will be stored based on their configuration and the presence of a message key. Producers are responsible for, but not limited to:
-
Message Composition:
- Each message consists of a key (optional), a value (the actual data), and metadata.
- The key determines the partition for the message, ensuring order for messages with the same key.
-
Partition Assignment:
- If a key is provided, the producer uses a hashing algorithm to determine the partition.
- Without a key, messages are distributed across partitions in a round-robin manner for load balancing.
-
Compression:
Producers can compress messages to reduce network bandwidth and storage use.
Consumers
A Kafka consumer is a client application that reads messages from Kafka topics, it retrieves messages from Kafka partitions at their own pace, allowing for real-time or batch processing of data. Notice that Kafka does not push messages to consumers, they pull messages from Kafka partitions by requesting the data.
Consumers also keep track of the offsets they have processed. Offsets can be committed automatically or manually, ensuring data is not lost if a consumer fails. This allows for flexible consumption, including replaying messages by resetting the offset.
Consumer groups
A consumer group is a set of consumers that cooperate to consume data from some topics, which allows for distributed processing of a topic's messages.
Partitions of a topic are divided among the consumers in the group, ensuring each message is processed by only one consumer within the group. Multiple consumer groups can independently consume the same topic without interference.
When a new consumer joins a group or an existing consumer fails, Kafka reassigns partitions among the consumers in the group to ensure all partitions are covered.
Serialization and Deserialization
Serialization and deserialization in Kafka are about converting data between its original format and a byte array for transmission and storage, allowing producers and consumers to communicate efficiently.
Serialization
Is the process of converting an object or data structure into a byte stream so it can be transmitted or stored. Before a producer sends data to a Kafka topic, it serializes the data (key and value) into byte arrays.
Common Serialization Formats:
- JSON: Human-readable, widely compatible.
- Avro: Compact and efficient, schema-based.
- Protobuf: Compact, schema-based, and language-agnostic.
- String: Simple text-based serialization.
- Custom Serialization: For application-specific needs.
Deserialization
Is the reverse process, where a byte stream is converted back into its original object or data structure. When a consumer reads data from a Kafka topic, it deserializes the byte array back into a usable format for processing.
Compression
Compression is reducing the size of messages before they are stored or transmitted. It optimizes storage usage, reduces network bandwidth consumption, and improves overall performance by sending smaller payloads between producers, brokers, and consumers.
When a producer sends messages to a Kafka topic, it can compress the message before the transmission. The compressed message is stored on brokers as-is and decompressed by consumers when they read the messages.
Advantages
- Reduced Network Bandwidth: Smaller payloads mean less data is transmitted over the network.
- Lower Storage Requirements: Compressed messages take up less space on disk.
- Improved Throughput: Smaller messages allow for faster data transfer and processing.
When to use?
- Use cases with large message sizes: Compression greatly reduces data size.
- High-throughput systems: Reduces the strain on network and storage resources.
- Batching: Compression works best when producers batch multiple messages together.
While compression saves resources, it's essential to balance the trade-off between CPU usage and compression benefits, choosing the codec that suits your use case.
Supported Compression Types
- None: No compression (default).
- Gzip: High compression ratio but higher CPU usage.
- Snappy: Balanced compression speed and CPU usage, suitable for real-time use cases.
- LZ4: Faster compression and decompression, optimized for low-latency systems.
- Zstd: High compression ratio with better performance than Gzip, supported in newer Kafka versions.
Tuning
Optimizing Apache Kafka's performance involves fine-tuning various components to balance throughput and latency effectively. This article only scratches the surface of this subject, here are some aspects to consider when tuning Kafka:
-
Partition Management:
- Partition Count: Increase the number of partitions to enhance parallelism and throughput. However, avoid excessive partitions to prevent management overhead. Align the number of partitions with your consumer capabilities and desired consumption rate.
-
Producer Configuration:
-
Batching: Configure
batch.size
andlinger.ms
to enable efficient batching of messages, reducing the number of requests and improving throughput. -
Compression: Implement compression (e.g.,
compression.type=snappy
) to decrease message size, reducing network and storage usage. Be mindful of the additional CPU overhead introduced by compression.
-
Batching: Configure
-
Consumer Configuration:
-
Fetch Settings: Adjust
fetch.min.bytes
andfetch.max.wait.ms
to control how consumers retrieve messages, balancing latency and throughput according to your application's needs.
-
Fetch Settings: Adjust
Practical example
Imagine an application that records the temperature in a room and transmits this data using Kafka, where another application processes it. For simplicity, we'll focus exclusively on the Kafka aspect, with both the producer and consumer implemented within the same application. In this scenario, each recorded temperature at a specific moment represents an event:
{
temperature: 42,
timeStamp: new Date(),
};
All the code will be in this repository.
First, we need a Kafka broker, but instead of installing Kafka in our machine let’s just this Docker Kafka image.
Start by pulling that image:
docker pull apache/kafka
Then run it mapping the port that Kafka listens on the same port on our machine:
docker run -d -p 9092:9092 --name broker apache/kafka:latest
That’s it, we have a Kafka broker running, before continuing you might want to play around with it by creating topics, sending and consume messages, to do that just follow the instructions on that image page.
To build our application we’re going to use NestJS with KafkaJS, start by creating the app with Nest CLI
nest new my-nest-project
Inside the project folder install kafkajs
npm i kafkajs
And generate the following modules
nest g mo kafka
nest g mo producer
nest g mo consumer
nest g mo temperature
The Kafka module will handle all Kafka-specific operations, including managing consumer and producer classes for connecting, disconnecting, sending, and receiving messages. This will be the only module directly interacting with the kafkajs
package.
The Producer and Consumer modules will act as interfaces between the pub-sub platform (Kafka, in this case) and the rest of the application, abstracting platform-specific details.
The Temperature module will manage the events. It doesn't need to know which pub-sub platform is being used, it only requires a consumer and a producer to function.
With the modules created, let’s also create a folder src/interface
and add the following interfaces in it:
// src/interfaces/producer.interface.ts
export interface IProducer {
produce: (message: any) => Promise<void>;
connect: () => Promise<void>;
disconnect: () => Promise<void>;
isConnected: () => boolean;
}
// src/interfaces/consumer.interface.ts
export type ConsumerMessage = {
key?: string;
value: any;
};
export type OnMessage = (message: ConsumerMessage) => Promise<void>;
export interface IConsumer {
connect: () => Promise<void>;
disconnect: () => Promise<void>;
consume: (onMessage?: OnMessage) => Promise<void>;
isConnected: () => boolean;
}
In src/kafka/
folder add the producer and consumer classes that implement those interfaces:
// src/kafka/kafka.producer.ts
export class KafkaProducer implements IProducer {
private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
private readonly kafka: Kafka;
private readonly producer: Producer;
private connected: boolean = false;
constructor(
private readonly broker: string,
private readonly topic: string,
) {
// The client must be configured with at least one broker
this.kafka = new Kafka({
brokers: [this.broker],
});
this.producer = this.kafka.producer();
}
async produce(
message: Message,
compression?: CompressionTypes,
acks?: number,
timeout?: number,
) {
// To produce, at least a topic and a list of messages must be provided
await this.producer.send({
topic: this.topic,
messages: [message],
compression,
timeout,
acks,
});
}
// To produce a message, the producer must be connected
async connect() {
try {
// Just hooking up some logs in the producer events
// And storing the connection status
this.producer.on('producer.connect', () => {
this.logger.log(
`producer connected. broker: ${this.broker} topic: ${this.topic}`,
);
this.connected = true;
});
this.producer.on('producer.disconnect', () => {
this.logger.log(
`producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
);
this.connected = false;
});
// Connect to Kafka
await this.producer.connect();
} catch (err) {
this.logger.error(
`failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
err,
);
}
}
async disconnect() {
await this.producer.disconnect();
}
isConnected(): boolean {
return this.connected;
}
}
// src/kafka/kafka.cosumer.ts
export class KafkaConsumer implements IConsumer {
private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
private readonly kafka: Kafka;
private readonly consumer: Consumer;
private connected: boolean = false;
constructor(
private readonly broker: string,
private readonly topic: string,
private readonly groupId: string,
) {
if (this.broker && this.topic && this.groupId) {
// The client must be configured with at least one broker
this.kafka = new Kafka({
brokers: [this.broker],
});
this.consumer = this.kafka.consumer({ groupId: this.groupId });
} else {
this.logger.warn('Broker, topic and groupId must be provided');
}
}
// The onMessage function will be called when a message is received
async consume(onMessage: OnMessage) {
// Here we subscribe to the topic ...
await this.consumer.subscribe({ topic: this.topic });
// ... and handle the messages
await this.consumer.run({
eachMessage: async (payload) => {
try {
this.logger.log(
`message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
);
await onMessage({
key: payload.message.key?.toString(),
value: payload.message.value.toString(),
});
} catch (err) {
this.logger.error('error on consuming message', err);
}
},
});
}
// To consume, the consumer must be connected
async connect() {
try {
// Just hooking up some logs in the consumer events
// And storing the connection status
this.consumer.on('consumer.connect', () => {
this.logger.log(
`consumer connected. broker: ${this.broker} topic: ${this.topic}`,
);
this.connected = true;
});
this.consumer.on('consumer.disconnect', () => {
this.logger.log(
`consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
);
this.connected = false;
});
await this.consumer.connect();
} catch (err) {
this.logger.error(
`failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
err,
);
}
}
async disconnect() {
await this.consumer.disconnect();
}
isConnected(): boolean {
return this.connected;
}
}
Don’t forget to export these classes in kafka.module.ts
// src/kafka/kafka.module.ts
@Module({
imports: [],
providers: [KafkaProducer, KafkaConsumer],
exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
As it is now we could go to the temperature module and instantiate those Kafka classes and start using them. However, it would be better if the temperature module didn’t have to worry about which pub-sub platform it was using. Instead, it should simply work with an injected producer and/or consumer, focusing solely on sending and receiving messages, regardless of the underlying platform. This way, if we decide to switch to a different pub-sub platform in the future, we won’t need to make any changes to the temperature module.
To achieve this abstraction, we can create Producer and Consumer classes that handle the specifics of Kafka’s Producer and Consumer implementations. Let’s start with the Producer:
// src/producer/producer.service.ts
@Injectable()
export class ProducerService implements OnApplicationShutdown {
// Expects any producer that implements the IProducer interface
private readonly producer: IProducer;
constructor(
@Inject('broker') broker: string,
@Inject('topic') topic: string,
) {
this.producer = new KafkaProducer(broker, topic);
}
/** The produce() and message can receive more parameters,
* refer to produce method in src/kafka/kafka.producer.ts
*/
async produce(message: { key?: string; value: string }) {
if (!this.producer.isConnected()) {
await this.producer.connect();
}
await this.producer.produce(message);
}
async onApplicationShutdown() {
await this.producer.disconnect();
}
}
// src/producer/producer.module.ts
@Module({
imports: [KafkaModule],
providers: [
ProducerService,
{
provide: 'broker',
useValue: 'default-broker-value',
},
{
provide: 'topic',
useValue: 'default-topic-value',
},
],
exports: [ProducerService],
})
export class ProducerModule {}
Now, the Consumer:
// src/consumer/consumer.service.ts
@Injectable()
export class ConsumerService implements OnApplicationShutdown {
// Expects any consumer that implements the IConsumer interface
private readonly consumer: IConsumer;
constructor(
@Inject('broker') broker: string,
@Inject('topic') topic: string,
@Inject('groupId') groupId: string,
) {
this.consumer = new KafkaConsumer(broker, topic, groupId);
}
async consume(onMessage: OnMessage) {
if (!this.consumer.isConnected()) {
await this.consumer.connect();
}
await this.consumer.consume(onMessage);
}
async onApplicationShutdown() {
await this.consumer.disconnect();
}
}
// src/consumer/consumer.module.ts
@Module({
imports: [KafkaModule],
providers: [
ConsumerService,
{
provide: 'broker',
useValue: 'default-broker-value',
},
{
provide: 'topic',
useValue: 'default-topic-value',
},
{
provide: 'groupId',
useValue: 'default-groupId-value',
},
],
exports: [ConsumerService],
})
export class ConsumerModule {}
Now, we can focus on building the temperature module. In the temperature.service.ts
file, we’ll create a method to register a temperature, which in this example will simply send the temperature data to the broker using a producer. Additionally, we’ll implement a method to handle incoming messages for demonstration purposes.
These methods can be invoked by another service or a controller. However, for simplicity, in this example, we’ll call them directly when the application starts, utilizing the onModuleInit
method.
// src/temperature/temperature.service.ts
@Injectable()
export class TemperatureService implements OnModuleInit {
private readonly logger = new Logger(TemperatureService.name, {
timestamp: true,
});
private readonly producerService: ProducerService;
private readonly consumerService: ConsumerService;
constructor(private readonly configService: ConfigService) {
this.producerService = new ProducerService(
this.configService.get('BROKER'),
this.configService.get('TOPIC'),
);
this.consumerService = new ConsumerService(
this.configService.get('BROKER'),
this.configService.get('TOPIC'),
'temperature-consumer',
);
}
async registerTemperature(temperature: number) {
const message = {
temperature,
timeStamp: new Date(),
};
await this.producerService.produce({
value: JSON.stringify(message),
});
}
async handleTemperatureReading() {
await this.consumerService.consume(async (message: ConsumerMessage) => {
this.logger.log(message);
// TODO handle consumed message
});
}
// Called when the application starts, only for testing purposes
async onModuleInit() {
await this.registerTemperature(30);
// await this.handleTemperatureReading();
}
}
// src/temperature/temperature.module.ts
@Module({
imports: [ProducerModule, ConsumerModule],
providers: [TemperatureService],
})
export class TemperatureModule {}
That's it! With the broker running in the Docker container, you can start the application to send and receive messages. Additionally, you can open a shell inside the broker container using the following command:
docker exec --workdir /opt/kafka/bin/ -it broker sh
From there, you can interact with the broker directly and send messages to the application, receive messages from it, create new topics, etc.
This is the repository with the code of this example.
Top comments (0)