<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Brice LEPORINI</title>
    <description>The latest articles on DEV Community by Brice LEPORINI (@bleporini).</description>
    <link>https://dev.to/bleporini</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F835467%2Fa636fd36-1388-43f7-80eb-cfac9c222730.jpeg</url>
      <title>DEV Community: Brice LEPORINI</title>
      <link>https://dev.to/bleporini</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/bleporini"/>
    <language>en</language>
    <item>
      <title>Breaking Free from Chaos: Kafka's Epic Quest to Save Microservices from Circuit Breaker</title>
      <dc:creator>Brice LEPORINI</dc:creator>
      <pubDate>Mon, 29 May 2023 10:19:21 +0000</pubDate>
      <link>https://dev.to/bleporini/breaking-free-from-chaos-kafkas-epic-quest-to-save-microservices-from-circuit-breaker-4dg9</link>
      <guid>https://dev.to/bleporini/breaking-free-from-chaos-kafkas-epic-quest-to-save-microservices-from-circuit-breaker-4dg9</guid>
      <description>&lt;p&gt;In the realm of microservices architecture, developers often encounter challenges when it comes to handling the resilience and fault tolerance of distributed systems. To address these challenges, the Circuit Breaker pattern emerged as a popular solution. Originally introduced as for managing faults in distributed systems, it aimed to prevent cascading failures and provide a fallback mechanism.&lt;/p&gt;

&lt;p&gt;A common example of such kind of failure can be an e-commerce system with three microservices: Inventory, Payment, and Order.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The Inventory service experiences a sudden surge in traffic during a sale event, causing it to slow down or become unresponsive.&lt;/li&gt;
&lt;li&gt;The Payment service, relying on the Inventory service to check product availability, starts experiencing delays in processing payments due to the slow response from Inventory.&lt;/li&gt;
&lt;li&gt;The Order service, depending on both Inventory and Payment services, faces issues in processing customer orders, leading to delays and potential errors in the order fulfillment process.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--mWi2enJg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yt1fkrfb199a7ud0ewso.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--mWi2enJg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yt1fkrfb199a7ud0ewso.png" alt="Image description" width="800" height="512"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;By implementing the Circuit Breaker pattern, you can mitigate the impact of cascading failures. The Circuit Breaker would detect the failures in the Inventory Service and trip, temporarily isolating it. This allows the Payment Service and the Order Service to avoid unnecessary requests and quickly fail over to alternative mechanisms, such as using cached data or providing fallback responses. By preventing the propagation of failures, the Circuit Breaker pattern helps to maintain the stability and resilience of the system.&lt;/p&gt;

&lt;h2&gt;
  
  
  Why should you limit its usage?
&lt;/h2&gt;

&lt;p&gt;First of all, a Microservices architecture is designed to provide autonomy and independent scalability for individual services. However, when the Circuit Breaker pattern is applied, it de-facto means a certain level of dependency and coupling between services. This interference with service autonomy can undermine the very principles of microservices architecture. This kind of design is what I use to name a distributed monolith instead of a microservice architecture implementing a proper autonomy principle. Therefore, a systematic use of the Circuit Breaker might be a clue that your architecture should be revisited.&lt;/p&gt;

&lt;p&gt;Then it adds an additional layer of complexity to your microservices architecture. Each service needs to include circuit breaker logic, making the code more intricate and harder to maintain. As the number of services and interdependencies grow, managing circuit breakers becomes increasingly challenging, leading to a more convoluted system. Also, it raises a couple of questions like :&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;What's the expected behavior when the circuit breaker trips?&lt;/li&gt;
&lt;li&gt;Fall back to a predefined value?&lt;/li&gt;
&lt;li&gt;Return a cached response from a previous call? &lt;/li&gt;
&lt;li&gt;Raise an error code?&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;All those points need to be discussed with the business and implemented carefully. Long story short it means that you need to implement infrastructure management related code in your application while you should focus on the business logic.&lt;/p&gt;

&lt;h2&gt;
  
  
  Event-Driven Architecture to the Rescue
&lt;/h2&gt;

&lt;p&gt;An event-driven architecture can foster loose coupling and reduce the reliance on the Circuit Breaker pattern. Indeed, services can communicate through events rather than direct synchronous calls. Events are produced when significant actions or changes occur, and other services consume those events to react accordingly. This asynchronous nature of communication reduces tight coupling between services as they don't have direct dependencies on each other's interfaces. By decoupling services through events, the need for direct service-to-service interactions decreases. This means there are fewer points of failure and potential cascading failures, reducing the need for extensive usage of the Circuit Breaker pattern. This approach reduces the need for synchronous calls and minimizes the chances of failures propagating throughout the system. With looser coupling and the absence of tight dependencies, the Circuit Breaker pattern becomes less necessary for isolating failures.&lt;/p&gt;

&lt;p&gt;That being said, it's important to bear in mind that in an event-driven architecture, services maintain eventual consistency rather than immediate consistency. They react to events and update their own state asynchronously, which allows them to operate independently without relying on immediate responses from other services.&lt;/p&gt;

&lt;h2&gt;
  
  
  Kafka
&lt;/h2&gt;

&lt;p&gt;That kind of architecture is often paired with Kafka because it is built upon a distributed commit log design. It maintains an immutable, ordered sequence of records, or "log," allowing events to be written, stored, and consumed in the order they occurred. This inherent log-based architecture makes Kafka highly suitable for capturing, storing, and processing streams of events at scale.&lt;/p&gt;

&lt;p&gt;In addition, Kafka's fault tolerance, scalability, event retention, stream processing capabilities, exactly-once semantics, and ecosystem make it uniquely suited as a foundational component for building robust and scalable event-driven architectures. While other messaging systems can fulfill some aspects of event-driven architectures, Kafka's specific design and features make it a powerful and instrumental choice for event streaming, data processing, and reliable event-driven communication at scale.&lt;/p&gt;

&lt;p&gt;While the Circuit Breaker pattern can provide some benefits in terms of fault tolerance and resilience, it should be used judiciously in a microservices-oriented architecture. The increased complexity, operational overhead, delayed feedback, interference with service autonomy, and lack of granularity are important factors to consider when deciding whether to adopt this.&lt;/p&gt;

&lt;p&gt;As a conclusion, I would say that this decision is obviously not binary, the rule of thumb should be avoiding the Circuit Breaker, however implementing an event driven architecture might not be possible for the whole information system, as you may have to deal with external APIs out of your control or legacy systems that may be too expensive to refactor to emit events. &lt;/p&gt;

</description>
      <category>kafka</category>
      <category>microservices</category>
      <category>eventdriven</category>
      <category>circuitbreaker</category>
    </item>
    <item>
      <title>Kafka: Let's talk (again) about replication</title>
      <dc:creator>Brice LEPORINI</dc:creator>
      <pubDate>Mon, 24 Apr 2023 07:45:18 +0000</pubDate>
      <link>https://dev.to/bleporini/kafka-lets-talk-again-about-replication-2h76</link>
      <guid>https://dev.to/bleporini/kafka-lets-talk-again-about-replication-2h76</guid>
      <description>&lt;p&gt;Like in many others distributed systems, Kafka leverages data replication to implement reliability and everyone that has tapped into Kafka knows about the replication factor configuration property that is required to create any topic. So why writing a paper in 2023 to talk about that? Well in fact things have evolved other time and data replication is now a broader topic than just the partition replication factor, so let me give you an overview of the current options.&lt;/p&gt;

&lt;h2&gt;
  
  
  Replication factor
&lt;/h2&gt;

&lt;p&gt;Just a brief recap of this capability that is at the roots of Kafka itself. Data in a production Kafka cluster is replicated 3 times: once in the partition that is the leader in the replica set and two additional times in the followers. In fact, 3 is not enforced but when it comes to replication having 3 copies, it’s the commonly accepted minimum, it permits the data to still be available despite one failure aor a maintenance. Another instrumental configuration parameter is complementing this: &lt;code&gt;min.insync.replicas&lt;/code&gt;. The in-sync replica set is, as its name says, the set of replicas made up of the leader and all other replicas in sync with the leader. So, considering a producer set with &lt;code&gt;acks=all&lt;/code&gt;, a record write is considered successful if at least &lt;code&gt;min.insync.replicas&lt;/code&gt; were able to acknowledge the write. So with a &lt;strong&gt;R&lt;/strong&gt;eplication &lt;strong&gt;F&lt;/strong&gt;actor of 3, the usual min ISR value is set to 2, that way if one broker is unavailable for any reason, whether because of a failure or any planned operation like an upgrade, then a partition can still accept new records, giving the guarantee that they will be replicated even in this degraded configuration. This is why a minimal cluster requires 3 brokers, but 4 is recommended to keep the topic creation availability in case of a broker loss. &lt;/p&gt;

