<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Shikha</title>
    <description>The latest articles on DEV Community by Shikha (@shikha_d3b255ec768d55094f).</description>
    <link>https://dev.to/shikha_d3b255ec768d55094f</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1854819%2Fe62dd539-2ad0-435e-a7dd-d8a4523ac97f.png</url>
      <title>DEV Community: Shikha</title>
      <link>https://dev.to/shikha_d3b255ec768d55094f</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/shikha_d3b255ec768d55094f"/>
    <language>en</language>
    <item>
      <title>How can i stop my kafka consumer from consuming messages ?</title>
      <dc:creator>Shikha</dc:creator>
      <pubDate>Mon, 29 Jul 2024 08:19:02 +0000</pubDate>
      <link>https://dev.to/shikha_d3b255ec768d55094f/how-can-i-stop-my-consumer-from-consuming-messages--47a0</link>
      <guid>https://dev.to/shikha_d3b255ec768d55094f/how-can-i-stop-my-consumer-from-consuming-messages--47a0</guid>
      <description>&lt;p&gt;I am using below functions but my kafkaListener is keeps on consuming messages even if my consumer is in paused state.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import org.apache.kafka.clients.consumer.Consumer;

private Consumer&amp;lt;String, String&amp;gt; kafkaConsumer;

public void pauseKafkaConsumer() {
  if (!kafkaConsumer.paused().isEmpty()) {
     return;
  }
  // Pause all assigned partitions
  Collection&amp;lt;TopicPartition&amp;gt; assignedPartitions = 
  kafkaConsumer.assignment();
  kafkaConsumer.poll(0);
  kafkaConsumer.pause(assignedPartitions);
}

// Resume the Kafka consumer
public void resumeKafkaConsumer() {
  // Resume all paused partitions
  Collection&amp;lt;TopicPartition&amp;gt; pausedPartitions = 
  kafkaConsumer.paused();
      kafkaConsumer.resume(pausedPartitions);
}


@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.groupId}'}", containerFactory = "kafkaListenerContainerFactory")
public void consume(String stream, Consumer&amp;lt;?, ?&amp;gt; consumer, Acknowledgment acknowledgment) {
  this.kafkaConsumer = (Consumer&amp;lt;String, String&amp;gt;) consumer;


  if (getEventCount() &amp;gt; 10) {   
        pauseKafkaConsumer();
    return; 
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>kafka</category>
      <category>springboot</category>
      <category>apachekafka</category>
      <category>help</category>
    </item>
  </channel>
</rss>
