<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Dejan Maric</title>
    <description>The latest articles on DEV Community by Dejan Maric (@de_maric).</description>
    <link>https://dev.to/de_maric</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F88909%2F21c383a9-bca2-4855-917f-91efbe090df0.jpg</url>
      <title>DEV Community: Dejan Maric</title>
      <link>https://dev.to/de_maric</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/de_maric"/>
    <language>en</language>
    <item>
      <title>Kafka Consumer – Committing consumer group offset</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Sun, 26 Jan 2025 11:19:56 +0000</pubDate>
      <link>https://dev.to/de_maric/kafka-consumer-committing-consumer-group-offset-3i4</link>
      <guid>https://dev.to/de_maric/kafka-consumer-committing-consumer-group-offset-3i4</guid>
      <description>&lt;h2&gt;
  
  
  Consumer group offset – an introduction
&lt;/h2&gt;

&lt;h3&gt;
  
  
  What is a consumer group offset?
&lt;/h3&gt;

&lt;p&gt;It’s a way for a consumer to keep track of where it is in the topic. For each topic and partition the consumer group will have one number representing the latest consumed record from Kafka.&lt;/p&gt;

&lt;p&gt;When we wrote about &lt;a href="https://codingharbour.com/apache-kafka/the-introduction-to-kafka-topics-and-partitions/" rel="noopener noreferrer"&gt;Kafka topic&lt;/a&gt; we mentioned how each record in a partition has an &lt;strong&gt;offset&lt;/strong&gt; – a sequential ID that marks its position within the partition and uniquely identifies it. The same concept is being used here. A consumer group will have one offset for each partition it is consuming. So if we have a consumer group that is reading from a topic with two partitions (let’s call them P1 and P2), the consumer group will have an offset for P1 – the last record read from P1 partition – and an offset for P2 – again, the last record read from P2 partition.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjdnjwc631viom2p10lp2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjdnjwc631viom2p10lp2.png" alt="An example of a current offset (position) for a consumer group 1" width="800" height="478"&gt;&lt;/a&gt;An example of a current offset (position) for a consumer group 1&lt;/p&gt;

&lt;p&gt;There are two ways to keep track of consumer group offsets:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Storing them in Kafka, or&lt;/li&gt;
&lt;li&gt;Saving them in external storage (e.g. in a file or database)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This blog post covers the first approach – storing the offset in Kafka. The blog post describing the second approach is coming soon.&lt;/p&gt;

&lt;h3&gt;
  
  
  Where is offset stored in Kafka?
&lt;/h3&gt;

&lt;p&gt;When using Kafka to store offsets, these are saved in a topic called &lt;code&gt;__consumer_offsets&lt;/code&gt; (yes, those are two underscores at the beginning). Kafka client library provides built-in support for storing and retrieving offsets, ensuring that consumers resume from the last stored position after a restart.&lt;/p&gt;

&lt;h3&gt;
  
  
  What happens if there’s no stored offset for the consumer?
&lt;/h3&gt;

&lt;p&gt;If no offset is found, the consumer follows the &lt;code&gt;auto.offset.reset&lt;/code&gt; configuration to decide what to do next.&lt;/p&gt;

&lt;p&gt;Consumer behaviour, depending on the &lt;a href="https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset" rel="noopener noreferrer"&gt;auto.offset.reset&lt;/a&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;latest&lt;/code&gt; (default) – Jump to the end of a topic and wait for new messages. None of the existing messages will be read&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;earliest&lt;/code&gt; – Jump to the start of the topic (will read all existing messages)&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;none&lt;/code&gt; – Throws an exception if no existing offset is found&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Auto vs Manual Commit
&lt;/h2&gt;

&lt;h3&gt;
  
  
  What is an auto-commit?
&lt;/h3&gt;

&lt;p&gt;This is a feature in Kafka client that simplifies the offset management by periodically committing the consumer offset to the Kafka cluster. The client is doing this in a separate thread and by default it is committing the offset every 5 seconds without the explicit action from us. This means that you can simply start consuming records and when your client restarts it will continue from the latest committed offset without you needing to do anything about it. This comes with some downsides we’ll mention soon.&lt;/p&gt;

&lt;p&gt;The use of auto-commit is controlled by the &lt;code&gt;enable.auto.commit&lt;/code&gt; property, which defaults to &lt;code&gt;true&lt;/code&gt; when &lt;code&gt;group.id&lt;/code&gt; property is set.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeout&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="c1"&gt;// process the records&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Downsides of using auto-commit
&lt;/h3&gt;

&lt;p&gt;Based on how the auto-commit works, there is room for losing data when using it. Here’s how that might happen.&lt;/p&gt;

&lt;p&gt;Since the client commits offsets in a separate thread, it does not track where your application is in processing polled records. In addition, since Kafka client is often polling multiple records at a time, the following scenario could happen:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The client polls 10 records&lt;/li&gt;
&lt;li&gt;You start processing records and on the e.g. 3rd record the background thread commits the offset. It will commit the offset for the entire poll, which means that it will mark you’re currently on the 10th record.&lt;/li&gt;
&lt;li&gt;If your application now fails before processing all 10 records, you have effectively lost data because the next time your application starts it will receive the info it is on the 10th record and the next record to consume is the 11th record.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Simple way to avoid data loss is to only commit offsets after you have successfully processed them, which is why we have manual commit.&lt;/p&gt;

&lt;h3&gt;
  
  
  Manual commit
&lt;/h3&gt;

&lt;p&gt;Manual offset commit allows explicit control over when offsets are recorded. By disabling auto-commit (&lt;code&gt;enable.auto.commit=false&lt;/code&gt;), the application must explicitly commit offsets using the &lt;code&gt;commitSync()&lt;/code&gt; or &lt;code&gt;commitAsync()&lt;/code&gt; methods. This level of control allows offsets to be committed only after specific conditions are met, such as successfully processing a batch of records.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeout&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="c1"&gt;// process the records&lt;/span&gt;
  &lt;span class="c1"&gt;//when all the polled records are consumed, commit the offset&lt;/span&gt;
  &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;commitSync&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; &lt;span class="c1"&gt;// or consumer.commitAsync()&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  When to rely on auto-commit?
&lt;/h3&gt;

&lt;p&gt;If your application can tolerate:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;reprocessing the same data from time to time (because it’s designed as idempotent) and&lt;/li&gt;
&lt;li&gt;losing a few messages here and there&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;then this is a good feature to leave enabled. It reduces the client’s complexity.&lt;/p&gt;

&lt;p&gt;Otherwise, use manual commit.&lt;/p&gt;

&lt;h2&gt;
  
  
  Synchronous vs Asynchronous Offset Commit
&lt;/h2&gt;

&lt;p&gt;After choosing manual commit, the next decision is whether to commit offsets synchronously or asynchronously.&lt;/p&gt;

&lt;p&gt;If you want to be absolutely sure the offset is committed before continuing your work, use commitSync(). Be aware that this will block your thread until the commit is confirmed by Kafka server or an exception is thrown.&lt;/p&gt;

&lt;p&gt;Async offset commit has two variants, both of which are non-blocking:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;commitAsync()&lt;/code&gt; which ignores any exceptions thrown and&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;commitAsync(OffsetCommitCallback callback)&lt;/code&gt; where you can provide a callback that will handle success or exceptions (eg timeout happened, or consumer group rebalance is in progress).&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Consumer group offset is a fundamental aspect of how Kafka ensures reliable message consumption. It determine where a consumer resumes reading after a restart and play a critical role in data consistency.&lt;/p&gt;

&lt;p&gt;While auto-commit simplifies offset management, it comes with the risk of data loss if records are not processed before a commit occurs. Manual offset management, on the other hand, provides greater control, ensuring that offsets are only committed after records are fully processed.&lt;/p&gt;

&lt;p&gt;Choosing between synchronous and asynchronous commits depends on your application’s requirements—commitSync() guarantees persistence at the cost of blocking, while commitAsync() offers non-blocking flexibility with potential trade-offs.&lt;/p&gt;

&lt;p&gt;Understanding these offset management techniques allows developers to design Kafka consumers that balance performance, reliability, and fault tolerance, ensuring data integrity in real-world applications.&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get absolutely free. Sign up for it over at &lt;a href="https://codingharbour.com/kafka-mini-course/" rel="noopener noreferrer"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Photo credit: &lt;a href="https://unsplash.com/@kencheungphoto" rel="noopener noreferrer"&gt;@kencheungphoto&lt;/a&gt;&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>consumergroup</category>
      <category>java</category>
    </item>
    <item>
      <title>Unit testing Kafka producer using MockProducer</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Tue, 11 Apr 2023 10:00:00 +0000</pubDate>
      <link>https://dev.to/de_maric/unit-testing-kafka-producer-using-mockproducer-2i2g</link>
      <guid>https://dev.to/de_maric/unit-testing-kafka-producer-using-mockproducer-2i2g</guid>
      <description>&lt;p&gt;Sometimes your Kafka producer code is doing things that need to be properly validated and of course, we developers resort to writing a test. If the functionality we want to test is nicely encapsulated we can do that using a unit test. Kafka helps us with that by providing a mock implementation of Producer&amp;lt;&amp;gt; interface called, you guessed it, &lt;code&gt;MockProducer&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Preparation for test
&lt;/h2&gt;

&lt;p&gt;&lt;code&gt;TransactionProcessor&lt;/code&gt; class below is our class under test. It has a &lt;code&gt;process(Transaction)&lt;/code&gt; method that receives a Transaction object which in our example only contains &lt;code&gt;userId&lt;/code&gt; and &lt;code&gt;amount&lt;/code&gt; properties. Depending on the amount the processor will decide to which topic to write the object. If the amount is above 100.000 it will use the &lt;code&gt;transactions_high_prio&lt;/code&gt; topic. Otherwise, it will write a transaction to the &lt;code&gt;transactions_regular_prio&lt;/code&gt; topic.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TransactionProcessor&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="no"&gt;HIGH_PRIORITY_THRESHOLD&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mf"&gt;100.000&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Producer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;kafkaProducer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;highPrioTopic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;regularPrioTopic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Gson&lt;/span&gt; &lt;span class="n"&gt;gson&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Gson&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;TransactionProcessor&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Producer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;kafkaProducer&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;highPrioTopic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;regularPrioTopic&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;kafkaProducer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;kafkaProducer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;highPrioTopic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;highPrioTopic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;regularPrioTopic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;regularPrioTopic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Transaction&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="o"&gt;){&lt;/span&gt;
        &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;selectedTopic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;regularPrioTopic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getAmount&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="no"&gt;HIGH_PRIORITY_THRESHOLD&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;selectedTopic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;highPrioTopic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;transactionJson&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;gson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toJson&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; 
            &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;selectedTopic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getUserId&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;transactionJson&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;kafkaProducer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And Transaction class looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Transaction&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="n"&gt;amount&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="c1"&gt;//removed for brevity&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;An important thing to notice here is that &lt;code&gt;TransactionProcessor&lt;/code&gt; uses the &lt;code&gt;Producer&lt;/code&gt; interface, not the implementation (which is the KafkaProducer class). This will make it possible to unit test our adapter using the &lt;code&gt;MockProducer&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  MockProducer in action
&lt;/h2&gt;

&lt;p&gt;Ok, now onto the test class. &lt;code&gt;TransactionProcessorTest&lt;/code&gt; creates an instance of the MockProducer that we will provide to the TransactionProcessor.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TransactionProcessorTest&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;HIGH_PRIO_TOPIC&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"transactions_high_prio"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;REGULAR_PRIO_TOPIC&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"transactions_regular_prio"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;MockProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;mockProducer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; 
        &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;MockProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;MockProducer&lt;/code&gt; constructor takes a couple of parameters, namely key and value serializers, in our case StringSerializer(s). The first parameter, &lt;code&gt;autocomplete&lt;/code&gt;, is a boolean that tells MockProducer to automatically complete all requests immediately. In regular testing, you want to set this parameter to &lt;code&gt;true&lt;/code&gt; so that messages are immediately ‘sent’. If you set it to &lt;code&gt;false&lt;/code&gt; you will need to explicitly call &lt;code&gt;completeNext()&lt;/code&gt; and &lt;code&gt;errorNext(RuntimeException)&lt;/code&gt; methods after calling the &lt;code&gt;send()&lt;/code&gt; method. You would want to do this to e.g. test the error handling in your producer (by providing the exception you want to handle as the parameter to the errorNext method).&lt;/p&gt;

&lt;p&gt;After we’ve created the MockProducer, we create the instance of the class we wish to test.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;TransactionProcessor&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; 
    &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;TransactionProcessor&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mockProducer&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;HIGH_PRIO_TOPIC&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;REGULAR_PRIO_TOPIC&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now it's time to test whether the selection of topics based on amount is correct. We will create two Transaction objects, the first one with a low amount and the second one with an amount higher than our threshold (which is 100.000).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="nd"&gt;@Test&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;testPrioritySelection&lt;/span&gt;&lt;span class="o"&gt;(){&lt;/span&gt;
        &lt;span class="nc"&gt;Double&lt;/span&gt; &lt;span class="n"&gt;lowAmount&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mf"&gt;50.2d&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="nc"&gt;Double&lt;/span&gt; &lt;span class="n"&gt;highAmount&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;250000&lt;/span&gt;&lt;span class="n"&gt;d&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="nc"&gt;Transaction&lt;/span&gt; &lt;span class="n"&gt;regularPrioTransaction&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Transaction&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user1"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;lowAmount&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;regularPrioTransaction&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;Transaction&lt;/span&gt; &lt;span class="n"&gt;highPrioTransaction&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Transaction&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user2"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;highAmount&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;highPrioTransaction&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="n"&gt;assertThat&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mockProducer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;history&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;hasSize&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;regTransactionRecord&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;mockProducer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;history&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;assertThat&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;regTransactionRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;contains&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;lowAmount&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="n"&gt;assertThat&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;regTransactionRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;isEqualTo&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;REGULAR_PRIO_TOPIC&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;highTransactionRecord&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;mockProducer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;history&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;assertThat&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;highTransactionRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;contains&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;highAmount&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="n"&gt;assertThat&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;highTransactionRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;isEqualTo&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;HIGH_PRIO_TOPIC&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After calling processor.process(…) method twice we want to validate that there are two records sent to Kafka. For that, we use MockProducer#history() method which returns the list of records that the producer received to send to Kafka. We fetch each record from the history to ensure it is ‘sent’ to the proper topic.&lt;/p&gt;