&lt;p&gt;On the producer side, in the majority of the use cases, &lt;code&gt;acks=all&lt;/code&gt; is the standard setting for all the reasons explained above. Note that even if &lt;code&gt;min.insync.replicas=2&lt;/code&gt;, during nominal operations, most of the time the ISR set counts all 3 replicas. Hence, this makes this replication process &lt;strong&gt;synchronous&lt;/strong&gt;.  &lt;/p&gt;

&lt;h2&gt;
  
  
  External replication with MirrorMaker or Confluent Replicator
&lt;/h2&gt;

&lt;p&gt;External replication is when records are replicated to another cluster. There are various reasons for doing that :&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Sharing data between two locations that are too far to support synchronous replication: imagine applications hosted in the US east coast producing records and other consumers located in the west coast, implying too much network latency. Another reason is if you need to share data in real time with partners, and you want to copy the records from a set of topics to a foreign cluster managed by the partner.&lt;/li&gt;
&lt;li&gt;DR scenarios: if your organization has 2 and only 2 DC, then you can't stretch the cluster, and you need to run two distinct clusters and replicate records from the primary to the DR one. But this also applies if you're hosting your cluster in the public cloud in three availability zones, and your business is so critical that you want to cover the risk of a complete region loss. &lt;/li&gt;
&lt;li&gt;When you have on-prem applications like legacy core banking systems or mainframe applications and you need to stream data to new generation applications hosted in the public cloud, one good way to implement that kind of hybrid scenario is to replicate the on-prem cluster to a fully managed one in the cloud. It also drastically helps to streamline the network round trips as there's only one kind of flow to govern, and you can read multiple times the same data and pay the network cost between your DC and the cloud only once.&lt;/li&gt;
&lt;li&gt;Data migration between two Kafka clusters&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;So tools like &lt;a href="https://kafka.apache.org/documentation/#georeplication"&gt;MirrorMaker&lt;/a&gt; and &lt;a href="https://docs.confluent.io/platform/current/multi-dc-deployments/replicator/index.html"&gt;Confluent Replicator&lt;/a&gt; allow that kind of cross-cluster replication. You can see them like external applications consuming records from one side and producing on the other side, obviously the reality is a bit more complex as they're covering a wide range of edge cases. Both of them are implemented as Kafka Connect connectors, note that MirrorMaker version 1 is not a connector. So as they're replicating beyond the ISR set, this makes this kind of replication asynchronous by design, and you should also pay attention to the fact that you can't guarantee that all records are replicated at the moment of a complete disaster, so the guaranteed &lt;strong&gt;RPO&lt;/strong&gt; can't be 0.&lt;/p&gt;

&lt;h2&gt;
  
  
  Asynchronous replication with Cluster Linking
&lt;/h2&gt;

&lt;p&gt;We saw that asynchronous replication makes sense in some scenarios, especially to avoid any latency impact on the producer side. External replication is one option for doing so, but it also comes with a couple of challenges to care about:  &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;those tools are external to the broker, which implies additional resource to manage, in a fault-tolerant manner and with the proper monitoring.&lt;/li&gt;
&lt;li&gt;as consumer offsets are stored in a distinct topic, the consumer offsets can't be preserved, so offsets need to be translated from one cluster to another, this topic is extensively covered in the &lt;a href="https://docs.confluent.io/platform/current/multi-dc-deployments/replicator/replicator-failover.html#understanding-consumer-offset-translation"&gt;documentation&lt;/a&gt;. &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This is where &lt;a href="https://docs.confluent.io/platform/current/multi-dc-deployments/cluster-linking/index.html"&gt;Cluster Linking&lt;/a&gt; comes into play. It's a feature offered by Confluent Server, which can be seen as a broker on steroids with a wide set of added capabilities, and Cluster Linking is one of them. Here the game changer is that as the replication is a feature internal to the broker, so it makes a byte-to-byte replication allowing to preserve the consumer offsets from the source cluster to the destination one. The other benefit is the reduction of the footprint on the infrastructure as there's no need to manage external components for that matter.&lt;br&gt;
Cluster Linking is also available on &lt;a href="https://confluent.cloud"&gt;Confluent Cloud&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Asynchronous intra-cluster replication
&lt;/h2&gt;

&lt;p&gt;At that stage you should wonder how would it be possible to have asynchronous replication as the followers are expected to be part of the ISR set? This is the trick: Confluent introduced an additional kind of replica: &lt;a href="https://docs.confluent.io/platform/current/multi-dc-deployments/multi-region.html#observers"&gt;the observer&lt;/a&gt;. It's another additional feature from the Confluent Server and it's different in the sense it's not part of the ISR set, which allows replicating asynchronously the leader.&lt;/p&gt;

&lt;p&gt;Ok, so now let's talk about the use cases where this feature can fit. As formerly mentioned, if you need to share data with some applications that are hosted far away from the producer, implying a latency beyond the acceptable from a producer's perspective, then building a &lt;a href="https://docs.confluent.io/platform/current/multi-dc-deployments/multi-region.html"&gt;Multi Region Cluster&lt;/a&gt; spanned across those two places makes sense. It relies on the &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica"&gt;follower fetching feature&lt;/a&gt; that was introduced in Kafka 2.4. &lt;/p&gt;

&lt;p&gt;Another very interesting scenario is when you combine observers and &lt;a href="https://docs.confluent.io/platform/current/multi-dc-deployments/multi-region.html#automatic-observer-promotion"&gt;Automatic Observer Promotion&lt;/a&gt; because it unlocks the option to stretch the cluster across only 2 DC for the data plane. It's quite common in many organizations to have only 2 DC but  remember that the control plane is implemented with Zookeeper, which is quorum based, so it needs an odd number of locations in order to maintain the quorum in case of a DC loss. So, if using the public cloud to host a Zookeeper tie-breaker is an option, which is usually accepted by Info Sec teams as no business data is managed by Zookeeper, then it's possible to overcome the 2 DC limitation mentioned previously. This is what we call &lt;strong&gt;2.5 DC Architecture&lt;/strong&gt;, to learn more, see this blog post: &lt;a href="https://www.confluent.io/blog/automatic-observer-promotion-for-safe-multi-datacenter-failover-in-confluent-6-1"&gt;Automatic Observer Promotion Brings Fast and Safe Multi-Datacenter Failover with Confluent Platform 6.1&lt;/a&gt;. The main benefits of using a stretched cluster rather than replicated clusters are that you don't need to restart and reconfigure  the client applications on the DR; as it's a single cluster, then you need fewer components and more importantly, you can guarantee that the RPO will be 0, meaning &lt;strong&gt;no data loss&lt;/strong&gt; in the event of a unavailable DC. &lt;/p&gt;

&lt;p&gt;I hope this gives clarification on all available options in terms of replication, however if you still need help to figure out what can be the appropriate setup for your use case, let's connect on &lt;a href="https://linkedin.com/in/bleporini"&gt;LinkedIn&lt;/a&gt; and discuss about it! &lt;/p&gt;

</description>
      <category>kafka</category>
      <category>replication</category>
      <category>datastreaming</category>
      <category>multidc</category>
    </item>
    <item>
      <title>Canary release with Kafka</title>
      <dc:creator>Brice LEPORINI</dc:creator>
      <pubDate>Mon, 27 Mar 2023 13:56:53 +0000</pubDate>
      <link>https://dev.to/bleporini/canary-release-with-kafka-1h89</link>
      <guid>https://dev.to/bleporini/canary-release-with-kafka-1h89</guid>
      <description>&lt;p&gt;When a new version for a product has to be released, it has always been key as it's a critical moment for engineering teams as well as for operation teams; mainly because it's usually managed as a big bang switch over. So during this operation, if something bad happens like a migration procedure that is not running as expected, or the system is not working like it should be, whether it's a functional issue or a performance one, it puts a lot of pressure on the teams because it has a direct impact on the business.&lt;/p&gt;

&lt;p&gt;Canary release, or sometimes called friends and family, is a more and more common practice when a new version of a software is rolled out. It consists in keeping both versions running side by side and gradually increasing the traffic managed by the new version during the sunset phase of the old one. Then it becomes way easier to check that no major bug is impacting the new version or to verify that it's able to deal with the real life traffic. Long story short, whatever the problem can be faced, the chances that it impacts the business are drastically dropping down. &lt;/p&gt;

&lt;p&gt;The term Canary Release comes from the time when people were working underground in the mines and the risk was if the level of CO2 in the air increased, as it's scentless, miners died silently. To cover that risk, they used to work with a canary bird in a cage and if the canary dies, it means that the level of CO2 increased dangerously, as this kind of bird is even more sensitive than humans to CO2.  &lt;/p&gt;

