<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Zeliot </title>
    <description>The latest articles on DEV Community by Zeliot  (zeliotofficial).</description>
    <link>https://dev.to/zeliotofficial</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.us-east-2.amazonaws.com%2Fuploads%2Forganization%2Fprofile_image%2F2710%2Fd31008f8-2415-45c7-81bb-fbc19ad30ca7.png</url>
      <title>DEV Community: Zeliot </title>
      <link>https://dev.to/zeliotofficial</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/zeliotofficial"/>
    <language>en</language>
    <item>
      <title>Unlock Kafka Schemas with Karapace: A Hands-On Guide</title>
      <dc:creator>Sachin Kamath</dc:creator>
      <pubDate>Thu, 18 Jun 2026 11:29:21 +0000</pubDate>
      <link>https://dev.to/zeliotofficial/unlock-kafka-schemas-with-karapace-a-hands-on-guide-4kp6</link>
      <guid>https://dev.to/zeliotofficial/unlock-kafka-schemas-with-karapace-a-hands-on-guide-4kp6</guid>
      <description>&lt;h2&gt;
  
  
  TL;DR
&lt;/h2&gt;

&lt;p&gt;Kafka handles event streaming: producers write to topics, consumers read. JSON parsing lags at scale, so use Karapace Schema Registry (open-source Confluent alternative) with Protobuf for efficient, schema-enforced serialization. This Java Spring Boot guide shows producer/consumer setup, compile .proto schemas, use TopicNameStrategy, SCRAM/Basic auth tested on Condense's Kafka+Karapace. Ideal for structured telematics; avoid for ad-hoc data.&lt;/p&gt;

&lt;h2&gt;
  
  
  A Brief Introduction to Kafka
&lt;/h2&gt;

&lt;p&gt;Kafka is a distributed system consisting of servers and clients that communicate using the TCP network protocol. Kafka is used as an event-based system that records a specific event that has happened.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;For example "Transaction of 500 USD has occurred in Alice's account."&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Events are created and written to Kafka by client applications called Producers and consumed by client applications called Consumers. Kafka provides the feature to create topics, which can be used to group events as per the user's requirements. Single or multiple producers can produce events/messages to a single topic, and single or multiple consumers can consume from one topic.&lt;/p&gt;

&lt;h2&gt;
  
  
  Common Throughput Issues When Operating at Scale
&lt;/h2&gt;

&lt;p&gt;By default, Kafka producers and consumers exchange event data in the String UTF-8 format. Even though Kafka offers features to compress the data using gzip and other algorithms, this creates additional overhead on the producers and consumers to decompress the data and then validate it. Most events are generally sent in the JSON format, which is helpful for human readability, but this slows down the whole pipeline if high throughput is needed. The effects can only be felt on production clusters where the incoming data transfer rate is high, but the consumers start lagging due to complex JSON deserialization if there are nested JSONs present.&lt;/p&gt;

&lt;p&gt;To overcome this issue, we need a different serialization and deserialization format that can reduce the time taken to construct or deconstruct the binary data sequence and ensure that the data follows a certain schema. We can either write our own serializer and deserializer or use open-source libraries from Confluent to achieve it. Confluent offers JSON, Avro, and Protobuf serde (serializer and deserializers) to compact the data and enforce a schema on it.&lt;/p&gt;

&lt;h2&gt;
  
  
  Karapace Schema Registry &amp;amp; Kafka
&lt;/h2&gt;

&lt;p&gt;Karapace is an open source near drop-in replacement for the Confluent Schema Registry, which aims to be compatible with the Confluent Schema Registry Client libraries, which are offered in Java, C#, etc. In this blog post, we will be looking at the Java libraries for implementing a producer and consumer.&lt;/p&gt;

&lt;p&gt;The Schema Registry is a separate component from Kafka, as it is not offered by Apache. It is designed to run as a separate node that is dependent on Kafka, but Kafka does not need Schema Registry as a core functionality. This can lead to confusion as Kafka does not enforce the schema for a particular topic or record, but the producer and consumer can choose to do so by using the Schema Registry. Unlike a relational database schema, where a table can enforce a very strict schema down to the datatypes, size of the data in a column, and other constraints regardless of the users connecting to it, different Kafka producers could still write very different data to a topic while not following a schema or format, causing downstream issues in a pipeline.&amp;nbsp; To avoid this, we can use different strategies to minimize the chances of schema drift or unintended data storage in topics, which we will cover in this post.&lt;/p&gt;

&lt;h2&gt;
  
  
  DISCLAIMER
&lt;/h2&gt;

&lt;p&gt;&lt;em&gt;Setting up Kafka and Schema Registry in a local environment requires a lot of configuration steps. I will be using a platform called Condense by the company Zeliot, which provides an integrated Kubernetes + Kafka + Schema Registry (Karapace), along with the options to set ACLs for Kafka and Schema Registry from a browser UI instead of a command line. I will cover setting up a local Kafka cluster with Schema Registry in a separate blog post.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  How Kafka &amp;amp; Schema Registry Work Together
&lt;/h2&gt;

&lt;p&gt;Since Kafka and Schema Registry are separate components, only the producers and consumers are responsible for setting up a loosely coupled connection between them. A basic flow diagram of how they work together is given below:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.us-east-2.amazonaws.com%2Fuploads%2Farticles%2Fw4q4d9qf4yvzlfpzfa2h.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.us-east-2.amazonaws.com%2Fuploads%2Farticles%2Fw4q4d9qf4yvzlfpzfa2h.jpg" alt=" " width="799" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here, this architecture assumes that we are using the Confluent Schema Registry Client in both the producer and consumer. We can use compiled classes without the Schema Registry, but that approach has disadvantages, such as a lack of versioning support without downtime, maintenance of compiled classes, complex logic, and library setup needed to generate and use compiled classes.&lt;/p&gt;

&lt;p&gt;With Kafka 4.0 and its KRaft-only architecture, schema registry compatibility is an important consideration for teams upgrading.&lt;/p&gt;

&lt;p&gt;The process of generating a message in JSON, Avro, or Protobuf format involves the producer specifying the naming strategy and version of the schema (default is the latest version), which is fetched from the Confluent client, and then we need to use the format-specific libraries as specified in the next sections to convert the message from String format to the required format. Note that the message format must be the same as specified in the schema type, or else the Kafka producer library will throw a serialization exception. After serialization, this message is sent to the Kafka topic, which is specified.&lt;/p&gt;

&lt;p&gt;For the consumer, the Confluent client library automatically sees the 1st byte called the magic byte, which determines whether the message is using the Schema Registry or not. 1 means yes and 0 means no. If yes, the next 4 bytes of the message determine the schema ID present in the Registry, and the client automatically fetches the correct schema for the message. Then the message is deserialized into the String UTF-8 format and can be viewed and processed normally.&lt;/p&gt;

&lt;h2&gt;
  
  
  Subject Naming Strategies
&lt;/h2&gt;

&lt;p&gt;There are 3 different naming strategies offered by the Confluent Schema Registry Client libraries to determine how the serializer libraries fetch the schema to correctly serialize the message.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;TopicName Strategy&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This is the default strategy for naming the subject and is &lt;code&gt;&amp;lt;topic-name&amp;gt;&lt;/code&gt; + &lt;code&gt;”-value”&lt;/code&gt;. If a topic name is &lt;code&gt;car-topic&lt;/code&gt;, then the corresponding subject name is &lt;code&gt;car-topic-value&lt;/code&gt;. For the key, the subject name is &lt;code&gt;&amp;lt;topic-name&amp;gt;&lt;/code&gt; + &lt;code&gt;”-key”&lt;/code&gt;.  The serializer and deserializer using this strategy assume that all the messages in this topic correspond to this schema, and the presence of messages in other formats can disrupt the consumer with errors. Best used in cases where the topic name is used to differentiate the type of data present in the topic.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;RecordNameStrategy&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This is a naming strategy for naming the subject using the record name of the schema (in the above example, the Record Name is Car). This does not tie the producer and consumer to a particular topic, but consuming messages in other formats/schemas can cause the consumer to crash if error-handling mechanisms aren’t implemented correctly. Best used in cases where producers need to immediately push data into any available topic with a high number of partitions, replication factors, etc.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;TopicRecordNameStrategy&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This strategy uses both the topic name and the record name to determine the subject name. This strategy can be used when we are sure that a topic can have multiple record types.&lt;/p&gt;

&lt;h2&gt;
  
  
  Setting Up a Producer with Protobuf Schema
&lt;/h2&gt;

&lt;p&gt;First, we need to register a subject within Karapace, which defines the schema of the message containing the fields and their data types. I am defining the following proto schema:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight protobuf"&gt;&lt;code&gt;&lt;span class="na"&gt;syntax&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"proto3"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;example.GenericProducer.schema&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;option&lt;/span&gt; &lt;span class="na"&gt;java_package&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"com.example.GenericProducer.schema"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;option&lt;/span&gt; &lt;span class="na"&gt;java_outer_classname&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"CarProto"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;Car&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;carId&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;carNumber&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="na"&gt;speed&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="na"&gt;latitude&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="na"&gt;longitude&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This defines a schema in which the data should contain the fields carId, carNumber of type string, and speed, latitude, and longitude of type double. To get a higher data generation speed, I am using compiled classes for Protobuf.&lt;/p&gt;

&lt;p&gt;In your Spring Java project, create a folder &lt;code&gt;src/main/java&lt;/code&gt; named schema and create a file named Car.proto. Paste the above schema and modify your Java package and java_package to your requirements.&lt;/p&gt;

&lt;p&gt;Next, install the protobuf compiler for your OS by following the instructions &lt;a href="https://protobuf.dev/installation/" rel="noopener noreferrer"&gt;here&lt;/a&gt;. Next, go to the root directory of your project and enter the command to compile the proto file and generate a compiled class. Modify the path if needed. The first parameter sets the path for the output compiled class, and the second accepts the path for the proto file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;protoc &lt;span class="nt"&gt;--java_out&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;src/main/java/ src/main/java/schema/car.proto

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can check the generated file to see if it is created or not. Do not edit the file.&lt;/p&gt;

&lt;p&gt;Next, we can add the required dependencies  pom.xml for the producer application. Below are the following dependencies. We can use the latest versions of these libraries from the mvn repository and the Confluent repository.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;org.apache.kafka&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;kafka-clients&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;3.9.0&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;io.confluent&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;kafka-protobuf-serializer&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;8.0.1&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;io.confluent&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;kafka-protobuf-provider&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;8.0.1&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;com.google.protobuf&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;protobuf-java&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;4.33.4&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt; 

  &lt;span class="nt"&gt;&amp;lt;scope&amp;gt;&lt;/span&gt;compile&lt;span class="nt"&gt;&amp;lt;/scope&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt; 

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To add the io.confluent packages, we need to add the Confluent repository, as these packages are not present in the Maven repository.&lt;/p&gt;

&lt;p&gt;We can add it just before the &lt;code&gt;&amp;lt;/project&amp;gt;&lt;/code&gt; tag in the &lt;code&gt;pom.xml&lt;/code&gt; like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;...

&lt;span class="nt"&gt;&amp;lt;/plugin&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;/plugins&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;/build&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;repositories&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;repository&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;id&amp;gt;&lt;/span&gt;confluent&lt;span class="nt"&gt;&amp;lt;/id&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;name&amp;gt;&lt;/span&gt;Confluent Maven Repository&lt;span class="nt"&gt;&amp;lt;/name&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;url&amp;gt;&lt;/span&gt;https://packages.confluent.io/maven/&lt;span class="nt"&gt;&amp;lt;/url&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;/repository&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;/repositories&amp;gt;&lt;/span&gt; 

&lt;span class="nt"&gt;&amp;lt;/project&amp;gt;&lt;/span&gt; 

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Since this is a sample producer, we shall randomly generate data every 5 seconds to Kafka to simulate telematics data. We shall define a class just like our schema.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.AllArgsConstructor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Data&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.NoArgsConstructor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 


&lt;span class="nd"&gt;@Data&lt;/span&gt; 

&lt;span class="nd"&gt;@AllArgsConstructor&lt;/span&gt; 

&lt;span class="nd"&gt;@NoArgsConstructor&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Car&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;carId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;carNumber&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="n"&gt;speed&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="n"&gt;latitude&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="n"&gt;longitude&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, we shall define a KafkaProducer as a Spring component that can be invoked once within a service and reused.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Component&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducerClient&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 


&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;FALSE&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"false"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;CLIENT_ID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"car-producer"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;SECURITY_PROTOCOL&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"SASL_PLAINTEXT"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;SASL_MECHANISM&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"SCRAM-SHA-512"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;SASL_JAAS_CONFIG&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;bootstrapServerUrl&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 


&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;KafkaProducerClient&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${kafka.bootstrap.server.url}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;bootstrapServerUrl&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;bootstrapServerUrl&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bootstrapServerUrl&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;K&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;V&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;K&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;V&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getDefaultProducerClientWithoutPartitioner&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${schema.registry.url}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;schemaRegistryUrl&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;props&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apache&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;kafka&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;common&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;serialization&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;StringSerializer&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 


&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_SERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 


&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bootstrapServerUrl&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CLIENT_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;CLIENT_ID&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;RETRIES_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"0"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ProducerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MAX_BLOCK_MS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"3000"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AdminClientConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SECURITY_PROTOCOL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;SECURITY_PROTOCOL&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SaslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SASL_MECHANISM&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;SASL_MECHANISM&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SaslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SASL_JAAS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;format&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;SASL_JAAS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AbstractKafkaSchemaSerDeConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SCHEMA_REGISTRY_URL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;schemaRegistryUrl&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"basic.auth.credentials.source"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"USER_INFO"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"basic.auth.user.info"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;":"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AbstractKafkaSchemaSerDeConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AUTO_REGISTER_SCHEMAS&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AbstractKafkaSchemaSerDeConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;LATEST_COMPATIBILITY_STRICT&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"false"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AbstractKafkaSchemaSerDeConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_SUBJECT_NAME_STRATEGY&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;  

&lt;span class="s"&gt;"io.confluent.kafka.serializers.subject.TopicNameStrategy"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here, we set the properties of the key and value serializers to String and Protobuf, respectively. We then set the authentication mechanism for the Kafka cluster (SCRAM-SHA-512 in my case) with the username and password.&lt;/p&gt;

&lt;p&gt;I am using basic authentication in Karapace, so we need to set the &lt;code&gt;"basic.auth.credentials.source"&lt;/code&gt; to &lt;code&gt;“USER_INFO”&lt;/code&gt; which sets the KafkaProducer property to authenticate with the Schema registry using the Basic Authentication headers. We then need to set &lt;code&gt;"basic.auth.user.info"&lt;/code&gt; the user’s credentials in the format shown in the code. If we are not using any authentication for the Schema Registry, we can omit these properties.&lt;/p&gt;

&lt;p&gt;I am using the Topic Name Strategy for subjects for simplicity and demo purposes for the &lt;code&gt;AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY&lt;/code&gt; property. We can set &lt;code&gt;AUTO_REGISTER_SCHEMAS&lt;/code&gt; to true if the schema doesn’t exist in the registry, or it will create a new version of the schema if the underlying compiled class schema has changed. The created subject will follow the naming strategy as defined in the above property. To allow compatibility between various versions and generate messages, we have set the &lt;code&gt;LATEST_COMPATIBILITY_STRICT&lt;/code&gt; to false.&lt;/p&gt;

&lt;p&gt;Now, we shall set up a random data generator class to generate some sample data.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Component&lt;/span&gt; 

&lt;span class="nd"&gt;@NoArgsConstructor&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RandomCarDataGenerator&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 


&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${CAR_ID}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;carIDString&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 


&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${CAR_NUMBER}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;carNumberString&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 


&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Random&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Random&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 


&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Car&lt;/span&gt; &lt;span class="nf"&gt;generateRandomCar&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="nc"&gt;Car&lt;/span&gt; &lt;span class="n"&gt;car&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Car&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;carId&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;carIDString&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;carNumber&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;carNumberString&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;carId&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;carId&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;carId&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"default-car-id"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;carNumber&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;carNumber&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;carNumber&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"default-car-number"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setCarId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;carId&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setCarNumber&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;carNumber&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setSpeed&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;generateRandomSpeed&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt; 

&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setLatitude&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;generateRandomLatitude&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt; 

&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setLongitude&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;generateRandomLongitude&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt; 

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 


&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="nf"&gt;generateRandomLatitude&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="c1"&gt;// Range: -90 to 90 &lt;/span&gt;

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;90&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;180&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextDouble&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 


&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="nf"&gt;generateRandomLongitude&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="c1"&gt;// Range: -180 to 180 &lt;/span&gt;

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;180&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;360&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextDouble&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 


&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="nf"&gt;generateRandomSpeed&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="c1"&gt;// Random speed between 0 and 180 km/h &lt;/span&gt;

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextDouble&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;180&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 


&lt;span class="nc"&gt;Next&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;we&lt;/span&gt; &lt;span class="n"&gt;shall&lt;/span&gt; &lt;span class="n"&gt;set&lt;/span&gt; &lt;span class="n"&gt;up&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt;&lt;span class="err"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;which&lt;/span&gt; &lt;span class="n"&gt;initializes&lt;/span&gt; &lt;span class="n"&gt;a&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt; &lt;span class="n"&gt;and&lt;/span&gt; &lt;span class="n"&gt;has&lt;/span&gt; &lt;span class="n"&gt;a&lt;/span&gt; &lt;span class="n"&gt;lightweight&lt;/span&gt; &lt;span class="n"&gt;function&lt;/span&gt; &lt;span class="n"&gt;to&lt;/span&gt; &lt;span class="n"&gt;convert&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;generated&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="n"&gt;into&lt;/span&gt; &lt;span class="n"&gt;a&lt;/span&gt; &lt;span class="nc"&gt;Protobuf&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="n"&gt;using&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;earlier&lt;/span&gt; &lt;span class="n"&gt;generated&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="nc"&gt;CarProto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Car&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt; &lt;span class="nc"&gt;Ensure&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;environment&lt;/span&gt; &lt;span class="n"&gt;variables&lt;/span&gt; &lt;span class="n"&gt;are&lt;/span&gt; &lt;span class="n"&gt;configured&lt;/span&gt; &lt;span class="n"&gt;properly&lt;/span&gt; &lt;span class="n"&gt;to&lt;/span&gt; &lt;span class="n"&gt;avoid&lt;/span&gt; &lt;span class="n"&gt;errors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;


