DEV Community

Cover image for 🔥 What is Apache Kafka and how to perform performance tests on it (Part 2) 👨🏻‍💻
Grzegorz Piechnik
Grzegorz Piechnik

Posted on

🔥 What is Apache Kafka and how to perform performance tests on it (Part 2) 👨🏻‍💻

Event streaming is a technique for processing and transferring data in real time, which entails the transmission of data streams across different network protocols. Its purpose is to facilitate swift data exchange between various components within a system or application.

Apache Kafka serves as a prominent platform for managing real-time events. In a previous article, we looked at the architecture of Apache Kafka. Now let’s examine how to run performance tests on it.

Running Apache Kafka locally

To save time on creating docker-compose files, we will use a ready-made image. Inside the repository we can read that inside the image there is:

  1. A Kafka distribution with Apache Kafka, Kafka Connect, Zookeeper, Confluent Schema Registry and REST Proxy
  2. Lenses.io Lenses or kafka-topics-ui, schema-registry-ui, kafka-connect-ui
  3. Lenses.io Stream Reactor, 25+ Kafka Connectors to simplify ETL processes
  4. Integration testing and examples embedded into the docker

So let's launch the instance with the following command.

docker run --detach --rm --name lensesio -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092  -e ADV_HOST=127.0.0.1 -e RUN_TESTS=0 lensesio/fast-data-dev:latest
Enter fullscreen mode Exit fullscreen mode

After entering the address 127.0.0.1:3000 I get a dashboard from which we can read, among others, created topics or their partitions.

Kafka 1

Kafka 2

Apache Kafka Performance Tests.

Apache Kafka, like any other service, platform or server, has its performance limitations. In order to know them, you need to run performance tests on Apache Kafka's Cluster, which will show acceptation thresholds. One project that can help with this is xk6-kafka.

Xk6-kafka is an extension to the k6 tool that allows performance testing of Apache Kafka via a producer and (possibly) a consumer. We will use the following script to do this.

import { check } from "k6";
import {
    Writer,
    Reader,
    Connection,
    SchemaRegistry,
    SCHEMA_TYPE_STRING,
} from "k6/x/kafka";


const brokers = ["localhost:9092"];
const topic = "xk6_kafka_json_topic";

const writer = new Writer({
    brokers: brokers,
    topic: topic,
    autoCreateTopic: true,
});

const reader = new Reader({
    brokers: brokers,
    topic: topic,
});

const connection = new Connection({
    address: brokers[0],
});

const schemaRegistry = new SchemaRegistry();

if (__VU == 0) {
    connection.createTopic({ topic: topic });
}

export const options = {
    thresholds: {
        kafka_writer_error_count: ["count == 0"],
        kafka_reader_error_count: ["count == 0"],
    },
};

export default function () {
    let messages = [
        {
            key: schemaRegistry.serialize({
                data: "test-key-string",
                schemaType: SCHEMA_TYPE_STRING,
            }),
            value: schemaRegistry.serialize({
                data: "test-value-string",
                schemaType: SCHEMA_TYPE_STRING,
            }),
            headers: {
                mykey: "myvalue",
            },
            partition: 0,
            time: new Date(), // Will be converted to timestamp automatically
        },
        {
            key: schemaRegistry.serialize({
                data: "test-key-string",
                schemaType: SCHEMA_TYPE_STRING,
            }),
            value: schemaRegistry.serialize({
                data: "test-value-string",
                schemaType: SCHEMA_TYPE_STRING,
            }),
            headers: {
                mykey: "myvalue",
            },
        },
    ];

    writer.produce({ messages: messages });

    // Read 2 messages only
    messages = reader.consume({ limit: 2 });

    check(messages, {
        "2 messages are received": (messages) => messages.length == 2,
    });

    check(messages[0], {
        "Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == topic,
        "Key is a string and is correct": (msg) =>
            schemaRegistry.deserialize({
                data: msg.key,
                schemaType: SCHEMA_TYPE_STRING,
            }) == "test-key-string",
        "Value is a string and is correct": (msg) =>
            typeof schemaRegistry.deserialize({
                data: msg.value,
                schemaType: SCHEMA_TYPE_STRING,
            }) == "string" &&
            schemaRegistry.deserialize({
                data: msg.value,
                schemaType: SCHEMA_TYPE_STRING,
            }) == "test-value-string",
        "Header equals {'mykey': 'myvalue'}": (msg) =>
            "mykey" in msg.headers &&
            String.fromCharCode(...msg.headers["mykey"]) == "myvalue",
        "Time is past": (msg) => new Date(msg["time"]) < new Date(),
        "Partition is zero": (msg) => msg["partition"] == 0,
        "High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0,
    });
}

export function teardown(data) {
    if (__VU == 0) {
        connection.deleteTopic(topic);
    }
    writer.close();
    reader.close();
    connection.close();
}
Enter fullscreen mode Exit fullscreen mode