&lt;p&gt;When thinking about how to implement that for a REST API or a web application, the usual way is to use a reverse proxy to balance the traffic across the two versions, but when considering data streaming, how to implement that with data processors while maintaining all guarantees that Kafka offers? This is the challenge I faced and this paper is to share my proposition to tackle it.&lt;/p&gt;

&lt;h2&gt;
  
  
  Let's put some basic requirements
&lt;/h2&gt;

&lt;p&gt;First of all, it shouldn't have any impact on the design or on the code neither of the producer nor of the consumer. Then the producer shouldn't be aware that the traffic is split down to two different processors for the same purpose, which is a common concept between loose coupled applications. Finally, it must guarantee that every record is processed, and processed by only one consumer.&lt;/p&gt;

&lt;h2&gt;
  
  
  Scenario
&lt;/h2&gt;

&lt;p&gt;Let's consider that you want to introduce a new version and at first it will only deal with 10% of the traffic. Usual verifications are applied: no excessive consumer lag, no functional regressions or bug. Then you plan to gradually increase the traffic to be applied and at the final stage, completely switch to the new version, allowing you to dispose the old one.&lt;/p&gt;

&lt;h2&gt;
  
  
  Using ksqlDB as a router
&lt;/h2&gt;

&lt;p&gt;It is the basic idea. As the traffic needs to be split in two based on a ratio, we can leverage the &lt;a href="https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#random"&gt;RANDOM&lt;/a&gt; that returns a value between 0.0 and 1.0.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;original&lt;/span&gt; 
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;original_key&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;key_type&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="p"&gt;(...);&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;legacy_version&lt;/span&gt; 
    &lt;span class="k"&gt;with&lt;/span&gt;&lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; 
    &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;original&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;new_version&lt;/span&gt;
    &lt;span class="k"&gt;with&lt;/span&gt;&lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; 
    &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;original&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Please notice that the Schema Registry utilization offers more concise definitions. As the outcome for a persistent query is one and only one topic, then you need two persistent queries. &lt;/p&gt;

&lt;h2&gt;
  
  
  Not that fast
&lt;/h2&gt;

&lt;p&gt;That way the amount of traffic processed by both versions is compliant with the ratio defined, however, due to the nature of the &lt;code&gt;random&lt;/code&gt; function, there's neither the guarantee that every record is processed nor that each one is processed only once. In order to work around that, we need to set in stone the assigned ratio to each record and then apply the routing:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;original&lt;/span&gt; 
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;original_key&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;key_type&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="p"&gt;(...);&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;original_rated&lt;/span&gt; 
    &lt;span class="k"&gt;with&lt;/span&gt;&lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; 
    &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;rate&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;original&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;legacy_version&lt;/span&gt;
    &lt;span class="k"&gt;with&lt;/span&gt;&lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt;
    &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;original_rated&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;rate&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;new_version&lt;/span&gt; 
    &lt;span class="k"&gt;with&lt;/span&gt;&lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; 
    &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;original_rated&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;rate&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In order to start the canary release transition, the former version is stopped, then the queries above are started and the two versions of the service are started, reconfigured to consume the assigned topics.&lt;/p&gt;

&lt;h2&gt;
  
  
  Looks good, but that's not enough.
&lt;/h2&gt;

&lt;p&gt;The default ksqlDB behavior when running a query is to read the topic from the latest offset. That means that the sequence above is leading to data loss as it's quite sure that messages are pushed to the topic while running the transition from the normal to the canary release. As a consequence, the rating query is required to start consuming the input topic from the last offsets committed by the legacy service before it stopped. Unfortunately, ksqlDB doesn't offer the capability to define starting offsets for a given query. The only options are the value accepted in the query configuration parameter &lt;a href="https://docs.ksqldb.io/en/latest/reference/server-configuration/#ksqlstreamsautooffsetreset"&gt;auto.offset.reset&lt;/a&gt;: &lt;code&gt;earliest&lt;/code&gt; or &lt;code&gt;earliest&lt;/code&gt;. But the game is not yet over as the language gives access to the offset and partition of each record, thanks to the &lt;a href="https://docs.ksqldb.io/en/latest/reference/sql/data-definition/#pseudocolumns"&gt;pseudo columns&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;So the procedure requires additional steps: after the former version is stopped, for each partition, collect the committed offset for the consumer group assigned to the legacy and build the rating query accordingly in order to start consuming only unprocessed messages, as an example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SET&lt;/span&gt; &lt;span class="s1"&gt;'auto.offset.reset'&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'earliest'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;original&lt;/span&gt; &lt;span class="p"&gt;(...);&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="k"&gt;replace&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;original_rated&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt;&lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; 
    &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;rate&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;original&lt;/span&gt; 
    &lt;span class="k"&gt;where&lt;/span&gt;  
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ROWPARTITION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="k"&gt;and&lt;/span&gt; &lt;span class="n"&gt;ROWOFFSET&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;165&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="k"&gt;OR&lt;/span&gt;  
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ROWPARTITION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt; &lt;span class="k"&gt;and&lt;/span&gt; &lt;span class="n"&gt;ROWOFFSET&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;176&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="k"&gt;OR&lt;/span&gt;  
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ROWPARTITION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="k"&gt;and&lt;/span&gt; &lt;span class="n"&gt;ROWOFFSET&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;149&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="k"&gt;OR&lt;/span&gt;  
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ROWPARTITION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt; &lt;span class="k"&gt;and&lt;/span&gt; &lt;span class="n"&gt;ROWOFFSET&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;151&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="k"&gt;OR&lt;/span&gt;  
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ROWPARTITION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt; &lt;span class="k"&gt;and&lt;/span&gt; &lt;span class="n"&gt;ROWOFFSET&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;152&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="k"&gt;OR&lt;/span&gt;  
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ROWPARTITION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt; &lt;span class="k"&gt;and&lt;/span&gt; &lt;span class="n"&gt;ROWOFFSET&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;167&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It does the trick but also comes with the drawback that it requires to stream the entire content of the input topic. If the amount of data stored in the partitions is huge, it can take a non-negligible time.&lt;/p&gt;

&lt;h2&gt;
  
  
  Way to go!
&lt;/h2&gt;

&lt;p&gt;Now both versions can be started. Please notice that all those operations don't have any impact on the data producer and there's no impact at all on the applications, the only requirement is to provide the consumed topic as a configuration parameter. &lt;/p&gt;

&lt;p&gt;Then, over time, the routing ratio can be revised running the following sequence :&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Pause the rating query, not drop, otherwise offsets will be lost&lt;/li&gt;
&lt;li&gt;Update the ratio from the two routing queries&lt;/li&gt;
&lt;li&gt;Resume the rating query &lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Final stage: promote the new version
&lt;/h2&gt;

&lt;p&gt;For obvious reasons, even if you assign 100% of the traffic to the new version, all of that can't stay like that forever as it's a waste of storage (the records are copied twice in three topics) as well as a waste of processing resources (each canary release implies three persistent queries). So a final procedure is required to safely reconfigure the new application to process messages from the original input topic:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Pause the rating query&lt;/li&gt;
&lt;li&gt;Wait til the consumer groups of the two downstream services have zero lag on all partitions&lt;/li&gt;
&lt;li&gt;Collect the offsets for the consumer groups of the two applications&lt;/li&gt;
&lt;li&gt;For each partition: compute the offset the new version should start from in the original topic considering adding the last offset of the former version in the original partition + the offset of the legacy in the filtered rated partition + the offset of the new version in its respective partition; reset the offset for this partition group to this resulting offset&lt;/li&gt;
&lt;li&gt;Start the new version configured to consume the original topic and dispose everything else, that's it!&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Automate it!
&lt;/h2&gt;

&lt;p&gt;Now that every step is detailed, why not build some tooling to make it automatic? This is what I did in &lt;a href="https://github.com/bleporini/canary-router"&gt;Canary Router&lt;/a&gt;. It's based on a couple of shell scripts and comes with minimal requirements. It mainly leverages &lt;a href="https://confluent.cloud"&gt;Confluent Cloud&lt;/a&gt; resources, so the only requirement is to sign up, you'll be awarded of 400$ of free credits to spin up streaming resources, which is more than enough for a basic Kafka cluster and a small ksqlDB cluster. It's definitely not production ready, but it shows a path for running that kind of operation, and can be easily reimplemented in Java or Go to make it a proper tool. &lt;/p&gt;

&lt;p&gt;The demo is based on a Datagen source connector and the downstream services are nothing more than containers running dumb &lt;code&gt;kafka-console-consumer&lt;/code&gt;. If you want to test it with real life consumers, just implement the &lt;code&gt;(start|stop)_(legacy|new)_service&lt;/code&gt; four shell functions to pass as a context to the scripts, like the one implemented in &lt;a href="https://github.com/bleporini/canary-router/blob/main/services_examples/context.sh"&gt;context.sh&lt;/a&gt;. &lt;/p&gt;

