DEV Community

Shikha
Shikha

Posted on

How can i stop my kafka consumer from consuming messages ?

I am using below functions but my kafkaListener is keeps on consuming messages even if my consumer is in paused state.

import org.apache.kafka.clients.consumer.Consumer;

private Consumer<String, String> kafkaConsumer;

public void pauseKafkaConsumer() {
  if (!kafkaConsumer.paused().isEmpty()) {
     return;
  }
  // Pause all assigned partitions
  Collection<TopicPartition> assignedPartitions = 
  kafkaConsumer.assignment();
  kafkaConsumer.poll(0);
  kafkaConsumer.pause(assignedPartitions);
}

// Resume the Kafka consumer
public void resumeKafkaConsumer() {
  // Resume all paused partitions
  Collection<TopicPartition> pausedPartitions = 
  kafkaConsumer.paused();
      kafkaConsumer.resume(pausedPartitions);
}


@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.groupId}'}", containerFactory = "kafkaListenerContainerFactory")
public void consume(String stream, Consumer<?, ?> consumer, Acknowledgment acknowledgment) {
  this.kafkaConsumer = (Consumer<String, String>) consumer;


  if (getEventCount() > 10) {   
        pauseKafkaConsumer();
    return; 
  }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)