Problem Statement
When designing a high-throughput data pipeline using Apache Kafka, one of the first challenges developers face is consumer scalability—especially when constrained by partition limits.
In Kafka, a partition is the unit of parallelism. A given consumer group can have at most one consumer per partition, and each partition is assigned to only one consumer within a group. This implies that, regardless of the processing power available, the parallelism is bound by the number of partitions.
But what happens when your application needs to process thousands of messages per second, and scaling by partition count is not an option?
The Challenge
Suppose an application is required to handle extremely high message throughput. However, due to architectural limitations, the number of Kafka partitions per topic—and by extension, per consumer group—is fixed. This constraint imposes a hard limit on the throughput achievable by each consumer group, regardless of the available hardware resources.
Adding more consumers to the group doesn't solve the problem, as Kafka assigns one consumer per partition within a group. Any additional consumers remain idle, unable to process messages since there are no extra partitions to be assigned.
The Solution: Spring Kafka Batch Listener + Async Executor
To overcome this limitation, the application leveraged Spring Kafka's Batch Listener functionality in combination with asynchronous processing using an AsyncTaskExecutor. This approach enabled parallel processing of multiple messages within the same partition and under a single consumer thread. As a result, it provided fine-grained control over throughput without the need to increase the number of Kafka partitions.
Step 1: Enable Batch Processing with Manual Offset Commit and Controlled Throughput
To enable high-throughput message processing while working around Kafka's partition-per-consumer constraint, batch processing must be enabled in the Kafka consumer configuration. Additionally, manual offset commits should be used to ensure messages are only acknowledged after successful processing, especially when processing asynchronously.
This step involves:
- Enabling batch mode.
- Setting a controlled batch size using max.poll.records.
- Disabling auto-commit.
- Enabling manual acknowledgment mode.
- Registering a custom error handler (in later steps).
Kafka Consumer Configuration
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-throughput-consumer-group");
// Disable auto-commit to allow manual offset control
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Set the maximum number of records to be returned in a single poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Tune based on throughput needs
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
Kafka Listener Container Factory with Batch + Manual Acknowledgment
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(CommonErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Enable batch listener mode
factory.setBatchListener(true);
// Set concurrency (number of consumer threads); 1 per partition
factory.setConcurrency(1); // Increase only if topic has multiple partitions
// Set acknowledgment mode to manual
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// Register common error handler
factory.setCommonErrorHandler(errorHandler);
return factory;
}
Explanation
- batchListener = true Enables delivery of messages as a list instead of single record.
- max.poll.records = 500 Controls how many records to pull in a single poll (batch size).
- enable.auto.commit = false Prevents Kafka from committing offsets automatically.
- AckMode.MANUAL Gives the application explicit control over when offsets are committed.
- concurrency = 1 Keeps one thread per partition; more threads only help with >1 partitions.
With this configuration, Kafka will deliver up to 500 messages at once to the listener, and the application will be responsible for acknowledging them only after they have been successfully processed—which will be handled in the next steps using an async executor and a common error handler.
Step 2: Configure a Common Error Handler
When processing messages in batch mode, especially with asynchronous execution, robust error handling is essential to avoid repeated failures or data loss. Spring Kafka provides the CommonErrorHandler interface to manage such scenarios.
Why Use It
- It defines how to handle exceptions during batch processing.
- You can configure retry attempts and backoff timing.
- It allows you to skip failed messages or forward them to a Dead Letter Topic (DLT).
Default Error Handler with Retry Logic
@Bean
public CommonErrorHandler errorHandler() {
// Retry the batch up to 3 times with a 1-second interval
FixedBackOff backOff = new FixedBackOff(1000L, 3);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(backOff);
// Optional: Log retries for visibility
errorHandler.setRetryListeners((record, ex, deliveryAttempt) -> {
System.err.printf("Retry attempt %d for record %s due to %s%n",
deliveryAttempt, record, ex.getMessage());
});
return errorHandler;
}
This error handler can be customized further to stop retries, skip certain exceptions, or route failed records to a DLT.
Step 3: Process Batch Asynchronously Using CompletableFuture with Exception Capture
This approach uses CompletableFuture.runAsync to process each record asynchronously. The handle method is used to capture exceptions without immediately failing, allowing you to collect the processing results for all records first. After all futures complete, the listener inspects the results and throws a BatchListenerFailedException if any record failed, including the index of the first failed record.
Listener Code Using CompletableFuture
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
@Autowired
private ExecutorService executorService;
@KafkaListener(topics = "high-throughput-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) throws Exception {
List<CompletableFuture<RecordProcessingResult>> futures = new ArrayList<>();
for (int i = 0; i < records.size(); i++) {
final int index = i;
final ConsumerRecord<String, String> record = records.get(i);
CompletableFuture<RecordProcessingResult> future = CompletableFuture.runAsync(() -> {
processRecord(record); // your business logic here
}, executorService).handle((res, ex) -> {
if (ex != null) {
// Capture exception and record index
return new RecordProcessingResult(false, index, ex.getCause());
} else {
return new RecordProcessingResult(true, -1, null);
}
});
futures.add(future);
}
// Wait for all to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// Check results for failure
for (CompletableFuture<RecordProcessingResult> future : futures) {
RecordProcessingResult result = future.get();
if (!result.isSuccess()) {
// Throw with failed record index and cause
throw new BatchListenerFailedException(
"Processing failed at record index " + result.getFailedIndex(),
result.getException(),
result.getFailedIndex()
);
}
}
// If all succeed, acknowledge offsets
ack.acknowledge();
}
Explanation
- Each record is processed asynchronously via CompletableFuture.runAsync.
- The handle method captures exceptions without failing the entire future immediately.
- The results from all futures are collected and inspected after completion.
- If any record fails, a BatchListenerFailedException is thrown with the failed record's index, allowing Spring Kafka's error handler to react accordingly.
- If all succeed, the offset is acknowledged manually.
How Spring Kafka Reacts to BatchListenerFailedException
When a BatchListenerFailedException is thrown by a batch listener, Spring Kafka uses this exception to determine which record in the batch caused the failure (via the failed index provided) and manages offset commits and retries accordingly.
Offset Commit Behavior
Offsets up to (but not including) the failed record’s offset are considered successfully processed and will be committed if manual commit is enabled and configured properly.
Offsets starting from the failed record's offset onward are not committed. These messages remain uncommitted in Kafka.
What This Means in Practice
- Suppose a batch contains 100 messages (index 0 to 99).
- If the record at index 30 fails, Spring Kafka:
- Commits offsets for records 0 to 29 (the successfully processed records).
- Does not commit offsets for records 30 to 99.
- Keeps the uncommitted messages available for retry on the next poll.
Retry Behavior
- On the next poll, Kafka will redeliver the batch starting from the failed offset (index 30 in the example).
- Depending on the configured error handler (e.g., DefaultErrorHandler), the batch will be retried according to retry/backoff policy.
- This ensures no messages are lost or skipped, and processing resumes from the point of failure.
Benefits
- Exactly-once processing semantics are easier to approximate since offsets for successfully processed messages are committed.
- Fine-grained control: the consumer can handle partial batch failures without reprocessing the entire batch.
- Helps avoid the "poison pill" problem, especially when combined with dead-letter topics (DLTs) for permanently failing records.
Important Considerations
- To enable this behavior, the consumer must be configured with: Manual acknowledgment mode (AckMode.MANUAL).
- BatchListenerFailedException thrown with the failed record’s index.
- The CommonErrorHandler must be configured to respect this exception and manage retries accordingly.
- If a batch contains multiple failures, the first failure index typically determines the commit boundary.
Step 4: Tune Thread Pool and Batch Size for Optimal Throughput
- Balancing throughput and resource usage is key.
- Increase the max.poll.records value (e.g., 500 or 1000) to pull larger batches.
- Adjust your ExecutorService thread pool size based on available CPU cores and expected processing complexity.
- Monitor your application and Kafka broker performance metrics to tune these parameters.
Executor tuning:
@Bean
public ExecutorService executorService() {
int coreCount = Runtime.getRuntime().availableProcessors();
return Executors.newFixedThreadPool(coreCount * 2);
}
Conclusion
Handling very high message throughput in Kafka while facing partition limitations requires thoughtful architectural and coding strategies. Leveraging Spring Kafka’s batch listener with manual offset commits, combined with asynchronous processing using CompletableFuture or an executor service, enables efficient parallel processing within a single consumer thread. This approach provides fine-grained control over throughput without the need to increase Kafka partitions.
By integrating robust error handling with CommonErrorHandler and throwing BatchListenerFailedException with the failed record’s index, the system ensures reliable processing, precise offset management, and seamless retries. Further enhancements such as Dead Letter Topics and careful tuning of batch sizes and thread pools help to build a resilient and scalable Kafka consumer application.
Monitoring and graceful shutdown complete the setup, providing operational visibility and robustness. Overall, this design decouples processing throughput from Kafka’s partitioning constraints, unlocking high-performance stream processing capabilities in a controlled and maintainable way.
Top comments (0)