&lt;h2&gt;
  
  
  Remarks and limitation
&lt;/h2&gt;

&lt;p&gt;There's a script that checks that the number of messages in the new/legacy topics are consistent with the fact there's no duplicate and no data loss, however it's weak as the only reliable way would be to check every message and not just count.&lt;/p&gt;

&lt;p&gt;This design works as long as the processing order is not a requirement. Even if there's no doubt that the messages are copied in the same partition numbers in the different topics, there's no guarantee that all messages with the same key are in the same topic. Then especially as the traffic is by design not evenly balanced, there will definitely be at some point some records that will be processed not in the order they were produced. That being said, I think a logic can be implemented in order to enforce that once a key becomes assigned to the new service, then all records will be sent to this topic. Feel free to comment or fork the repository and propose a pull request 😉. &lt;/p&gt;

</description>
      <category>kafka</category>
      <category>ksqldb</category>
      <category>datastreaming</category>
    </item>
    <item>
      <title>Using OpenId Connect with Confluent Cloud</title>
      <dc:creator>Brice LEPORINI</dc:creator>
      <pubDate>Mon, 27 Feb 2023 12:05:22 +0000</pubDate>
      <link>https://dev.to/bleporini/using-openid-connect-with-confluent-cloud-2a36</link>
      <guid>https://dev.to/bleporini/using-openid-connect-with-confluent-cloud-2a36</guid>
      <description>&lt;p&gt;I hope you've already read my &lt;a href="https://dev.to/bleporini/openid-connect-authentication-with-apache-kafka-31-5747"&gt;previous post&lt;/a&gt; about the capability that was added in Kafka 3.1 to authenticate applications using an external OpenId Connect identity provider. Now you also can do the same with &lt;a href="https://confluent.cloud" rel="noopener noreferrer"&gt;Confluent Cloud&lt;/a&gt;. Initially, the only way to authenticate applications was to use API keys and secret managed in Confluent Cloud, but offering the capability to manage centrally accounts, credentials and authentication flows in a single identity provider is a common expectation in many organizations.&lt;/p&gt;

&lt;p&gt;To set it up, it's quite easy and resides in two steps. First you need to declare a new identity provider for your Confluent Cloud organization. Azure and Okta are completely integrated, but let's focus on vanilla OpenId Connect. One good thing with OIDC is that this standard is completely discoverable, as an example you can freely dump the configuration for the Google OIDC service:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl https://accounts.google.com/.well-known/openid-configuration
&lt;span class="o"&gt;{&lt;/span&gt;
 &lt;span class="s2"&gt;"issuer"&lt;/span&gt;: &lt;span class="s2"&gt;"https://accounts.google.com"&lt;/span&gt;,
 &lt;span class="s2"&gt;"authorization_endpoint"&lt;/span&gt;: &lt;span class="s2"&gt;"https://accounts.google.com/o/oauth2/v2/auth"&lt;/span&gt;,
 &lt;span class="s2"&gt;"device_authorization_endpoint"&lt;/span&gt;: &lt;span class="s2"&gt;"https://oauth2.googleapis.com/device/code"&lt;/span&gt;,
 &lt;span class="s2"&gt;"token_endpoint"&lt;/span&gt;: &lt;span class="s2"&gt;"https://oauth2.googleapis.com/token"&lt;/span&gt;,
 &lt;span class="s2"&gt;"userinfo_endpoint"&lt;/span&gt;: &lt;span class="s2"&gt;"https://openidconnect.googleapis.com/v1/userinfo"&lt;/span&gt;,
 &lt;span class="s2"&gt;"revocation_endpoint"&lt;/span&gt;: &lt;span class="s2"&gt;"https://oauth2.googleapis.com/revoke"&lt;/span&gt;,
 &lt;span class="s2"&gt;"jwks_uri"&lt;/span&gt;: &lt;span class="s2"&gt;"https://www.googleapis.com/oauth2/v3/certs"&lt;/span&gt;,
&lt;span class="o"&gt;[&lt;/span&gt;...]
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;.well-known/openid-configuration&lt;/code&gt; is an endpoint implemented by all providers and this is the only thing you need to set the identity provider in Confluent Cloud :&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqk4vf9gru6mzs5dubsfo.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqk4vf9gru6mzs5dubsfo.jpg" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
As a result, with the configuration URL, Confluent Cloud is able to automatically gather the issuer URI, but more importantly the JWKS, which provides the public keys to verify the JWTs.&lt;/p&gt;

&lt;p&gt;The second step is to declare an identity pool. In fact, it's a way to define how JWT tokens issued by the IDP are qualified to be authenticated to the Confluent Cloud service :&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flwp5q36825rwy8fbem4o.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flwp5q36825rwy8fbem4o.jpg" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For this demo, let's keep it simple. The &lt;code&gt;claims.sub&lt;/code&gt; default value for the identity claim field is perfectly fine as it's a &lt;a href="https://www.rfc-editor.org/rfc/rfc7519#section-4.1.2" rel="noopener noreferrer"&gt;registered claim&lt;/a&gt; to identify the principal. Here's an example payload of a JWT (modified, not really issued by Google 😉):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"iss"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://accounts.google.com"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"sub"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"dZJPsd9oVtAciRY8F5lHzk4yS0hfnBiE@clients"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"aud"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://kafka.auth"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"iat"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1672817905&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"exp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1672904305&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"azp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"dZJPsd9oVtAciRY8F5lHzk4yS0hfnBiE"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"scope"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"scope"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"gty"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"client-credentials"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then let's set that every JWT that comes with the &lt;code&gt;https://kafka.auth&lt;/code&gt; value in the &lt;code&gt;aud&lt;/code&gt; claim is valid. Notice that the audience claim can be an array of strings instead of a single valued field. This value is set in the IDP.&lt;/p&gt;

&lt;p&gt;To finalize the creation, you need to bind roles and resources to this new  identity pool, which is an usual operation for every Confluent Cloud administrator!&lt;/p&gt;

&lt;p&gt;Now let's check that it's working with a dumb Kafka consumer. Thanks to the New Client wizard, getting a base configuration to start with is easy:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1rc9hpludf6uwuwr16wc.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1rc9hpludf6uwuwr16wc.jpg" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;But you need to tweak it a bit to define how the Java application must request the JWT to provide Confluent Cloud, it's almost like what was &lt;a href="https://dev.to/bleporini/openid-connect-authentication-with-apache-kafka-31-5747"&gt;showed in my previous post&lt;/a&gt; but in addition you need to set the JAAS sonfiguration with the logical cluster id and the identity pool id:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;&lt;span class="py"&gt;sasl.mechanism&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;OAUTHBEARER&lt;/span&gt;
&lt;span class="py"&gt;sasl.login.callback.handler.class&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler&lt;/span&gt;
&lt;span class="py"&gt;sasl.login.connect.timeout.ms&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;15000&lt;/span&gt;
&lt;span class="py"&gt;sasl.oauthbearer.token.endpoint.url&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;https://oauth2.googleapis.com/token&lt;/span&gt;
&lt;span class="py"&gt;sasl.jaas.config&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;clientId="dZJPsd9oVtAciRY8F5lHzk4yS0hfnBiE" &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;clientSecret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;extension_logicalCluster="lkc-000000" &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;extension_identityPoolId="pool-XXXXX" ;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then you can test it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--rm&lt;/span&gt; &lt;span class="nt"&gt;-ti&lt;/span&gt; &lt;span class="nt"&gt;-v&lt;/span&gt; &lt;span class="nv"&gt;$PWD&lt;/span&gt;:/work &lt;span class="nt"&gt;--workdir&lt;/span&gt; /work confluentinc/cp-kafka kafka-console-consumer &lt;span class="nt"&gt;--consumer&lt;/span&gt;.config config.properties &lt;span class="nt"&gt;--topic&lt;/span&gt; &lt;span class="nb"&gt;test&lt;/span&gt; &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; pkc-xxxxxx.europe-west1.gcp.confluent.cloud:9092 &lt;span class="nt"&gt;--from-beginning&lt;/span&gt;
&lt;span class="o"&gt;[&lt;/span&gt;2023-01-04 12:17:49,565] WARN These configurations &lt;span class="s1"&gt;'[basic.auth.credentials.source, acks, schema.registry.url, basic.auth.user.info]'&lt;/span&gt; were supplied but are not used yet. &lt;span class="o"&gt;(&lt;/span&gt;org.apache.kafka.clients.consumer.ConsumerConfig&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"ordertime"&lt;/span&gt;:1497014222380,&lt;span class="s2"&gt;"orderid"&lt;/span&gt;:18,&lt;span class="s2"&gt;"itemid"&lt;/span&gt;:&lt;span class="s2"&gt;"Item_184"&lt;/span&gt;,&lt;span class="s2"&gt;"address"&lt;/span&gt;:&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"city"&lt;/span&gt;:&lt;span class="s2"&gt;"Mountain View"&lt;/span&gt;,&lt;span class="s2"&gt;"state"&lt;/span&gt;:&lt;span class="s2"&gt;"CA"&lt;/span&gt;,&lt;span class="s2"&gt;"zipcode"&lt;/span&gt;:94041&lt;span class="o"&gt;}}&lt;/span&gt;
&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"ordertime"&lt;/span&gt;:1497014222380,&lt;span class="s2"&gt;"orderid"&lt;/span&gt;:18,&lt;span class="s2"&gt;"itemid"&lt;/span&gt;:&lt;span class="s2"&gt;"Item_184"&lt;/span&gt;,&lt;span class="s2"&gt;"address"&lt;/span&gt;:&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"city"&lt;/span&gt;:&lt;span class="s2"&gt;"Mountain View"&lt;/span&gt;,&lt;span class="s2"&gt;"state"&lt;/span&gt;:&lt;span class="s2"&gt;"CA"&lt;/span&gt;,&lt;span class="s2"&gt;"zipcode"&lt;/span&gt;:94041&lt;span class="o"&gt;}}&lt;/span&gt;
^CProcessed a total of 2 messages
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Give it an automation flavour...
&lt;/h2&gt;