Inside the script, we create a topic (if it does not exist) and send two messages to it. Then we perform a standard check and close the connections and delete the previously created topic. It is important that the number of messages we want to read is not greater than the one we sent - otherwise the tool will return an error.

After running the tool in the console, we get the following summary.

tmp> .\k6.exe run --vus 1 --duration 100s .\scenario.js

          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  ()  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: .\scenario.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 2m10s max duration (incl. graceful stop):
           * default: 1 looping VUs for 1m40s (gracefulStop: 30s)


running (1m40.0s), 0/1 VUs, 32642 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  1m40s

     ✓ 2 messages are received
     ✓ Topic equals to xk6_kafka_json_topic
     ✓ Key is a string and is correct
     ✓ Value is a string and is correct
     ✓ Header equals {'mykey': 'myvalue'}
     ✓ Time is past
     ✓ Partition is zero
     ✓ High watermark is gte zero

     █ teardown

     checks.........................: 100.00% ✓ 261136     ✗ 0
     data_received..................: 0 B     0 B/s
     data_sent......................: 0 B     0 B/s
     iteration_duration.............: avg=3.05ms  min=1.48ms  med=2.74ms max=40.27ms p(90)=3.83ms   p(95)=3.98ms
     iterations.....................: 32642   326.345726/s
     kafka_reader_dial_count........: 1       0.009998/s
     kafka_reader_dial_seconds......: avg=215ns   min=0s      med=0s     max=7.04ms  p(90)=0s       p(95)=0s
   ✓ kafka_reader_error_count.......: 0       0/s
     kafka_reader_fetch_bytes.......: 2.1 MB  21 kB/s
     kafka_reader_fetch_bytes_max...: 1000000 min=1000000  max=1000000
     kafka_reader_fetch_bytes_min...: 1       min=1        max=1
     kafka_reader_fetch_size........: 32662   326.54568/s
     kafka_reader_fetch_wait_max....: 200ms   min=200ms    max=200ms
     kafka_reader_fetches_count.....: 65265   652.501495/s
     kafka_reader_lag...............: 0       min=0        max=0
     kafka_reader_message_bytes.....: 2.1 MB  21 kB/s
     kafka_reader_message_count.....: 65284   652.691452/s
     kafka_reader_offset............: 65284   min=2        max=65284
     kafka_reader_queue_capacity....: 1       min=1        max=1
     kafka_reader_queue_length......: 0       min=0        max=0
     kafka_reader_read_seconds......: avg=16.06µs min=0s      med=0s     max=5.77ms  p(90)=0s       p(95)=260.99µs
     kafka_reader_rebalance_count...: 0       0/s
     kafka_reader_timeouts_count....: 0       0/s
     kafka_reader_wait_seconds......: avg=1.51ms  min=499.9µs med=1.37ms max=19.7ms  p(90)=1.91ms   p(95)=1.98ms
     kafka_writer_acks_required.....: 0       min=0        max=0
     kafka_writer_async.............: 0.00%   ✓ 0          ✗ 32642
     kafka_writer_attempts_max......: 0       min=0        max=0
     kafka_writer_batch_bytes.......: 1.8 MB  18 kB/s
     kafka_writer_batch_max.........: 1       min=1        max=1
     kafka_writer_batch_size........: 32642   326.345726/s
     kafka_writer_batch_timeout.....: 0s      min=0s       max=0s
   ✓ kafka_writer_error_count.......: 0       0/s
     kafka_writer_message_bytes.....: 3.5 MB  35 kB/s
     kafka_writer_message_count.....: 65284   652.691452/s
     kafka_writer_read_timeout......: 0s      min=0s       max=0s
     kafka_writer_retries_count.....: 0       0/s
     > kafka_writer_wait_seconds......: avg=0s      min=0s      med=0s     max=0s      p(90)=0s       p(95)=0s
     kafka_writer_write_count.......: 65284   652.691452/s
     kafka_writer_write_seconds.....: avg=71.73µs min=0s      med=0s     max=1.86ms  p(90)=270.34µs p(95)=276.2µs
     kafka_writer_write_timeout.....: 0s      min=0s       max=0s
     vus............................: 1       min=1        max=1
     vus_max........................: 1       min=1        max=1
Enter fullscreen mode Exit fullscreen mode

As you can see, in 100 seconds, as many as 32,000 messages were sent on a single thread. Pretty fast, right?

Analysis of results

A dedicated solution in the form of the k6 extension allows you to extract a mass of metrics, which can then be used for further analysis. They include, among others:

  • kafka_reader_error_count
  • kafka_reader_wait_seconds
  • kafka_reader_message_count

As you know, console-level test results can be hard to analyze. Therefore, we created a Grafana dashboard to visualize them. You can find it on our repository github. What does it look like from the user level?

Kafka k6 dashboard 1

Kafka k6 dashboard 2

Top comments (0)