&lt;h2&gt;
  
  
  Code on Github
&lt;/h2&gt;

&lt;p&gt;All code examples from this blog post are available on &lt;a href="https://github.com/codingharbour/kafka-test-examples/tree/main/mock-producer"&gt;Coding Harbour’s GitHub&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get absolutely free. Sign up for it over at &lt;a href="https://codingharbour.com/kafka-mini-course/"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Photo credit: &lt;a href="https://unsplash.com/@paulschnuerle"&gt;@paulschnuerle&lt;/a&gt;&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>java</category>
      <category>testing</category>
    </item>
    <item>
      <title>How to use PEM certificates with Apache Kafka</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Thu, 06 May 2021 20:07:18 +0000</pubDate>
      <link>https://dev.to/de_maric/how-to-use-pem-certificates-with-apache-kafka-3139</link>
      <guid>https://dev.to/de_maric/how-to-use-pem-certificates-with-apache-kafka-3139</guid>
      <description>&lt;p&gt;It’s been a long waiting but it’s finally here: starting with Apache Kafka 2.7 it is now possible to use TLS certificates in PEM format with brokers and java clients. &lt;/p&gt;

&lt;p&gt;You might wonder – why does it matter?&lt;/p&gt;

&lt;p&gt;PEM is a scheme for encoding x509 certificates and private keys as Base64 ASCII strings. This makes it easier to handle your certificates. You can simply provide keys and certificates to the app as string parameters (e.g. through environment variables). This is especially useful if your applications are running in containers, where mounting files to containers makes the deployment pipeline a bit more complex. In this post, I want to show you two ways to use PEM certificates in Kafka.&lt;/p&gt;

&lt;h2&gt;
  
  
  Providing certificates as strings
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Brokers and CLI tools
&lt;/h3&gt;

&lt;p&gt;You can add certificates directly to the configuration file of your clients or brokers. If you’re providing them as single-line strings, you must transform the original multiline format to a single line by adding the line feed characters ( \n ) at the end of each line. Here’s how the SSL section of the properties file should look:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;&lt;span class="py"&gt;security.protocol&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;SSL&lt;/span&gt;
&lt;span class="py"&gt;ssl.keystore.type&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;PEM&lt;/span&gt;
&lt;span class="py"&gt;ssl.keystore.certificate.chain&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;-----BEGIN CERTIFICATE-----&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s"&gt;MIIDZjC...&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s"&gt;-----END CERTIFICATE-----&lt;/span&gt;
&lt;span class="py"&gt;ssl.keystore.key&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;-----BEGIN ENCRYPTED PRIVATE KEY-----&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s"&gt;...&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s"&gt;-----END ENCRYPTED PRIVATE KEY-----&lt;/span&gt;
&lt;span class="py"&gt;ssl.key.password&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;&amp;lt;private_key_password&amp;gt;&lt;/span&gt;
&lt;span class="py"&gt;ssl.truststore.type&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;PEM&lt;/span&gt;
&lt;span class="py"&gt;ssl.truststore.certificates&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;-----BEGIN CERTIFICATE-----&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s"&gt;MICC...&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s"&gt;-----END CERTIFICATE-----&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note that &lt;code&gt;ssl.keystore.certificate.chain&lt;/code&gt; needs to contain your signed certificate as well as all the intermediary CA certificates. For more details on this see the &lt;strong&gt;Common gotchas&lt;/strong&gt; section below.&lt;/p&gt;

&lt;p&gt;Your private key goes into &lt;code&gt;ssl.keystore.key&lt;/code&gt; field, while the password for the private key (if you use one) goes to &lt;code&gt;ssl.key.password&lt;/code&gt; field.&lt;/p&gt;

&lt;h3&gt;
  
  
  Java clients
&lt;/h3&gt;

&lt;p&gt;Java clients use exactly the same properties, but constants help with readability:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;...&lt;/span&gt;&lt;span class="na"&gt;omitted&lt;/span&gt; &lt;span class="n"&gt;other&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="n"&gt;configs&lt;/span&gt;&lt;span class="o"&gt;...&lt;/span&gt;
&lt;span class="c1"&gt;//SSL configs&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CommonClientConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SECURITY_PROTOCOL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"SSL"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_KEYSTORE_TYPE_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"PEM"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"&amp;lt;certificate_chain_string&amp;gt;"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_KEYSTORE_KEY_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"&amp;lt;private_key_string&amp;gt;"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="c1"&gt;// key password is needed if the private key is encrypted&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_KEY_PASSWORD_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"&amp;lt;private_key_password&amp;gt;"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_TRUSTSTORE_TYPE_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"PEM"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_TRUSTSTORE_CERTIFICATES_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"&amp;lt;trusted_certificate&amp;gt;"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Providing certificates as files
&lt;/h2&gt;

&lt;p&gt;If you already use mTLS authentication towards Kafka, then the easiest way to transition towards PEM certificates is to use them as files, replacing the java keystore and truststore you use today. This approach makes it easy to transition from PKCS12 files to PEM files.&lt;/p&gt;

&lt;h3&gt;
  
  
  Brokers and CLI tools
&lt;/h3&gt;

&lt;p&gt;Here’s how the ssl section of the properties file should look:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;&lt;span class="py"&gt;security.protocol&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;SSL&lt;/span&gt;
&lt;span class="py"&gt;ssl.keystore.type&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;PEM&lt;/span&gt;
&lt;span class="py"&gt;ssl.keystore.location&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;/path/to/file/containing/certificate/chain&lt;/span&gt;
&lt;span class="py"&gt;ssl.key.password&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;&amp;lt;private_key_password&amp;gt;&lt;/span&gt;
&lt;span class="py"&gt;ssl.truststore.type&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;PEM&lt;/span&gt;
&lt;span class="py"&gt;ssl.truststore.location&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;/path/to/truststore/certificate&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;ssl.keystore.type&lt;/code&gt; and &lt;code&gt;ssl.truststore.type&lt;/code&gt; properties tell Kafka in which format we are providing the certificates and the truststore.&lt;/p&gt;

&lt;p&gt;Next, &lt;code&gt;ssl.keystore.location&lt;/code&gt; points to a file that should contain the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;your private key&lt;/li&gt;
&lt;li&gt;your signed certificate&lt;/li&gt;
&lt;li&gt;as well as any intermediary CA certificates&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For more details about the certificate chain see the &lt;strong&gt;Common gotchas&lt;/strong&gt; section below.&lt;/p&gt;

&lt;p&gt;You will need to set the &lt;code&gt;ssl.key.password&lt;/code&gt; if your private key is encrypted (which I hope it is). Make sure not to provide the &lt;code&gt;ssl.keystore.password&lt;/code&gt; otherwise you’ll get an error.&lt;/p&gt;

&lt;h3&gt;
  
  
  Java clients
&lt;/h3&gt;

&lt;p&gt;Again Java clients use the same properties, but here we’re using the constants provided by the Kafka client library:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;...&lt;/span&gt;&lt;span class="na"&gt;omitted&lt;/span&gt; &lt;span class="n"&gt;other&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="n"&gt;configs&lt;/span&gt;&lt;span class="o"&gt;...&lt;/span&gt;
&lt;span class="c1"&gt;//SSL configs&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CommonClientConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SECURITY_PROTOCOL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"SSL"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_KEYSTORE_TYPE_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"PEM"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_KEYSTORE_LOCATION_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"/path/to/file/containing/certificate/chain"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="c1"&gt;// key password is needed if the private key is encrypted&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_KEY_PASSWORD_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"&amp;lt;private_key_password&amp;gt;"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_TRUSTSTORE_TYPE_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"PEM"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SSL_TRUSTSTORE_LOCATION_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"/path/to/truststore/certificate"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Common gotchas when setting up a certificate chain
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;If your private key is encrypted (which it should always be), you need to convert it from PKCS#1 to PKCS#8 format for java/kafka to be able to read it properly&lt;/li&gt;
&lt;li&gt;If you want to provide the PEM certificate as a one-line string, make sure to add the line feed characters at the end of each line ( \n ). Otherwise, the certificate will be considered invalid.&lt;/li&gt;
&lt;li&gt;The certificate chain has to include your certificate together with all the intermediary CA certificates that signed it, in that order. So for example, if your certificate was signed by certificate A which was signed by cert B which was signed by the root certificate, your certificate chain has to include: your certificate, certificate A and certificate B, in that order. Do note that the root certificate should not be in the chain.&lt;/li&gt;
&lt;li&gt;Certificate order in your certificate chain is important (see point 3)&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Example of Kafka SSL setup with PEM certificates
&lt;/h2&gt;

&lt;p&gt;Testing an SSL setup of your clients is not simple, because setting up a Kafka cluster with SSL authentication is not a straightforward process. This is why I created a &lt;a href="https://github.com/codingharbour/kafka-docker-compose/tree/master/kafka-ssl"&gt;docker-compose project&lt;/a&gt; with a single zookeeper and broker, enabled with SSL authentication. This project used many ideas from the excellent &lt;a href="https://github.com/confluentinc/cp-demo"&gt;cp-demo project&lt;/a&gt; by Confluent.&lt;/p&gt;

&lt;p&gt;To use the project, clone the &lt;a href="https://github.com/codingharbour/kafka-docker-compose"&gt;docker-compose repository&lt;/a&gt;, and navigate to the &lt;code&gt;kafka-ssl&lt;/code&gt; folder.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;git clone https://github.com/codingharbour/kafka-docker-compose.git
&lt;span class="nb"&gt;cd &lt;/span&gt;kafka-ssl
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Running the &lt;code&gt;start-cluster.sh&lt;/code&gt; script will generate the self-signed root certificate. The script will use it to sign all other certificates. It will also generate and sign the certificates for the broker and zookeeper and certificates for one producer and consumer. After this, the script will start the cluster using docker-compose.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Don’t have docker-compose? Check: &lt;a href="https://docs.docker.com/compose/install/"&gt;how to install docker-compose&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In addition, the startup script will generate &lt;code&gt;producer.properties&lt;/code&gt; and &lt;code&gt;consumer.properties&lt;/code&gt; files you can use with &lt;code&gt;kafka-console-*&lt;/code&gt; tools.&lt;/p&gt;

&lt;p&gt;The consumer.properties file is an example of how to use PEM certificates as strings. The producer.properties, on the other hand, uses certificates stored in PEM files. This way you can see and test both approaches described in this blog post.&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get &lt;strong&gt;absolutely free&lt;/strong&gt;. Sign up for it over at &lt;a href="https://codingharbour.com/kafka-mini-course/"&gt;Coding Harbour&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Photo credit: &lt;a href="https://unsplash.com/@flyd2069"&gt;FLY:D&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>mtls</category>
      <category>tutorial</category>
      <category>security</category>
    </item>
    <item>
      <title>How to use Protobuf with Apache Kafka and Schema Registry</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Sun, 05 Jul 2020 08:00:20 +0000</pubDate>
      <link>https://dev.to/de_maric/how-to-use-protobuf-with-apache-kafka-and-schema-registry-4iep</link>
      <guid>https://dev.to/de_maric/how-to-use-protobuf-with-apache-kafka-and-schema-registry-4iep</guid>
      <description>&lt;h2&gt;
  
  
  Full guide on working with Protobuf in Apache Kafka
&lt;/h2&gt;

&lt;p&gt;Since Confluent Platform version 5.5 Avro is no longer the only schema in town. Protobuf and JSON schemas are now supported as the first-class citizens in Confluent universe. But before I go on explaining how to use Protobuf with Kafka, let’s answer one often asked question…&lt;/p&gt;

&lt;h2&gt;
  
  
  Why do we need schemas
&lt;/h2&gt;

&lt;p&gt;When applications communicate through a pub-sub system, they exchange messages and those messages need to be understood and agreed upon by all the participants in the communication. Additionally, you would like to detect and prevent changes to the message format that would make messages unreadable for some of the participants.&lt;/p&gt;

&lt;p&gt;That’s where a schema comes in – it represents a contract between the participants in communication, just like an API represents a contract between a service and its consumers. And just as REST APIs can be described using OpenAPI (Swagger) so the messages in Kafka can be described using Avro, Protobuf or Avro schemas.&lt;/p&gt;

&lt;p&gt;Schemas describe the structure of the data by:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;specifying which fields are in the message&lt;/li&gt;
&lt;li&gt;specifying the data type for each field and whether the field is mandatory or not&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In addition, together with Schema Registry, schemas prevent a producer from sending poison messages – malformed data that consumers cannot interpret. Schema Registry will detect if breaking changes are about to be introduced by the producer and can be configured to reject such changes. An example of a breaking change would be deleting a mandatory field from the schema.&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction to Protobuf
&lt;/h2&gt;

&lt;p&gt;Similar to &lt;a href="https://codingharbour.com/apache-kafka/guide-to-apache-avro-and-kafka/"&gt;Apache Avro&lt;/a&gt;, Protobuf is a method of serializing structured data. A message format is defined in a &lt;strong&gt;.proto&lt;/strong&gt; file and you can generate code from it in many languages including Java, Python, C++, C#, Go and Ruby. Unlike Avro, Protobuf does not serialize schema with the message. So, in order to deserialize the message, you need the schema in the consumer.&lt;/p&gt;

&lt;p&gt;Here’s an example of a Protobuf schema containing one message type:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight protobuf"&gt;&lt;code&gt;&lt;span class="na"&gt;syntax&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"proto3"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;codingharbour.protobuf&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;content&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;date_time&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the first line, we define that we’re using protobuf version 3. Our message type called SimpleMessage defines two string fields: content and date_time. Each field is assigned a so-called &lt;strong&gt;field number&lt;/strong&gt;, which has to be unique in a message type. These numbers identify the fields when the message is serialized to the Protobuf binary format. Google suggests using numbers 1 through 15 for most frequently used fields because it takes one byte to encode them.&lt;/p&gt;