&lt;p&gt;All of that was manually set up, using graphical user interfaces and wizards in order to walk you gradually through this process, however modern organization requires an automated way to provision resources. Guess what, you have multiple options to do that with Confluent Cloud. The low level one is to use the &lt;a href="https://docs.confluent.io/cloud/current/api.html#tag/Identity-Providers-(iamv2)" rel="noopener noreferrer"&gt;Confluent Cloud REST API&lt;/a&gt; but more probably you will opt for the Terraform option. That way, you have a real &lt;strong&gt;Infrastructure as Code&lt;/strong&gt; approach and it's completely embeddable in a global infrastructure definition. So feel free to read the Confluent Cloud Terraform provider documentation and especially the sections about the &lt;a href="https://registry.terraform.io/providers/confluentinc/confluent/latest/docs/resources/confluent_identity_provider" rel="noopener noreferrer"&gt;identity provider&lt;/a&gt; and about the &lt;a href="https://registry.terraform.io/providers/confluentinc/confluent/latest/docs/resources/confluent_identity_pool" rel="noopener noreferrer"&gt;identity pool&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Obviously all of that is only a initial introductory to OIDC integration in Confluent Cloud and I recommend having a look to the &lt;a href="https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/identity-providers.html" rel="noopener noreferrer"&gt;comprehensive documentation&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>cloud</category>
      <category>security</category>
      <category>oauth</category>
    </item>
    <item>
      <title>OpenID Connect authentication with Apache Kafka 3.1</title>
      <dc:creator>Brice LEPORINI</dc:creator>
      <pubDate>Tue, 03 Jan 2023 09:49:28 +0000</pubDate>
      <link>https://dev.to/bleporini/openid-connect-authentication-with-apache-kafka-31-5747</link>
      <guid>https://dev.to/bleporini/openid-connect-authentication-with-apache-kafka-31-5747</guid>
      <description>&lt;p&gt;Dear reader, this is not going to be fun because today we're talking about security. However, to make it less boring, this is about taking advantage of the support of OpenID Connect (OIDC) in Kafka 3.1, the foundation of &lt;a href="https://www.confluent.io/blog/introducing-confluent-platform-7-1/" rel="noopener noreferrer"&gt;Confluent Platform 7.1&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  OpenID Connect
&lt;/h2&gt;

&lt;p&gt;Let's start with a few words about &lt;a href="https://openid.net/connect/" rel="noopener noreferrer"&gt;OIDC&lt;/a&gt;. It's an open standard that completes OAuth2.0. The aim of this paper is not to get a proper introduction to OIDC, but let's emphasize some key differences with OAuth2.0.&lt;/p&gt;

&lt;p&gt;First of all, in OAuth2.0, the token is nothing more than an opaque string that has to be verified against the authorization server to be trusted. OIDC uses &lt;a href="https://datatracker.ietf.org/doc/html/rfc7519" rel="noopener noreferrer"&gt;JSON Web Token&lt;/a&gt;. It's a signed JSON document, base64 encoded. The cool thing is, as it's signed, applications can trust it without requiring any requests to the authorization server, so it implies only processing resources, which scales way better than point-to-point connections. The only element the application (here the application is a Kafka broker) needs is the public key for validating the token, and it's published by the authorization server, with another open specification, &lt;a href="https://datatracker.ietf.org/doc/html/rfc7517" rel="noopener noreferrer"&gt;JWKS&lt;/a&gt;, and is easily cacheable.&lt;/p&gt;

&lt;p&gt;Obviously, this is an extremely incomplete summary of OIDC. What I like about it is that it frees the application from authentication method complexity. With OIDC, the organization can opt for simple user/password authentication, MFA, biometrics, SSO, multiple authentication flows and other options:  it has no impact on the application as long as it complies with the standard.&lt;/p&gt;

&lt;h2&gt;
  
  
  Putting it together with Kafka
&lt;/h2&gt;

&lt;p&gt;Here, we're keeping it simple as the use case is to make an application to application authentication. So I'm using only client id and client secret. In order to make it as light as possible, the authorization server is &lt;a href="https://auth0.com/" rel="noopener noreferrer"&gt;Auth0&lt;/a&gt;, a fully managed service with a free tier. To set it up, I recommend reading the section &lt;a href="https://auth0.com/docs/quickstart/backend" rel="noopener noreferrer"&gt;Backend/API&lt;/a&gt; of the documentation. Kafka is part of the listed backend, but the &lt;a href="https://auth0.com/docs/quickstart/backend/java-spring-security5#configure-auth0-apis" rel="noopener noreferrer"&gt;Configure Auth0 APIs&lt;/a&gt; paragraph of any kind of backend fits for this PoC. You can feel free to opt for any other authentication provider as the standard is open and there are multiple implementation alternatives, self-managed as well as as-a-service.&lt;/p&gt;

&lt;p&gt;The support of OIDC in Kafka 3.1 is an extension of an existing feature and is defined in &lt;a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575" rel="noopener noreferrer"&gt;KIP-768&lt;/a&gt;. The authentication flow is pretty simple:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkhrrr4b5cvv8mjamohcv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkhrrr4b5cvv8mjamohcv.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;During startup, the broker collects the public key set from the authorization server. The client starts by authenticating against the authorization server, then the latter issues a JWT. This token is then used for the SASL/OAUTHBEARER authentication. The broker now validates the token by verifying the signature and claims to clear the client.&lt;/p&gt;

&lt;p&gt;To make it more fun, I'm using Kafka in KRaft mode (so without Zookeeper) based on this example &lt;a href="https://github.com/confluentinc/cp-all-in-one/tree/latest/cp-all-in-one-kraft" rel="noopener noreferrer"&gt;running in Docker&lt;/a&gt; provided by &lt;a href="https://www.confluent.io/" rel="noopener noreferrer"&gt;Confluent&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The first step is to validate the Auth0 setup, and Kafka comes with a handy command line tool:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;-ti&lt;/span&gt; &lt;span class="nt"&gt;--rm&lt;/span&gt; confluentinc/cp-kafka:7.1.0 kafka-run-class org.apache.kafka.tools.OAuthCompatibilityTool &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--clientId&lt;/span&gt; XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--clientSecret&lt;/span&gt; XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--sasl&lt;/span&gt;.oauthbearer.jwks.endpoint.url https://xxxx-xxxxx.us.auth0.com/.well-known/jwks.json &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--sasl&lt;/span&gt;.oauthbearer.token.endpoint.url https://xxxx-xxxxx.us.auth0.com/oauth/token &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--sasl&lt;/span&gt;.oauthbearer.expected.audience https://kafka.auth

