The example's here.
The relevant bit:
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
public String start() {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("fooGroup");
listenerContainer.start();
...
}
@KafkaListener(id = "fooGroup", topics = "topic1", autoStartup = "false")
public void listen(Foo2 foo) {
...
}
Top comments (1)
Hi ! Thanks for your example!
I thought it would solve my issue but it doesnt though, maybe you have had the same : I'm using the same configuration (very standard kafka listener with spring boot config).
Weirdly, it seems like the messages "stored in the kafka broker while the listener was down" are treated in only one listener thread, making is slow to process.
However, after all the buffered messages are threated slowly in one thread, even though they were produced in different partitions, the listener start behaving "normally" again, processing in parallel (from the "setConcurrency(200)" in KafkaListeneContainerFactory + having 200 partitions for the topic in kafka).
It's as if the kafka listener was creating itself, then connecting to kafka and stating "I'm going to handle all these messages who were not consumed here!", and when it's done, the other "listener thread" are assigned to their respective partitions.
I tried starting it manually with your method, thinking maybe there was a hidden mechanism making that but with no success ...same behavior
it's the same weird behavior than here : stackoverflow.com/questions/537514... but the question was unfortunately unanswered ....
If anyone has any idea how to solve that, I've been struggling...It's important as I want my consumer to be able to be off for a while without being too slow when it restarts to treat pending messages stored by the consumer during any downtime.