loading...
Cover image for Concurrency in Spring's StreamListener and Kafka
Playtomic

Concurrency in Spring's StreamListener and Kafka

sgmoratilla profile image Sergio Garcia Moratilla ・2 min read

TL;DR: go to Use Configuration

Another too fast, too furious post. I have spent a few hours trying to make my event processor multi-threaded, and it's so damn easy that I don't want anyone to spend more than a few minutes.

We are using the Spring Cloud Stream layer to configure our Kafka consumers.

For example, a configuration for a processor named 'reservations-input' connected to a Kafka topic 'reservations-topic' would be similar to this:

spring.cloud.stream:
  bindings:
    reservations-input:
      content-type: application/json
      destination: reservations-topic
      group: consumer-service-group

And your class to start processing those events:

@EnableBinding({
    MessagingConfiguration.ReservationTopic.class})
public class MessagingConfiguration {
    public interface ReservationTopic {

        String INPUT = "reservations-channel";

        @Input(INPUT)
        SubscribableChannel input();
    }
}

@Service
public class ReservationProcessor {
    @StreamListener(MessagingConfiguration.ReservationTopic.INPUT)
    public void handle(@Nonnull Message<ReservationEvent> reservationMessage) {
        // your stuff
    }

Easy peasy. Only problem here is concurrency.

If you have used Kafka before, you would know that the number of partitions in your topic limits the concurrency.
Each partition have 1 single consumer.

I don't know whether (or where) I read that, but I assumed that my application would generate as many threads/consumers as partitions my topic has. But I was wrong. By default, Spring's only generates 1-threaded processor.

Solutions? Get more instances of your application or configure ConcurrentKafkaListenerContainerFactory to be able to throw more threads (see https://docs.spring.io/spring-kafka/docs/2.3.x/reference/html/#container-factory).

Option 1: create your own instance of ConcurrentKafkaListenerContainerFactory.

The only hint I found in the documentation or stackoverflow but to instance a bean of type ConcurrentKafkaListenerContainerFactory

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
        @Nonnull ConsumerFactory<String, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(5);

        return factory;
    }

I am not very prone to instance my own beans to configure things that seems too obvious. It is easy to overwrite some Spring default values that I am already expecting to use, it is more code to maintain...

There has to be a way through configuration.

Option 2: use configuration

Getting back to configuration, what we write under spring.cloud.stream.bindings.channel-name.consumer ends in the configuration of Kafka. So that I tried to configure the property concurrency. That is:

spring.cloud.stream:
  bindings:
    reservations-input:
      content-type: application/json
      consumer.concurrency: 3
      destination: reservations-topic
      group: consumer-service-group

Starting our application, we see that we have 3 binders.
Profit!

    December 17th 2019, 14:22:57.274    2019-12-17 13:22:57.274  INFO [consumer-service,,,] 1 --- [container-1-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-1]
    December 17th 2019, 14:22:57.259    2019-12-17 13:22:57.259  INFO [consumer-service,,,] 1 --- [container-2-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-2]
    December 17th 2019, 14:22:57.256    2019-12-17 13:22:57.256  INFO [consumer-service,,,] 1 --- [container-3-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-3]

Playtomic

Playtomic is the biggest network of sport reservations in Spain. Through either our app or web, users can find and book sport activities.

Discussion

pic
Editor guide