PASSED 1/5: client configuration
PASSED 2/5: client JWT retrieval
PASSED 3/5: client JWT validation
PASSED 4/5: broker configuration
PASSED 5/5: broker JWT validation
SUCCESS
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;All  configurations come from the authentication provider. Any other kind of output would require you to have a look at the Auth0 configuration.&lt;br&gt;
Now let's tweak the broker configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;2'&lt;/span&gt;
&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;

  &lt;span class="na"&gt;broker&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;confluentinc/cp-kafka:7.1.0&lt;/span&gt;
    &lt;span class="na"&gt;hostname&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;broker&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;broker&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;9092:9092"&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;9101:9101"&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_BROKER_ID&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_LISTENER_SECURITY_PROTOCOL_MAP&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OIDC:SASL_PLAINTEXT'&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_ADVERTISED_LISTENERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;PLAINTEXT://broker:29092,OIDC://localhost:9092'&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_TRANSACTION_STATE_LOG_MIN_ISR&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_JMX_PORT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;9101&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_JMX_HOSTNAME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;localhost&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_PROCESS_ROLES&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;broker,controller'&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_NODE_ID&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_CONTROLLER_QUORUM_VOTERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;1@broker:29093'&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_LISTENERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;PLAINTEXT://broker:29092,CONTROLLER://broker:29093,OIDC://0.0.0.0:9092'&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_INTER_BROKER_LISTENER_NAME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;PLAINTEXT'&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_CONTROLLER_LISTENER_NAMES&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;CONTROLLER'&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_LOG_DIRS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/tmp/kraft-combined-logs'&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_SASL_ENABLED_MECHANISMS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;OAUTHBEARER&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;$JWKS_ENDPOINT_URL&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_OPTS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;-Djava.security.auth.login.config=/tmp/kafka_server_jaas.conf&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_SASL_OAUTHBEARER_EXPECTED_AUDIENCE&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;$OIDC_AUD&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_LISTENER_NAME_OIDC_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler&lt;/span&gt;
    &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./update_run.sh:/tmp/update_run.sh&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./kafka_server_jaas.conf:/tmp/kafka_server_jaas.conf&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./client.properties:/tmp/client.properties&lt;/span&gt;
    &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bash&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;-c&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;'if&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;[&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;!&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;-f&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;/tmp/update_run.sh&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;];&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;then&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;echo&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt;ERROR:&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;Did&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;you&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;forget&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;the&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;update_run.sh&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;file&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;that&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;came&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;with&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;this&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;docker-compose.yml&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;file?&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;&amp;amp;&amp;amp;&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;exit&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;1&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;;&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;else&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;/tmp/update_run.sh&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;&amp;amp;&amp;amp;&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;/etc/confluent/docker/run&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;;&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;fi'"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here are the differences with the original example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight diff"&gt;&lt;code&gt;$ diff compose.yml compose.ori.yml
&lt;span class="p"&gt;14,15c14,15
&lt;/span&gt;&lt;span class="gd"&gt;&amp;lt;       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OIDC:SASL_PLAINTEXT'
&amp;lt;       KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,OIDC://localhost:9092'
&lt;/span&gt;&lt;span class="p"&gt;---
&lt;/span&gt;&lt;span class="gi"&gt;&amp;gt;       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
&amp;gt;       KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
&lt;/span&gt;&lt;span class="p"&gt;25c25
&lt;/span&gt;&lt;span class="gd"&gt;&amp;lt;       KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,OIDC://0.0.0.0:9092'
&lt;/span&gt;&lt;span class="p"&gt;---
&lt;/span&gt;&lt;span class="gi"&gt;&amp;gt;       KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
&lt;/span&gt;&lt;span class="p"&gt;29,33d28
&lt;/span&gt;&lt;span class="gd"&gt;&amp;lt;       KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
&amp;lt;       KAFKA_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: $JWKS_ENDPOINT_URL
&amp;lt;       KAFKA_OPTS: -Djava.security.auth.login.config=/tmp/kafka_server_jaas.conf
&amp;lt;       KAFKA_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: $OIDC_AUD
&amp;lt;       KAFKA_LISTENER_NAME_OIDC_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
&lt;/span&gt;&lt;span class="p"&gt;36,37d30
&lt;/span&gt;&lt;span class="gd"&gt;&amp;lt;       - ./kafka_server_jaas.conf:/tmp/kafka_server_jaas.conf
&amp;lt;       - ./client.properties:/tmp/client.properties
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Long story short, the external listener has been renamed and configured to use SASL_PLAINTEXT with the OAUTHBEARER mechanism. Notice that the coordinates of the authorization service are provided with environment variables in order to keep it generic.&lt;/p&gt;

&lt;p&gt;The JAAS configuration is pretty basic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;KafkaServer {
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
};
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now let’s start the broker:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ JWKS_ENDPOINT_URL&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;https://xxxx-xxxxx.us.auth0.com/.well-known/jwks.json &lt;span class="nv"&gt;OIDC_AUD&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;https://kafka.auth compose up
&lt;span class="o"&gt;[&lt;/span&gt;+] Running 1/1
 ⠿ Container broker  Created                       0.1s
