DEV Community

Shikha
Shikha

Posted on

How can i stop my kafka consumer from consuming messages ?

I am using below functions but my kafkaListener is keeps on consuming messages even if my consumer is in paused state.

import org.apache.kafka.clients.consumer.Consumer;

private Consumer<String, String> kafkaConsumer;

public void pauseKafkaConsumer() {
  if (!kafkaConsumer.paused().isEmpty()) {
     return;
  }
  // Pause all assigned partitions
  Collection<TopicPartition> assignedPartitions = 
  kafkaConsumer.assignment();
  kafkaConsumer.poll(0);
  kafkaConsumer.pause(assignedPartitions);
}

// Resume the Kafka consumer
public void resumeKafkaConsumer() {
  // Resume all paused partitions
  Collection<TopicPartition> pausedPartitions = 
  kafkaConsumer.paused();
      kafkaConsumer.resume(pausedPartitions);
}


@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.groupId}'}", containerFactory = "kafkaListenerContainerFactory")
public void consume(String stream, Consumer<?, ?> consumer, Acknowledgment acknowledgment) {
  this.kafkaConsumer = (Consumer<String, String>) consumer;


  if (getEventCount() > 10) {   
        pauseKafkaConsumer();
    return; 
  }
}
Enter fullscreen mode Exit fullscreen mode

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs