DEV Community

Fred Munjogu
Fred Munjogu

Posted on

Apache Kafka Deep Dive: Core Concepts, Data Engineering Applications, and Real-World Production Practices

As a data engineer, you will often need to stream data. To be more specific, you will need a tool to help you stream live data for whichever project you will be working on.

Kafka is a great tool and has a ton of functionality to help you stream data seamlessly. In this article, we will focus on the core concepts you need to know to get started with Kafka.

Kafka Architecture

Brokers

A broker is a server that stores the data we use in streaming and also handles all the data streaming requests. The broker acts as a middleman between a producer (those who send information) and a consumer (those who receive the information).

Zookeeper vs. KRaft (Kafka Raft Metadata mode)

In earlier versions of Kafka (lower than v2.8), Kafka contained an external coordinator by the name ZooKeeper, which was in charge of handling metadata. ZooKeeper worked hand in hand with the brokers to provide information on who the controller is and also to persist the state.

KRaft was introduced in Kafka 2.8 as an alternative and fully replaced ZooKeeper in Kafka 3.3+. Instead of brokers relying on ZooKeeper anymore, Kafka now uses Raft consensus algorithm to manage metadata and fully remove the need for ZooKeeper. This makes it simple and much more scalable since there are fewer moving parts.

Topics, Partitions, Offsets

Topic

A topic in Kafka is a log that stores messages and events in a logical order. We can equate a topic to a folder in a filesystem, and the events to the files.

Partition

A partition is a "slice" of a topic. This means that when you create a topic, you need to specify the number of partitions you will need.

This is important since it allows more brokers to share the load, since the data is stored in separate partitions. This also allows consumers in the same group to read from different partitions at the same time. An example of a partition is as follows:
Say you create a topic called orders with 3 partitions. This is what it will actually look like

Orders partitions
orders-0: [msg1, msg4, msg7]
orders-1: [msg2, msg5, msg8]
orders-2: [msg3, msg6, msg9]

Enter fullscreen mode Exit fullscreen mode

Offsets

An offset is a unique identifier assigned to each message in a partition. This helps the producers, consumers, and brokers to determine the position of a message in a partition.

Producers

A producer is a client that writes messages to topics in the Kafka cluster. It is also possible to specify the partition the message will be stored in by using key-based partitioning.

Consumers

A consumer is a client that reads messages from topics in the Kafka cluster.

In Kafka, there are consumer groups. These basically help consumers to work together in parallel. Different consumers are able to read messages from topics concurrently.

Message Delivery Semantics

There are three different types of delivery semantics:

  • at-most-once
  • at-least-once
  • exactly-once

At-Most-Once Delivery

This approach entails the consumer only saving the position of the last event and then processing it. This means that in the event the consumer fails in the middle of execution, there is no way to go back to read this event.

This approach is for situations where some data loss is not an issue and accuracy is not a priority.

At-Least-Once Delivery

This approach entails the consumer processing the received events, saving the results, and also saving the position of the last received event. This is different from at-most-once delivery in the sense that it can retrieve and reprocess old events.

Exactly-Once Delivery

This approach is similar to at-least-once delivery, whereby the consumer receives the event, saves the results, and also the position of the last received event. The difference comes in where any duplicates are dropped, and this leads to only one processing of an event.

Retention Policies

Retention refers to the configurable time period during which data is retained in a Kafka topic. This outlines how long messages are preserved in topics before they are deleted.

Some of the retention policies include:

  • Time-based retention
  • Size-based retention

Time-based retention

This is a policy where you can configure how long messages are retained in their topics based on their timestamps. Once that time period expires, the closed segments are deleted.

Size-based retention

This is a policy where the expiration of messages is dependent on the bytes of messages retained. This means that if the configured size is attained or exceeded, the messages are then deleted.

Serialization & Deserialization

Serialization refers to the process of converting structured data into a byte stream. This is crucial since Kafka stores and transmits data as raw bytes. This process is done by a serializer.

Deserialization is the opposite of serialization. This is converting data from a byte stream to a structured form, e.g., an object. This process is done by a deserializer.