&lt;p&gt;Protobuf supports common scalar types like string, int32, int64 (long), double, bool etc. For the full list of all scalar types in Protobuf check the &lt;a href="https://developers.google.com/protocol-buffers/docs/overview#scalar"&gt;Protobuf documentation&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Besides scalar types, it is possible to use complex data types. Below we see two schemas, Order and Product, where Order can contain zero, one or more Products:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight protobuf"&gt;&lt;code&gt;&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;Order&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;int64&lt;/span&gt; &lt;span class="na"&gt;order_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;int64&lt;/span&gt; &lt;span class="na"&gt;date_time&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;repeated&lt;/span&gt; &lt;span class="n"&gt;Product&lt;/span&gt; &lt;span class="na"&gt;product&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;Product&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;int32&lt;/span&gt; &lt;span class="na"&gt;product_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;description&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, let’s see how these schemas end up in the Schema Registry.&lt;/p&gt;

&lt;h2&gt;
  
  
  Schema Registry and Protobuf
&lt;/h2&gt;

&lt;p&gt;Schema Registry is a service for storing a versioned history of schemas used in Kafka. It also supports the evolution of schemas in a way that doesn’t break producers or consumers. Until recently Schema Registry supported only &lt;a href="https://codingharbour.com/apache-kafka/guide-to-apache-avro-and-kafka/"&gt;Avro schemas&lt;/a&gt;, but since Confluent Platform 5.5 the support has been extended to Protobuf and JSON schemas.&lt;/p&gt;

&lt;p&gt;If you worked with Avro and Kafka before, this section will not contain any surprises. Like with Avro, Schema Registry provides a serializer and deserializer for Protobuf, called KafkaProtobufSerializer and KafkaProtobufDeserializer.&lt;/p&gt;

&lt;p&gt;The job of this serializer is to convert the Java object to a protobuf binary format before the producer writes the message to Kafka.&lt;/p&gt;

&lt;p&gt;The additional job of the serialiser is to check whether the protobuf schema exists in the Schema Registry. If not, it will write the schema to Schema Registry and it will write the schema id to the message (at the beginning of the message). Then, when the Kafka record reaches the consumer, the consumer will use KafkaProtobufDeserializer to fetch the schema from the Schema Registry based on the schema id from the message. Once the schema is fetched, the KafkaProtobufDeserializer will use it to deserialize the message. This way the consumer doesn’t need to know the schema in advance to be able to consume messages from Kafka.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--wJ-JBCbU--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://i1.wp.com/codingharbour.com/wp-content/uploads/2020/03/schema_registry.jpg%3Fresize%3D446%252C223%26ssl%3D1" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--wJ-JBCbU--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://i1.wp.com/codingharbour.com/wp-content/uploads/2020/03/schema_registry.jpg%3Fresize%3D446%252C223%26ssl%3D1" alt="schema registry" width="446" height="223"&gt;&lt;/a&gt;Registering and using Protobuf schema&lt;/p&gt;

&lt;p&gt;This is why, when using KafkaProtobuf(De)Serializer in a producer or a consumer, we need to provide the URL of the Schema Registry.&lt;/p&gt;

&lt;h2&gt;
  
  
  Code generation in Java
&lt;/h2&gt;

&lt;p&gt;Ok, now we know how a protobuf schema looks and we know how it ends up in Schema Registry. Let’s see now how we use protobuf schemas from Java.&lt;/p&gt;

&lt;p&gt;The first thing that you need is a protobuf-java library. In these examples, I’m using maven, so let’s add the maven dependency:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;dependencies&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;com.google.protobuf&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;protobuf-java&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;3.12.2&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/dependencies&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The next thing you want to do is use the &lt;strong&gt;protoc&lt;/strong&gt; compiler to generate Java code from &lt;strong&gt;.&lt;/strong&gt; proto files. But we’re not going to invite the compiler manually, we’ll use a maven plugin called &lt;strong&gt;protoc-jar-maven-plugin&lt;/strong&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;plugin&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;com.github.os72&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;protoc-jar-maven-plugin&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;3.11.4&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;executions&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;execution&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;phase&amp;gt;&lt;/span&gt;generate-sources&lt;span class="nt"&gt;&amp;lt;/phase&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;goals&amp;gt;&lt;/span&gt;
                &lt;span class="nt"&gt;&amp;lt;goal&amp;gt;&lt;/span&gt;run&lt;span class="nt"&gt;&amp;lt;/goal&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;/goals&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;configuration&amp;gt;&lt;/span&gt;
                &lt;span class="nt"&gt;&amp;lt;inputDirectories&amp;gt;&lt;/span&gt;
                    &lt;span class="nt"&gt;&amp;lt;include&amp;gt;&lt;/span&gt;${project.basedir}/src/main/protobuf&lt;span class="nt"&gt;&amp;lt;/include&amp;gt;&lt;/span&gt;
                &lt;span class="nt"&gt;&amp;lt;/inputDirectories&amp;gt;&lt;/span&gt;
                &lt;span class="nt"&gt;&amp;lt;outputTargets&amp;gt;&lt;/span&gt;
                    &lt;span class="nt"&gt;&amp;lt;outputTarget&amp;gt;&lt;/span&gt;
                        &lt;span class="nt"&gt;&amp;lt;type&amp;gt;&lt;/span&gt;java&lt;span class="nt"&gt;&amp;lt;/type&amp;gt;&lt;/span&gt;
                        &lt;span class="nt"&gt;&amp;lt;addSources&amp;gt;&lt;/span&gt;main&lt;span class="nt"&gt;&amp;lt;/addSources&amp;gt;&lt;/span&gt;
                        &lt;span class="nt"&gt;&amp;lt;outputDirectory&amp;gt;&lt;/span&gt;${project.basedir}/target/generated-sources/protobuf&lt;span class="nt"&gt;&amp;lt;/outputDirectory&amp;gt;&lt;/span&gt;
                    &lt;span class="nt"&gt;&amp;lt;/outputTarget&amp;gt;&lt;/span&gt;
                &lt;span class="nt"&gt;&amp;lt;/outputTargets&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;/configuration&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;/execution&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;/executions&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/plugin&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The protobuf classes will be generated during the generate-sources phase. The plugin will look for proto files in the &lt;strong&gt;src/main/protobuf&lt;/strong&gt; folder and the generated code will be created in the &lt;strong&gt;target/generated-sources/protobuf&lt;/strong&gt; folder.&lt;/p&gt;

&lt;p&gt;To generate the class in the target folder run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;mvn clean generate-sources
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Note: All code examples from this blog post are available on &lt;a href="https://github.com/codingharbour/kafka-protobuf"&gt;Coding Harbour's GitHub&lt;/a&gt;.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Ok, now that we have our class generated, let’s send it to Kafka using the new Protobuf serializer.&lt;/p&gt;

&lt;h2&gt;
  
  
  Running a local Kafka cluster
&lt;/h2&gt;

&lt;p&gt;Before we get started, let’s boot up a local Kafka cluster with the Schema Registry, so we can try our out code right away. We will run our cluster using docker-compose.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Don’t have docker-compose? Check: &lt;a href="https://docs.docker.com/compose/install/"&gt;how to install docker-compose&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;I’ve prepared a docker-compose file with one Zookeeper, one Kafka broker and the Schema Registry. You can grab it from &lt;a href="https://github.com/codingharbour/kafka-docker-compose"&gt;https://github.com/codingharbour/kafka-docker-compose&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Navigate to &lt;strong&gt;single-node-avro-kafka&lt;/strong&gt; folder and run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output should look similar to this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Starting sn-zookeeper ... &lt;span class="k"&gt;done
&lt;/span&gt;Starting sn-kafka     ... &lt;span class="k"&gt;done
&lt;/span&gt;Starting sn-schema-registry ... &lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Your local Kafka cluster is now ready to be used. By running &lt;strong&gt;docker-compose ps&lt;/strong&gt; , we can see that the Kafka broker is available on port 9092, while the Schema Registry runs on port 8081. Make a note of that, because we’ll need it soon.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose ps
       Name                    Command            State                     Ports
&lt;span class="nt"&gt;----------------------------------------------------------------------------------------------------&lt;/span&gt;
sn-kafka             /etc/confluent/docker/run   Up      0.0.0.0:9092-&amp;gt;9092/tcp
sn-schema-registry   /etc/confluent/docker/run   Up      0.0.0.0:8081-&amp;gt;8081/tcp
sn-zookeeper         /etc/confluent/docker/run   Up      0.0.0.0:2181-&amp;gt;2181/tcp, 2888/tcp, 3888/tcp
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Writing a Protobuf Producer
&lt;/h2&gt;

&lt;p&gt;With Kafka cluster up and running is now time to create a Java producer that will send our SimpleMessage to Kafka. First, let’s prepare the configuration for the Producer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaProtobufSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaProtobufSerializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SCHEMA_REGISTRY_URL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="nc"&gt;Producer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice that we are using &lt;strong&gt;KafkaProtobufSerializer&lt;/strong&gt; as the value serializer class. This is the new serializer available in Confluent Platform since version 5.5. It works similarly to KafkaAvroSerializer: when publishing messages it will check with Schema Registry if the schema is available there. If the schema is not yet registered, it will write it to Schema Registry and then publish the message to Kafka. For this to work, the serializer needs the URL of the Schema Registry and in our case, that’s &lt;em&gt;&lt;a href="http://localhost:8081"&gt;http://localhost:8081&lt;/a&gt;&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;Next, we prepare the KafkaRecord, using the SimpleMessage class generated from the protobuf schema:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;SimpleMessage&lt;/span&gt; &lt;span class="n"&gt;simpleMessage&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newBuilder&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setContent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Hello world"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setDateTime&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;now&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

&lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;
                &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="s"&gt;"protobuf-topic"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;simpleMessage&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;  
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This record will be written to the topic called &lt;strong&gt;protobuf-topic&lt;/strong&gt;. The last thing to do is to write the record to Kafka:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flush&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;close&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Usually, you wouldn’t call &lt;strong&gt;flush()&lt;/strong&gt; method, but since our application will be stopped after this, we need to ensure the message is written to Kafka before that happens.&lt;/p&gt;

&lt;h2&gt;
  
  
  Writing a Protobuf Consumer
&lt;/h2&gt;

&lt;p&gt;We said that the consumer doesn’t need to know the schema in advance to be able to deserialize the message, thanks to Schema Registry. But, having the schema available in advance allows us to generate the Java class out of it and use the class in our code. This helps with code readability and makes a code strongly typed.&lt;/p&gt;

&lt;p&gt;Here’s how to do it. First, you will generate a java class(es) as explained in Code generation in Java section. Next, we prepare the configuration for the Kafka consumer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;GROUP_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"protobuf-consumer-group"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;      
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AUTO_OFFSET_RESET_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"earliest"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ENABLE_AUTO_COMMIT_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here we’re defining a broker URL, consumer group of our consumer and telling the consumer that we’ll handle offset commits ourselves.&lt;br&gt;&lt;br&gt;
Next, we define deserializer for the messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaProtobufDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaProtobufDeserializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SCHEMA_REGISTRY_URL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaProtobufDeserializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SPECIFIC_PROTOBUF_VALUE_TYPE&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We use string deserializer for the key, but for the value, we’re using the new KafkaProtobufDeserializer. For the protobuf deserializer, we need to provide the Schema Registry URL, as we did for the serializer above.&lt;/p&gt;

&lt;p&gt;The last line is the most important. It tells the deserializer to which class to deserializer the record values. In our case, it’s the SimpleMessage class (the one we generated from the protobuf schema using the protobuf maven plugin).&lt;/p&gt;

&lt;p&gt;Now we’re ready to create our consumer and subscribe it to &lt;strong&gt;protobuf-topic&lt;/strong&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;singleton&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"protobuf-topic"&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And then we poll Kafka for records and print them to the console:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMillis&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message content: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getContent&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message time: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getDateTime&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;commitAsync&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here we’re consuming a batch of records and just printing the content to the console.&lt;/p&gt;

&lt;p&gt;Remember when we configured the consumer to let us handle committing offsets by setting ENABLE_AUTO_COMMIT_CONFIG to false? That’s what we’re doing in the last line: only after we’ve fully processed the current group of records will we commit the consumer offset.&lt;/p&gt;

&lt;p&gt;That’s all there is to writing a simple protobuf consumer. Let’s now check one more variant.&lt;/p&gt;

&lt;h2&gt;
  
  
  Generic Protobuf Consumer
&lt;/h2&gt;

&lt;p&gt;What if you want to handle messages in a generic way in your consumer, without generating a Java class from a protobuf schema? Well, you can use an instance of DynamicMessage class from protobuf library. DynamicMessage has a reflective API, so you can navigate through message fields and read their values. Here’s how you can do it…&lt;/p&gt;

&lt;p&gt;First, let’s configure the consumer. Its configuration is very similar to the previous example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;GROUP_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"generic-protobuf-consumer-group"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;      
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AUTO_OFFSET_RESET_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"earliest"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ENABLE_AUTO_COMMIT_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaProtobufDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaProtobufDeserializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SCHEMA_REGISTRY_URL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The only thing missing is the &lt;strong&gt;SPECIFIC_PROTOBUF_VALUE_TYPE&lt;/strong&gt; configuration. Since we want to handle messages in a generic way, we don’t need this configuration.&lt;/p&gt;

&lt;p&gt;Now we’re ready to create our consumer and subscribe it to &lt;strong&gt;protobuf-topic&lt;/strong&gt; topic, as in the previous example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;singleton&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"protobuf-topic"&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And then we poll Kafka for records and print them to the console:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;DynamicMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMillis&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;DynamicMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;FieldDescriptor&lt;/span&gt; &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getAllFields&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;keySet&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;field&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;": "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getField&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;field&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;commitAsync&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Without SPECIFIC_PROTOBUF_VALUE_TYPE configured in our consumer, the consumer will always return the instance of DynamicMessage in the record’s value. Then we use the &lt;strong&gt;DynamicMessage.getAllFields()&lt;/strong&gt; method to obtain the list of FieldDescriptors. Once we have all the descriptors we can simply iterate through them and print the value of each field.&lt;/p&gt;

