<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: RaghuNandan Neerukonda</title>
    <description>The latest articles on DEV Community by RaghuNandan Neerukonda (@nraghu).</description>
    <link>https://dev.to/nraghu</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F379907%2Ff5d1854b-9d4d-4019-949b-d7851bfabb34.jpeg</url>
      <title>DEV Community: RaghuNandan Neerukonda</title>
      <link>https://dev.to/nraghu</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/nraghu"/>
    <language>en</language>
    <item>
      <title>KAFKA - PRODUCER API WITH THIRD PARTY TOOLS</title>
      <dc:creator>RaghuNandan Neerukonda</dc:creator>
      <pubDate>Tue, 23 Jun 2020 11:35:52 +0000</pubDate>
      <link>https://dev.to/nraghu/kafka-producer-api-with-third-party-tools-1j30</link>
      <guid>https://dev.to/nraghu/kafka-producer-api-with-third-party-tools-1j30</guid>
      <description>&lt;p&gt;This is a continuation write-up to the previous post - &lt;a href="https://dev.to/nraghu/kafka-playing-with-consumer-api-using-python-library-3b50"&gt;Kafka - Playing with Consumer API&lt;/a&gt;, where I covered some peculiar use-cases of Consumer API.&lt;br&gt;
Here, I would like to play with Producer API to create a message using third-party tools like MessagePack for serialization and KafDrop for monitoring&lt;/p&gt;
&lt;h3&gt;
  
  
  &lt;a href="https://msgpack.org/index.html"&gt;MessagePack&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;MessagePack is one of the best available schemaless serialization libraries to interchange data between heterogeneous applications. It is a binary for simple data structures and designed for efficient transmission over the wire.&lt;/p&gt;
&lt;h4&gt;
  
  
  Code Snippets
&lt;/h4&gt;

&lt;p&gt;Following examples shows how to pack and unpack the common data types, to work with custom data types like datetime.datetime, check DateTime example&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Simple Example
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight"&gt;&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;msgpack&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;packb&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;unpackb&lt;/span&gt;

&lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;'a'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;msg_packet&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;packb&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;recreated_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;unpackb&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg_packet&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;


&lt;ul&gt;
&lt;li&gt;Using Datetime
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight"&gt;&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;dtm&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;dateutil&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;parser&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;dtm_encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nb"&gt;isinstance&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dtm&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="s"&gt;'__dtm__'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="s"&gt;'obj_as_str'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;isoformat&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;dtm_decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="s"&gt;'__dtm__'&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;isoparse&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'obj_as_str'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt;

&lt;span class="n"&gt;dat&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;'a'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'dates'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;'created_at'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;dtm&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()}}&lt;/span&gt;
&lt;span class="n"&gt;serialized_dat&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;packb&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dat&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;default&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dtm_encode&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;deserialzed_dat&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;unpackb&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;serialized_dat&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;object_hook&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dtm_decode&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;

&lt;h4&gt;
  
  
  Why serialize the messages?
&lt;/h4&gt;

&lt;p&gt;Serialization is the process of transforming an object into a format that it can be stored, at the destination the data can be deserialized to recreate the original object. Since Kafka uses the filesystem to store the messages, we need to serialize and deserialize the data. There are two protocols to serialize a message either by using Schema-based IDL(Interface Definition Language) or using Schemaless. While you can find numerous articles touching these protocols on the web, I have constituted a one-liner differentiating them.&lt;/p&gt;
&lt;h4&gt;
  
  
  Schemaless and Schema-based IDL
&lt;/h4&gt;

