DEV Community

Cover image for Concurrency in Spring's StreamListener and Kafka
Sergio Garcia Moratilla for Playtomic

Posted on

Concurrency in Spring's StreamListener and Kafka

TL;DR: go to Use Configuration

Another too fast, too furious post. I have spent a few hours trying to make my event processor multi-threaded, and it's so damn easy that I don't want anyone to spend more than a few minutes.

We are using the Spring Cloud Stream layer to configure our Kafka consumers.

For example, a configuration for a processor named 'reservations-input' connected to a Kafka topic 'reservations-topic' would be similar to this:

spring.cloud.stream:
  bindings:
    reservations-input:
      content-type: application/json
      destination: reservations-topic
      group: consumer-service-group
Enter fullscreen mode Exit fullscreen mode

And your class to start processing those events:

@EnableBinding({
    MessagingConfiguration.ReservationTopic.class})
public class MessagingConfiguration {
    public interface ReservationTopic {

        String INPUT = "reservations-channel";

        @Input(INPUT)
        SubscribableChannel input();
    }
}

@Service
public class ReservationProcessor {
    @StreamListener(MessagingConfiguration.ReservationTopic.INPUT)
    public void handle(@Nonnull Message<ReservationEvent> reservationMessage) {
        // your stuff
    }
Enter fullscreen mode Exit fullscreen mode

Easy peasy. Only problem here is concurrency.

If you have used Kafka before, you would know that the number of partitions in your topic limits the concurrency.
Each partition have 1 single consumer.

I don't know whether (or where) I read that, but I assumed that my application would generate as many threads/consumers as partitions my topic has. But I was wrong. By default, Spring's only generates 1-threaded processor.

Solutions? Get more instances of your application or configure ConcurrentKafkaListenerContainerFactory to be able to throw more threads (see https://docs.spring.io/spring-kafka/docs/2.3.x/reference/html/#container-factory).

Option 1: create your own instance of ConcurrentKafkaListenerContainerFactory.

The only hint I found in the documentation or stackoverflow but to instance a bean of type ConcurrentKafkaListenerContainerFactory

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
        @Nonnull ConsumerFactory<String, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(5);

        return factory;
    }
Enter fullscreen mode Exit fullscreen mode

I am not very prone to instance my own beans to configure things that seems too obvious. It is easy to overwrite some Spring default values that I am already expecting to use, it is more code to maintain...

There has to be a way through configuration.

Option 2: use configuration

Getting back to configuration, what we write under spring.cloud.stream.bindings.channel-name.consumer ends in the configuration of Kafka. So that I tried to configure the property concurrency. That is:

spring.cloud.stream:
  bindings:
    reservations-input:
      content-type: application/json
      consumer.concurrency: 3
      destination: reservations-topic
      group: consumer-service-group
Enter fullscreen mode Exit fullscreen mode

Starting our application, we see that we have 3 binders.
Profit!

    December 17th 2019, 14:22:57.274    2019-12-17 13:22:57.274  INFO [consumer-service,,,] 1 --- [container-1-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-1]
    December 17th 2019, 14:22:57.259    2019-12-17 13:22:57.259  INFO [consumer-service,,,] 1 --- [container-2-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-2]
    December 17th 2019, 14:22:57.256    2019-12-17 13:22:57.256  INFO [consumer-service,,,] 1 --- [container-3-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-3]
Enter fullscreen mode Exit fullscreen mode

Discussion (0)