Attaching to broker
broker  | &lt;span class="o"&gt;===&amp;gt;&lt;/span&gt; User
broker  | &lt;span class="nv"&gt;uid&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1000&lt;span class="o"&gt;(&lt;/span&gt;appuser&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nv"&gt;gid&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1000&lt;span class="o"&gt;(&lt;/span&gt;appuser&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nb"&gt;groups&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1000&lt;span class="o"&gt;(&lt;/span&gt;appuser&lt;span class="o"&gt;)&lt;/span&gt;
broker  | &lt;span class="o"&gt;===&amp;gt;&lt;/span&gt; Configuring ...
broker  | &lt;span class="o"&gt;===&amp;gt;&lt;/span&gt; Running preflight checks ...
broker  | &lt;span class="o"&gt;===&amp;gt;&lt;/span&gt; Check &lt;span class="k"&gt;if&lt;/span&gt; /var/lib/kafka/data is writable ...
broker  | &lt;span class="o"&gt;===&amp;gt;&lt;/span&gt; Check &lt;span class="k"&gt;if &lt;/span&gt;Zookeeper is healthy ...
broker  | ignore zk-ready  40
broker  | Formatting /tmp/kraft-combined-logs
broker  | &lt;span class="o"&gt;===&amp;gt;&lt;/span&gt; Launching ...
broker  | &lt;span class="o"&gt;===&amp;gt;&lt;/span&gt; Launching kafka ...
&lt;span class="o"&gt;[&lt;/span&gt;...]
broker  | &lt;span class="o"&gt;[&lt;/span&gt;2022-04-15 06:35:13,095] INFO KafkaConfig values:
broker  |   advertised.listeners &lt;span class="o"&gt;=&lt;/span&gt; PLAINTEXT://broker:29092,OIDC://localhost:9092
&lt;span class="o"&gt;[&lt;/span&gt;...]
broker  |   listener.security.protocol.map &lt;span class="o"&gt;=&lt;/span&gt; CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OIDC:SASL_PLAINTEXT
broker  |   listeners &lt;span class="o"&gt;=&lt;/span&gt; PLAINTEXT://broker:29092,CONTROLLER://broker:29093,OIDC://0.0.0.0:9092
&lt;span class="o"&gt;[&lt;/span&gt;...]
broker  |   sasl.enabled.mechanisms &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;OAUTHBEARER]
&lt;span class="o"&gt;[&lt;/span&gt;...]
broker  |   sasl.oauthbearer.expected.audience &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;https://kafka.auth]
&lt;span class="o"&gt;[&lt;/span&gt;...]
broker  |   sasl.oauthbearer.jwks.endpoint.url &lt;span class="o"&gt;=&lt;/span&gt; https://xxxx-xxxxx.us.auth0.com/.well-known/jwks.json
&lt;span class="o"&gt;[&lt;/span&gt;...]
broker  | &lt;span class="o"&gt;[&lt;/span&gt;2022-04-15 06:35:13,159] INFO &lt;span class="o"&gt;[&lt;/span&gt;BrokerLifecycleManager &lt;span class="nb"&gt;id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. &lt;span class="o"&gt;(&lt;/span&gt;kafka.server.BrokerLifecycleManager&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Good stuff! Next, let's configure the client:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;&lt;span class="py"&gt;security.protocol&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;SASL_PLAINTEXT&lt;/span&gt;
&lt;span class="py"&gt;sasl.mechanism&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;OAUTHBEARER&lt;/span&gt;
&lt;span class="py"&gt;sasl.login.callback.handler.class&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler&lt;/span&gt;
&lt;span class="py"&gt;sasl.login.connect.timeout.ms&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;15000&lt;/span&gt;
&lt;span class="py"&gt;sasl.oauthbearer.token.endpoint.url&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;https://xxxx-xxxxx.us.auth0.com/oauth/token&lt;/span&gt;
&lt;span class="py"&gt;sasl.jaas.config&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;clientId="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;clientSecret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" ;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We're now good to go with some basic produce and consume tests. You may have noticed that I also mounted the client configuration file in the broker container, it's pure convenience to run the clients in the same container:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-ti&lt;/span&gt; broker kafka-console-producer &lt;span class="nt"&gt;--producer&lt;/span&gt;.config /tmp/client.properties &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;--topic&lt;/span&gt; &lt;span class="nb"&gt;test&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;Hello OIDC!
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;%
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And in a different terminal:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-ti&lt;/span&gt; broker kafka-console-consumer &lt;span class="nt"&gt;--consumer&lt;/span&gt;.config /tmp/client.properties &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;--topic&lt;/span&gt; &lt;span class="nb"&gt;test
&lt;/span&gt;Hello OIDC!
^CProcessed a total of 1 messages
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That's it!&lt;/p&gt;

&lt;p&gt;Running the client without the proper configuration raises errors on both sides, testifying that the broker is rejecting it as expected:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-ti&lt;/span&gt; broker kafka-console-consumer &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;--topic&lt;/span&gt; &lt;span class="nb"&gt;test&lt;/span&gt;
&lt;span class="o"&gt;[&lt;/span&gt;2022-04-15 06:57:28,564] WARN &lt;span class="o"&gt;[&lt;/span&gt;Consumer &lt;span class="nv"&gt;clientId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;console-consumer, &lt;span class="nv"&gt;groupId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;console-consumer-9357] Bootstrap broker localhost:9092 &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;id&lt;/span&gt;: &lt;span class="nt"&gt;-1&lt;/span&gt; rack: null&lt;span class="o"&gt;)&lt;/span&gt; disconnected &lt;span class="o"&gt;(&lt;/span&gt;org.apache.kafka.clients.NetworkClient&lt;span class="o"&gt;)&lt;/span&gt;

broker  | &lt;span class="o"&gt;[&lt;/span&gt;2022-04-15 06:57:28,247] INFO &lt;span class="o"&gt;[&lt;/span&gt;SocketServer &lt;span class="nv"&gt;listenerType&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;BROKER, &lt;span class="nv"&gt;nodeId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1] Failed authentication with /127.0.0.1 &lt;span class="o"&gt;(&lt;/span&gt;Unexpected Kafka request of &lt;span class="nb"&gt;type &lt;/span&gt;METADATA during SASL handshake.&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;org.apache.kafka.common.network.Selector&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you increase the level of the &lt;code&gt;org.apache.kafka.common.security&lt;/code&gt; logger, you'll be able to see the token parsed.&lt;/p&gt;

&lt;p&gt;For reference, I also recommend reading the &lt;a href="https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_oauth.html#" rel="noopener noreferrer"&gt;SASL/OAUTHBEARER documentation&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>oauth2</category>
      <category>oidc</category>
      <category>openidconnect</category>
    </item>
    <item>
      <title>ksqlDB : pull queries on streams</title>
      <dc:creator>Brice LEPORINI</dc:creator>
      <pubDate>Mon, 14 Nov 2022 12:54:57 +0000</pubDate>
      <link>https://dev.to/bleporini/ksqldb-pull-queries-on-streams-5828</link>
      <guid>https://dev.to/bleporini/ksqldb-pull-queries-on-streams-5828</guid>
      <description>&lt;p&gt;&lt;code&gt;ksqlDB&lt;/code&gt; (initally named &lt;code&gt;KSQL&lt;/code&gt;) is not a new product as the first preview versions were released more than four years ago now. And since the beginning, there’s been a common misunderstanding: developers think at first that KSQL is a language to interrogate topic content. And this isn’t the DNA : as a streaming database; its purpose is to offer the means to process data in real time, at scale and in a resilient manner, with various high level features such as joining streams of events, continuously aggregating data, etc.&lt;/p&gt;

&lt;p&gt;However, the fact that developers had that kind of expectation from the product is not a surprise; trying to do things in the way you know doing it ever since sounds natural. The challenge is  paradigm shift. As a solutions engineer, trying to ease the change of culture and to open eyes on the differences between data at rest and data in motion is my daily job.&lt;/p&gt;

&lt;p&gt;That being said, there’s no point in opposing reacting on events to interrogating states, both are valid designs to be used properly depending on the needs. I guess that’s why ksqlDB evolved over time and offered pull queries in v0.6.0. At first it was only available for materialized views, then to tables, and now it’s finally available for streams.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Yes, now you have the capability to write that kind of statement&lt;/strong&gt; : &lt;code&gt;SELECT ... from &amp;lt;a topic mapped as a stream&amp;gt; WHERE ...&lt;/code&gt; &lt;/p&gt;

&lt;p&gt;As a pull query, it’s executed, the data is fetched and returned to the client then the connection is closed.&lt;/p&gt;

&lt;h2&gt;
  
  
  The right tool for the job
&lt;/h2&gt;

&lt;p&gt;Before throwing away PostgreSQL and other databases of that kind, let’s take a step back and see what will be the impact of that kind of query. I created a small test with &lt;a href="https://confluent.cloud"&gt;Confluent Cloud&lt;/a&gt; with a basic cluster and used the &lt;a href="https://www.confluent.io/hub/confluentinc/kafka-connect-datagen"&gt;Datagen Source Connector&lt;/a&gt; in order to generate some dummy data, based on the inventory model. So I let it run till I had a significant amount of data on this topic, around 1GB. After that, I mapped the topic in ksqlDB:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;STREAM&lt;/span&gt; &lt;span class="n"&gt;inventory&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafka_topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'inventory'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value_format&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'AVRO'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Pay attention that thanks to the Schema Registry integration, the stream is automatically created with the expected data structure :&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--yPoCD8ZV--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kbrhnml1pp8xp346t4pc.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--yPoCD8ZV--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kbrhnml1pp8xp346t4pc.jpg" alt="Data structure" width="310" height="282"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Then I executed a useless query that returns no records at all:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt;  &lt;span class="n"&gt;INVENTORY&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As expected, the execution returned no record. What’s good with &lt;a href="https://confluent.cloud"&gt;Confluent Cloud&lt;/a&gt; is that in addition to spinning up a cluster in a couple of seconds, it comes with an out-of-the-box metrics in the user interface, and the impact of that query can be checked almost immediately in the consumption graph:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--X0hcl6x7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/j3d8nzp3gg7xlly1jwzt.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--X0hcl6x7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/j3d8nzp3gg7xlly1jwzt.jpg" alt="Metrics" width="880" height="346"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You can clearly see that the whole content of the topic was scanned, generating a lot of outgoing traffic.&lt;/p&gt;

&lt;p&gt;Thank you Captain Obvious, you’ve just demonstrated that Kafka is not a database. So in which case  are these pull queries useful? Well, imagine a topic in which you have to do forensics, spotting a set of records against some criteria requires to build an application with a consumer that fully reads the topic content and applies the discriminant (in fact this is what a pull query does). Dramatically painful for a basic need. With pull queries on streams, the longest part is the execution 😎.&lt;/p&gt;

&lt;p&gt;All of that is to illustrate what is briefly mentioned in the &lt;a href="https://www.confluent.io/blog/ksqldb-0-23-1-features-updates/#pull-queries"&gt;ksqlDB 0.23.1 blog post&lt;/a&gt; with tangible facts regarding the caution of use. If you want to know more about ksqlDB, head over to &lt;a href="https://ksqldb.io"&gt;ksqldb.io&lt;/a&gt; to get started, where you can follow the quick start, read the docs, and learn more! Pro tip, I especially recommend checking out &lt;a href="https://confluent.cloud"&gt;Confluent Cloud&lt;/a&gt; as it will give you the opportunity to have a complete working environment in a couple of minutes 😉.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>ksqldb</category>
      <category>datastreaming</category>
      <category>streamprocessing</category>
    </item>
    <item>
      <title>Why you should not query a database in your stream processors</title>
      <dc:creator>Brice LEPORINI</dc:creator>
      <pubDate>Thu, 24 Mar 2022 06:19:56 +0000</pubDate>
      <link>https://dev.to/bleporini/why-you-should-not-query-a-database-in-your-stream-processors-4829</link>
      <guid>https://dev.to/bleporini/why-you-should-not-query-a-database-in-your-stream-processors-4829</guid>
      <description>&lt;p&gt;Enriching an event with data from another source is one of the more common use cases in event streaming.  But where does the extra enrichment information for the  event come from ? In Kafka Streams it could be easily written like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;StreamsBuilder&lt;/span&gt; &lt;span class="n"&gt;streamsBuilder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;StreamsBuilder&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

&lt;span class="n"&gt;streamsBuilder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"my_topic"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mapValues&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;enrichRecord&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;findCustomerById&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCusutomerId&lt;/span&gt;&lt;span class="o"&gt;())))&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"enriched_records"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Or in ksqlDB with a User Defined Function :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@UdfDescription&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"find_customer_by_id"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;author&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"Brice "&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;version&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"1.0.2"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;description&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"Finds a Customer entity based on its id."&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;FindCustomerByIdUdf&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nd"&gt;@Udf&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"..."&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Struct&lt;/span&gt; &lt;span class="nf"&gt;findCustomerById&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@UdfParameter&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;customerId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="o"&gt;[...]&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;customer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;used like that in &lt;code&gt;ksqlDB&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;enriched_records&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafka_topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nv"&gt;"enriched_records"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;...)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt;
&lt;span class="k"&gt;select&lt;/span&gt;
    &lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;[...]&lt;/span&gt;
    &lt;span class="n"&gt;find_customer_by_id&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;customer_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="n"&gt;emit&lt;/span&gt; &lt;span class="n"&gt;changes&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can replace the database query with an external call of any kind: REST API request, lookup in a file, etc.&lt;br&gt;