&lt;span class="nd"&gt;@Service&lt;/span&gt; 

&lt;span class="nd"&gt;@RequiredArgsConstructor&lt;/span&gt; 

&lt;span class="nd"&gt;@Slf4j&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ProtobufProducer&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 



&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;PROTO_TOPIC&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"test-schema-car-protobuf"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducerClient&lt;/span&gt; &lt;span class="n"&gt;kafkaProducerClient&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;CarProto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Car&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;protoProducer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${schema.registry.url}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;schemaRegistryUrl&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${kafka.username}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${kafka.password}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="nd"&gt;@PostConstruct&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;initProtoProducer&lt;/span&gt;&lt;span class="o"&gt;(){&lt;/span&gt; 

&lt;span class="n"&gt;protoProducer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;kafkaProducerClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDefaultProducerClientWithoutPartitioner&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt; 

&lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;schemaRegistryUrl&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 



&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;produceCarProto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Car&lt;/span&gt; &lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="c1"&gt;// Direct conversion from POJO to Protobuf - NO JSON overhead! &lt;/span&gt;

&lt;span class="nc"&gt;CarProto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Car&lt;/span&gt; &lt;span class="n"&gt;protoCar&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;CarProto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newBuilder&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; 

&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setCarId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCarId&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; 

&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setCarNumber&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCarNumber&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; 

&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setSpeed&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSpeed&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; 

&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setLatitude&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getLatitude&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; 

&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setLongitude&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getLongitude&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; 

&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 



&lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;CarProto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Car&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;producerRecord&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;  

&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="no"&gt;PROTO_TOPIC&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCarId&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;protoCar&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;protoProducer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;producerRecord&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;exception&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;exception&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Produced Proto message topic={} partition={} offset={}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; 

&lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error producing Proto message"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;exception&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;});&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error producing Proto message"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above code initializes an KafkaProducer at startup and whenever the &lt;code&gt;producerCarProto&lt;/code&gt; is called, the builder associated with the &lt;code&gt;CarProto.Car&lt;/code&gt; is built from the car object and a ProducerRecord is initialized with the &lt;code&gt;CarProto.Car&lt;/code&gt; type. Then the protobuf message is sent to the Kafka topic &lt;code&gt;test-schema-car-protobuf&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Now, we shall initialize a class to periodically send messages every 5 seconds.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Service&lt;/span&gt; 

&lt;span class="nd"&gt;@RequiredArgsConstructor&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ProducerScheduleService&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 



&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;RandomCarDataGenerator&lt;/span&gt; &lt;span class="n"&gt;carDataGenerator&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;ProtobufProducer&lt;/span&gt; &lt;span class="n"&gt;protobufProducer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;AvroProducer&lt;/span&gt; &lt;span class="n"&gt;avroProducer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="nd"&gt;@Scheduled&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fixedRate&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;5000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;produceCarToBothFormats&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="nc"&gt;Car&lt;/span&gt; &lt;span class="n"&gt;car&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;carDataGenerator&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;generateRandomCar&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; &lt;span class="c1"&gt;// generate once &lt;/span&gt;

&lt;span class="nd"&gt;@Cleanup&lt;/span&gt; 

&lt;span class="nc"&gt;ExecutorService&lt;/span&gt; &lt;span class="n"&gt;executorService&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Executors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newVirtualThreadPerTaskExecutor&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="n"&gt;executorService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;submit&lt;/span&gt;&lt;span class="o"&gt;(()-&amp;gt;&lt;/span&gt;&lt;span class="n"&gt;protobufProducer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;produceCarProto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;car&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will produce a Protobuf message to the Kafka topic every 5 seconds with random data.&lt;/p&gt;

&lt;p&gt;Similarly, we shall have a separate consumer that can consume messages from this topic and deserialize these messages faster than the normal String JSON format. Let us create a new Spring Boot service with a Kafka Consumer that is set up with a Protobuf deserializer and specifically requests this schema to deserialize the messages.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Component&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumerClient&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 



&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;FALSE&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"false"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;GROUP_ID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"car-consumer"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;CLIENT_ID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"car-consumer1"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;SECURITY_PROTOCOL&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"SASL_PLAINTEXT"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;SASL_MECHANISM&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"SCRAM-SHA-512"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;SASL_JAAS_CONFIG&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;bootstrapServerUrl&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;KafkaConsumerClient&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${kafka.bootstrap.server.url}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;bootstrapServerUrl&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;bootstrapServerUrl&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bootstrapServerUrl&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 



&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;K&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;V&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;K&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;V&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getDefaultConsumer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${schema.registry.url}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;schemaRegistryUrl&lt;/span&gt;&lt;span class="o"&gt;){&lt;/span&gt; 

&lt;span class="nc"&gt;Properties&lt;/span&gt; &lt;span class="n"&gt;props&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bootstrapServerUrl&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.apache.kafka.common.serialization.StringDeserializer"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;GROUP_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;GROUP_ID&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CLIENT_ID_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;CLIENT_ID&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AUTO_OFFSET_RESET_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"earliest"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ENABLE_AUTO_COMMIT_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"true"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MAX_POLL_RECORDS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MAX_POLL_INTERVAL_MS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;30000&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AdminClientConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SECURITY_PROTOCOL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;SECURITY_PROTOCOL&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SaslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SASL_MECHANISM&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;SASL_MECHANISM&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SaslConfigs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SASL_JAAS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;format&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;SASL_JAAS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AbstractKafkaSchemaSerDeConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SCHEMA_REGISTRY_URL_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;schemaRegistryUrl&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"basic.auth.credentials.source"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"USER_INFO"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"basic.auth.user.info"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;":"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"specific.protobuf.value.type"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"com.example.schema.CarProto$Car"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Here, we initialize a KafkaConsumer that has a String key deserializer and a Protobuf value deserializer. We can set the consumer group ID and client ID by using the &lt;code&gt;ConsumerConfig.GROUP_ID_CONFIG&lt;/code&gt; and &lt;code&gt;ConsumerConfig.CLIENT_ID_CONFIG&lt;/code&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;We can set properties such as the auto reset, auto offset commit, and polling periods based on the requirements and hardware capabilities.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;We need to set the Schema Registry URL in the properties and the authentication properties, if needed, just like the producer.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Since we are using the Topic Name Strategy for the subject, we are sure that all messages in this topic will be of the CarProto.Car schema for which we generated the compiled class in the producer.&amp;nbsp; This is an optional property and can be omitted if we are not sure all messages are of the same schema. This helps deserialize messages from the topic if there are multiple topics or the Record Naming Strategy or TopicRecord Naming Strategy is used.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We shall now create a consumer class that keeps running once the application is started.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.example.GenericConsumer.services&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.time.Duration&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Collections&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.concurrent.ExecutorService&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.concurrent.Executors&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.concurrent.atomic.AtomicBoolean&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.consumer.ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.consumer.ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.consumer.KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.common.errors.WakeupException&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.beans.factory.annotation.Value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.context.SmartLifecycle&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.stereotype.Service&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.example.GenericConsumer.clients.KafkaConsumerClient&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.example.GenericConsumer.enums.KafkaDeserializerTypes&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.example.schema.CarProto.Car&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.RequiredArgsConstructor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.extern.slf4j.Slf4j&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="nd"&gt;@Service&lt;/span&gt; 

&lt;span class="nd"&gt;@RequiredArgsConstructor&lt;/span&gt; 

&lt;span class="nd"&gt;@Slf4j&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CarConsumerService&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;SmartLifecycle&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 



&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumerClient&lt;/span&gt; &lt;span class="n"&gt;kafkaConsumerClient&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${kafka.username}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${kafka.password}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${schema.registry.url}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;schemaRegistryUrl&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;PROTO_TOPIC&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"test-schema-car-protobuf"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;ExecutorService&lt;/span&gt; &lt;span class="n"&gt;executorService&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Car&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;AtomicBoolean&lt;/span&gt; &lt;span class="n"&gt;running&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;AtomicBoolean&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;volatile&lt;/span&gt; &lt;span class="kt"&gt;boolean&lt;/span&gt; &lt;span class="n"&gt;started&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 



&lt;span class="nd"&gt;@Override&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;started&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;return&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Starting Kafka consumer in virtual thread..."&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="c1"&gt;// Create thread executor &lt;/span&gt;

&lt;span class="n"&gt;executorService&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Executors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newSingleThreadExecutor&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="nc"&gt;Thread&lt;/span&gt; &lt;span class="n"&gt;thread&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Thread&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;thread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setName&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"kafka-consumer-thread"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;thread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setDaemon&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;thread&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;});&lt;/span&gt; 