&lt;p&gt;Check out the JavaDoc to find out more about &lt;a href="https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/DynamicMessage"&gt;DynamicMessage&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;That wraps our Kafka Protobuf guide. Now you’re ready to start writing producers and consumers that send Protobuf messages to Apache Kafka with help of Schema Registry.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;All the code from this blog post is available on &lt;a href="https://github.com/codingharbour/kafka-protobuf"&gt;Coding Harbour's GitHub&lt;/a&gt;.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get &lt;strong&gt;absolutely free&lt;/strong&gt;. Sign up for it over at &lt;a href="https://codingharbour.com/"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Photo credit: &lt;a href="https://unsplash.com/@joshua_j_woroniecki"&gt;Joshua Woroniecki&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>tutorial</category>
      <category>protobuf</category>
    </item>
    <item>
      <title>Learn how to use Kafkacat – the most versatile Kafka CLI client</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Mon, 04 May 2020 18:47:42 +0000</pubDate>
      <link>https://dev.to/de_maric/learn-how-to-use-kafkacat-the-most-versatile-kafka-cli-client-1kb4</link>
      <guid>https://dev.to/de_maric/learn-how-to-use-kafkacat-the-most-versatile-kafka-cli-client-1kb4</guid>
      <description>&lt;p&gt;Kafkacat is an awesome tool and today I want to show you how easy it is to use it and what are some of the cool things you can do with it.&lt;/p&gt;

&lt;p&gt;All the features explained below are available in version 1.5.0.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Looking for a quick Kafkacat reference? Download the &lt;strong&gt;&lt;a href="https://codingharbour.com/kafkacat-cheatsheet/"&gt;Kafkacat 1-page cheatsheet&lt;/a&gt;&lt;/strong&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Installing Kafkacat
&lt;/h2&gt;

&lt;p&gt;Kafkacat is available from Homebrew (latest version) and some Linux repositories, but it is possible that Linux repos don’t contain the latest version. If that’s the case, you can always run the latest kafkacat from &lt;a href="https://hub.docker.com/r/edenhill/kafkacat/tags"&gt;docker&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  The basics
&lt;/h2&gt;

&lt;p&gt;Kafkacat is a command-line tool for producing and consuming Kafka messages. In addition, you can view metadata about the cluster or topics.&lt;/p&gt;

&lt;p&gt;Kafkacat has quite a few parameters and it might look scary learning them all, yet (most of) the parameters make sense and are easy to remember. Let’s start with the most important: &lt;strong&gt;modes&lt;/strong&gt;. When making a call to Kafkacat, you’ll always use it in one of the four modes it has. All the modes use capital letter:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;-P = &lt;strong&gt;P&lt;/strong&gt;roduce data&lt;/li&gt;
&lt;li&gt;-C = &lt;strong&gt;C&lt;/strong&gt;onsume data&lt;/li&gt;
&lt;li&gt;-L = &lt;strong&gt;L&lt;/strong&gt;ist metadata&lt;/li&gt;
&lt;li&gt;-Q = &lt;strong&gt;Q&lt;/strong&gt;uery &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The next most important option is the &lt;strong&gt;b&lt;/strong&gt;roker list (-b) and after that, it’s usually &lt;strong&gt;t&lt;/strong&gt;opic (-t).&lt;/p&gt;

&lt;p&gt;So you can almost write your command like a story. The following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-o&lt;/span&gt; beginning
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;could be read as: I want to &lt;strong&gt;C&lt;/strong&gt;onsume from &lt;strong&gt;b&lt;/strong&gt;roker localhost:9092 and &lt;strong&gt;t&lt;/strong&gt;opic topic1 with &lt;strong&gt;o&lt;/strong&gt;ffset set to the beginning.&lt;/p&gt;

&lt;p&gt;Ok, now that I have hopefully convinced you that all those cryptical parameters make sense, let’s look at how to use Kafkacat to achieve some common tasks.&lt;/p&gt;

&lt;h2&gt;
  
  
  Producing data (-P)
&lt;/h2&gt;

&lt;p&gt;What do we need so we could produce data? At a minimum, you need a broker and a topic you want to write to.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Produce values&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-P&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Default message separator is Enter. Type your messages, and separate them with Enter.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Producing keys and values&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you want to produce messages with key, you need to specify the &lt;strong&gt;K&lt;/strong&gt;ey delimiter (-K). Let’s use a colon to separate the key and the message in the input:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-P&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-K&lt;/span&gt; :
key3:message3
key4:message4
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Notice that parameter uses capital K.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Produce messages with headers&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you want to add &lt;strong&gt;h&lt;/strong&gt;eaders to the messages, add them using -H parameter, in a key=value format:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-P&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="nv"&gt;appName&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;kafkacat &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="nv"&gt;appId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;As you see, additional headers are added by repeating -H flag. Note that all the messages produced will have the two headers specified with -H flag.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Produce data from a file&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you want to produce data using a file, use the option -l (as in: fi*&lt;em&gt;l&lt;/em&gt;*e)… I did say that &lt;em&gt;most of the parameters&lt;/em&gt; are easy to remember :). Let’s say we have a file called data.txt containing key-value pairs, separated by a colon:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;key1:message1
key2:message2
key3:message3
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;So the command would be:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-P&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-K&lt;/span&gt;: &lt;span class="nt"&gt;-l&lt;/span&gt; data.txt
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Produce message with compression&lt;/p&gt;

&lt;p&gt;Using a (-z) parameter you can specify message compression:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-P&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-z&lt;/span&gt; snappy
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Supported values are: snappy, gzip and lz4.&lt;/p&gt;

&lt;h2&gt;
  
  
  Consuming data (-C)
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Simple consumer
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Consume all the messages from a topic&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Note that, unlike kafka-console-consumer, kafkacat will consume the messages from the beginning of the topic by default. This approach makes more sense to me, but YMMV.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Consume X messages&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;You can control how many messages will be consumed using the &lt;strong&gt;c&lt;/strong&gt;ount parameter (-c, lowercase).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-c&lt;/span&gt; 5
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h3&gt;
  
  
  Consuming from an offset
&lt;/h3&gt;

&lt;p&gt;If you want to read data from a particular &lt;strong&gt;o&lt;/strong&gt;ffset, you can use the -o parameter. The offset parameter is very versatile. You can:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Consume messages from the beginning or end&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-o&lt;/span&gt; beginning
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Use constants &lt;strong&gt;beginning&lt;/strong&gt; or &lt;strong&gt;end&lt;/strong&gt; to tell kafkacat where to begin the consumption.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Consume from a given offset&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-o&lt;/span&gt; 123
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Use an absolute value for the offset and Kafkacat will start consuming from the given offset. If you don’t specify the partition to consume, Kafkacat will consume all the partitions from the given offset.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Consume last X messages in a partition(s)&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-o&lt;/span&gt; &lt;span class="nt"&gt;-10&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;We do this by using a negative offset value.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Consume based on a timestamp&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;It is possible to start consuming after a given timestamp in milliseconds using the format -o s@start_timestamp. Technically this is consuming based on an offset, the difference is that kafkacat figures out the offset for you based on the provided timestamp(s).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-o&lt;/span&gt; s@start_timestamp
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;You can also stop consuming when a given timestamp is reached using:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-o&lt;/span&gt; e@end_timestamp
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;This is very useful when you are debugging an error that occurred, you have the timestamp of the error, but you want to check how the message looked. Then, combining the start and end offset, you can narrow down your search:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-o&lt;/span&gt; s@start_timestamp &lt;span class="nt"&gt;-o&lt;/span&gt; e@end_timestamp
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h3&gt;
  
  
  Formatting the output
&lt;/h3&gt;

&lt;p&gt;By default, Kafkacat will print out only the message payload (value of the Kafka record), but you can print anything you’re interested in. To define the custom output, specify (-f) flag, as in &lt;strong&gt;f&lt;/strong&gt;ormat, followed by a format string. Here’s an example that prints a string with key and value of the message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="se"&gt;\&lt;/span&gt;
 &lt;span class="nt"&gt;-f&lt;/span&gt; &lt;span class="s1"&gt;'Key is %k, and message payload is: %s \n'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;%k and %s are format string tokens. The output might be something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;Key is key3, and message payload is: message3
Key is key4, and message payload is: message4
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;So what can you print out using format string?&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Topic (%t), &lt;/li&gt;
&lt;li&gt;partition (%p)&lt;/li&gt;
&lt;li&gt;offset (%o)&lt;/li&gt;
&lt;li&gt;timestamp (%T)&lt;/li&gt;
&lt;li&gt;message key (%k)&lt;/li&gt;
&lt;li&gt;message value (%s)&lt;/li&gt;
&lt;li&gt;message headers (%h)&lt;/li&gt;
&lt;li&gt;key length (%K)&lt;/li&gt;
&lt;li&gt;value length (%S)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;As you’ve seen above, you can use newline (\n \r) or tab characters(\t) in the format string as well.&lt;/p&gt;

&lt;h3&gt;
  
  
  Serdes
&lt;/h3&gt;

&lt;p&gt;If messages are not written as strings, you need to configure a proper &lt;strong&gt;s&lt;/strong&gt;erde for keys and values using -s parameter.&lt;/p&gt;

&lt;p&gt;For example, if both key and value are 32-bit integers, you would read it using:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-s&lt;/span&gt; i
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;You can specify separately serde for the key and value using:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1 &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;i &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;You will find the list of all the serdes in a kafkacat help (kafkacat -h).&lt;/p&gt;

&lt;h3&gt;
  
  
  Avro serde
&lt;/h3&gt;

&lt;p&gt;Avro messages are a bit special since they require a schema registry. But Kafkacat has you covered there as well. Use (-r) to specify the schema registry URL:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-t&lt;/span&gt; avro-topic &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;In the example above, we’re are reading messages from a topic where keys are strings, but values are Avro.&lt;/p&gt;

&lt;h2&gt;
  
  
  List metadata (-L)
&lt;/h2&gt;

&lt;p&gt;Listing metadata gives you info about topics: how many partitions it has, which broker is a leader for a partition as well as the list of in-sync replicas (isr).&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Metadata for all topics&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-L&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Simply calling -L with no other parameters will display the metadata for all the topics in the cluster.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Metadata for a given topic&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you want to see metadata for just one topic, specify it using (-t) parameter:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-L&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; topic1
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Query mode (-Q)
&lt;/h2&gt;

&lt;p&gt;If you want to find an offset of a Kafka record based on a timestamp, Query mode can help with that. Just specify the topic, partition and a timestamp:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafkacat &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;-Q&lt;/span&gt; &lt;span class="nt"&gt;-t&lt;/span&gt; topic1:1:1588534509794
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Is this all?
&lt;/h2&gt;

&lt;p&gt;I'm glad that you asked because it's not :) I have created a &lt;strong&gt;1-page Kafkacat cheatsheet&lt;/strong&gt; for you to download. Grab it &lt;a href="https://codingharbour.com/kafkacat-cheatsheet/"&gt;here&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>kafkacat</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>Guide to Apache Avro and Kafka</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Tue, 14 Apr 2020 17:27:50 +0000</pubDate>
      <link>https://dev.to/de_maric/guide-to-apache-avro-and-kafka-46gk</link>
      <guid>https://dev.to/de_maric/guide-to-apache-avro-and-kafka-46gk</guid>
      <description>&lt;p&gt;&lt;em&gt;Photo credit: &lt;a href="https://unsplash.com/@bosco_shots"&gt;@bosco_shots&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Or, how to produce and consume Kafka records using Avro serialization in Java.&lt;/p&gt;

&lt;p&gt;So far we’ve seen how to &lt;a href="https://dev.to/de_maric/how-to-create-a-kafka-producer-in-java-ap9"&gt;produce&lt;/a&gt; and &lt;a href="https://dev.to/de_maric/how-to-create-a-kafka-consumer-in-java-1563"&gt;consume&lt;/a&gt; simple String records using Java and &lt;a href="https://dev.to/de_maric/how-to-get-started-with-apache-kafka-in-5-minutes-18k5"&gt;console tools&lt;/a&gt;. In this post, I would like to show you how to send and read Avro messages from Java using the kafka-clients library.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;If you’re new to Avro, I have written a full post about &lt;a href="https://dev.to/de_maric/why-use-avro-data-format-with-apache-kafka-hc9"&gt;why you should consider Avro serialization for Kafka messages&lt;/a&gt;, so check it out to learn more.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Running a Kafka cluster locally
&lt;/h2&gt;

&lt;p&gt;To test the producers and consumers, let’s run a Kafka cluster locally, consisting of one broker, one zookeeper and a Schema Registry.&lt;/p&gt;

&lt;p&gt;To simplify our job, we will run these servers as Docker containers, using docker-compose.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Don’t have docker-compose? Check: &lt;a href="https://docs.docker.com/compose/install/"&gt;how to install docker-compose&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;I’ve prepared a docker-compose file which you can grab from Coding Harbour’s GitHub:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;git clone https://github.com/codingharbour/kafka-docker-compose.git
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once you have the project, navigate to a folder called &lt;strong&gt;single-node-avro-kafka&lt;/strong&gt; and start the Kafka cluster:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output should look something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
Starting sn-zookeeper ... &lt;span class="k"&gt;done
&lt;/span&gt;Starting sn-kafka ... &lt;span class="k"&gt;done
&lt;/span&gt;Starting sn-schema-registry ... &lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Your local Kafka cluster is now ready to be used. By running &lt;strong&gt;docker-compose ps&lt;/strong&gt; , we can see that the Kafka broker is available on port 9092, while the Schema Registry runs on port 8081. Make a note of that, because we’ll need it soon.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose ps
       Name                    Command            State                     Ports
&lt;span class="nt"&gt;----------------------------------------------------------------------------------------------------&lt;/span&gt;
sn-kafka             /etc/confluent/docker/run   Up      0.0.0.0:9092-&amp;gt;9092/tcp
sn-schema-registry   /etc/confluent/docker/run   Up      0.0.0.0:8081-&amp;gt;8081/tcp
sn-zookeeper         /etc/confluent/docker/run   Up      0.0.0.0:2181-&amp;gt;2181/tcp, 2888/tcp, 3888/tcp
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Defining the Avro Schema
&lt;/h2&gt;

