DEV Community

AutoMQ
AutoMQ

Posted on

How to Achieve 2x Partition Write Performance for Kafka

Introduction
Writing to a single partition holds significant value in scenarios requiring globally ordered messages. In some strictly ordered environments, it is necessary to set the number of partitions to one and use only a single producer to send data, ensuring that consumers can read all data in the original order. At this time, the single partition write performance of Kafka will determine the throughput ceiling of the entire system. In our practice, we have found that Kafka, due to constraints in its thread model implementation, has not fully exploited the potential of single partition write performance. This article will delve into the deficiencies of the Kafka thread model and how AutoMQ has improved upon it to achieve better single partition write performance.

Analysis of Apache Kafka®'s Serial Processing Model
Apache Kafka's serial processing model network framework primarily consists of five classes:

  • SocketServer: The core class of the network framework, including the Acceptor and Processor segments
    • Acceptor: Listens on ports, handles new connection requests, and distributes connections to the Processor;
    • Processor: A network thread, configured by the num.network.threads setting. Each TCP connection is handled exclusively by one Processor. The Processor#run method drives the subsequent lifecycle management of the connection, parsing requests from the network and writing responses back to the network;
  • KafkaChannel: An abstraction of a single TCP connection, maintains the state information of the connection, held by the Processor;
  • RequestChannel: After the Processor parses requests from the network, it places them into the single-queue RequestChannel, from where the KafkaRequestHandler pulls them for concurrent multi-threaded processing;
  • KafkaRequestHandler: Handles business logic and IO operations, configured through num.io.threads. After receiving requests from the RequestChannel, it invokes KafkaApis to process the business logic.
  • KafkaApis: A specific class for handling business logic, which dispatches requests to different methods based on the type of request. The core classes of the network framework and their interactions correspond to the thread model of Apache Kafka as shown in the figure below:

Image description

As can be seen, Kafka's thread model is similar to the server programs we develop using Netty:

  • kafka-socket-listener corresponds to the Boss EventLoopGroup: responsible for accepting client connections. When a new connection arrives, the Boss EventLoopGroup accepts the connection and registers it with the Worker EventLoopGroup;
  • kafka-network-thread corresponds to the Worker EventLoopGroup: handles all I/O events of the connection, including reading data, writing data, and managing lifecycle events of the connection;
  • Kafka-request-handler: To prevent business logic from blocking network threads, business logic is typically offloaded to a separate thread pool for asynchronous execution; So why is Apache Kafka considered to have a serial processing model? This is related to its KafkaChannel mute state machine, which is illustrated in the figure below:

Image description

  • Receiving Requests: When a Processor parses a complete request from the network, it first adds the request to the RequestChannel, then calls the #mute method to change the KafkaChannel's state from NOT_MUTE to MUTE, and sends a REQUEST_RECEIVED event to transition to MUTE_AND_RESPONSE_PENDING state. Note: Until a corresponding response is received for this request, the Processor will not attempt to read more requests from connections that are not in the NOT_MUTE state (Processor#processCompletedReceives).
  • Returning Responses: Once KafkaApis has processed the request and is ready to return the response to the KafkaChannel, it first sends a RESPONSE_SENT event to change the state from MUTE_AND_RESPONSE_PENDING to MUTE, then calls the #unmute method to change the state to NOT_MUTE. At this point, the Processor will parse more requests from this connection (Processor#processNewResponses).
  • Quota Limitations: The flow control process caused by quota limitations is not covered in this article, but those interested can delve further into the Processor class. The state machine in Apache Kafka ensures that for each connection, there is only one request being processed at a time, and it will only continue to process the next request once the previous one has been completed and responded to. This is why Apache Kafka is known for its serial processing model. In a message production request scenario, assume a 1MB message production request requires 5ms for network parsing, validation, ordering, and persistence (ISR sync/disk flushing), then the processing capability limit of a connection is 200 requests per second, and the throughput limit for a single producer per partition is also 200MB/s. As illustrated below, even if the client sets max.in.flight.requests.per.connection = 5, MSG1 to MSG4 "arrive" at the server "simultaneously", MSG4 must wait until the previous three requests have been processed and responded before it can begin processing, resulting in a total send time for MSG4 of 4T.

Image description

Given that the serial processing model is not that efficient, why does Apache Kafka choose this design?
One of the core reasons: Through the serial processing model, Apache Kafka can more simply ensure the sequential handling of single connection requests. For example, in transactions where multiple messages are sent, the messages carry sequence numbers to denote order, and the Broker checks the sequence number of each request before persisting the messages. If the sequence numbers are not incrementally increasing, an OUT_OF_ORDER_SEQUENCE_NUMBER error is returned. If these requests were processed in parallel after being parsed from the network, it could lead to message disorder issues.