&lt;span class="n"&gt;running&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;set&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;started&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="c1"&gt;// Submit consumer task to executor &lt;/span&gt;

&lt;span class="n"&gt;executorService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;submit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="n"&gt;consumeCar&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Kafka consumer started successfully"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 



&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;consumeCar&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;kafkaConsumerClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDefaultConsumer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt; 

&lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;  

&lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;  

&lt;span class="n"&gt;schemaRegistryUrl&lt;/span&gt; 

&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;singleton&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;PROTO_TOPIC&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Subscribed to topic: {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;PROTO_TOPIC&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;running&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Car&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMillis&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt; 

&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Car&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;carRecord&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Consumed Car Record - Key: {}, Value: {}, Partition: {}, Offset: {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;  

&lt;span class="n"&gt;carRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;key&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; 

&lt;span class="n"&gt;carRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;  

&lt;span class="n"&gt;carRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; 

&lt;span class="n"&gt;carRecord&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; 

&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;commitAsync&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;WakeupException&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Consumer wakeup called, shutting down..."&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="k"&gt;break&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error in consumer loop"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;finally&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Closing Kafka consumer..."&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;close&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofSeconds&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error closing consumer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Kafka consumer closed"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 



&lt;span class="nd"&gt;@Override&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;stop&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Shutting down Kafka consumer service..."&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;running&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;set&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;wakeup&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;executorService&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;executorService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;shutdown&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;executorService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;awaitTermination&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;java&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;util&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;concurrent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;TimeUnit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;SECONDS&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;warn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Executor did not terminate in time, forcing shutdown"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;executorService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;shutdownNow&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;InterruptedException&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Shutdown interrupted"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="n"&gt;executorService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;shutdownNow&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="nc"&gt;Thread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;currentThread&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;interrupt&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="n"&gt;started&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Kafka consumer service shutdown complete"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 



&lt;span class="nd"&gt;@Override&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;boolean&lt;/span&gt; &lt;span class="nf"&gt;isRunning&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;started&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 



&lt;span class="nd"&gt;@Override&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="nf"&gt;getPhase&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="c1"&gt;// Return a phase value to control startup order &lt;/span&gt;

&lt;span class="c1"&gt;// Higher values start later and stop earlier &lt;/span&gt;

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Integer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MAX_VALUE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 



&lt;span class="nd"&gt;@Override&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;boolean&lt;/span&gt; &lt;span class="nf"&gt;isAutoStartup&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="c1"&gt;// Return true to start automatically when Spring context is ready &lt;/span&gt;

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 



&lt;span class="nd"&gt;@Override&lt;/span&gt; 

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;stop&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Runnable&lt;/span&gt; &lt;span class="n"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; 

&lt;span class="n"&gt;stop&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="n"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;run&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 

&lt;span class="o"&gt;}&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will initialize a KafkaConsumer, which reads and deserializes messages from the &lt;code&gt;test-schema-car-protobuf&lt;/code&gt; topic and displays them in the logs.&lt;/p&gt;

&lt;h2&gt;
  
  
  When to Use Protobuf in Your Pipeline
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Protobuf messages are efficient for high-volume, high-throughput scenarios where messages follow a schema with very few optional attributes and variances in the fields.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If the messages are machine-to-machine, and the components in the pipeline are capable of serialization or deserialization.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If there are schemas enforced on the data and evolve with a clear forward/backward pattern&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  When Not to Use Protobuf in Your Pipeline
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;If the data needs to be viewed by humans immediately, with no extra computing overhead, then use a normal String format&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If the data doesn’t really follow any schemas and can change on an ad-hoc basis, all advantages of Protobuf are negated, and complexity increases unnecessarily.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Protobuf isn’t suitable for applications that require continuous streaming, like file data, videos, etc., as each message is meant to be processed as a whole record.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  GitHub Links
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://github.com/harishva23/GenericProducer" rel="noopener noreferrer"&gt;Generic Producer&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/harishva23/GenericConsumer" rel="noopener noreferrer"&gt;Generic Consumer&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Resources
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;a href="https://docs.confluent.io/kafka/overview.html" rel="noopener noreferrer"&gt;Confluent Kafka Docs&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.confluent.io/platform/current/schema-registry/index.html" rel="noopener noreferrer"&gt;Confluent Schema Docs&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/Aiven-Open/karapace" rel="noopener noreferrer"&gt;Aiven Karapace&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://kafka.apache.org/41/getting-started/introduction/" rel="noopener noreferrer"&gt;Apache Kafka Docs&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html" rel="noopener noreferrer"&gt;Apache Kafka Clients Docs&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://protobuf.dev/overview/" rel="noopener noreferrer"&gt;Protobuf Docs&lt;/a&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;For production teams, managing Karapace alongside Kafka brokers, connectors, and stream processors adds significant operational overhead, one of the core reasons teams move to fully managed streaming platforms.&lt;/p&gt;

&lt;h2&gt;
  
  
  Frequently Asked Questions (FAQs)
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Why does adding Schema Registry make Kafka harder to operate? &lt;br&gt;
Schema Registry is a separate component that producers and consumers must talk to for serialization and compatibility. You need to set up, secure, monitor, and upgrade it alongside Kafka brokers, which adds operational overhead and complexity.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;How does Condense simplify Kafka + Schema Registry? &lt;br&gt;
Condense bundles Kafka-native streaming with integrated Schema Registry (Karapace-compatible), ACLs, and observability in one managed platform. You deploy from cloud marketplaces and get schema governance without running a separate registry node.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;What subject naming strategies does Condense support? &lt;br&gt;
Condense supports the standard Confluent naming strategies: TopicNameStrategy, RecordNameStrategy, and TopicRecordNameStrategy. These determine how subjects are named and how schema versions are managed for keys and values.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Why is Protobuf better than JSON for high-throughput Kafka pipelines? Protobuf uses compiled classes and binary serialization, which is faster and smaller than JSON's text-based format. This reduces deserialization overhead and consumer lag, especially with nested structures or high data rates.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;How does Condense help teams ship Protobuf-based streaming pipelines faster? &lt;br&gt;
Condense provides prebuilt connectors, a Git-enabled IDE, and no-code/low-code builders for stream logic, so you can focus on Protobuf message design instead of cluster ops. You get Kafka-native performance with reduced setup time and operational burden.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>dataengineering</category>
      <category>apachekafkaschema</category>
      <category>karapace</category>
      <category>protobuf</category>
    </item>
    <item>
      <title>Kafka Streams 101: A Developer’s Guide to Real-Time Application Logic</title>
      <dc:creator>Sachin Kamath</dc:creator>
      <pubDate>Tue, 16 Jun 2026 10:06:59 +0000</pubDate>
      <link>https://dev.to/zeliotofficial/kafka-streams-101-a-developers-guide-to-real-time-application-logic-2knc</link>
      <guid>https://dev.to/zeliotofficial/kafka-streams-101-a-developers-guide-to-real-time-application-logic-2knc</guid>
      <description>&lt;h2&gt;
  
  
  TL;DR
&lt;/h2&gt;

&lt;p&gt;Kafka Streams enables real-time stream processing inside applications using local state backed by Kafka logs. However, deploying and managing multiple Kafka Streams microservices at scale is complex, requiring custom CI/CD, state recovery, and observability tooling.&lt;/p&gt;

&lt;p&gt;Condense simplifies this by providing a fully managed, unified streaming platform inside your cloud (BYOC). It integrates Kafka Streams with built-in IDE, Git versioning, prebuilt domain logic, and native observability, eliminating operational overhead while accelerating development and scaling real-time apps reliably.&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Apache Kafka has long been a cornerstone of modern data infrastructure, providing a distributed, fault-tolerant backbone for event ingestion at scale. But ingestion is only half the equation. Business value lies in what happens after events are received, how raw data is filtered, joined, aggregated, enriched, and ultimately transformed into decisions.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;This is where Kafka Streams comes in. As a native stream processing library built on Kafka itself, Kafka Streams enables developers to write real-time logic using a simple yet powerful programming model. This blog walks through the foundations of Kafka Streams, explores how it powers real-world applications, and examines the architectural implications for engineering teams. At the end, we’ll also see how modern platforms are simplifying this journey further by eliminating unnecessary complexity from the development lifecycle.&amp;nbsp;&lt;/p&gt;

&lt;h2&gt;
  
  
  Understanding the Kafka Streams Programming Model&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Kafka Streams is fundamentally a Java library that allows developers to treat Kafka topics not just as message queues, but as unbounded data tables or continuously updating datasets. Its core abstractions include:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;KStream: A continuous stream of records. Think of this as the raw event log.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;KTable: A changelog stream that represents the latest value for each key, essentially a materialized view.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;GlobalKTable: A read-only table replicated on each instance, often used for joining reference data.&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Stream logic is expressed using the Streams DSL or the Processor API. Most applications use the DSL to define transformations &lt;code&gt;like map()&lt;/code&gt;, &lt;code&gt;filter()&lt;/code&gt;, &lt;code&gt;join()&lt;/code&gt;, and &lt;code&gt;aggregate()&lt;/code&gt;, while the Processor API gives lower-level control over state and custom operators.&amp;nbsp;&lt;/p&gt;

&lt;h2&gt;
  
  
  Stateful Processing and Local Stores&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;One of Kafka Streams’ defining features is its local state management. Stateful operations, like &lt;code&gt;groupByKey()&lt;/code&gt;&lt;code&gt;.windowedBy()&lt;/code&gt;&lt;code&gt;.aggregate()&lt;/code&gt;, require storing intermediate state. Instead of centralizing this in a database, Kafka Streams maintains RocksDB-based state stores on the local disk of each processing instance.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;This state is backed by a changelog topic in Kafka. If a failure occurs, the processor recovers by replaying the changelog. This design allows for scalable, distributed stream processing, but it also introduces critical operational requirements:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Persistent disk access for RocksDB.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Monitoring of state restoration and checkpointing.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Partitioned processing tied to Kafka topic partitioning.&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Real-World Application Deployment: Microservices and Beyond&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;In most enterprises, Kafka Streams applications are deployed as microservices. Each stream processing unit, fraud detection, ETA computation, SLA tracking is packaged as a Spring Boot or Quarkus application, then deployed into Kubernetes or another container orchestrator.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;This approach introduces certain responsibilities per service:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Maintain a complete lifecycle (build, deploy, monitor, patch).&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Handle schema compatibility between topics and application code.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Implement backpressure handling, logging, and metrics.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Define partitioning logic that matches Kafka topic partitioning.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This model is manageable at small scale, but quickly becomes burdensome as the number of real-time applications grows. Teams often end up building:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Custom CI/CD tooling for streaming services.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;State migration routines for schema evolution.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Monitoring layers to track per-operator lag, backpressure, and failures.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Homegrown governance to version and deploy transforms safely.&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;The reality is that while Kafka Streams simplifies the programming model, it does not eliminate operational complexity. Most Kafka Streams microservices still need to be treated like full-fledged backend services, each with infrastructure, observability, and deployment overhead.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Managing Failures and Stateful Recovery&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Stateful stream processing introduces unique challenges not seen in stateless services:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Processor crashes require replaying changelogs to restore state.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Version upgrades must avoid state corruption or key mismatch.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Hot deployments risk double processing or record duplication if not orchestrated carefully.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Event time processing with out-of-order data requires complex watermarking and windowing strategies.&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Kafka Streams supports exactly-once semantics (EOS) with idempotent producers and transactional writes, but this adds additional configuration burden and requires careful coordination between input/output topics and processing guarantees.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;In practice, engineering teams often need to build custom scaffolding to make these patterns reliable, transform state inspection, window replay, timestamp alignment, and state migration versioning.&amp;nbsp;&lt;/p&gt;

&lt;h2&gt;
  
  
  Why Observability Remains an Under-Addressed Challenge&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;While Kafka itself provides metrics on broker health and topic lag, Kafka Streams applications demand pipeline-aware observability:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Is a specific stream join introducing backpressure?&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Are certain partitions processing slower due to skewed keys?&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Is the state store nearing disk exhaustion?&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Which application version is currently deployed and processing which partitions?&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;These questions often require setting up Prometheus exporters, embedding Micrometer, and integrating with tools like Grafana, Jaeger, or OpenTelemetry. In many cases, visibility across multi-stage pipelines (e.g., “raw event → session builder → score assigner → alert emitter”) is fragmented and hard to debug during incident response.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  CI/CD and Versioned Transform Pipelines&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Deploying changes to streaming logic requires particular discipline:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Stateful operators must be deployed carefully to avoid dropping or reprocessing records.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Version control is critical, not just for source code, but for schemas and processing topology.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Teams must implement rollback strategies for failed deployments without corrupting stream state.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Developers often struggle to test stream topologies locally, especially when logic is embedded deep inside a containerized microservice.&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;While Kafka Streams supports topology versioning and testing via TopologyTestDriver, there’s no built-in support for seamless, multi-version CI/CD integration.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  What This Means for Real-Time Engineering Teams&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;By now, the picture is clear: Kafka Streams provides the primitives, but not the platform. To make real-time work in production, teams must shoulder:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Lifecycle management of dozens of services.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;CI/CD pipelines that are stream-aware.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Governance across schemas, state, and partitioning.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Ops playbooks for fault tolerance, state recovery, and lag monitoring.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;A documentation trail so that new engineers can maintain existing stream logic safely.&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;This fragmentation can be a major blocker, not because the underlying code is difficult, but because the integration burden scales with every new pipeline.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  How Platforms Like Condense Change the Equation&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Modern real-time platforms are increasingly collapsing this complexity.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;Condense, for example, retains Kafka Streams’ power while eliminating the need for separate microservices per application. Instead of building, deploying, and observing independent logic units:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Developers write Kafka Streams-style logic inside an integrated IDE, with support for no-code and low-code operators (merge, delay, alert, window).&amp;nbsp;&lt;/li&gt;
&lt;li&gt;All transforms are version-controlled and Git-integrated, enabling safe rollouts, rollbacks, and collaborative development.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;The platform handles orchestration, state recovery, partition scaling, and observability as first-class features.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Prebuilt domain-specific operators (e.g., CAN decoder, trip builder, geofence engine) reduce redundant engineering effort.&amp;nbsp;&lt;/li&gt;
&lt;li&gt;All Kafka brokers and processors run inside the customer’s cloud account via BYOC, ensuring data sovereignty without operational burden.&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;By removing the need to wrap each stream job in its own microservice, Condense makes it feasible to scale from 5 to 50+ real-time workflows without growing operational debt linearly.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Closing Thoughts&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Kafka Streams remains a powerful tool in the real-time developer’s toolkit. But making it work at scale involves far more than just calling &lt;code&gt;stream.map()&lt;/code&gt;&lt;code&gt;.filter()&lt;/code&gt;&lt;code&gt;.join()&lt;/code&gt;, it demands operational rigor, architectural forethought, and careful coordination across the development lifecycle.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;For organizations moving from raw events to real-time decisions, the choice is not just about code, it’s about platform strategy.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;As real-time becomes core infrastructure, platforms like Condense that provide an integrated, streaming-native runtime, from ingestion to logic to deployment are proving to be not just convenient, but essential.&amp;nbsp;&lt;/p&gt;

</description>
      <category>kafkastreams</category>
      <category>kafka</category>
      <category>eventdriven</category>
      <category>datastreaming</category>
    </item>
    <item>
      <title>Challenges in Updating Managed Kafka Platforms to Kafka 4.3.0</title>
      <dc:creator>Sachin Kamath</dc:creator>
      <pubDate>Wed, 03 Jun 2026 17:28:05 +0000</pubDate>
      <link>https://dev.to/zeliotofficial/challenges-in-updating-managed-kafka-platforms-to-kafka-430-g0i</link>
      <guid>https://dev.to/zeliotofficial/challenges-in-updating-managed-kafka-platforms-to-kafka-430-g0i</guid>
      <description>&lt;h2&gt;
  
  
  TL;DR
&lt;/h2&gt;

&lt;p&gt;Updating managed Kafka platforms to Kafka 4.3.0 is not a simple version upgrade. The removal of ZooKeeper, KRaft migration requirements, infrastructure validation, compatibility testing, recovery optimization, and operational changes introduces significant engineering effort for managed Kafka providers. Condense simplifies this complexity by handling Kafka upgrades, infrastructure management, monitoring, scaling, and operational workflows centrally.&lt;/p&gt;

&lt;p&gt;Apache Kafka 4.3.0 introduces major architectural and operational changes across KRaft, storage recovery, consumer coordination, security, and observability. While these improvements strengthen Kafka for production-scale environments, upgrading managed Kafka platforms to Kafka 4.3.0 requires significant engineering effort.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;For managed Kafka providers, upgrades are not limited to changing broker versions. Every infrastructure layer, operational workflow, monitoring pipeline, client compatibility model, and recovery mechanism must be validated carefully before production rollout.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;The move to KRaft-only architecture in Kafka 4.3.0 increases this complexity further because ZooKeeper support is completely removed.&amp;nbsp;&lt;/p&gt;

&lt;h2&gt;
  
  
  Managed Kafka providers must ensure:&amp;nbsp;
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Cluster stability&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Data safety&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Upgrade compatibility&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Operational continuity&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Multi-tenant reliability&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Security consistency&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Zero or minimal downtime&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;These requirements make Kafka version upgrades operationally intensive.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  KRaft Migration Complexity&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;One of the biggest changes in Kafka 4.3.0 is the complete removal of ZooKeeper support. Kafka clusters now operate entirely on KRaft mode.&amp;nbsp;For managed Kafka providers, this is not simply a configuration update.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Major Efforts Involved:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Migrating existing ZooKeeper-based clusters&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Validating metadata consistency&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Updating controller management workflows&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Reworking infrastructure automation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Rebuilding deployment pipelines&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Updating monitoring systems for KRaft&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Providers must validate that KRaft behaves consistently across:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Small clusters&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Large multi-tenant environments&amp;nbsp;&lt;/li&gt;
&lt;li&gt;High-throughput workloads&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Disaster recovery scenarios&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Migration errors at the metadata layer can directly impact cluster availability and operational stability.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Infrastructure Validation and Compatibility Testing&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Managed Kafka environments support multiple customer workloads with different:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Kafka clients&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Consumer patterns&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Security configurations&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Connector ecosystems&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Streaming applications&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Upgrading Kafka versions requires extensive compatibility validation.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;&lt;strong&gt;Major Efforts Involved&amp;nbsp;&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Client compatibility testing&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Connector validation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Schema registry testing&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Security integration validation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Consumer group behavior testing&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Kafka Streams compatibility verification&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Providers cannot assume every customer application will behave identically after upgrades&lt;/p&gt;

&lt;p&gt;Even small protocol-level changes can impact:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Rebalance behavior&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Throughput patterns&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Latency&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Connector operations&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Stream processing workflows&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This makes pre-production validation extremely important.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;Operational Risk During Upgrades&amp;nbsp;&lt;/p&gt;

&lt;p&gt;Managed Kafka providers operate production-critical environments where downtime risks must remain minimal.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;Kafka upgrades require careful operational planning.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Major Efforts Involved&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Rolling upgrade orchestration&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Replica synchronization validation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Partition reassignment handling&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Traffic balancing&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Recovery workflow testing&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Rollback strategy preparation&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Upgrades become even more sensitive in:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;High-throughput environments&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Multi-region clusters&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Tiered storage deployments&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Mission-critical systems&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Any instability during upgrades can impact production data pipelines directly.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Tiered Storage Recovery Validation&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Kafka 4.3.0 introduces improvements for tiered storage replica recovery.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;While these improvements provide operational advantages, managed Kafka providers must validate recovery behavior thoroughly before enabling them at scale.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Major Efforts Involved&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Recovery testing across large datasets&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Remote storage validation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Replica synchronization benchmarking&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Failure scenario simulation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Recovery performance tuning&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Tiered storage environments usually operate with massive historical data volumes. Recovery inefficiencies can increase operational overhead significantly if not validated properly.&lt;/p&gt;

&lt;h2&gt;
  
  
  Consumer Group Coordination Changes&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Kafka 4.3.0 improves consumer group assignment handling through assignment batching and configurable assignment intervals.&lt;/p&gt;

&lt;p&gt;For managed Kafka providers, consumer group behavior is extremely sensitive because customers operate different scaling models and workload patterns.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Major Efforts Involved&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Rebalance behavior validation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Autoscaling compatibility testing&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Coordinator load benchmarking&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Consumer lag analysis&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Throughput stability testing&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Even improvements intended to optimize coordination must be validated carefully across different workload patterns before broad rollout.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Monitoring and Observability Updates&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Kafka 4.3.0 introduces new operational metrics and observability improvements, including retention headroom metrics.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;Managed Kafka platforms usually maintain centralized observability systems for:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Metrics&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Alerts&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Dashboards&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Capacity planning&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Operational analytics&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Every Kafka release requires updates to these monitoring systems.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Major Efforts Involved&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Updating monitoring pipelines&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Creating new dashboards&lt;/li&gt;
&lt;li&gt;Alert validation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Storage visibility integration&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Operational analytics updates&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Without proper monitoring updates, new Kafka capabilities cannot be utilized effectively.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Security and IAM Integration Validation&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Kafka 4.3.0 introduces OAuth client assertion support for enterprise authentication workflows.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;Managed Kafka providers supporting enterprise customers must validate:&amp;nbsp;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;IAM integrations&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Token-based authentication flows&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Access control behavior&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Security policy compatibility&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Authentication performance&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Major Efforts Involved&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Identity provider testing&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Security workflow validation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Multi-tenant access verification&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Compliance testing&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Zero-trust architecture validation&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Security upgrades require careful validation because authentication inconsistencies directly affect customer workloads.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Upgrade Coordination Across Multi-Tenant Environments&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;Managed Kafka platforms usually host multiple customer environments on shared infrastructure layers.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;This creates additional operational complexity during upgrades.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Major Efforts Involved&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Tenant-aware rollout planning&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Cluster isolation validation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Workload impact analysis&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Upgrade scheduling coordination&amp;nbsp;&lt;/li&gt;
&lt;li&gt;SLA management&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Providers must ensure upgrades do not create cascading impact across customer environments.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;This becomes significantly more complex at scale.&amp;nbsp;&lt;/p&gt;

&lt;h2&gt;
  
  
  Engineering Effort Behind Kafka Upgrades&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;From the outside, Kafka upgrades may appear straightforward.&lt;/p&gt;

&lt;p&gt;Internally, managed Kafka providers must coordinate across:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Platform engineering teams&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Infrastructure teams&amp;nbsp;&lt;/li&gt;
&lt;li&gt;SRE teams&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Security teams&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Support teams&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Customer operations teams&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Kafka Upgrades Involve:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Infrastructure automation updates&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Recovery validation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Observability changes&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Operational testing&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Security integration updates&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Documentation and support readiness&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;The engineering effort behind production-grade Kafka upgrades is substantial.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  How Condense Simplifies Kafka Upgrades&amp;nbsp;
&lt;/h2&gt;

&lt;p&gt;At Condense, Kafka infrastructure management, upgrades, scaling, observability, and operational workflows are centrally managed as part of the platform.&amp;nbsp;&lt;/p&gt;

&lt;p&gt;Condense simplifies Kafka version adoption by handling:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Kafka cluster management&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Upgrade orchestration&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Infrastructure automation&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Monitoring and observability&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Security integration&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Scaling workflows&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Recovery operations&amp;nbsp;&lt;/li&gt;
&lt;li&gt;Operational maintenance&amp;nbsp;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This allows organizations to adopt newer Kafka versions such as Kafka 4.3.0 without managing the operational complexity internally.&amp;nbsp;&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;As Kafka evolves with architectural changes like KRaft, tiered storage optimization, and operational improvements, Condense ensures these capabilities are integrated and operationalized efficiently within production environments.&amp;nbsp;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Frequently Asked Questions (FAQs)
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Why is KRaft migration a major challenge?&lt;br&gt;
KRaft completely removes ZooKeeper dependency, requiring metadata migration, infrastructure changes, monitoring updates, and operational workflow redesign.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Does Kafka 4.3.0 improve operational efficiency?&lt;br&gt;
Yes. Kafka 4.3.0 improves recovery behavior, consumer coordination, observability, security integration, and infrastructure simplification through KRaft architecture.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;How does Condense simplify Kafka 4.3.0 adoption?&lt;br&gt;
Condense is already built on KRaft-based Kafka architecture, enabling organizations to adopt Kafka 4.3.0 capabilities without handling underlying infrastructure transitions, upgrade orchestration, monitoring, scaling, or operational maintenance internally.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Can Condense help organizations adopt Kafka 4.3.0 faster?&lt;br&gt;
Yes. Since Condense already operates on modern Kafka architecture principles including KRaft, organizations can leverage Kafka 4.3.0 operational improvements such as simplified metadata management, recovery optimizations, and enhanced observability without redesigning their Kafka infrastructure stack.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Why is KRaft adoption easier with Condense?&lt;br&gt;
KRaft migration usually requires infrastructure redesign, operational workflow changes, metadata management updates, and monitoring modifications. With Condense, these infrastructure complexities are abstracted through a managed streaming platform already aligned with modern Kafka operational architecture.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;What operational improvements does Kafka 4.3.0 bring to real-time streaming platforms?&lt;br&gt;
Kafka 4.3.0 improves operational efficiency through faster tiered storage recovery, broker cordoning, retention headroom metrics, improved consumer group coordination, and better enterprise security integration. These improvements help organizations run large-scale streaming environments more efficiently.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;What should organizations consider before upgrading to Kafka 4.3.0?&lt;br&gt;
Organizations should evaluate client compatibility, recovery workflows, monitoring updates, consumer group behavior, connector validation, and operational readiness before upgrading to Kafka 4.3.0. Production rollout should always include staging validation and rollback planning.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Does Kafka 4.3.0 require infrastructure changes?&lt;br&gt;
Yes. Kafka 4.3.0 introduces architectural and operational changes that may require updates to deployment workflows, monitoring systems, maintenance processes, and infrastructure automation depending on the existing Kafka environment.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;How does Kafka 4.3.0 improve Kafka maintenance workflows?&lt;br&gt;
Kafka 4.3.0 introduces broker and log directory cordoning, allowing operators to stop new partition assignments during maintenance operations. This simplifies hardware replacement, broker migration, and infrastructure maintenance workflows.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Is Kafka 4.3.0 better for large-scale streaming workloads?&lt;br&gt;
Yes. Kafka 4.3.0 improves recovery efficiency, consumer coordination, observability, and operational stability, making it more suitable for large-scale real-time streaming environments with high throughput and large storage volumes.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>architecture</category>
      <category>devops</category>
      <category>distributedsystems</category>
      <category>infrastructure</category>
    </item>
    <item>
      <title>Real-Time Data Streaming vs Batch Data ETL: Why Timing Matters</title>
      <dc:creator>Sachin Kamath</dc:creator>
      <pubDate>Tue, 02 Jun 2026 02:26:41 +0000</pubDate>
      <link>https://dev.to/zeliotofficial/real-time-data-streaming-vs-batch-data-etl-why-timing-matters-1d4m</link>
      <guid>https://dev.to/zeliotofficial/real-time-data-streaming-vs-batch-data-etl-why-timing-matters-1d4m</guid>
      <description>&lt;h2&gt;
  
  
  TL;DR
&lt;/h2&gt;

&lt;p&gt;Batch ETL moves and processes data on a schedule, delivering insights with built-in latency, ideal for historical analysis and compliance, but ineffective for urgent, real-time business actions. Real-Time Streaming pipelines process each event instantly, enabling on-the-fly fraud detection, predictive maintenance, and hyper-personalized engagement. Timing isn’t just a throughput metric; it determines whether data delivers competitive value or is just hindsight. Condense makes real-time streaming practical and production-ready, letting enterprises turn events into actions within their own cloud, while traditional batch workflows remain valuable for long-term reporting and analytics.&lt;/p&gt;

&lt;p&gt;For decades, batch ETL defined how enterprises integrated and analyzed data. Jobs were scheduled, data was extracted from sources, transformed into a unified schema, and loaded into warehouses or lakes for reporting. &lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;This was enough when businesses primarily asked: what happened yesterday? &lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;But the operational environment has changed. Industries now compete on the ability to respond instantly whether blocking fraud at the moment of authorization, detecting anomalies in connected fleets, or personalizing customer engagement as interactions unfold. In this landscape,  Real-Time Data Streaming and modern streaming pipelines are not optimizations. They are requirements. &lt;/p&gt;

&lt;p&gt;This blog examines the technical differences between batch ETL and  Real-Time streaming, explains why timing is more than a performance metric, and explores how streaming pipelines are reshaping enterprise architectures. &lt;/p&gt;

&lt;h2&gt;
  
  
  Batch ETL: Strengths and Boundaries
&lt;/h2&gt;

&lt;p&gt;Batch ETL (Extract, Transform, Load) pipelines move data in discrete intervals. They typically operate as follows: &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Extract&lt;br&gt;
Pull records from transactional systems, APIs, or files. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Transform&lt;br&gt;
Apply schema normalization, deduplication, or business logic in staging. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Load&lt;br&gt;
Insert processed batches into a target system (warehouse or data lake). &lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Technical strengths&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Throughput: Bulk processing of millions of records is efficient on modern compute clusters. &lt;/li&gt;
&lt;li&gt;Determinism: Fixed jobs are easier to validate and audit, making them suitable for compliance. &lt;/li&gt;
&lt;li&gt;Maturity: Tooling (Informatica, Talend, dbt, Airflow) is well established and battle-tested. &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Limitations inherent to the design&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Latency: The time between data generation and availability is at least the batch interval minutes, hours, or days. &lt;/li&gt;
&lt;li&gt;Operational blind spots: Events between runs remain invisible. Failures may not be discovered until the next batch completes. &lt;/li&gt;
&lt;li&gt;Rigid scheduling: Workflows are brittle under changing workloads. Rescheduling impacts dependencies downstream. &lt;/li&gt;
&lt;li&gt;Resource spikes: Large jobs create uneven load, with clusters often over provisioned to handle peak windows. &lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Batch ETL is indispensable for historical analysis and compliance reporting, but unsuitable when insights must drive immediate operational action. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Real-Time Data Streaming: A Continuous Model
&lt;/h2&gt;

&lt;p&gt;Real-Time Data Streaming inverts this paradigm. Instead of moving data in scheduled intervals, every event is treated as a discrete, time ordered signal that can be processed immediately. Kafka and similar log based systems provide the backbone for this architecture. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Core mechanics of streaming pipelines:&lt;/strong&gt; &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Immutable logs: Events are appended to partitions, guaranteeing order and durability. &lt;/li&gt;
&lt;li&gt;Replayability: Consumers can reprocess events from any offset, enabling recovery and backfills. &lt;/li&gt;
&lt;li&gt;Stateful stream processing: Operators maintain state across windows, joins, and aggregations (e.g., “total purchases by customer in the last 5 minutes”). &lt;/li&gt;
&lt;li&gt;Continuous enrichment: Streams are augmented with contextual data (e.g., geolocation, device metadata) in motion. &lt;/li&gt;
&lt;li&gt;Low-latency sinks: Events are delivered to APIs, dashboards, or control systems within milliseconds to seconds. &lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;This model does not merely accelerate batch. It enables workflows that batch cannot support because the business outcome depends on acting while the event is still unfolding. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Why Timing Is Strategic
&lt;/h2&gt;

&lt;p&gt;Timing is not a secondary concern; it directly determines the value of data. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Fraud detection: A fraudulent card transaction must be flagged before the authorization completes. A nightly batch report identifies fraud after the funds are gone. &lt;/li&gt;
&lt;li&gt;Predictive maintenance: An abnormal vibration detected mid route can prevent breakdown. Batch ETL will surface it only after the vehicle is already sidelined. &lt;/li&gt;
&lt;li&gt;Customer personalization: Recommending a product while a customer is browsing drives conversion. A next day email is often irrelevant. &lt;/li&gt;
&lt;li&gt;Logistics visibility: A delayed shipment must trigger re routing in the moment. Reporting it after delivery deadlines have passed is operationally useless. &lt;/li&gt;
&lt;li&gt;Cybersecurity: Intrusion attempts must be analyzed in flight to prevent compromise. Batch ETL provides forensic evidence, not active defense. &lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;In each case, the same data is processed. The difference is timing. Batch delivers hindsight. Streaming delivers foresight. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Demand for Streaming Pipelines
&lt;/h2&gt;

&lt;p&gt;Enterprises are increasingly building streaming pipelines because the nature of their industries leaves no tolerance for latency. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Financial services:  Real-Time AML checks, fraud detection, and instant payment processing are both competitive and regulatory mandates. &lt;/li&gt;
&lt;li&gt;Mobility and automotive: Vehicles generate telemetry that must be analyzed continuously for safety and efficiency. &lt;/li&gt;
&lt;li&gt;Telecom and IoT: Billions of device signals require filtering, aggregation, and anomaly detection at scale. &lt;/li&gt;
&lt;li&gt;Retail and digital platforms: Context aware personalization drives customer engagement. Delayed data undermines the business model. &lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;The demand side is clear: data is only valuable if it can be acted upon within the time window that matters. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Coexistence: Batch and Streaming Together
&lt;/h2&gt;

&lt;p&gt;This is not a zero sum choice. Batch ETL and streaming coexist in most enterprises: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Batch ETL: Best for historical analytics, compliance archiving, financial reporting, and periodic aggregations. &lt;/li&gt;
&lt;li&gt;Real-Time Data Streaming: Best for operational intelligence, anomaly detection, personalization, SLA monitoring, and IoT telemetry. &lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;The shift is not about replacement, but about recognizing that streaming pipelines increasingly occupy the critical front line of enterprise decision making. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Why Real Time Data Streaming Platforms like Condense Matters Here
&lt;/h2&gt;

&lt;p&gt;This is where &lt;a href="https://www.zeliot.in/condense" rel="noopener noreferrer"&gt;Condense&lt;/a&gt; makes a difference. It is a Kafka Native platform designed to deliver production-ready streaming pipelines inside the enterprise’s own cloud environment (BYOC). With &lt;a href="https://www.zeliot.in/condense" rel="noopener noreferrer"&gt;Condense&lt;/a&gt;, organizations don’t just get Managed Kafka brokers they get a complete runtime that manages ingestion, stream processing, stateful recovery, observability, and domain-specific transforms. &lt;/p&gt;

&lt;p&gt;That means enterprises can move from raw events to actionable insights in minutes, without taking on the operational weight of building pipelines from scratch. &lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Batch ETL will remain valuable, but the competitive edge lies in  Real-Time. Condense enables enterprises to capture that edge by making  Real-Time Data Streaming both practical and production-ready. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Frequently Asked Questions (FAQs)
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;What is the main difference between batch ETL and Real-Time Data Streaming?&amp;nbsp;&lt;br&gt;
Batch ETL processes data in scheduled intervals, while Real-Time Data Streaming processes each event as it happens.&amp;nbsp;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Why are streaming pipelines faster than batch ETL?&amp;nbsp;&lt;br&gt;
Streaming pipelines handle events continuously with low latency, unlike batch jobs that wait for scheduled runs.&amp;nbsp;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;When should enterprises use batch ETL instead of streaming?&amp;nbsp;&lt;br&gt;
Batch ETL is best for historical reporting, compliance archives, and workloads where timing is not critical.&amp;nbsp;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Why is timing important in Real-Time Data Streaming?&amp;nbsp;&lt;br&gt;
Timing ensures events drive immediate actions, such as fraud blocking, predictive maintenance, or real-time personalization.&amp;nbsp;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Can batch ETL and streaming pipelines coexist?&amp;nbsp;&lt;br&gt;
Yes, most enterprises use streaming pipelines for live operations and batch ETL for long-term analytics.&amp;nbsp;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;What industries benefit most from Real-Time Data Streaming?&amp;nbsp;&lt;br&gt;
Finance, mobility, logistics, IoT, and retail depend on Real-Time Data Streaming for mission-critical decisions.&amp;nbsp;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;How does Condense improve the adoption of streaming pipelines?&amp;nbsp;&lt;br&gt;
&lt;a href="https://www.zeliot.in/condense" rel="noopener noreferrer"&gt;Condense&lt;/a&gt; is a Kafka Native platform that lets enterprises build production-ready streaming pipelines in minutes inside their own cloud.&amp;nbsp;&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>etl</category>
      <category>eventdriven</category>
      <category>kafka</category>
      <category>dataplatform</category>
    </item>
    <item>
      <title>AIDL deep dive post does not exist! Part - I</title>
      <dc:creator>Prasanna</dc:creator>
      <pubDate>Sat, 18 Jul 2020 06:33:30 +0000</pubDate>
      <link>https://dev.to/zeliotofficial/aidl-deep-dive-post-does-not-exist-part-i-4po8</link>
      <guid>https://dev.to/zeliotofficial/aidl-deep-dive-post-does-not-exist-part-i-4po8</guid>
      <description>&lt;h3&gt;
  
  
  &lt;strong&gt;Introduction&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Hello there! I am going to talk about Android's parliamentary conversations. What do I mean by that? I mean, the IPC(Inter Process Communication) that is happening inside Android system. We are going to use AIDL for the so called "conversation". I will explain better and deeper in this article.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;We all know it!&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Why is this a brand new post? Because, it's not about the IPC of components inside one Android application. It's about the communication of two different applications. Most people agree to disagree that it's pretty easy to understand AIDLs. But, there is saying(of mine), &lt;code&gt;"If you understand interfaces in OOP, you shall understand AIDL!"&lt;/code&gt;. Well, Let's see what that saying really means, in detail.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;The AIDL unbolted.&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Android Interface Definition Language(AIDL), is what you every time you Google AIDL. This doesn't explain what it really is. Let me explain! AIDL is an Android implementation to achieve Inter Process Communication(IPC) in between Android components. Diving deep, there is a program called &lt;code&gt;aidl&lt;/code&gt;, which compiles the AIDL source code and generates client(Proxy) and server(Stub) Java interfaces.&lt;/p&gt;

&lt;p&gt;&amp;nbsp;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fprgype5k7uh08zi40kly.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fprgype5k7uh08zi40kly.png" alt="Alt Text" width="675" height="215"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&amp;nbsp;&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Initial building blocks.&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Let us start with the basics. To create an AIDL, we need to open an Android Project and click &lt;code&gt;File -&amp;gt; New -&amp;gt; AIDL -&amp;gt; AIDL File&lt;/code&gt;. Write your first AIDL file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;interface IMyAidlInterface {
    String getMessage();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, do not forget to &lt;code&gt;Build -&amp;gt; Rebuild&lt;/code&gt;. The &lt;code&gt;IMyAidlInterface&lt;/code&gt; class will be generated. Create a class which extends the &lt;code&gt;IMyAidlInterface.Stub&lt;/code&gt;. Please remember to override &lt;code&gt;getMessage()&lt;/code&gt; method in this class.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class MyImplementor extends IMyAidlInterface.Stub {
    @Override
    public String getMessage() {
        return "Hello from AIDL Stub!";
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once we have extended the Stub class and implemented(overridden) the method, we must create a Service which is going to expose the APIs to the applications which need bind the AIDL functionalities.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class MyService extends Service {
    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return new MyImplementor();
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Also, add the &lt;code&gt;&amp;lt;service&amp;gt;&lt;/code&gt; tag in &lt;code&gt;AndroidManifest.xml&lt;/code&gt; inside &lt;code&gt;&amp;lt;application&amp;gt;&lt;/code&gt; tag.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;service
    android:name=".MyService"
    android:enabled="true"
    android:exported="true"
    android:process=":remote"&amp;gt;
    &amp;lt;intent-filter&amp;gt;
        &amp;lt;action android:name="MyService" /&amp;gt;
    &amp;lt;/intent-filter&amp;gt;
&amp;lt;/service&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&amp;nbsp;&lt;/p&gt;

&lt;p&gt;Now that we have implemented the Service part, we should go ahead and create another Android project for the client part. Please note that the AIDLs are very &lt;strong&gt;case-sensitive&lt;/strong&gt;. Unfortunately, Android Studio is not mature enough to detect the syntax errors in an AIDL file. Any changes in an AIDL file would need a rebuild of the project.&lt;br&gt;
&amp;nbsp;&lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;The Real River to cross!&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;We are done with the server(Service) part. Now, we should create another application which will be our client part. So, create a new project in Android Studio. After creating the project, we have to do one of the most important things.&lt;br&gt;
&amp;nbsp;&lt;/p&gt;

&lt;p&gt;In Android Studio, select the Project view in the left top menu. Inside &lt;code&gt;app/src/main/&lt;/code&gt;, create a folder called &lt;code&gt;aidl&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fcpc4t7zilznasmno0eeh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fcpc4t7zilznasmno0eeh.png" alt="Alt Text" width="800" height="321"&gt;&lt;/a&gt;&lt;br&gt;
&amp;nbsp;&lt;/p&gt;

&lt;p&gt;After this, go to the AIDL service application, and copy the contents of &lt;code&gt;aidl&lt;/code&gt; folder.&lt;br&gt;
&amp;nbsp;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fht0bxlgcj9a6o834a47y.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fht0bxlgcj9a6o834a47y.png" alt="aidl2" width="397" height="320"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Switch back to the client application and paste the contents into &lt;code&gt;aidl&lt;/code&gt; folder. &lt;code&gt;Build -&amp;gt; Rebuild&lt;/code&gt;. The project should build without any errors.&lt;br&gt;
&amp;nbsp;&lt;/p&gt;

&lt;p&gt;Go to MainActivity.java and create a &lt;code&gt;ServiceConnection&lt;/code&gt; object and &lt;code&gt;IMyAidlInterface&lt;/code&gt; instance.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private IMyAidlInterface iMyAidlInterface;
private final ServiceConnection mServiceConnection =
    new ServiceConnection() {
        @Override
        public void onServiceConnected(
                    ComponentName name, IBinder service) {

            iMyAidlInterface =
                    IMyAidlInterface.Stub.asInterface(service);

            Log.d(TAG, "Service Connected.");
            Toast.makeText(MainActivity.this, "Service Connected.", Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onServiceDisconnected(ComponentName name) {
            iMyAidlInterface = null;
            Toast.makeText(MainActivity.this, "Service Disconnected.", Toast.LENGTH_SHORT).show();
        }
    };
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&amp;nbsp;&lt;/p&gt;

&lt;p&gt;In &lt;code&gt;onCreate()&lt;/code&gt; of &lt;code&gt;MainActivity.java&lt;/code&gt;, bind the service. Note that we need the action and the package name to successfully bind to the service.&lt;/p&gt;

&lt;p&gt;&amp;nbsp;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

    Intent intent = new Intent();
    intent.setPackage("com.zeliot.aidldemo");
    intent.setAction("MyService");
    bindService(intent, mServiceConnection, BIND_AUTO_CREATE);
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, create a button and while clicking the button, call &lt;code&gt;iMyAidlInterface.getMessage()&lt;/code&gt; to receive the String from the AIDL service.&lt;br&gt;
&amp;nbsp;&lt;/p&gt;

&lt;p&gt;We are done with client and server parts. As of now, the Service is connected from another application. We can do any type of operations using the same. We can have data trasnfer,aynchronous calls, callbacks. More detailed implementation will be released in the second part of this series.&lt;br&gt;
&amp;nbsp;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;a href="https://github.com/prasan29/aidl-blog" rel="noopener noreferrer"&gt;AIDL source code&lt;/a&gt;&lt;/strong&gt;&lt;br&gt;
&amp;nbsp;&lt;/p&gt;

&lt;h2&gt;
  
  
  Thank you, for reading. Happy interfacing!! 😀 🙂
&lt;/h2&gt;

&lt;p&gt;&amp;nbsp;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fprofile-counter.glitch.me%2Fprasan29%2Fcount.svg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fprofile-counter.glitch.me%2Fprasan29%2Fcount.svg" alt="Visits" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

</description>
      <category>android</category>
      <category>aidl</category>
      <category>java</category>
      <category>kotlin</category>
    </item>
  </channel>
</rss>
