DEV Community

Shikha
Shikha

Posted on

How can i stop my kafka consumer from consuming messages ?

I am using below functions but my kafkaListener is keeps on consuming messages even if my consumer is in paused state.

import org.apache.kafka.clients.consumer.Consumer;

private Consumer<String, String> kafkaConsumer;

public void pauseKafkaConsumer() {
  if (!kafkaConsumer.paused().isEmpty()) {
     return;
  }
  // Pause all assigned partitions
  Collection<TopicPartition> assignedPartitions = 
  kafkaConsumer.assignment();
  kafkaConsumer.poll(0);
  kafkaConsumer.pause(assignedPartitions);
}

// Resume the Kafka consumer
public void resumeKafkaConsumer() {
  // Resume all paused partitions
  Collection<TopicPartition> pausedPartitions = 
  kafkaConsumer.paused();
      kafkaConsumer.resume(pausedPartitions);
}


@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.groupId}'}", containerFactory = "kafkaListenerContainerFactory")
public void consume(String stream, Consumer<?, ?> consumer, Acknowledgment acknowledgment) {
  this.kafkaConsumer = (Consumer<String, String>) consumer;


  if (getEventCount() > 10) {   
        pauseKafkaConsumer();
    return; 
  }
}
Enter fullscreen mode Exit fullscreen mode

Heroku

This site is built on Heroku

Join the ranks of developers at Salesforce, Airbase, DEV, and more who deploy their mission critical applications on Heroku. Sign up today and launch your first app!

Get Started

Top comments (0)

Billboard image

Create up to 10 Postgres Databases on Neon's free plan.

If you're starting a new project, Neon has got your databases covered. No credit cards. No trials. No getting in your way.

Try Neon for Free →