&lt;p&gt;In Schema-based IDL, the primitives of the message are pre-defined where publishers and consumers can validate the message before working on it. On the other hand, schemaless can have custom primitives for every message.&lt;br&gt;
If you're having a standard and rigid database schema, you might want to use Apache Avro which is the best fit and popularly used in combination with Apache Kafka. In contrast, if you're working with schemaless or bigdata oriented systems where primitives are highly unpredictable, there is an overhead of maintaining the Avro schemas, and the need for schemaless IDL arises.&lt;br&gt;
These are some of the standard protocols most commonly chosen.&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Protocol&lt;/th&gt;
&lt;th&gt;Type&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;&lt;a href="https://www.thrift.apache.org"&gt;Thrift&lt;/a&gt;&lt;/td&gt;
&lt;td&gt;Schema Based&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;a href="https://avro.apache.org"&gt;Avro&lt;/a&gt;&lt;/td&gt;
&lt;td&gt;Schema Based&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;a href="//www.json.org"&gt;JSON&lt;/a&gt;&lt;/td&gt;
&lt;td&gt;Schemaless&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;a href="https://msgpack.org/index.html"&gt;MessagePack&lt;/a&gt;&lt;/td&gt;
&lt;td&gt;Schemaless&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;a href="http://bsonspec.org/"&gt;BSON&lt;/a&gt;&lt;/td&gt;
&lt;td&gt;Schemaless&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;I chose MessagePack among the list because it has an incredible performance, super simple to setup and start working with, and also much smaller and faster than JSON.&lt;/p&gt;
&lt;h3&gt;
  
  
  &lt;a href="https://github.com/obsidiandynamics/kafdrop"&gt;KafDrop&lt;/a&gt;
&lt;/h3&gt;

&lt;p&gt;
Kafdrop is an opensource web UI for viewing Kafka topics and browsing consumer groups. The tool displays information such as brokers, topics, partitions, consumers, and lets you view messages.
&lt;/p&gt;

&lt;h4&gt;
  
  
  Example screen how a KafDrop looks like
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;Home Screen
Home Screen not only covers the overview, but it also has information about the Brokers and topics available in the cluster&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--BJNtJ7g0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/55m16fthktlvh8gig86g.PNG" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--BJNtJ7g0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/55m16fthktlvh8gig86g.PNG" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Topic Page
Reveals all partitions info of the topic and renders the list of consumer groups subscribed to the topic&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--zQzhRKJz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/4l29zjeuf4lo3fqoopgi.PNG" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--zQzhRKJz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/4l29zjeuf4lo3fqoopgi.PNG" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Features
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;List all topics of the cluster&lt;/li&gt;
&lt;li&gt;Option to browse through a partition data of a topic&lt;/li&gt;
&lt;li&gt;Tracks message offsets of all consumer groups subscribed to a topic&lt;/li&gt;
&lt;li&gt;Create &amp;amp; Delete topics&lt;/li&gt;
&lt;li&gt;Search and filter messages in a partition&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Produce a Message and see in Kafdrop
&lt;/h3&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;dtm&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;msgpack&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;packb&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;unpackb&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;confluent_kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Producer&lt;/span&gt;

&lt;span class="s"&gt;'''
Note: Import or recreate "dtm_encode" function from the Code Snippet section 
'''&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;delivery_report&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;f'Message -&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;, delivery failed: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;f'Message delivered to &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; [&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;]'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="n"&gt;producer_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="s"&gt;'bootstrap.servers'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;'172.16.6.9'&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;producer_topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;'raghu-producer-test'&lt;/span&gt;
&lt;span class="n"&gt;P&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Producer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;producer_config&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="s"&gt;'a'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="s"&gt;'created_at'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;dtm&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;data_packet&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;packb&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;default&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dtm_encode&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;use_bin_type&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;P&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;produce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;producer_topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;data_packet&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;delivery_report&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;P&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;0.01&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Note: Refer code samples in my GitHub &lt;a href="https://github.com/n-raghu/journals/tree/master/kafka-producer-n-monitoring-tools/code"&gt;repo&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Snapshot of Kafdrop
&lt;/h4&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--WBc6gnA7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/hw4ly23d51hag4t9xcaa.PNG" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--WBc6gnA7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/hw4ly23d51hag4t9xcaa.PNG" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Best Practices
&lt;/h3&gt;

