DEV Community

loading...

Starting and stopping a Kafka listener after Spring Boot Startup

arsenalist profile image Zarar Siddiqi ・1 min read

The example's here.

The relevant bit:


    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public String start() {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("fooGroup");
        listenerContainer.start();
        ...
    }

    @KafkaListener(id = "fooGroup", topics = "topic1", autoStartup = "false")
    public void listen(Foo2 foo) {
        ...
    }

Discussion (1)

pic
Editor guide
Collapse
cerrebos profile image
Cerrebos • Edited

Hi ! Thanks for your example!

I thought it would solve my issue but it doesnt though, maybe you have had the same : I'm using the same configuration (very standard kafka listener with spring boot config).

Weirdly, it seems like the messages "stored in the kafka broker while the listener was down" are treated in only one listener thread, making is slow to process.
However, after all the buffered messages are threated slowly in one thread, even though they were produced in different partitions, the listener start behaving "normally" again, processing in parallel (from the "setConcurrency(200)" in KafkaListeneContainerFactory + having 200 partitions for the topic in kafka).

It's as if the kafka listener was creating itself, then connecting to kafka and stating "I'm going to handle all these messages who were not consumed here!", and when it's done, the other "listener thread" are assigned to their respective partitions.

I tried starting it manually with your method, thinking maybe there was a hidden mechanism making that but with no success ...same behavior

it's the same weird behavior than here : stackoverflow.com/questions/537514... but the question was unfortunately unanswered ....

If anyone has any idea how to solve that, I've been struggling...It's important as I want my consumer to be able to be off for a while without being too slow when it restarts to treat pending messages stored by the consumer during any downtime.