The common formats used in Kafka are Avro, Protobuf, or JSON schema.

Replication and Fault Tolerance

In Kafka, one can specify the replication factor when creating a topic. This allows for multiple copies to be created across different brokers, which ensures data availability.

Replication Factor

A configurable setting for each topic that determines the total number of copies for each partition.

Leader

For each partition, one replica is designated as the leader, which handles all incoming produce and consume requests.

Follower

Other replicas for a partition are followers. They continuously fetch data from the leader to stay synchronized.

ISR (In-Sync Replicas)

The set of follower replicas that have successfully replicated the leader's data and are fully up-to-date.

High Availability

The system remains functional and data is accessible, even if one or more brokers fail.

Kafka Connect

This is a tool for streaming data between Kafka and other data systems. In this article, we will focus on streaming data between two databases, namely, PostgreSQL and Cassandra.

Configuring our Postgres Connector

In our case, we will be using Debezium Connect, which is built on Kafka Connect. To start things off, we will begin with the configuration file, which is in JSON format.

In our case, we will use Docker, so we will have to come up with a Docker Compose file that has all the services we need. These include:

  • zookeeper - handling metadata
  • Kafka - for streaming
  • postgres (15) - our source database
  • Cassandra (4.1) - our sink db
  • debezium/connect - a tool to allow us to stream data from PostgreSQL to Cassandra
  • kafka-ui - Graphical User Interface to view our streamed data

Let's break down each service component.

Postgres

  postgres:
    image: postgres:15
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: test
      POSTGRES_PASSWORD: root
      POSTGRES_DB: test
    command: >
      postgres -c wal_level=logical
               -c max_wal_senders=10
               -c max_replication_slots=10
               -c max_connections=200
    volumes:
      - postgres_data:/var/lib/postgresql/data
Enter fullscreen mode Exit fullscreen mode

The command property is used to specify arguments to our PostgreSQL. The wal_level being set to logical allows for replication and logs to be shared and streamed in Kafka, which will be used to replicate data in the Cassandra DB

Cassandra

  cassandra:
    image: cassandra:4.1
    container_name: cassandra
    ports:
      - "9042:9042"
    environment:
      CASSANDRA_CLUSTER_NAME: "cdc-cluster"
      CASSANDRA_NUM_TOKENS: 16
      CASSANDRA_DC: datacenter1
      CASSANDRA_RACK: rack1
    volumes:
      - cassandra_data:/var/lib/cassandra
Enter fullscreen mode Exit fullscreen mode

This is just a simple configuration to load our simple sink database.

Connect

This is where we use Debeziu,m, and there are a few things we need to do so that it can transmit the data to these data systems.

  connect:
    image: debezium/connect:2.7.3.Final
    container_name: debezium
    depends_on:
    - kafka
    - postgres
    ports:
    - "8083:8083"
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      BOOTSTRAP_SERVERS: kafka:9092
      HOST_NAME: "connect"
      ADVERTISED_HOST_NAME: "connect"
      ADVERTISED_PORT: "8083"
      KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      OFFSET_FLUSH_INTERVAL_MS: "60000"
      OFFSET_FLUSH_TIMEOUT_MS: "5000"
      SHUTDOWN_TIMEOUT: "10000"
      HEAP_OPTS: "-Xms512M -Xmx2G"
      LOG_LEVEL: "INFO"
      ENABLE_APICURIO_CONVERTERS: "false"
      ENABLE_DEBEZIUM_SCRIPTING: "false"
      KAFKA_CONNECT_PLUGINS_DIR: /kafka/connect,/kafka/connect/plugins
    volumes:
      - connect_data:/kafka/connect
      - ../plugins:/kafka/connect/plugins
Enter fullscreen mode Exit fullscreen mode