It compiles, it works and it looks like what we've been doing forever, so what’s the problem in doing this?&lt;/p&gt;

&lt;p&gt;First of all, there’s a semantic issue because stream processing is expected to be idempotent, meaning that processing again and again the same stream of events should produce the same values, unless you change the implementation of the application, obviously… And involving a third party in order to provide data to enrich your stream breaks this property, because there’s no guarantee that the external call gives the same value each time you invoke it with the same arguments.&lt;/p&gt;

&lt;p&gt;Then let’s talk about the architecture concerns. Kafka is a distributed system.  Dealing with failures is part of its DNA and there are multiple architecture patterns in order to face almost any kind of outage. This is why Kafka is the first class choice as a central nervous system for many organizations. If you put in the middle of your pipeline a dependency to an external datastore that doesn’t provide the same guarantees, then the resilience and the performance of your application are now the ones offered by this foreign system… And it’s not uncommon to fetch data from a traditional RDBMS, don’t get me wrong.  Those are really good tools providing great features but not with the same guarantee, and when it’s not available the whole pipeline is down, ruining your efforts to provide a resilient streaming platform.&lt;/p&gt;

&lt;p&gt;My next point against this kind of design is when the external call produces a side effect (meaning each call creates or updates foreign data).  In addition to the former point, it breaks the Exactly Once Semantics feature offered out-of-the-box by ksqlDB and Kafka Streams (and to vanilla Kafka client at the cost of some boilerplate) because in case of a failure of any kind during the processing of a record, there’s no means to automatically rollback changes in the remote system. Let’s illustrate it with a practical scenario: imagine the remote request increments a counter and during operations, one of the ksqlDB workers becomes unreachable for any reason. Then the workload is rebalanced to the survival instances and the last uncommitted batch of records is processed once again, meaning there are also unexpected increments in the foreign system. Hashtag data corruption.&lt;/p&gt;

&lt;p&gt;This is a well known issue of lack of distributed transaction management… but lack may not be the right term because this is not something that’s expected to be implemented. Indeed, in the past there were options, like XA, to deal with distributed transactions, but it was really cumbersome to set up, and it provided real scalability concerns by design. So this is definitely not what you expect when building a data streaming platform able to process GB of records per seconds!&lt;/p&gt;
&lt;h2&gt;
  
  
  So how to sort this out?
&lt;/h2&gt;

&lt;p&gt;Usually data enrichment is nothing more than data lookup and record merging, so the best way to do that is onboarding that data in Kafka topics and putting a table abstraction on top of it.  More details about that concept in this &lt;a href="https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/" rel="noopener noreferrer"&gt;blog post&lt;/a&gt;, then joining the stream of events to this table in order to merge the records. The good thing about this is that the external datastore is no longer interrogated, therefore this point of failure is now fixed. Even if the remote system is unavailable, it won’t have any effect on the pipeline.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fx03nk6zo62373l0v55nn.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fx03nk6zo62373l0v55nn.png" alt="stream table join"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And this is something that can be translated in ksqlDB to (considering co partitioning) :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="n"&gt;enriched_records&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafka_topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nv"&gt;"enriched_records_by_customer_id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;...)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt;
&lt;span class="k"&gt;select&lt;/span&gt;
    &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;[...]&lt;/span&gt;
&lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt; &lt;span class="k"&gt;join&lt;/span&gt; &lt;span class="n"&gt;customers&lt;/span&gt; &lt;span class="k"&gt;c&lt;/span&gt; &lt;span class="k"&gt;on&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;customer_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;c&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;customer_id&lt;/span&gt;
&lt;span class="n"&gt;emit&lt;/span&gt; &lt;span class="n"&gt;changes&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The options to make this data available in a topic are multiple: if the remote system is an application already onboarded in Kafka, then it can be updated to stream changes in the destination topic. If it’s a database or a legacy system not expected to share records in Kafka, then you can utilize source connectors such as Change Data Capture or JDBC connector.&lt;/p&gt;

&lt;h2&gt;
  
  
  What if the remote system is out of my organisation?
&lt;/h2&gt;

&lt;p&gt;This happens when you have to deal with a partner API or any kind of remote system under the control of another business unit, so it’s not possible to onboard this data in Kafka. So it looks like you’re doomed to do the call in the stream processor… Well, not that fast because there’s another concern, a bit more technical but that you should not pass over. And to understand we have to go deeper in the layers down to the Kafka client library. At the end of the day, processing a stream of records is nothing more than implementing a kind of loop:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Consumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;DataRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;DataRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;asList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

 &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;DataRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;DataRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Doing stuff&lt;/span&gt;
            &lt;span class="o"&gt;[...]&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Whether you’re writing ksql queries or Kafka Streams Java code, it will result in that kind of poll loop. The Kafka Java client library comes with the following configuration properties:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;max.poll.interval.ms&lt;/code&gt;:
The maximum delay between invocations of poll() when using consumer group management.[…] If poll() is not called before expiration of this timeout, then the consumer is considered failed and the consumer group coordinator will trigger a rebalance in order to reassign the partitions to another member.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;max.poll.records&lt;/code&gt;:
The maximum number of records returned in a single call to poll().&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now let’s say that the remote system slows down for any reason and each query/request has a one second response time. The default value for &lt;code&gt;max.poll.records&lt;/code&gt; is 500, so it means that one iteration in the poll loop can take up to &lt;strong&gt;500 seconds…&lt;/strong&gt; And the default value for &lt;code&gt;max.poll.interval.ms&lt;/code&gt; is 300000, so what will happen in this context is that the &lt;code&gt;GroupCoordinator&lt;/code&gt; will consider the client as down and trigger a rebalance. And your Kafka Streams application (or ksqlDB persistent query) is not down, so the batch of records won’t be committed and after the rebalance, the same records will be processed again and again, continuously increasing the consumer lag. This can lead to a snowball effect because the root cause of that is a slow remote system and because it’s slow, it's invoked more and more without any chance to recover… &lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcwompdsqfbtgnwtegqpy.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcwompdsqfbtgnwtegqpy.gif" alt="Boom"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Don’t think about it as theoretical concerns, because it’s something I’ve seen on the field!&lt;/p&gt;

&lt;p&gt;The cheapest answer is to tune the values of &lt;code&gt;max.poll.records&lt;/code&gt; or &lt;code&gt;max.poll.interval.ms&lt;/code&gt;, which could be fine to adapt to usual latency and response time, but it can be risky to push the limit to deal to casual spikes because this can lead to a vicious circle.&lt;/p&gt;

&lt;p&gt;What about using an asynchronous client to avoid blocking the poll loop thread? This design doesn’t work at all because &lt;code&gt;KafkaConsumer&lt;/code&gt; is not thread safe. It’s not a lack of thread safety, it’s enforced by the Kafka processing model because otherwise you lose the ordering guarantee.&lt;/p&gt;

&lt;h2&gt;
  
  
  So is there any viable option?
&lt;/h2&gt;

&lt;p&gt;Yes there is, at the cost of a less straightforward design… The basic idea is to split the process in two, delegate the request processing to another component that can take advantage of an asynchronous design:&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4e5ixzxhr40vojyeo1qm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4e5ixzxhr40vojyeo1qm.png" alt="Async processor"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here, requests are records stored in a topic, consumed by a dedicated processor that runs the request asynchronously to avoid blocking the poll loop and that will eventually write the result in an output topic. Then the second part of the initial pipeline is able to move forward by joining results with pending jobs.&lt;/p&gt;

&lt;p&gt;Wait a minute, it sounds exactly like what was described as irrelevant in a Kafka context, isn’t it? Not exactly because as long as the result of the request is not required to commit the request’s topic offsets, that’s ok. On the other hand, it requires to implement at a higher level things like timeouts, maximum in-flight requests and crash recovery.&lt;/p&gt;

&lt;p&gt;It obviously increases the complexity, however it offers a real opportunity to implement rich error management scenarios like retries with various back off policies.&lt;/p&gt;

&lt;p&gt;You can check out an implementation example of that kind of architecture &lt;a href="https://github.com/bleporini/kafka-side-effects-processor" rel="noopener noreferrer"&gt;here&lt;/a&gt;. It’s not battle tested but it can give you inspiration for your own needs.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>ksqldb</category>
      <category>kafkastreams</category>
      <category>streamprocessing</category>
    </item>
  </channel>
</rss>