&lt;p&gt;Let’s create a schema for the messages we’ll be sending through Kafka. We’ll call our message: SimpleMessage, and it will have two fields:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;content – a string field, holding the message we want to send and&lt;/li&gt;
&lt;li&gt;date_time – human-readable date-time showing when the message was sent&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Avro schemas are written in a JSON format, so our SimpleMessage schema will look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"record"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"SimpleMessage"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"namespace"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"com.codingharbour.avro"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"fields"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"content"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"doc"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Message content"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"date_time"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"doc"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Datetime when the message was generated"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The schema consists of a couple of elements:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Type – Describes a data type of the entire schema. Type ‘record’ means that the schema describes a complex data type, which includes other fields.&lt;/li&gt;
&lt;li&gt;Name – The name of the schema. In our case “SimpleMessage”&lt;/li&gt;
&lt;li&gt;Namespace – Namespace of the schema that qualifies the name. In our case, the namespace is “com.codingharbour.avro”&lt;/li&gt;
&lt;li&gt;List of fields – One or more fields that are in this complex data type&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Each field in a schema is a JSON object with multiple attributes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;name – name of the field &lt;/li&gt;
&lt;li&gt;type – data type of the field. Avro supports primitive types like int, string, bytes etc, and complex types like record, enum, etc&lt;/li&gt;
&lt;li&gt;doc – Documentation for the given field&lt;/li&gt;
&lt;li&gt;default – the default value for the field, used by the consumer to populate the value when the field is missing from the message. &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For more info on Avro data types and schema check &lt;a href="https://avro.apache.org/docs/current/spec.html"&gt;the Avro spec&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Schema Registry
&lt;/h2&gt;

&lt;p&gt;As I’ve mentioned in &lt;a href="https://codingharbour.com/apache-kafka/why-use-avro-data-format-with-apache-kafka/"&gt;the previous post&lt;/a&gt;, every Avro message contains the schema used to serialize it. But sending thousands or millions of messages per second with the same schema is a huge waste of bandwidth and storage space. That’s where the Schema Registry, KafkaAvroSerializer and KafkaAvroDeserializer come into play.&lt;/p&gt;

&lt;p&gt;Instead of writing the schema to the message, KafkaAvroSerializer will write the schema to the Schema Registry and it will only write the schema id to the message. Then, when the Kafka record reaches the consumer, the consumer will use KafkaAvroDeserializer to fetch the schema from the Schema Registry based on the schema id from the message. Once the schema is fetched, the KafkaAvroDeserializer can deserialize the message.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--wJ-JBCbU--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://i1.wp.com/codingharbour.com/wp-content/uploads/2020/03/schema_registry.jpg%3Fresize%3D446%252C223%26ssl%3D1" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--wJ-JBCbU--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://i1.wp.com/codingharbour.com/wp-content/uploads/2020/03/schema_registry.jpg%3Fresize%3D446%252C223%26ssl%3D1" alt="schema registry" width="446" height="223"&gt;&lt;/a&gt;Registering and using Avro schema&lt;/p&gt;

&lt;p&gt;This is why, when using KafkaAvro(De)Serializer in a producer or a consumer, we need to provide the URL of the schema registry. Remember that our Schema Registry runs on port 8081.&lt;/p&gt;

&lt;p&gt;Here’s a snippet from our producer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaAvroSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaAvroSerializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SCHEMA_REGISTRY_URL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we don’t specify the URL, the (de)serializer will complain when we try to send/read a message.&lt;/p&gt;

&lt;p&gt;Ok, the next thing is to see how an Avro schema gets translated into a Java object.&lt;/p&gt;

&lt;h2&gt;
  
  
  Avro record in Java
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Note&lt;/strong&gt; : do not confuse an Avro record with a Kafka record. Each Avro schema describes one or more Avro records. An Avro record is a complex data type in Avro, consisting of other fields, with their own data types (primitive or complex). Kafka record, on the other hand, consists of a key and a value and each of them can have separate serialization. Meaning, e.g. that Kafka key may be one Avro record, while a Kafka value is another Avro record (if we choose to use Avro serialization for both the key and the value).&lt;/p&gt;

&lt;p&gt;When it comes to representing an Avro record in Java, Avro library provides two interfaces: &lt;strong&gt;GenericRecord&lt;/strong&gt; or &lt;strong&gt;SpecificRecord&lt;/strong&gt;. Let’s see what the difference is and when to use which.&lt;/p&gt;

&lt;p&gt;An instance of a &lt;strong&gt;GenericRecord&lt;/strong&gt; allows us to access the schema fields either by index or by name, as seen below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;...&lt;/span&gt; &lt;span class="o"&gt;;&lt;/span&gt; &lt;span class="c1"&gt;//obtain a generic record&lt;/span&gt;
&lt;span class="c1"&gt;//accessing the field by name&lt;/span&gt;
&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"date_time"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2020-01-01 12:45:00"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;dateTime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"date_time"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="c1"&gt;//accessing the field by index&lt;/span&gt;
&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"this is message number 1"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;content&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Using a GenericRecord is ideal when a schema is not known in advance or when you want to handle multiple schemas with the same code (e.g. in a Kafka Connector). The drawback of GenericRecord is the lack of type-safety. GenericRecord’s &lt;strong&gt;put&lt;/strong&gt; and &lt;strong&gt;get&lt;/strong&gt; methods work with Object.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;SpecificRecord&lt;/strong&gt; is an interface from the Avro library that allows us to use an Avro record as a POJO. This is done by generating a Java class (or classes) from the schema, by using &lt;strong&gt;avro-maven-plugin&lt;/strong&gt;. The generated class will implement the SpecificRecord interface, as seen below.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="cm"&gt;/* Class generated by avro-maven-plugin*/&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apache&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;avro&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;specific&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SpecificRecordBase&lt;/span&gt; 
&lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apache&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;avro&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;specific&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SpecificRecord&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
   &lt;span class="c1"&gt;//content removed for brevity&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;//using the SpecificRecord by using the actual implementation&lt;/span&gt;
&lt;span class="nc"&gt;SimpleMessage&lt;/span&gt; &lt;span class="n"&gt;simpleMessage&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;simpleMessage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setContent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Hello world"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The drawback of SpecificRecord is that you need to generate a class for each schema you plan to use, in advance. Which again means you need the Avro schema in advance, to be able to generate the Java class.&lt;/p&gt;

&lt;h2&gt;
  
  
  Producing Avro messages using GenericRecord
&lt;/h2&gt;

&lt;p&gt;First, we prepare the properties the producer needs. We specify our brokers, serializers for the key and the value, as well as the URL for the Schema Registry. Then we instantiate the Kafka producer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaAvroSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaAvroSerializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SCHEMA_REGISTRY_URL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="nc"&gt;Producer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As you see, we are using String serializer for the keys and Avro for values. Notice that the producer expects _GenericRecord_s as the value of the Kafka record.&lt;/p&gt;

&lt;p&gt;The next step is to create an instance of an Avro record based on our schema:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;//avro schema&lt;/span&gt;
&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;simpleMessageSchema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
        &lt;span class="s"&gt;"{"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
        &lt;span class="s"&gt;" \"type\": \"record\","&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
        &lt;span class="s"&gt;" \"name\": \"SimpleMessage\","&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
        &lt;span class="s"&gt;" \"namespace\": \"com.codingharbour.avro\","&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
        &lt;span class="s"&gt;" \"fields\": ["&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
        &lt;span class="s"&gt;" {\"name\": \"content\", \"type\": \"string\", \"doc\": \"Message content\"},"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
        &lt;span class="s"&gt;" {\"name\": \"date_time\", \"type\": \"string\", \"doc\": \"Datetime when the message\"}"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
        &lt;span class="s"&gt;" ]"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
        &lt;span class="s"&gt;"}"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;//parse the schema&lt;/span&gt;
&lt;span class="nc"&gt;Schema&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Parser&lt;/span&gt; &lt;span class="n"&gt;parser&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Schema&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Parser&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="nc"&gt;Schema&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;parse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;simpleMessageSchema&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;//prepare the avro record&lt;/span&gt;
&lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="n"&gt;avroRecord&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;GenericData&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Record&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;avroRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"content"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Hello world"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;avroRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"date_time"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;now&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here, we specified the schema directly in the code. By parsing the schema we get a Schema object, which we use to instantiate a new GenericRecord. Finally, we set the record’s fields by name, using the put method.&lt;/p&gt;

&lt;p&gt;The last thing to do is create a Kafka record with null key and Avro record in the value and write it to a topic called avro-topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;//prepare the kafka record&lt;/span&gt;
&lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="s"&gt;"avro-topic"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;avroRecord&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="c1"&gt;//ensures record is sent before closing the producer&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flush&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;close&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Producing Avro messages using SpecificRecord
&lt;/h2&gt;

&lt;p&gt;Another way to produce the same record as above is to use the SpecificRecord interface. We will generate a Java cclass from the Avro schema using the avro-maven-plugin. We’ll add the plugin to our pom.xml:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;plugin&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;org.apache.avro&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;avro-maven-plugin&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;1.9.2&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;executions&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;execution&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;phase&amp;gt;&lt;/span&gt;generate-sources&lt;span class="nt"&gt;&amp;lt;/phase&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;goals&amp;gt;&lt;/span&gt;
                &lt;span class="nt"&gt;&amp;lt;goal&amp;gt;&lt;/span&gt;schema&lt;span class="nt"&gt;&amp;lt;/goal&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;/goals&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;configuration&amp;gt;&lt;/span&gt;
                &lt;span class="nt"&gt;&amp;lt;sourceDirectory&amp;gt;&lt;/span&gt;${project.basedir}/src/main/avro/&lt;span class="nt"&gt;&amp;lt;/sourceDirectory&amp;gt;&lt;/span&gt;
                &lt;span class="nt"&gt;&amp;lt;outputDirectory&amp;gt;&lt;/span&gt;${project.basedir}/target/generated-sources/avro/&lt;span class="nt"&gt;&amp;lt;/outputDirectory&amp;gt;&lt;/span&gt;
                &lt;span class="nt"&gt;&amp;lt;stringType&amp;gt;&lt;/span&gt;String&lt;span class="nt"&gt;&amp;lt;/stringType&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;/configuration&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;/execution&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;/executions&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/plugin&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Avro plugin is configured above to generate classes based on schemas in the &lt;strong&gt;src/main/avro&lt;/strong&gt; folder and to store the classes in the &lt;strong&gt;target/generated-sources/avro/&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;If you check the &lt;em&gt;src/main/avro&lt;/em&gt; folder, you will see the Avro schema for our SimpleMessage. It’s the same schema we used in the GenericRecord example above. When you execute mvn compile, the &lt;strong&gt;SimpleMessage&lt;/strong&gt; class will be generated in the target folder.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--0CGapT6G--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://i0.wp.com/codingharbour.com/wp-content/uploads/2020/04/SimpleMessage_arr2-2.png%3Fresize%3D285%252C349%26ssl%3D1" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--0CGapT6G--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://i0.wp.com/codingharbour.com/wp-content/uploads/2020/04/SimpleMessage_arr2-2.png%3Fresize%3D285%252C349%26ssl%3D1" alt="" width="285" height="349"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Then we’ll define properties for the Kafka producer, same as in the GenericRecord example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaAvroSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"schema.registry.url"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="nc"&gt;Producer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SpecificRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The only difference compared to the GenericRecord example is the type for the value of the Kafka record, which is now SpecificRecord.&lt;/p&gt;

&lt;p&gt;Next, we create the instance of the SimpleMessage:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;//create the specific record&lt;/span&gt;
&lt;span class="nc"&gt;SimpleMessage&lt;/span&gt; &lt;span class="n"&gt;simpleMessage&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;simpleMessage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setContent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Hello world"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;simpleMessage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setDateTime&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;now&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And lastly, we create a Kafka record and write it to the “avro-topic” topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SpecificRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="s"&gt;"avro-topic"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;simpleMessage&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="c1"&gt;//ensures record is sent before closing the producer&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flush&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;close&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note that both producers above have written to a topic called ‘avro-topic’. So we now have two records to consume. Let’s see how we can create the consumers.&lt;/p&gt;

&lt;h2&gt;
  
  
  Consuming Avro messages using GenericRecord
&lt;/h2&gt;

&lt;p&gt;The consumer that uses GenericRecord, does not need a schema nor a Java class generated from the schema. All the data will be obtained by the deserializer from the schema registry.&lt;/p&gt;

&lt;p&gt;First, we’ll create properties for the consumer and istantiate it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;GROUP_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"generic-record-consumer-group"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AUTO_OFFSET_RESET_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"earliest"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ENABLE_AUTO_COMMIT_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaAvroDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaAvroDeserializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SCHEMA_REGISTRY_URL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then we’ll subscribe our consumer to the ‘avro-topic’ topic and start listening for records:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;singleton&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"avro-topic"&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

&lt;span class="c1"&gt;//poll the record from the topic&lt;/span&gt;
&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMillis&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message content: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"content"&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message time: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"date_time"&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;commitAsync&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here we get the field values by name, using the &lt;strong&gt;Object get(String key)&lt;/strong&gt; method of GenericRecord.&lt;/p&gt;

&lt;h2&gt;
  
  
  Consuming Avro messages using SpecificRecord
&lt;/h2&gt;

&lt;p&gt;Last thing to show is how to consume Avro kafka record, which is automatically casted into proper Java class, generated from Avro schema.&lt;/p&gt;

&lt;p&gt;As before, we’ll start with preparing properties for the consumer and instantiating it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;GROUP_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"specific-record-consumer-group"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AUTO_OFFSET_RESET_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"earliest"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ENABLE_AUTO_COMMIT_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaAvroDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaAvroDeserializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SCHEMA_REGISTRY_URL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaAvroDeserializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SPECIFIC_AVRO_READER_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; &lt;span class="c1"&gt;//ensures records are properly converted&lt;/span&gt;

&lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Everything is the same as with the previous consumer, except the third line from the bottom. Let’s look at it again:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaAvroDeserializerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SPECIFIC_AVRO_READER_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This line is necessary if you want your Avro records to be properly converted into the excepted Java class (in our case, this is SimpleMessage).&lt;/p&gt;

&lt;p&gt;Now, all we have to do is subscribe our consumer to the topic and start consuming:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;singleton&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"avro-topic"&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

&lt;span class="c1"&gt;//poll the record from the topic&lt;/span&gt;
&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMillis&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SimpleMessage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message content: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getContent&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt; &lt;span class="c1"&gt;//1&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message time: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getDateTime&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt; &lt;span class="c1"&gt;//2&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;commitAsync&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You see above lines marked 1 and 2 how the fields of SimpleMessage are accessed using proper getter methods.&lt;/p&gt;

&lt;p&gt;There you have it, two ways to produce and two ways to consume Kafka Avro records. Hope this was helpful.&lt;/p&gt;

&lt;p&gt;As always, the code from this blog post is available on &lt;a href="https://github.com/codingharbour/kafka-avro"&gt;CodingHarbour’s github repo&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get &lt;strong&gt;absolutely free&lt;/strong&gt;. Sign up for it over at &lt;a href="https://codingharbour.com/"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>apacheavro</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>Why use Avro data format with Apache Kafka?</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Sat, 21 Mar 2020 18:27:05 +0000</pubDate>
      <link>https://dev.to/de_maric/why-use-avro-data-format-with-apache-kafka-hc9</link>
      <guid>https://dev.to/de_maric/why-use-avro-data-format-with-apache-kafka-hc9</guid>
      <description>&lt;p&gt;Avro is an open-source binary serialization format. But why use it with Kafka? Why not send JSON or XML messages? What benefits does it give us? This is what we’ll be exploring today.&lt;/p&gt;

&lt;h2&gt;
  
  
  Serialization
&lt;/h2&gt;

&lt;p&gt;It’s important to understand that records in a topic are just arrays of bytes. Kafka broker doesn’t care about the type of data we’re sending. To make those byte arrays useful in our applications, producers and consumers must know how to interpret them. Are we sending CSV data or JSON or XML? Producers and consumers use (de)serialization to transform data to byte arrays and back. Yet, that’s only the beginning.&lt;/p&gt;

&lt;h2&gt;
  
  
  Schema
&lt;/h2&gt;

&lt;p&gt;Whenever we use data as an integration mechanism, the schema of the data becomes very important, because it becomes the contract between producers and consumers. The teams behind producers and consumers need to agree on the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Which fields are in a record&lt;/li&gt;
&lt;li&gt;Which fields are optional and which are mandatory&lt;/li&gt;
&lt;li&gt;Where and how should this be documented&lt;/li&gt;
&lt;li&gt;Which default values should a consumer use for the fields that are missing from the record&lt;/li&gt;
&lt;li&gt;How should changes to the schema be handled&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Data format
&lt;/h2&gt;

&lt;p&gt;Now, you could use JSON with a JSON schema or use XML with an XSD schema to describe the message format. But there’s one downside with these: messages in these formats often use more space to convey the same information due to the nature of JSON and XML.&lt;/p&gt;

&lt;h2&gt;
  
  
  So is there a better way?
&lt;/h2&gt;

&lt;p&gt;Yes. You could use Apache Avro. Avro is a data serialization format that is developed under the Apache umbrella and is suggested to be used for Kafka messages by the creators of Apache Kafka themselves. Why?&lt;/p&gt;

&lt;p&gt;By serializing your data in Avro format, you get the following benefits:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Avro relies on a schema. This means every field is properly described and documented&lt;/li&gt;
&lt;li&gt;Avro data format is a compact binary format, so it takes less space both on a wire and on a disk&lt;/li&gt;
&lt;li&gt;It has support for a variety of programming languages&lt;/li&gt;
&lt;li&gt;in Avro, every message contains the schema used to serialize it. That means that when you’re reading messages, you always know how to deserialize them, even if the schema has changed&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Yet, there’s one thing that makes Avro not ideal for usage in Kafka, at least not out-of-the-box, because…&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Every Avro message contains the schema used to serialize the message&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Think about this for a moment: if you plan on sending millions of messages a day to Kafka, it’s a terrible waste of bandwidth and storage space to send the same schema information over and over again. &lt;/p&gt;

&lt;p&gt;So, the way to overcome this is to…&lt;/p&gt;

&lt;h2&gt;
  
  
  Separate the schema from the message
&lt;/h2&gt;

&lt;p&gt;That’s where a Schema Registry comes into play. Schema Registry is developed by Confluent, a company behind Apache Kafka, and it provides a RESTful interface for storing and receiving Avro schemas.&lt;/p&gt;

&lt;p&gt;Instead of sending the schema inside a Kafka record, a producer starts by checking whether schema already exists in the Schema Registry. If not, it will write the schema there (step 1 below). Then the producer will obtain the id of the schema (step 2) and send that id inside the record (step 3), saving a lot of space this way. The consumer will read the message (step 4) and then contact the Schema Registry with the schema id from the record to get the full schema (step 5) and cache it locally.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--IG0iOllg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/cr9m77zugwh05kir4ja2.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--IG0iOllg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/cr9m77zugwh05kir4ja2.jpg" alt="Schema registry"&gt;&lt;/a&gt;Registering and using an Avro schema&lt;/p&gt;

&lt;p&gt;Ok, that seems nice. But that’s not the only place where Schema Registry helps. Let’s see what happens when you want to…&lt;/p&gt;

&lt;h2&gt;
  
  
  Evolve the schema
&lt;/h2&gt;

&lt;p&gt;Imagine yourself in a situation where, 2 years after releasing the producer application, you decide to change the format of the message in a way that, in your opinion, doesn’t break the compatibility and thus should not affect the consumers. Now you have 2 options:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;you can either be a nice person, find all the consumers, check whether the suggested changes will affect them and if so, ask them to change&lt;/li&gt;
&lt;li&gt;or you can do the change and wait for the mob with torches, shovels, and rakes to come your way&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Assuming you chose to avoid the option that would put your picture on every train station with a monetary reward under it, what is the probable outcome of the option number one?&lt;/p&gt;

&lt;p&gt;Those who worked in a corporate environment know the answer: a lot of planning, budgeting, negotiating and sometimes even postponing because there are more pressing issues. Getting 10s or 100s of consumers to perform the upgrade before you can continue with the changes in the producer is a sure way to an asylum.&lt;/p&gt;

&lt;p&gt;Apache Avro and Schema Registry are coming to the rescue once again. Schema Registry allows us to enforce the rules for validating a schema compatibility when the schema is modified. If a new message breaks the schema compatibility, a producer will reject to write the message.&lt;/p&gt;

&lt;p&gt;This way your consumers are protected from e.g. someone suddenly changing the data type of the field from a long to a string or removing the mandatory field.&lt;/p&gt;

&lt;p&gt;At the same time, it gives you clear guidelines about which changes to the schema are allowed. So no longer weeks or months of coordinating the change just to remove an optional field in a message.&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get &lt;strong&gt;absolutely free&lt;/strong&gt;. Sign up for it over at &lt;a href="https://codingharbour.com/"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>introduction</category>
    </item>
    <item>
      <title>What is a consumer group in Kafka?</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Wed, 04 Mar 2020 16:10:37 +0000</pubDate>
      <link>https://dev.to/de_maric/what-is-a-consumer-group-in-kafka-49il</link>
      <guid>https://dev.to/de_maric/what-is-a-consumer-group-in-kafka-49il</guid>
      <description>&lt;p&gt;&lt;em&gt;Photo credit: &lt;a href="https://unsplash.com/@vladington"&gt;Stefan Vladimirov&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;When sending messages using a messaging system, you typically have two scenarios you want to achieve. Either you want to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;send a message to a targeted group of consumers (which might be just one consumer) or&lt;/li&gt;
&lt;li&gt;broadcast the message to all the consumers&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Kafka allows you to achieve both of these scenarios by using &lt;strong&gt;consumer groups&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Consumer group
&lt;/h2&gt;

&lt;p&gt;A consumer group is a group of consumers (I guess you didn’t see this coming?) that share the same group id. When a topic is consumed by consumers in the same group, every record will be delivered to only one consumer. As the official documentation states: “If all the consumer instances have the same consumer group, then the records will effectively be load-balanced over the consumer instances.”&lt;/p&gt;

&lt;p&gt;This way you can ensure parallel processing of records from a topic and be sure that your consumers won’t be stepping on each other toes.&lt;/p&gt;

&lt;h3&gt;
  
  
  How does Kafka achieve this?
&lt;/h3&gt;

&lt;p&gt;Each topic consists of one or more partitions. When a new consumer is started it will join a consumer group (this happens under the hood) and Kafka will then ensure that each partition is consumed by only one consumer from that group.&lt;/p&gt;

&lt;p&gt;So, if you have a topic with two partitions and only one consumer in a group, that consumer would consume records from both partitions.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--RXM2rWG---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/30y6rc0bsd337nzd4lan.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--RXM2rWG---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/30y6rc0bsd337nzd4lan.jpg" alt="Single consumer"&gt;&lt;/a&gt;A single consumer in a consumer group&lt;/p&gt;

&lt;p&gt;After another consumer joins the same group, each consumer would continue consuming only one partition.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--6akXLQu5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/8mkt13tryrjv7d98fbr6.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--6akXLQu5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/8mkt13tryrjv7d98fbr6.jpg" alt="Multiple consumers in a group"&gt;&lt;/a&gt;Multiple consumers in one consumer group&lt;/p&gt;

&lt;h3&gt;
  
  
  Does it mean if I want to have more than one consumer (from the same group) reading from one topic I need to have more than one partition?
&lt;/h3&gt;

&lt;p&gt;That is correct. If you have more consumers in a group than you have partitions, extra consumers will sit idle, since all the partitions are taken. If you know that you will need many consumers to parallelize the processing, then plan accordingly with the number of partitions.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--3jS6E40Y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/f0h3c07isaz6y1rhwokz.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--3jS6E40Y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/f0h3c07isaz6y1rhwokz.jpg" alt="Idle consumer"&gt;&lt;/a&gt;Additional consumers in a group sit idly&lt;/p&gt;

&lt;p&gt;When we talked about &lt;a href="https://codingharbour.com/apache-kafka/the-introduction-to-kafka-topics-and-partitions/"&gt;topics and partitions&lt;/a&gt;, I mentioned that a partition is a unit of parallelism from the consumer’s perspective. Now you know the reason – there is a direct link between the number of partitions and number of consumers from a group reading in parallel.&lt;/p&gt;

&lt;h3&gt;
  
  
  What if I want to consume the same record from multiple consumers?
&lt;/h3&gt;

&lt;p&gt;That is also possible. You can have many consumers reading the same records from the topic, as long as they all have different group ids.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--bGqBBnKI--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/4p6mqvqu3v75rz57qpdn.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--bGqBBnKI--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/4p6mqvqu3v75rz57qpdn.jpg" alt="Multiple consumer groups"&gt;&lt;/a&gt;Multiple consumers reading the same records from the topic&lt;/p&gt;

&lt;h3&gt;
  
  
  An example to recap
&lt;/h3&gt;

&lt;p&gt;Let’s illustrate what we’ve been talking about with an example.&lt;/p&gt;

&lt;p&gt;Let’s say we’re building an online store and it consists of few microservices that are sending events to each other: payment service, shipping service, and notification service. Once the payment service processes the payment it will send an event PaymentProcessed as a record on Kafka topic. Then we want both the shipping service and notification service to consume this record. The shipping service needs the record in order to start the shipping process, while the notification service wants to receive this record so it could send an email to the customer saying ‘&lt;em&gt;Your payment has been received&lt;/em&gt;‘. In this case, we want the PaymentProcessed record to be broadcasted to all the consumers.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--1fDwI-5a--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/la6aqug2r1apd5iin4oy.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--1fDwI-5a--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/la6aqug2r1apd5iin4oy.jpg" alt="Microservice communication through Kafka"&gt;&lt;/a&gt;Microservices communicating using Kafka&lt;/p&gt;

&lt;p&gt;Yet, if we have multiple instances of the consuming services, we always want exactly one of the instances to process each record. For example, we wouldn’t want multiple instances of the notification service to process the PaymentProcessed record and send multiple ‘Your payment has been received’ emails to the customer. Nor would we want multiple instances of shipping service to receive the same PaymentProcessed record and start the shipment process multiple times, potentially losing us money.&lt;/p&gt;

&lt;p&gt;To ensure the record reaches both the shipping and the notification service but only once, we would put all the payment service instances in one consumer group and put all the notification service instances in another consumer group.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--xREBnsSH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/si6lmplcfnirg0akqnpu.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--xREBnsSH--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/si6lmplcfnirg0akqnpu.jpg" alt="Records read by consumer groups"&gt;&lt;/a&gt;Each Kafka record will reach one shipping and one notification service&lt;/p&gt;

&lt;p&gt;This ensures that all the records are always read by both &lt;em&gt;shipping_group&lt;/em&gt; and &lt;em&gt;notification_group&lt;/em&gt;, but within those groups, one record will always go to only one instance. That’s what consumer groups enable us to do.&lt;/p&gt;

&lt;h2&gt;
  
  
  A consumer group and record offset
&lt;/h2&gt;

&lt;p&gt;If you remember when we talked about &lt;a href="https://codingharbour.com/apache-kafka/the-introduction-to-kafka-topics-and-partitions/"&gt;topics&lt;/a&gt;, we said that each record is uniquely identified by an offset in the partition. These offsets are used to track which record has been consumed by which consumer group.&lt;/p&gt;

