DEV Community

Shikha
Shikha

Posted on

How can i stop my kafka consumer from consuming messages ?

I am using below functions but my kafkaListener is keeps on consuming messages even if my consumer is in paused state.

import org.apache.kafka.clients.consumer.Consumer;

private Consumer<String, String> kafkaConsumer;

public void pauseKafkaConsumer() {
  if (!kafkaConsumer.paused().isEmpty()) {
     return;
  }
  // Pause all assigned partitions
  Collection<TopicPartition> assignedPartitions = 
  kafkaConsumer.assignment();
  kafkaConsumer.poll(0);
  kafkaConsumer.pause(assignedPartitions);
}

// Resume the Kafka consumer
public void resumeKafkaConsumer() {
  // Resume all paused partitions
  Collection<TopicPartition> pausedPartitions = 
  kafkaConsumer.paused();
      kafkaConsumer.resume(pausedPartitions);
}


@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.groupId}'}", containerFactory = "kafkaListenerContainerFactory")
public void consume(String stream, Consumer<?, ?> consumer, Acknowledgment acknowledgment) {
  this.kafkaConsumer = (Consumer<String, String>) consumer;


  if (getEventCount() > 10) {   
        pauseKafkaConsumer();
    return; 
  }
}
Enter fullscreen mode Exit fullscreen mode

Image of Datadog

Create and maintain end-to-end frontend tests

Learn best practices on creating frontend tests, testing on-premise apps, integrating tests into your CI/CD pipeline, and using Datadog’s testing tunnel.

Download The Guide

Top comments (0)

Heroku

This site is powered by Heroku

Heroku was created by developers, for developers. Get started today and find out why Heroku has been the platform of choice for brands like DEV for over a decade.

Sign Up