The main things to focus on in this configuration are the volumes. By default, Debezium has a pre-installed PostgreSQL plugin, which is located in the /kafka/connect directory. This means that you do not need to install the PostgreSQL plugin. All you have to do is provide that path to the environment variable KAFKA_CONNECT_PLUGINS_DIR. For Cassandra, it is a little bit different since you have to source the plugin. Once installed in your system, extract the file and locate a Kafka sink connector file with the extension .jar. This is what we will place in our root directory and mount it in the /kafka/connect/plugins directory. This will allow our Docker container to recognize the plugin, and we will be able to register our sink connector.

Similar to what we did for Postgres, we need to add this path to the environment variable KAFKA_CONNECT_PLUGINS_DIR.

After adding the zookeeper, Kafka, and Kafka UI images (I have not provided the snippets since they require the basic configuration and are easy to set up), we will now create identical tables in both PostgreSQL and Cassandra.

Creating Tables

For Postgres:

CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(150),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Insert some sample rows
INSERT INTO customers (name, email) VALUES
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com');
Enter fullscreen mode Exit fullscreen mode

For Cassandra:

CREATE KEYSPACE IF NOT EXISTS cdc_demo
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

USE cdc_demo;

CREATE TABLE customers (
    id INT PRIMARY KEY,
    name TEXT,
    email TEXT,
    created_at TIMESTAMP
);
Enter fullscreen mode Exit fullscreen mode

Registering the connectors

Postgres configuration: postgres_connector.json

{
  "name": "postgres-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "postgres", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname": "postgres", 
    "topic.prefix": "test", 
    "plugin.name": "pgoutput",
    "snapshot.mode": "initial"
  }
}
Enter fullscreen mode Exit fullscreen mode

Save this config to a JSON file and run this command to register the connector:

curl -X POST -H "Content-Type: application/json" \
     --data @postgres_connector.json \
     http://localhost:8083/connectors

Enter fullscreen mode Exit fullscreen mode

Cassandra Configuration: cassandra_connector.json

{
  "name": "stocks-sink",
  "config": {
    "connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
    "tasks.max": "1",
    "topics": "test.public.customers,
    "contactPoints": "cassandra",
    "loadBalancing.localDc": "datacenter1",
    "port": 9042,
    "auth.provider": "None",
    "ssl.provider": "None",
    "topic.test.public.cdc_demo.customers.mapping": 
    “id=value.id, name=value.name, email=value.email, created_at=value.created_at”,
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}
Enter fullscreen mode Exit fullscreen mode

Save this config to a JSON file and run this command to register the connector:

curl -X POST -H "Content-Type: application/json" \
     --data @cassandra_connector.json \
     http://localhost:8083/connectors

Enter fullscreen mode Exit fullscreen mode

If we look into our Cassandra database now, we will be able to see the records that are in our PostgreSQL database. If we insert more data into our Postgres Database, this will also reflect in our Cassandra database.

What we have just done is referred to as CDC (Change Data Capture). Essentially, this involves having a source database and a sink database. This allows for availability since we can retrace our steps in case of failure from the source database in our sink database. This is one of the applications of Kafka Connect.

How Kafka is Used in the Industry

To explain how Kafka is used in the real world, we will use the example of a popular company, Uber.

Uber operates at a massive scale:

  • Millions of ride requests, driver updates, payments, GPS events, and ETA predictions per second.

  • They need to process events in real-time for features like surge pricing, live tracking, fraud detection, and customer notifications.

Example Use Case

When a rider presses “Request Ride” in the Uber app, the action is published as an event into a Kafka topic called rides. Since Kafka topics are partitioned, this request can be processed in parallel with millions of others, which helps Uber handle high throughput at scale. At the same time, every driver’s phone sends GPS updates to another Kafka topic called locations. Kafka’s durability makes sure these events are stored safely on disk, copied across brokers, and can be replayed if needed. Uber’s matching service consumes data from both rides and locations topics in real time, and Kafka’s consumer groups allow multiple consumers to share the work of processing events. Kafka also guarantees ordering within partitions, so a driver’s GPS updates are processed in the correct order, which helps the system calculate accurate ETAs. Once a rider-driver match is made, the result is published into another topic called matches, which is consumed by the notification service to update both apps instantly.

Top comments (0)