&lt;p&gt;Kafka employs an approach of ‘a dumb pipeline, smart clients’ meaning that Kafka brokers don’t know anything about consumer offsets. The consumers themselves are in charge of tracking which records have been consumed. Once the consumer reads the record it will store this offset in a special Kafka topic called &lt;em&gt;__consumer_offsets&lt;/em&gt; (yes, those are two underscores at the beginning). When a consumer stores the offset in this topic we’re saying that it’s &lt;strong&gt;committing the offset&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;This enables consumers to always know which record should be consumed next from a given partition. Since the consumer offset is stored in Kafka, it means that the position of the consumer group is maintained even after restarts.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--v39uWacd--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/7ejvjqbjanvpn4d9ez0j.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--v39uWacd--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/7ejvjqbjanvpn4d9ez0j.jpg" alt="Consumer offsets"&gt;&lt;/a&gt;An example of consumer offsets&lt;/p&gt;

&lt;p&gt;In the &lt;a href="https://codingharbour.com/apache-kafka/the-introduction-to-kafka-topics-and-partitions/"&gt;topic post&lt;/a&gt;, I also mentioned that records remain in the topic even after being consumed. This allows multiple consumers to consume the same message, but it also allows one more thing: the same consumer can re-consume the records it already read, by simply rewinding its consumer offset. This is very useful when you e.g. had a bug in your consumer and want to re-read the records after fixing the bug.&lt;/p&gt;

&lt;p&gt;And there you have it, Kafka consumer groups in a nutshell.&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get &lt;strong&gt;absolutely free&lt;/strong&gt;. Sign up for it over at &lt;a href="https://codingharbour.com/"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>introduction</category>
    </item>
    <item>
      <title>How to delete records from a Kafka topic</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Sat, 29 Feb 2020 16:41:08 +0000</pubDate>
      <link>https://dev.to/de_maric/how-to-delete-records-from-a-kafka-topic-464g</link>
      <guid>https://dev.to/de_maric/how-to-delete-records-from-a-kafka-topic-464g</guid>
      <description>&lt;p&gt;&lt;em&gt;Photo credit: &lt;a href="https://unsplash.com/@adliwahid"&gt;Adli Wahid&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Every now and then I get a request from my colleagues who would like to delete some or all the records from a Kafka topic. The request usually comes after someone has produced the wrong data in a test topic while playing around or due to a bug in the producer code. Or simply because they want a clean slate.&lt;/p&gt;

&lt;p&gt;Whatever the reason, today I’ll show you a few ways to delete some or all the records from a Kafka topic.&lt;/p&gt;

&lt;p&gt;It should go without saying that you should use your best judgment and check (at least) twice before using the methods described below in a production environment.&lt;/p&gt;

&lt;h2&gt;
  
  
  Kafka-delete-records
&lt;/h2&gt;

&lt;p&gt;This command is available as part of Kafka CLI tools. It requires two parameters:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;a bootstrap server and &lt;/li&gt;
&lt;li&gt;a JSON file, describing which records should be deleted. &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The command allows you to delete all the records from the beginning of a partition, until the specified offset.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;NOTE: It is not possible to delete records in the middle of the topic.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The JSON file specifies one or more partitions from which we want to remove the records. Let’s create delete-records.json file as below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "partitions": [
        {
            "topic": "my-topic",
            "partition": 0,
            "offset": 3
        }
    ],
    "version": 1
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Here we’ve specified that for the partition 0 of the topic “my-topic” we want to delete all the records from the beginning until offset 3.&lt;/p&gt;

&lt;p&gt;Now we’re ready to delete records. Execute:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-delete-records --bootstrap-server localhost:9092 \
--offset-json-file delete-records.json
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;After the command finishes the start offset for the partition 0 will be 3.&lt;/p&gt;

&lt;h2&gt;
  
  
  Deleting all the records in a topic
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;NOTE: This will not work for compacted topics&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you want to prune all the messages, another option is to reduce the retention of the topic to a small value (e.g. 100ms), wait for the brokers to remove all the records from the topic and then set the topic retention to its original value. Here’s how to do it.&lt;/p&gt;

&lt;p&gt;First, set the retention.ms to 100 milliseconds.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-configs --zookeeper localhost:2181 \
--entity-type topics \
--entity-name my-topic \
--alter --add-config retention.ms=100
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Then, wait for the brokers to remove the messages with expired retention (that is, all of them). To know if the process is finished, check whether the start offset and end offset are the same. This means there are no more records available on the topic. Depending on your setup, it might take few minutes for Kafka to clean up the topic, so keep checking the start offset.&lt;/p&gt;

&lt;p&gt;Use the GetOffsetShell class to check the beginning and ending offset of a topic’s partitions. To check the end offset set parameter &lt;strong&gt;time&lt;/strong&gt; to value -1:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic my-topic \
--time -1
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;To check the start offset, use &lt;strong&gt;--time -2&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic my-topic \
--time -2
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Once the topic has been purged, return the retention.ms to its original value:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-configs --zookeeper localhost:2181 \
--entity-type topics \
--entity-name my-topic \
--alter --add-config retention.ms=&amp;lt;ORIGINAL VALUE&amp;gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Delete a topic and create it again
&lt;/h2&gt;

&lt;p&gt;Not as elegant as the previous two approaches, yet it might be an easier solution in some cases (e.g. if topic creation is scripted).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-topics --bootstrap-server localhost:9092 \
--topic my-topic \
--delete
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Then create it again:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-topics --bootstrap-server localhost:9092 \
--topic my-topic \
--create \
--partitions &amp;lt;number_of_partitions&amp;gt; \
--replication-factor &amp;lt;replication_factor&amp;gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h3&gt;
  
  
  Few things to be aware of when using this approach
&lt;/h3&gt;

&lt;p&gt;Make sure the deletion of topics is enabled in your cluster. Set &lt;strong&gt;delete.topic.enable=true&lt;/strong&gt;. From Kafka 1.0.0 this property is true by default.&lt;/p&gt;

&lt;p&gt;Make sure all consumers have stopped consuming the data from the topic you want to delete. Otherwise, they will throw errors like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Received unknown topic or partition error in fetch for partition my-topic-0
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;or&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Error while fetching metadata with correlation id 123 : {my-topic=LEADER_NOT_AVAILABLE}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;One more thing that might happen if you have consumers up and running is that the topic will get auto-created if the cluster-wide property &lt;strong&gt;auto.create.topics.enable&lt;/strong&gt; is true (and by default it is). Not bad per se, but it will use a default number of partitions (1) and a replication factor (1), which might not be what you wanted.&lt;/p&gt;

&lt;p&gt;Moral of the story is – make sure to stop your consumers before using this approach 🙂&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get &lt;strong&gt;absolutely free&lt;/strong&gt;. Sign up for it over at &lt;a href="https://codingharbour.com/"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>How to create a Kafka consumer in Java</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Sun, 24 Nov 2019 00:31:57 +0000</pubDate>
      <link>https://dev.to/de_maric/how-to-create-a-kafka-consumer-in-java-1563</link>
      <guid>https://dev.to/de_maric/how-to-create-a-kafka-consumer-in-java-1563</guid>
      <description>&lt;p&gt;&lt;em&gt;Photo credit: &lt;a href="https://unsplash.com/@kazukyakayashi"&gt;Kazuky Akayashi&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In this post, I’ll show you how to consume Kafka records in Java. We’ll read data from a topic called &lt;strong&gt;java_topic&lt;/strong&gt;. To test how our consumer is working, we’ll produce data using the Kafka CLI tool. And all this in under 5 minutes, so let’s jump right in.&lt;/p&gt;

&lt;h2&gt;
  
  
  Running a Kafka cluster locally
&lt;/h2&gt;

&lt;p&gt;We’ll use Docker Compose to run our local Kafka cluster. It will consist of one Zookeeper instance and one Kafka broker.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Don’t have docker-compose? Check: &lt;a href="https://docs.docker.com/compose/install/"&gt;how to install docker-compose&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;I’ve prepared a docker-compose file which you can grab from Coding Harbour’s GitHub:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;git clone https://github.com/codingharbour/kafka-docker-compose.git
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Once you have the project, navigate to a folder called &lt;em&gt;single-node-kafka&lt;/em&gt; and start the Kafka cluster:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The output should look something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;Creating network &lt;span class="s2"&gt;"single-node-kafka_default"&lt;/span&gt; with the default driver
Creating sn-zookeeper ... &lt;span class="k"&gt;done
&lt;/span&gt;Creating sn-kafka     ... &lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Your local Kafka cluster is now ready to be used. By running &lt;strong&gt;docker-compose ps&lt;/strong&gt; , we can see that the Kafka broker is available on port 9092. Make a note of that, because we’ll need it soon.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose ps
    Name                Command            State              Ports
&lt;span class="nt"&gt;-------------------------------------------------------------------------------&lt;/span&gt;
sn-kafka       /etc/confluent/docker/run   Up      0.0.0.0:9092-&amp;gt;9092/tcp
sn-zookeeper   /etc/confluent/docker/run   Up      2181/tcp, 2888/tcp, 3888/tcp
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Dependencies
&lt;/h2&gt;

&lt;p&gt;To be able to consumer records from Kafka we need the Kafka client library. Add this dependency to your pom.xml:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;org.apache.kafka&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;kafka-clients&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;2.4.1&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Check the latest version of kafka-clients library at &lt;a href="https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients"&gt;maven repository&lt;/a&gt;. At the time of writing it was 2.4.1.&lt;/p&gt;

&lt;h2&gt;
  
  
  Create a Kafka topic
&lt;/h2&gt;

&lt;p&gt;To create a topic we’ll use a Kafka CLI tool called &lt;strong&gt;kafka-topics&lt;/strong&gt; , that comes bundled with Kafka binaries. In our case, it means the tool is available in the docker container named &lt;em&gt;sn-kafka&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;First, open your favourite terminal and connect to the running Kafka container:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; sn-kafka /bin/bash
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now that we’re inside the container where we have access to Kafka CLI tools, let’s create our topic, called &lt;em&gt;java_topic&lt;/em&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafka-topics &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--create&lt;/span&gt; &lt;span class="nt"&gt;--topic&lt;/span&gt; java_topic &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--partitions&lt;/span&gt; 1 &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--replication-factor&lt;/span&gt; 1
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Creating a Kafka consumer
&lt;/h2&gt;

&lt;p&gt;There are a couple of properties we need to set up for Kafka consumer to work properly:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;GROUP_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"my-first-consumer-group"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AUTO_OFFSET_RESET_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"earliest"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ENABLE_AUTO_COMMIT_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="nc"&gt;Consumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;First, we specify the URL where our Kafka broker is running, by configuring &lt;em&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/em&gt; property. In cases when you have more than one broker (which is the usual scenario), you would specify them in a comma-separated string, e.g:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;"serverA:9092,serverB:9092,serverC:9092"
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Second, the consumer needs to know how to deserialize the records from a byte array. Kafka brokers are agnostic of the data types and are treating records as byte arrays. This means that producers need to know how to serialize data into byte arrays and consumers need to know how to deserialize it back.&lt;/p&gt;

&lt;p&gt;Each Kafka record consists of a key and a value. These can potentially be of different types, so the consumer needs to know which deserializer to use for key and which one to use for value. That’s where &lt;em&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/em&gt; and &lt;em&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/em&gt; come into play. In our case, we’re using String for both key and value.&lt;/p&gt;

&lt;p&gt;The next property is a consumer group id. It’s a unique identifier of a group of consumers and is used, among other things, to track which records have been consumed by the consumers in this group. If you want to learn more about consumer groups, check &lt;a href="https://dev.to/apache-kafka/what-is-a-consumer-group-in-kafka?"&gt;this post&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The property &lt;em&gt;AUTO_OFFSET_RESET_CONFIG&lt;/em&gt;, tells the consumer where it should start consuming messages the first time it starts. By default, a consumer will only consume messages that arrive to the topic &lt;strong&gt;after&lt;/strong&gt; the consumer is started for the first time. By setting the value to “earliest” we tell the consumer to read all the records that already exist in the topic.&lt;/p&gt;

&lt;p&gt;By default, Kafka consumer commits the offset periodically. Last property, &lt;em&gt;ENABLE_AUTO_COMMIT_CONFIG&lt;/em&gt;, tells the consumer that we’ll handle committing the offset in the code. I’ll show you how to do it soon.&lt;/p&gt;

&lt;h2&gt;
  
  
  Consuming the records
&lt;/h2&gt;

&lt;p&gt;Now that we have our consumer configured and created, it’s time to consume some data. In Kafka producers push the data to topics and consumers are frequently polling the topic(s) to check for new records. So the usual way is to poll for new records in an endless while loop and once there are new records, to process them. But before we can poll topic for records, we need to subscribe our consumer to one or more topics:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;singleton&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"java_topic"&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMillis&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message received: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;commitAsync&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;&lt;em&gt;consumer.poll(Duration)&lt;/em&gt; will return immediately if there are available records. Otherwise, it will block until either a record is available or timeout has expired. If timeout expired, the poll method will return an empty record set.&lt;/p&gt;

&lt;p&gt;The record processing would happen in the for loop. We are simply printing the value of the record to system out.&lt;/p&gt;

&lt;p&gt;Remember how we told the consumer library that we’ll be committing the offset manually? That’s what the line &lt;em&gt;consumer.commitAsync()&lt;/em&gt; is doing. You want to commit the offset after you’ve finished processing the message, not before.&lt;/p&gt;

&lt;p&gt;Now that we have our consumer in place, let’s produce some messages to test our consumer.&lt;/p&gt;

&lt;h2&gt;
  
  
  Producing messages for our consumer
&lt;/h2&gt;

&lt;p&gt;To produce the messages to this topic, we’ll use a CLI tool that comes bundled with Kafka binaries called &lt;em&gt;kafka-console-producer.&lt;/em&gt; This is a CLI tool that is part of Apache Kafka binaries and you can download it from the official website. Yet, since we’re using Kafka’s docker image, the CLI tools are already available in the Kafka broker’s container. To be able to use the tool we first need to connect to the container called &lt;em&gt;sn-kafka&lt;/em&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; sn-kafka /bin/bash
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now we can produce some messages. If you check the code in the previous section, you will see that our consumer is waiting for the messages to arrive on a topic called &lt;strong&gt;java_topic&lt;/strong&gt; , so let’s produce to that topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;kafka-console-producer &lt;span class="nt"&gt;--broker-list&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;--topic&lt;/span&gt; java_topic
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Producer tool will display the prompt, showing us that it is waiting for the message to send. Type the message and send it by pressing Enter.&lt;/p&gt;