&lt;p&gt;Before we close, I would like to mention some of the best practices&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Compression reduces the packet size and helps to transmit much faster over the network, however, MessagePack shrinks the packet size by 30% which acts as compression and made me stay away from conventional compression libraries. Most standard workflows are designed using a compression mechanism, &lt;strong&gt;snappy&lt;/strong&gt; &amp;amp; &lt;strong&gt;zlib&lt;/strong&gt; are most widely accepted libraries for compression.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Encryption, which I more prefer than compression, is a good practice to ensure the data is secured over the network. It is not a tedious task to encrypt messages before pushing to Producer and decrypt at the Consumer end. There are lots of key-based encryption system which helps us to encrypt and decrypt the packets.&lt;br&gt;
For my requirement, we have developed a thin wrapper on cryptography library to encrypt and decrypt messages using hybrid mechanism(Symmetric + Asymmetric), this functions like any other microservice to encrypt and decrypt the messages.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Message Headers, it is not an uncommon practice to incorporate headers in the message for several technical or architectural purposes. My team use headers to identify the version of the message, however, expect a new use-case with headers, refer the &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers"&gt;case&lt;/a&gt; to learn more.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>python</category>
      <category>kafka</category>
      <category>messagepack</category>
    </item>
    <item>
      <title>Kafka - Playing with Consumer API using python library</title>
      <dc:creator>RaghuNandan Neerukonda</dc:creator>
      <pubDate>Tue, 05 May 2020 16:57:21 +0000</pubDate>
      <link>https://dev.to/nraghu/kafka-playing-with-consumer-api-using-python-library-3b50</link>
      <guid>https://dev.to/nraghu/kafka-playing-with-consumer-api-using-python-library-3b50</guid>
      <description>&lt;h6&gt;
  
  
  &lt;em&gt;Prerequisite&lt;/em&gt;: Requires prior knowledge(beginner to moderate level) on Kafka ecosystem.
&lt;/h6&gt;

&lt;p&gt;
Kafka has become one of the most widely used Message Broker for Event Bus architecture and Data Streams. We have been using Apache Kafka as a Message Broker for Microservices with CQRS design to build the services on different frameworks.
&lt;/p&gt;

&lt;p&gt;
There are numerous articles available online which help developers to reuse the code snippets, however, it is mostly on Scala or Java.
With this write-up, I would like to share some of the reusable code snippets for Kafka Consumer API using Python library confluent_kafka.
confluent_kafka provides a good documentation explaining the funtionalities of all the API they support with the library. Their GitHub page also has adequate example codes.
&lt;/p&gt;

&lt;p&gt;Here, I would like to emphasize on two usecases which are rare but would definitely be used, at least a couple of times while working with message brokers.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Rewind Topic(Partition) offsets&lt;/li&gt;
&lt;li&gt;Read from multiple partitions of different topics&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Producer API
&lt;/h3&gt;

&lt;p&gt;Firstly, lets get started with a sample code to produce a message.
&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;pickle&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;confluent_kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Producer&lt;/span&gt;

&lt;span class="n"&gt;my_dat&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;'data that needs to be send to consumer'&lt;/span&gt;
&lt;span class="n"&gt;P&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;produce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;'my_topic'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;pickle&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;my_dat&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;delivery_report&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;&lt;em&gt;pickle&lt;/em&gt; is used to serialize the data, this is not necessary if you working with integers and string, however, when working with timestamps and complex objects, we have to serialize the data.&lt;br&gt;
&lt;em&gt;Note&lt;/em&gt;: The best practise is to use Apache Avro, which is highly used in combination with Kafka.&lt;/p&gt;

&lt;h4&gt;
  
  
  Create a consumer and consume data
&lt;/h4&gt;

&lt;p&gt;Initialize a consumer, subscribe to topics, poll consumer until data found and consume.&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;confluent_kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Consumer&lt;/span&gt;

&lt;span class="n"&gt;cfg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="s"&gt;'bootstrap.servers'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;'&amp;lt;kafka_host_ip&amp;gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="s"&gt;'group.id'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;'&amp;lt;consumer_group_id&amp;gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="s"&gt;'auto.offset.reset'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;'earliest'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;C&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cfg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="s"&gt;'kafka-topic-1'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'kafka-topic-2'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;])&lt;/span&gt;

&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nb"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;0.05&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;dat&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="s"&gt;'msg_value'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="c1"&gt;# This is the actual content of the message
&lt;/span&gt;            &lt;span class="s"&gt;'msg_headers'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;headers&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="c1"&gt;# Headers of the message
&lt;/span&gt;            &lt;span class="s"&gt;'msg_key'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="c1"&gt;# Message Key
&lt;/span&gt;            &lt;span class="s"&gt;'msg_partition'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="c1"&gt;# Partition id from which the message was extracted
&lt;/span&gt;            &lt;span class="s"&gt;'msg_topic'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="c1"&gt;# Topic in which Producer posted the message to          
&lt;/span&gt;        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dat&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;By default, consumer instances poll all the partitions of a topic, there is no need to poll each partition of topic to get the messages.&lt;br&gt;
&lt;em&gt;msg&lt;/em&gt; has a &lt;em&gt;None&lt;/em&gt; value if &lt;em&gt;poll&lt;/em&gt; method has no messages to return. Boolean check will help us to understand whether the &lt;em&gt;poll&lt;/em&gt; to broker fetched message or not.&lt;br&gt;
Valid message has not only data, it also has other functions which helps us to query or control the data.&lt;/p&gt;