AutoMQ Pipeline Processing Model
Is there a method that can ensure the sequential processing of requests while also being efficient?
First, let's consider sequentiality. The sequentiality requirements of Apache Kafka® manifest in three stages:

  • Network Parsing: Since the Kafka protocol is based on the TCP protocol, network parsing is inherently sequential and serial. It requires reading the data from the previous request completely before reading the next request;
  • Validation & Ordering: Requests on a single connection must be processed sequentially for validation and ordering to avoid message disorder;
  • Persistence: The order of message storage on disk must align with the order of message transmission; To summarize, sequentiality equates to: serial processing in network parsing, serial processing in validation & ordering, and sequential persistence. Astute readers might realize that "serial processing within the three stages" does not equate to "serial processing between the three stages." Thus, the secret to efficiency lies in how to parallelize and accelerate these three stages. Consequently, AutoMQ optimizes Kafka's processing model into a pipeline mode, referencing the CPU's pipeline architecture, balancing both orderliness and efficiency:
  • Orderliness: TCP connections are bound to threads, ensuring that for any given TCP connection, there is only one network thread parsing requests, and only one RequestHandler thread managing business logic;
  • Efficiency:
    • Pipeline processing at different stages allows the network thread to parse MSG1 and then immediately proceed to parse MSG2 without waiting for the persistence of MSG1 to complete. Similarly, once the RequestHandler has completed the verification and ordering of MSG1, it can immediately start processing MSG2;
    • To further enhance the efficiency of persistence, AutoMQ also batches data for flushing and persistent storage; Under the same conditions, where Apache Kafka previously required 4T to process four batches of messages, AutoMQ's pipeline processing model reduces the processing time to approximately 1.x T.

Image description

Let's now explore from an implementation perspective how AutoMQ achieves a pipeline processing model.
First, the state machine of KafkaChannel's mute state has been simplified, retaining only two states: MUTE and NOT_MUTE. Unlike before, it no longer mutes the corresponding connections upon receiving requests, nor does it serially process requests throughout the entire chain. This approach fully leverages the capabilities of the network parsing layer, continuously parsing new requests from connections. Additionally, to support Quota capabilities and prevent excessive Inflight requests from causing memory OOM, a Flag has been added to indicate the reasons for the current MUTE state. A connection will revert to a NOT_MUTE readable state only when the Flag is cleared.

Image description

After optimizing the network layer's processing efficiency, let's examine how the three-phase parallelization achieves sequential processing at the business logic layer.
AutoMQ has transformed RequestChannel into a multi-queue structure:

  • Each queue is directly mapped to a KafkaRequestHandler, and the number of handlers is consistent with the number of queues;
  • After the Processor has parsed a request, it routes the request to a specific queue based on the hash(channelId) % N; Through the multi-queue model, requests from the same connection are placed into the same queue and handled by the specific KafkaRequestHandler responsible for business logic processing, ensuring sequential processing within the validation and ordering phases.

Image description

Additionally, to further enhance the efficiency of persistence, AutoMQ also batches data for disk flushing and persistence:

  • When handling message production requests, the KafkaRequestHandler, after validation and sequencing, can proceed to the next request without waiting for data persistence, enhancing the utilization of business logic processing threads;
  • In the background, AutoMQ's storage thread triggers disk flushing based on batch size and time, and once persistence is successfully completed, it asynchronously returns a response to the network layer, boosting the efficiency of data persistence;

Optimization Effect Test
Test Environment Setup
To ensure the selection of appropriate ECS and EBS specifications and to guarantee that neither compute nor storage becomes a bottleneck, the following models and cloud disks were chosen for this test:

  • r6i.8xlarge: 32C256G, EBS throughput baseline 1250 MB/s;
  • System disk EBS volume: 5000 IOPS, throughput baseline 1000 MB/s; Broker configuration uses log.flush.interval.messages=1: Under identical hardware specifications, forced disk flushing is used to simulate the delay in multi-AZ replica synchronization of Apache Kafka ISR, aligning the persistence levels of both Apache Kafka and AutoMQ;

The versions of Kafka and AutoMQ used in the test are as follows:

Stress test script
Using Kafka's built-in tool scripts to simulate test loads

# 压测目标吞吐 350MB/s
bin/kafka-producer-perf-test.sh --topic perf --num-records=480000 --throughput 6000 --record-size 65536 --producer-props bootstrap.servers=localhost:9092 batch.size=1048576 linger.ms=1

# 压测目标吞吐 150 MB/s
bin/kafka-producer-perf-test.sh --topic perf --num-records=480000 --throughput 2400 --record-size 65536 --producer-props bootstrap.servers=localhost:9092 batch.size=1048576 linger.ms=1
Enter fullscreen mode Exit fullscreen mode

Test result analysis
The comparison of ultimate throughput performance for a single producer and a single partition is as follows. From the list of test results, we can see:

  • The ultimate throughput of AutoMQ is twice that of Apache Kafka, reaching 350MB/s;
  • Under ultimate throughput, the P99 latency of AutoMQ is 1/15 of that of Apache Kafka, just 11ms;

Image description

Conclusion
AutoMQ has enhanced the serial processing model of Apache Kafka® into a pipeline processing model through optimizations in its network handling, substantially increasing the write performance of single partitions. This improvement allows for single partition global order messages to meet the performance requirements of more scenarios. Despite the significant gains in peak throughput and reduced latency with the pipeline processing model, it is still recommended to find a reasonable method of data partitioning to avoid scenarios with a single producer and single partition, and to prevent partition hotspots as much as possible. The capability of a single partition always has its limits. Excessively increasing the throughput of a single partition not only results in larger granularity of cluster elasticity, reducing the cost-effectiveness of scalability, but also poses challenges to the non-scalable processing power of downstream consumers.

Top comments (0)