&lt;h2&gt;
  
  
  Run the consumer
&lt;/h2&gt;

&lt;p&gt;Now that the record is ready in Kafka topic, we can start our consumer and check the output. The message we sent should quickly appear. You can go back to the terminal where kafka-console-producer is running and send a few more messages, then check the consumer to see the messages consumed.&lt;/p&gt;

&lt;h2&gt;
  
  
  Source code
&lt;/h2&gt;

&lt;p&gt;Source code for this example is available on &lt;a href="https://github.com/codingharbour/kafka-quick-start/blob/master/src/main/java/com/codingharbour/kafka/consumer/SimpleKafkaConsumer.java"&gt;Coding Harbour’s GitHub&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get &lt;strong&gt;absolutely free&lt;/strong&gt;. Sign up for it over at &lt;a href="https://codingharbour.com/"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>How to create a Kafka producer in Java</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Sun, 24 Nov 2019 00:28:49 +0000</pubDate>
      <link>https://dev.to/de_maric/how-to-create-a-kafka-producer-in-java-ap9</link>
      <guid>https://dev.to/de_maric/how-to-create-a-kafka-producer-in-java-ap9</guid>
      <description>&lt;p&gt;&lt;em&gt;Photo credit: &lt;a href="https://unsplash.com/@davealmine"&gt;Dawid Zawiła&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In this post, I will show you how to produce messages to Kafka from Java using the kafka-clients library. It takes less than 5 minutes and around 10 lines of code. Don’t believe me? Keep reading.&lt;/p&gt;

&lt;h2&gt;
  
  
  Running a Kafka cluster locally
&lt;/h2&gt;

&lt;p&gt;To be able to produce messages (known in Kafka world as &lt;strong&gt;records&lt;/strong&gt; ) to Kafka we need a Kafka cluster. At a minimum, a Kafka cluster consists of one Kafka server (called a broker) and it needs at least one Zookeeper node.&lt;/p&gt;

&lt;p&gt;To simplify our job, we will run these two servers as Docker containers, using docker-compose.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Don’t have docker-compose? Check: &lt;a href="https://docs.docker.com/compose/install/"&gt;how to install docker-compose&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;I’ve prepared a docker-compose file which you can grab from Coding Harbour’s GitHub:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;git clone https://github.com/codingharbour/kafka-docker-compose.git
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Once you have the project, navigate to a folder called &lt;em&gt;single-node-kafka&lt;/em&gt; and start the Kafka cluster:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The output should look something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;Creating network &lt;span class="s2"&gt;"single-node-kafka_default"&lt;/span&gt; with the default driver
Creating sn-zookeeper ... &lt;span class="k"&gt;done
&lt;/span&gt;Creating sn-kafka     ... &lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Your local Kafka cluster is now ready to be used. By running &lt;strong&gt;docker-compose ps&lt;/strong&gt; , we can see that the Kafka broker is available on port 9092. Make a note of that, because we’ll need it soon.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose ps
    Name                Command            State              Ports
&lt;span class="nt"&gt;-------------------------------------------------------------------------------&lt;/span&gt;
sn-kafka       /etc/confluent/docker/run   Up      0.0.0.0:9092-&amp;gt;9092/tcp
sn-zookeeper   /etc/confluent/docker/run   Up      2181/tcp, 2888/tcp, 3888/tcp
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Dependencies
&lt;/h2&gt;

&lt;p&gt;To be able to write records to Kafka we need the Kafka client library. Add the dependency to your pom.xml:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;org.apache.kafka&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;kafka-clients&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;2.4.1&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Check the latest version of kafka-clients library at &lt;a href="https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients"&gt;maven repository&lt;/a&gt;. At the time of writing it was 2.4.1.&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating the Kafka producer
&lt;/h2&gt;

&lt;p&gt;To create Kafka producer we need to provide it with few mandatory properties. I’ll explain them below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="nc"&gt;Producer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;First, the producer needs to know how to reach the Kafka broker. We specify the broker’s address by using the &lt;em&gt;ProducerConfig&lt;/em&gt;.&lt;em&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/em&gt; property. In cases when you have more than one broker (which is always the case in production), you would specify them in a comma-separated string, e.g:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;"serverA:9092,serverB:9092,serverC:9092"
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Second, the producer needs to know how to serialize the records to a byte array. Kafka brokers are agnostic of the data types we’re sending and are treating every record as an array of bytes. This means that producers need to know how to serialize data into byte arrays and consumers need to know how to deserialize it back.&lt;/p&gt;

&lt;p&gt;Each Kafka record consists of a key and a value. These can potentially be of different types, so the producer needs to know which serializer to use for key and which one to use for value. That’s where &lt;em&gt;KEY_SERIALIZER_CLASS_CONFIG&lt;/em&gt; and &lt;em&gt;VALUE_SERIALIZER_CLASS_CONFIG&lt;/em&gt; come into play. In our example, we’ll be sending strings, so we’re configuring StringSerializer.&lt;/p&gt;

&lt;h2&gt;
  
  
  Producing the record
&lt;/h2&gt;

&lt;p&gt;Ok, it’s time to produce some data. Our producer code will write one record when we run it and the record will contain a string telling us the current time when the record was created.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;recordValue&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"Current time is "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nc"&gt;Instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;now&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

&lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="s"&gt;"javatopic"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;recordValue&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flush&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The first parameter we need to set when creating the ProducerRecord is the topic to which we’re writing the record. Then we set the key and the value of the record. The key is optional and in this case, we won’t be setting it. The value is, as mentioned, the string with the timestamp when we created the record. After everything is set, we call the send method of the producer.&lt;/p&gt;

&lt;p&gt;The last line forces the producer to write the record to the topic right away. You would expect that a call to &lt;em&gt;producer.send(record)&lt;/em&gt; would write data to Kafka immediately, but what happens under the hood is that the records are queued to be sent in batches. This way Kafka producer optimizes for throughput and latency since network operations are expensive. Yet, since we’re only producing a single message before shutting down our application, we want to tell the producer to send it right away.&lt;/p&gt;

&lt;p&gt;And that’s it. If you now run the application you will produce the message to Kafka.&lt;/p&gt;

&lt;p&gt;Have you done so? Ok, then let’s read the message, to ensure it’s really there 🙂&lt;/p&gt;

&lt;h2&gt;
  
  
  Check the record is in Kafka topic
&lt;/h2&gt;

&lt;p&gt;We’ll use &lt;em&gt;kafka-console-consumer&lt;/em&gt; utility to validate our message is written to the topic.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;kafka-console-consumer&lt;/em&gt; is a CLI tool that is part of Apache Kafka binaries and you can download it from the official website. Yet, since we’re using Kafka’s docker image, the CLI tools are already available in the Kafka broker’s container. To be able to use the tool we first need to connect to the container called &lt;em&gt;sn-kafka&lt;/em&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it sn-kafka /bin/bash
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now, run &lt;em&gt;kafka-console-consumer&lt;/em&gt; using the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-consumer --bootstrap-server localhost:9092 --topic javatopic --from-beginning
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;After few moments you should see the message.&lt;/p&gt;

&lt;p&gt;Congratulations, you have produced the message to Kafka from java, and it only took few lines of code 🙂&lt;/p&gt;

&lt;h2&gt;
  
  
  Source code
&lt;/h2&gt;

&lt;p&gt;The entire example is available on &lt;a href="https://github.com/codingharbour/kafka-quick-start/blob/master/src/main/java/com/codingharbour/kafka/producer/SimpleKafkaProducer.java"&gt;Coding Harbour’s github&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get &lt;strong&gt;absolutely free&lt;/strong&gt;. Sign up for it over at &lt;a href="https://codingharbour.com/"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>How to get started with Apache Kafka in 5 minutes</title>
      <dc:creator>Dejan Maric</dc:creator>
      <pubDate>Sun, 24 Nov 2019 00:13:13 +0000</pubDate>
      <link>https://dev.to/de_maric/how-to-get-started-with-apache-kafka-in-5-minutes-18k5</link>
      <guid>https://dev.to/de_maric/how-to-get-started-with-apache-kafka-in-5-minutes-18k5</guid>
      <description>&lt;p&gt;&lt;em&gt;Photo credit:&lt;a href="https://unsplash.com/@tateisimikito"&gt;Jukan Tateisi&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In this post, I will show you how to run an Apache Kafka cluster on your machine and how to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;create a topic that will keep the data&lt;/li&gt;
&lt;li&gt;produce messages to topic and &lt;/li&gt;
&lt;li&gt;consume those records&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;using only the CLI tools that come with Apache Kafka. All this in 5 minutes.&lt;/p&gt;

&lt;p&gt;Ok, are you ready? Let’s get started!&lt;/p&gt;

&lt;h2&gt;
  
  
  Running a Kafka cluster locally
&lt;/h2&gt;

&lt;p&gt;To be able to produce messages to and consume them from Kafka we need… uhm, the Kafka cluster. At a minimum, a Kafka cluster consists of one Kafka server (called broker) and it needs at least one Zookeeper node.&lt;/p&gt;

&lt;p&gt;To simplify our job, we will run these two servers using docker-compose.&lt;/p&gt;

&lt;p&gt;Don’t have docker-compose? Check: &lt;a href="https://docs.docker.com/compose/install/"&gt;how to install docker-compose&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;I’ve prepared a docker-compose file which you can grab from Coding Harbour’s GitHub:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;git clone https://github.com/codingharbour/kafka-docker-compose.git
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Once you have the project, navigate to a folder called &lt;em&gt;single-node-kafka&lt;/em&gt; and start the Kafka cluster:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up -d
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The output should look something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Creating network "single-node-kafka_default" with the default driver
Creating sn-zookeeper ... done
Creating sn-kafka ... done
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Your local Kafka cluster is now ready to be used. By running &lt;strong&gt;docker-compose ps&lt;/strong&gt; , as shown below, we can see that Kafka broker is available on port 9092.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker-compose ps
    Name                Command            State              Ports
-------------------------------------------------------------------------------
sn-kafka       /etc/confluent/docker/run   Up      0.0.0.0:9092-&amp;gt;9092/tcp
sn-zookeeper   /etc/confluent/docker/run   Up      2181/tcp, 2888/tcp, 3888/tcp
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Create a topic
&lt;/h2&gt;

&lt;p&gt;Now that we have our cluster up and running, we’ll create a topic.&lt;/p&gt;

&lt;p&gt;A topic is a way to organize messages. A producer is always sending a message to a particular topic and consumers are always reading messages from a particular topic.&lt;/p&gt;

&lt;p&gt;To create a topic we’ll use a Kafka CLI tool called &lt;strong&gt;kafka-topics&lt;/strong&gt; , that comes with Kafka. In our case, it means the tool is available in the docker container named &lt;em&gt;sn-kafka&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;First, open your favourite terminal and connect to the running Kafka container:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it sn-kafka /bin/bash
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now that we’re inside the container where we have access to Kafka CLI tools, let’s create our topic, called &lt;em&gt;first_topic&lt;/em&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-topics --bootstrap-server localhost:9092 \
--create --topic first_topic \
--partitions 1 \
--replication-factor 1
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Check that the topic is crated by listing all the topics:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-topics --bootstrap-server localhost:9092 --list
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The output should resemble the one below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;__consumer_offsets
first_topic
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Produce messages
&lt;/h2&gt;

&lt;p&gt;Next, let’s produce a message to a Kafka topic we just created. For this, we will use a Kafka command-line tool called &lt;strong&gt;kafka-console-producer&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;In the same terminal window run &lt;em&gt;kafka-console-producer&lt;/em&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-producer --broker-list localhost:9092 --topic first_topic
&amp;gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Producer tool will display the prompt, showing us that it is waiting for the message to send. Type the message and send it by pressing Enter.&lt;/p&gt;

&lt;p&gt;Congratulations, you have successfully produced the message to the topic called &lt;em&gt;first_topic&lt;/em&gt;. Now it’s time to consume that message.&lt;/p&gt;

&lt;h2&gt;
  
  
  Consuming messages
&lt;/h2&gt;

&lt;p&gt;As with producing, we’ll use the CLI tool already available in the kafka broker’s docker container. The tool is called &lt;strong&gt;kafka-console-consumer&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Let’s start by opening a new tab/window of the terminal and connecting to Kafka broker’s container:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it sn-kafka /bin/bash
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now, run &lt;em&gt;kafka-console-consumer&lt;/em&gt; using the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-consumer --bootstrap-server localhost:9092 \
--topic first_topic
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Aaaand….nothing happens. You're probably wondering where the message you just produced is. You need to know that unless told otherwise Kafka consumer reads only new messages (a.k.a. those arriving to the topic after the consumer is started). &lt;/p&gt;

&lt;p&gt;Let's fix that. Stop the consumer by pressing Ctrl+C.&lt;/p&gt;

&lt;p&gt;To read the messages that existed in the topic before we started the consumer, we must add the &lt;em&gt;–from-beginning&lt;/em&gt; parameter to the kafka-console-consumer command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-console-consumer --bootstrap-server localhost:9092 \
--topic first_topic \
--from-beginning
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;And (after few seconds) there’s our message. If you now go back to the producer window and send a few more messages, you will see them immediately appear in the consumer window.&lt;/p&gt;

&lt;p&gt;Congratulations, you’ve now managed to send messages to a Kafka topic called &lt;em&gt;first_topic&lt;/em&gt; and to consume messages, both old and new, from the same topic using only CLI tools provided with Kafka.&lt;/p&gt;

&lt;p&gt;You can now stop the producer and the consumer by pressing &lt;strong&gt;Ctrl+C&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Would you like to learn more about Kafka?
&lt;/h2&gt;

&lt;p&gt;I have created a Kafka mini-course that you can get &lt;strong&gt;absolutely free&lt;/strong&gt;. Sign up for it over at the &lt;a href="https://codingharbour.com/"&gt;Coding Harbour&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>tutorial</category>
      <category>docker</category>
    </item>
  </channel>
</rss>