&lt;h4&gt;
  
  
  Read from multiple partitions of different topics
&lt;/h4&gt;

&lt;p&gt;Scenario:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Read from partition 1 of topic 1 starting with offset value 6&lt;/li&gt;
&lt;li&gt;Read from partition 3 of topic 2 starting with offset value 5&lt;/li&gt;
&lt;li&gt;Read from partition 2 of topic 1 starting with offset value 9
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;confluent_kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TopicPartition&lt;/span&gt;
&lt;span class="n"&gt;cfg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="s"&gt;'bootstrap.servers'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;'172.16.18.187:9092'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="s"&gt;'group.id'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;'hb-events-1'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="s"&gt;'auto.offset.reset'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;'earliest'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;C&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cfg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;assign&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="n"&gt;TopicPartition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'kafka-topic-1'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;36&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;TopicPartition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'kafka-topic-2'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;35&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;TopicPartition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'kafka-topic-1'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;39&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;no_msg_counter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;no_msg_counter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
        &lt;span class="n"&gt;dat&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="s"&gt;'msg_val'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="s"&gt;'msg_partition'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="s"&gt;'msg_topic'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dat&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;no_msg_counter&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'No Messages Found from a long time'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;no_msg_counter&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
&lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;When reading from a specific partition of a topic, assign is the best method to use instead of subscribe.&lt;br&gt;
&lt;em&gt;assign&lt;/em&gt; method accepts a list of TopicPartitions. &lt;em&gt;TopicPartition&lt;/em&gt; is an instance which gets enrolled with one specific partition of a topic.&lt;/p&gt;

&lt;h4&gt;
  
  
  Rewind Topic(partition) offsets
&lt;/h4&gt;

&lt;p&gt;Scenario:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Rewind the Partition 1 of topic-1 to offset 5&lt;/li&gt;
&lt;li&gt;Reset the Partition 3 of topic-2
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;C&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cfg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;partition_topic_offsets&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
    &lt;span class="n"&gt;TopicPartition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'kafka-topic-1'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;TopicPartition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'kafka-topic-2'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;commit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;offsets&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;partition_topic_offsets&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;C&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cfg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="s"&gt;'kafka-topic-1'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'kafka-topic-2'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;])&lt;/span&gt;

&lt;span class="n"&gt;no_msg_counter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;0.05&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;no_msg_counter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
        &lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="s"&gt;'msg_value'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
                &lt;span class="s"&gt;'partition'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
                &lt;span class="s"&gt;'headers'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;headers&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
                &lt;span class="s"&gt;'key'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
                &lt;span class="s"&gt;'offset'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;no_msg_counter&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;break&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;no_msg_counter&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
&lt;span class="n"&gt;C&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Irrespective of the current offset for the partition, we can rewind or reset the offset.&lt;/em&gt;&lt;br&gt;
&lt;strong&gt;Reset or rewind offset values are set for a specific consumer groupid which was used to commit the offset, offsets of other consumer groups are unaffected&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;How we achieved this?&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Create a list of TopicPartitions with the respective offset to reset&lt;/li&gt;
&lt;li&gt;Commit these offsets to broker&lt;/li&gt;
&lt;li&gt;When consumer subscribed to these topics poll, they get data from the recently set offset&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Only retained messages are retrieved&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Only message within the retention period are retrieved when you reset or rewind the offset.&lt;/li&gt;
&lt;li&gt;If you lose or do not have a record of last successful offset, use &lt;strong&gt;OFFSET_BEGINNING&lt;/strong&gt;, this will fetch data from the current beginning of the partition.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Use REST API&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;If you're frequently running out of issues and want to rewind, it is advised to &lt;em&gt;periodically&lt;/em&gt; record/fetch the last successful offset to a table which look similar to events.&lt;/li&gt;
&lt;li&gt;How frequent should we record?, depends on the business case. Recording every offset involves DB call which may slow down the service.&lt;/li&gt;
&lt;li&gt;Create a wrapper REST-API which can update the table values.&lt;/li&gt;
&lt;li&gt;Modify consumer groups to get last offset from table.&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>python</category>
      <category>kafka</category>
    </item>
  </channel>
</rss>
