<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Thriving.dev | Software Architecture</title>
    <description>The latest articles on DEV Community by Thriving.dev | Software Architecture (@thriving-dev).</description>
    <link>https://dev.to/thriving-dev</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1092852%2F517e5c19-f355-4a80-a2e0-1df51b79017c.png</url>
      <title>DEV Community: Thriving.dev | Software Architecture</title>
      <link>https://dev.to/thriving-dev</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/thriving-dev"/>
    <language>en</language>
    <item>
      <title>Reduce Rebalance Downtime (by x450) for Stateless Kafka Streams Apps [Simple Steps]</title>
      <dc:creator>Thriving.dev | Software Architecture</dc:creator>
      <pubDate>Sun, 18 Jun 2023 22:43:22 +0000</pubDate>
      <link>https://dev.to/thriving-dev/reduce-rebalance-downtime-by-x450-for-stateless-kafka-streams-apps-simple-steps-4mi6</link>
      <guid>https://dev.to/thriving-dev/reduce-rebalance-downtime-by-x450-for-stateless-kafka-streams-apps-simple-steps-4mi6</guid>
      <description>&lt;p&gt;In this post, we’ll &lt;strong&gt;learn how Kafka Streams Consumers behave differently from regular Kafka Consumers&lt;/strong&gt;, the consequences for the application, as well as steps to minimise downtimes in event processing when consumer group members change.&lt;/p&gt;

&lt;p&gt;With the &lt;strong&gt;default configuration&lt;/strong&gt;, a containerised &lt;em&gt;stateless&lt;/em&gt; &lt;strong&gt;Streams app pauses processing for &amp;gt;45s&lt;/strong&gt; when one app instance (group member) is removed or restarted.&lt;br&gt;
For real-time data streaming workloads with a low e2e latency as an NFR (non-functional requirement), such a long ‘rebalance downtime’ often is unacceptable.&lt;/p&gt;

&lt;p&gt;Fortunately, there’s a simple yet efficient solution to address this problem.&lt;/p&gt;

&lt;p&gt;(i) As a bonus, we will look under the hood of Kafka Consumer Groups, the Group Coordinator &amp;amp; Rebalance Protocol, and measure, analyse and evaluate a simulation of a group member (replica) re-creation running on Kubernetes.&lt;/p&gt;

&lt;p&gt;...&lt;strong&gt;TLDR&lt;/strong&gt;? here's a &lt;strong&gt;spoiler&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"internal.leave.group.on.close"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Theory
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Regular Consumer Behaviour
&lt;/h3&gt;

&lt;p&gt;Let’s briefly recap on Kafka Consumers and Consumer groups.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;An Apache Kafka® &lt;strong&gt;Consumer&lt;/strong&gt; is a client application that subscribes to (reads and processes) events.&lt;/p&gt;

&lt;p&gt;A &lt;strong&gt;consumer group&lt;/strong&gt; is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as &lt;em&gt;rebalancing&lt;/em&gt; the group.&lt;/p&gt;

&lt;p&gt;(…) One of the brokers is designated as the group’s &lt;strong&gt;coordinator&lt;/strong&gt; and is responsible for managing the members of the group as well as their partition assignments.&lt;/p&gt;

&lt;p&gt;(…) When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then begins a group rebalance so that the new member is assigned its fair share of the group’s partitions. Every rebalance results in a new &lt;strong&gt;generation&lt;/strong&gt; of the group.&lt;/p&gt;

&lt;p&gt;Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no heartbeat is received before expiration of the configured &lt;strong&gt;session timeout&lt;/strong&gt;, then the coordinator will kick the member out of the group and reassign its partitions to another member.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;(Source: &lt;a href="https://docs.confluent.io/platform/current/clients/consumer.html"&gt;Kafka Consumer | Confluent Documentation&lt;/a&gt;)&lt;/p&gt;

&lt;p&gt;When a consumer leaves a group due to a controlled shutdown or a crash, its partitions are reassigned automatically to other consumers. Similarly, when a consumer (re) joins an existing group, all partitions are rebalanced among the group members. This dynamic group cooperation is facilitated by the &lt;strong&gt;Kafka Rebalance Protocol&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;For a rebalance scenario where one instance is stopped, the Consumer sends a &lt;code&gt;LeaveGroup&lt;/code&gt; request to the coordinator before stopping (as part of a graceful shutdown, &lt;code&gt;Consumer#close()&lt;/code&gt;), which triggers a rebalance.&lt;/p&gt;

&lt;p&gt;During the entire rebalancing process, i.e. as long as the partitions are not reassigned, consumers no longer process any data. Fortunately, rebalancing is very fast, typically between anything from 50ms to seconds. It may vary depending on different factors, such as load on your Kafka cluster or the complexity of your Streams topology (no. of input topics, streams tasks := partitions, and state stores, … -&amp;gt; total no. of consumers).&lt;/p&gt;

&lt;h3&gt;
  
  
  != Streams Consumer Behaviour
&lt;/h3&gt;

&lt;p&gt;For Kafka Streams, some config properties are overridden via (&lt;a href="https://github.com/apache/kafka/blob/3.4.1/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1115"&gt;StreamsConfig.CONSUMER_DEFAULT_OVERRIDES&lt;/a&gt;). One of those properties is &lt;code&gt;"internal.leave.group.on.close"&lt;/code&gt;, set to &lt;code&gt;false&lt;/code&gt; (enabled by default for &lt;em&gt;regular&lt;/em&gt; Consumers).&lt;/p&gt;

&lt;p&gt;Please note it’s a &lt;em&gt;non-public&lt;/em&gt; config, which may change without prior notice with new releases.&lt;br&gt;
Reference: &lt;a href="https://github.com/apache/kafka/blob/3.4.1/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L300"&gt;ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;This means Consumers will not send &lt;code&gt;LeaveGroup&lt;/code&gt; requests when stopped but will be removed by the coordinator only when the Consumer session times out (ref. &lt;code&gt;session.timeout.ms&lt;/code&gt;).&lt;br&gt;
The &lt;strong&gt;default Consumer session timeout is 45s&lt;/strong&gt; (note: was 10s before the Kafka 3.0.0 release, ref&lt;br&gt;
&lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout"&gt;KIP-735&lt;/a&gt;). Consequently, no data is processed for more than 45 seconds for tasks assigned to the Consumer that had been stopped.&lt;/p&gt;

&lt;p&gt;It even worsens if a new Consumer (re) joins the group while suspected dead (no more heartbeats received), where all consumers shut down, and task assignment is blocked until the timeout is exceeded. The coordinator evicts the old Consumer that had been stopped from the group. Until then, processing comes completely to a halt for all tasks, also known as ‘stop-the-world’ rebalancing. While the ‘incremental cooperative rebalancing protocol’ introduced with Kafka 2.5 avoids ‘stop-the-world’ rebalancing for &lt;em&gt;regular&lt;/em&gt; Consumers, the mentioned Kafka Streams overrides nullify some aspects.&lt;/p&gt;
&lt;h2&gt;
  
  
  Example Scenario: Kubernetes Pod Evicted … and Replaced
&lt;/h2&gt;

&lt;p&gt;Running your apps on Kubernetes takes a long way to achieve a robust, highly-available deployment. Kubernetes monitors your containers’ health, allows you to scale, and ensures all desired replicas are up and running according to your spec.&lt;/p&gt;

&lt;p&gt;But still, to be truly elastic and minimise downtime of your data stream processing, your application must be able to handle Pods (/container) to be restarted, evicted, and re-created gracefully.&lt;br&gt;
There are many potential causes, e.g. application upgrades (CI/CD), k8s cluster security patching, (auto-)scaling, resource shortage, or k8s nodes running on Spot instances being interrupted.&lt;/p&gt;

&lt;p&gt;Next, we look at a simple yet common example.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Example infrastructure setup:&lt;/strong&gt; Stateless Kafka Streams app, 6 streams tasks, running on Kubernetes as Deployment, with 3 replicas.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--PPgj7Lgz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/h1515988wn8buljegrgr.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--PPgj7Lgz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/h1515988wn8buljegrgr.png" alt="'Software Architecture' / 'Kubernetes Deployment' diagram, showing the setup of the simulation - Step 1" width="800" height="471"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--SH4tlKH5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/bc8aricmh2gxghuszuu3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--SH4tlKH5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/bc8aricmh2gxghuszuu3.png" alt="'Software Architecture' / 'Kubernetes Deployment' diagram, showing the setup of the simulation - Step 2" width="800" height="470"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--iow3SJ8W--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/tzzcjegfu4ai72deziav.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--iow3SJ8W--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/tzzcjegfu4ai72deziav.png" alt="'Software Architecture' / 'Kubernetes Deployment' diagram, showing the setup of the simulation - Step 3" width="800" height="583"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--AbmG9d1c--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/sz99n36z2zo2mihgyf6y.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--AbmG9d1c--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/sz99n36z2zo2mihgyf6y.png" alt="'Software Architecture' / 'Kubernetes Deployment' diagram, showing the setup of the simulation - Step 4" width="800" height="583"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Scenario:&lt;/strong&gt; One pod is terminated and successively replaced.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Initial state, all 3 pods are running &amp;amp; healthy, the streams app is processing, balanced task assignment&lt;/li&gt;
&lt;li&gt;Pod (P1.1) terminated (deleted) by k8s, shutting down gracefully&lt;/li&gt;
&lt;li&gt;A replacement Pod (P1.2) is scheduled &amp;amp; placed&lt;/li&gt;
&lt;li&gt;Final state, the replacement Pod is running &amp;amp; healthy, the streams app is processing, balanced task assignment&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;I would like to share a screenshot depicting the consumer lag metrics, rendered in Grafana, for a simulation of our scenario.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--V1wxrqFv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/nsy66wfzdx2z6pik0fwn.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--V1wxrqFv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/nsy66wfzdx2z6pik0fwn.png" alt="Screenshot depicting the consumer lag metrics, rendered in Grafana - Kafka Streams default config" width="800" height="602"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Let’s walk through the results and explain the behaviour:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;18:34:00:&lt;/strong&gt; the Pod (P1.1) is terminated and stops processing. The consumer lag of partitions &lt;code&gt;[1,4]&lt;/code&gt; starts to build up&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;18:34:20:&lt;/strong&gt; the replacement Pod (P1.2) has come up; the streams task sends a &lt;code&gt;JoinGroup&lt;/code&gt;  to the group coordinator&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;18:34:21:&lt;/strong&gt; rebalancing triggered, assignments revoked, pauses - due to no heartbeats received from (P1.1)&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;18:34:21:&lt;/strong&gt; all consumers pause processing, waiting for assignment; lag starts to build up for all partitions&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;18:34:45:&lt;/strong&gt; rebalancing continues, new assignment, processing continues&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;18:34:48:&lt;/strong&gt; all consumers caught up; consumer lags are back to healthy jitter&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To better illustrate everything that is happening over time, here’s a time bar diagram highlighting all important steps:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--GUnoJ8fE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/z6mxos1y0qdgojl1yiqm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--GUnoJ8fE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/z6mxos1y0qdgojl1yiqm.png" alt="Diagram illustrating the rebalancing behaviour for a k8s Pod recreation - Kafka Streams default config" width="800" height="488"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here are the belonging application logs for the rebalancing, which occurred at &lt;code&gt;16:34:44.988&lt;/code&gt; and took &lt;strong&gt;92ms&lt;/strong&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2023-06-17 16:34:44,988 INFO State transition from RUNNING to REBALANCING
2023-06-17 16:34:45,080 INFO State transition from REBALANCING to RUNNING
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;So we can conclude the following downtimes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;partitions &lt;code&gt;[2,5]&lt;/code&gt;: &lt;strong&gt;48s&lt;/strong&gt;
&lt;/li&gt;
&lt;li&gt;partitions &lt;code&gt;[0,1,2,3,4]&lt;/code&gt;: &lt;strong&gt;25s&lt;/strong&gt;
While the actual rebalancing took only &lt;strong&gt;92ms&lt;/strong&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  😵 Wait, 48s? Really???
&lt;/h2&gt;

&lt;p&gt;Depending on your stream processing use case, 45s+ downtime might be no big deal, but &lt;strong&gt;for real-time low-latency data streams&lt;/strong&gt;, it’s a massive &lt;strong&gt;breach of the NFR&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;So let’s see what options we’ve got to mitigate:&lt;/p&gt;

&lt;h3&gt;
  
  
  Option 1: Lower consumer session timeout
&lt;/h3&gt;

&lt;p&gt;Since the session timeout determines the downtime, one way to mitigate is to reduce &lt;code&gt;session.timeout.ms&lt;/code&gt;.&lt;br&gt;
Don’t forget to decrease the value of &lt;code&gt;heartbeat.interval.ms&lt;/code&gt; to ensure three heartbeats plus a buffer can fit within the timeout period.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;session.timeout.ms=6000
heartbeat.interval.ms=1500
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Read the config here: &lt;a href="https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html"&gt;Kafka Consumer Configurations&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Option 2: Enable ‘leaveGroupOnClose’
&lt;/h3&gt;

&lt;p&gt;…but why work with timeouts when it’s perfectly valid to have your &lt;em&gt;stateless&lt;/em&gt; Streams Consumers notify the coordinator when closing down?!?&lt;/p&gt;

&lt;p&gt;To enable ‘leaveGroupOnClose’ (overriding the &lt;a href="https://github.com/apache/kafka/blob/3.4.1/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1115"&gt;override&lt;/a&gt; 😜), configure your Kafka Streams app with following property:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;internal.leave.group.on.close=true
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Warn: Please note it’s a &lt;em&gt;non-public&lt;/em&gt; config, which may change without prior notice with new releases.&lt;br&gt;
Reference: &lt;a href="https://github.com/apache/kafka/blob/3.4.1/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L300"&gt;ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG&lt;/a&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  Re-do the Example with ‘leaveGroupOnClose’ 🚀
&lt;/h2&gt;

&lt;p&gt;Drum roll 🥁 … and here, without further ado, the results:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--wGCJEgOz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/tno3m57196jy2h75k7lu.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--wGCJEgOz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/tno3m57196jy2h75k7lu.png" alt="Screenshot depicting the consumer lag metrics, rendered in Grafana - Kafka Streams with 'internal.leave.group.on.close=true'" width="800" height="602"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;As we can (&lt;em&gt;not&lt;/em&gt;) see - the two rebalancings complete so fast that there's not even the slightest consumer lag increase visible in the metrics.   &lt;/p&gt;

&lt;p&gt;Here’s the visual explanation:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--JUCLvTnB--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/sukg3nlv036v47n6el74.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--JUCLvTnB--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/sukg3nlv036v47n6el74.png" alt="Diagram illustrating the rebalancing behaviour for a k8s Pod recreation - Kafka Streams with 'internal.leave.group.on.close=true'" width="800" height="776"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Finally, here are also the application logs showing the timings of the rebalancing, which happened twice. One at &lt;code&gt;17:46:00.332&lt;/code&gt; that took &lt;strong&gt;92ms&lt;/strong&gt;, and the other at &lt;code&gt;17:46:21.361&lt;/code&gt; in &lt;strong&gt;98ms&lt;/strong&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2023-06-17 17:46:00,332 INFO State transition from RUNNING to REBALANCING
2023-06-17 17:46:00,424 INFO State transition from REBALANCING to RUNNING
2023-06-17 17:46:21,361 INFO State transition from RUNNING to REBALANCING
2023-06-17 17:46:21,458 INFO State transition from REBALANCING to RUNNING
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Pro Tips
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Stateless &amp;lt;&amp;gt; Stateful
&lt;/h3&gt;

&lt;p&gt;This post recommends setting &lt;code&gt;internal.leave.group.on.close=true&lt;/code&gt; for &lt;strong&gt;stateless (!)&lt;/strong&gt; Kafka Streams applications.&lt;/p&gt;

&lt;p&gt;Before implementing &lt;code&gt;internal.leave.group.on.close=true&lt;/code&gt; for stateful applications, it is crucial to understand all potential consequences.&lt;/p&gt;

&lt;p&gt;Info: Unfortunately, my evaluation using &lt;code&gt;internal.leave.group.on.close=true&lt;/code&gt; in combination with standby replicas was not very promising.&lt;br&gt;&lt;br&gt;
The expected fluent task re-assignment to hot standby while one replica "restarts" - and subsequent re-distribution of tasks, does not work.  &lt;/p&gt;

&lt;p&gt;The Kafka Streams specific &lt;code&gt;HighAvailabilityTaskAssignor&lt;/code&gt; has known issues such as uneven task assignment, frozen warmup tasks ('task movement'), and not recognising caught-up standby tasks when the consumer group changes.&lt;br&gt;
Please note there are plans to address those issues with the next version of the Consumer Rebalance Protocol (see footnotes). &lt;/p&gt;

&lt;p&gt;Often the best plan to keep downtimes low during rebalance for stateful apps is to stick with RocksDB + StatefulSet + PersistentVolumes + restart within (!) the session timeout&lt;br&gt;&lt;br&gt;
=&amp;gt; re-join with previous assignment, re-use RocksDB state, and avoid rebalancing entirely... &lt;/p&gt;

&lt;p&gt;Tip: Alternatively, take a look at &lt;a href="https://github.com/thriving-dev/kafka-streams-cassandra-state-store"&gt;kafka-streams-cassandra-state-store&lt;/a&gt;, introduced in an &lt;a href="https://dev.to(/blog/introducing-kafka-streams-cassandra-state-store)"&gt;earlier blog post&lt;/a&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  k8s Deployment .spec.minReadySeconds
&lt;/h3&gt;

&lt;p&gt;Frequently rebalancing within a short timeframe can cause consumer delays and strain the Kafka cluster.&lt;/p&gt;

&lt;p&gt;If your application/container has a quick restart time, such as when running as a GraalVM native executable, it’s worth considering the use of &lt;a href="https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#min-ready-seconds"&gt;.spec.minReadySeconds&lt;/a&gt;  to maintain control and ensure upgrades occur in a controlled manner. This will help prevent frequent rebalancing within a short timeframe.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;By configuring your Kafka Streams app with &lt;code&gt;internal.leave.group.on.close=true&lt;/code&gt;, a graceful &lt;strong&gt;shutdown immediately triggers a rebalancing process&lt;/strong&gt; and tasks are re-assigned to other active members within the group.&lt;br&gt;
The &lt;strong&gt;processing downtime is significantly reduced&lt;/strong&gt; while &lt;strong&gt;also improving elasticity and resilience&lt;/strong&gt;. As a result, your applications enables interruption-free CI/CD and can be auto-scaled.&lt;/p&gt;

&lt;p&gt;Please note that this recommendation only applies to &lt;em&gt;stateless&lt;/em&gt; streams applications.! Tread carefully for stateful topologies, and do your homework!&lt;/p&gt;

&lt;p&gt;Remember that &lt;code&gt;internal.leave.group.on.close&lt;/code&gt; is a &lt;em&gt;non-public&lt;/em&gt; config, which may change without prior notice with new releases. Always check the source code for changes when upgrading the Kafka Streams dependency.&lt;/p&gt;

&lt;h2&gt;
  
  
  Footnotes
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;When writing this blog post, the latest version of kafka-streams was 3.4.1.&lt;/li&gt;
&lt;li&gt;There’s a ticket &lt;a href="https://issues.apache.org/jira/browse/KAFKA-6995"&gt;KAFKA-6995&lt;/a&gt; from June 2018 proposing to make the config public.
The ticket is closed as &lt;strong&gt;’Won’t Fix’&lt;/strong&gt;. Concerns of the core developer team can be found in the discussion.&lt;/li&gt;
&lt;li&gt;Looking into the crystal ball: A Kafka Design Proposal (KIP) is in progress to introduce a new group membership and rebalance protocol for the Kafka Consumer and, by extensions, Kafka Streams.
=&amp;gt; &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-848:+The+Next+Generation+of+the+Consumer+Rebalance+Protocol"&gt;KIP-848: The Next Generation of the Consumer Rebalance Protocol&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;It was also introduced on &lt;em&gt;Current 2022&lt;/em&gt;: &lt;a href="https://www.confluent.io/en-gb/events/current-2022/the-next-generation-of-the-consumer-rebalance-protocol/"&gt;The Next Generation of the Consumer Rebalance Protocol With David Jacot | UK&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;The application + docker-compose setup that was put together for this article can be found on the thriving-dev GitHub Organisation:
:icon{name="mdi-github" class="inline -mt-0.5 w-6 h-6"} &lt;a href="https://github.com/thriving-dev/kafka-streams-leave-group-on-close"&gt;https://github.com/thriving-dev/kafka-streams-leave-group-on-close&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Many thanks to  &lt;a href="https://twitter.com/MatthiasJSax"&gt;@MatthiasJSax&lt;/a&gt;  for proofreading the blog post! 🙇&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  References and Further Reading
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://medium.com/lydtech-consulting/kafka-consumer-group-rebalance-1-of-2-7a3e00aa3bb4"&gt;Kafka Consumer Group Rebalance (1 of 2)&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://medium.com/lydtech-consulting/kafka-consumer-group-rebalance-2-of-2-5d1d60c71e6e"&gt;Kafka Consumer Group Rebalance (2 of 2)&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://stackoverflow.com/questions/54398754/kafka-streams-delay-to-kick-rebalancing-on-consumer-graceful-shutdown"&gt;Kafka-streams delay to kick rebalancing on consumer graceful shutdown - Stack Overflow&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/"&gt;Cooperative Rebalancing in the Kafka Consumer, Streams &amp;amp; ksqlDB&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2"&gt;Apache Kafka Rebalance Protocol, or the magic behind your streams applications | by Florian Hussonnois | StreamThoughts | Medium&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group"&gt;KIP-812: Introduce another form of the &lt;code&gt;KafkaStreams.close()&lt;/code&gt; API that forces the member to leave the consumer group - Apache Kafka - Apache Software Foundation&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This post was originally published on &lt;a href="https://thriving.dev/blog/reduce-rebalance-downtime-for-stateless-kafka-streams-apps"&gt;Thriving.dev&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkastreams</category>
      <category>performance</category>
      <category>streamprocessing</category>
    </item>
    <item>
      <title>Introducing 'kafka-streams-cassandra-state-store'</title>
      <dc:creator>Thriving.dev | Software Architecture</dc:creator>
      <pubDate>Fri, 02 Jun 2023 15:06:32 +0000</pubDate>
      <link>https://dev.to/thriving-dev/introducing-kafka-streams-cassandra-state-store-159d</link>
      <guid>https://dev.to/thriving-dev/introducing-kafka-streams-cassandra-state-store-159d</guid>
      <description>&lt;p&gt;The Java library to be introduced - &lt;a href="https://github.com/thriving-dev/kafka-streams-cassandra-state-store"&gt;thriving-dev/kafka-streams-cassandra-state-store&lt;/a&gt;  - is a Kafka Streams State Store implementation that persists data to Apache Cassandra. &lt;/p&gt;

&lt;p&gt;It's a 'drop-in' replacement for the official Kafka Streams state store solutions, notably &lt;em&gt;RocksDB&lt;/em&gt; (default) and &lt;em&gt;InMemory&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;By moving the state to an &lt;em&gt;external&lt;/em&gt; datastore the &lt;strong&gt;stateful streams app&lt;/strong&gt; (from a deployment point of view) &lt;strong&gt;effectively becomes &lt;em&gt;stateless&lt;/em&gt;&lt;/strong&gt; - which greatly &lt;strong&gt;improves elasticity, reduces rebalancing downtimes &amp;amp; failure recovery&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Cassandra/ScyllaDB is horizontally scalable and allows for &lt;strong&gt;huge amounts of data&lt;/strong&gt; which provides a boost to your existing Kafka Streams application with very little change to the sourcecode.&lt;/p&gt;

&lt;p&gt;In addition to the &lt;code&gt;CassandraKeyValueStore&lt;/code&gt; this post will also cover all out-of-the box state store solutions, explain individual characteristics, benefits, drawbacks, and limitations in detail.&lt;/p&gt;

&lt;p&gt;Following the introduction and getting started guide, there's also a &lt;strong&gt;demo&lt;/strong&gt; available.&lt;br&gt;&lt;br&gt;
If you don't want to wait, feel free to head over to the &lt;a href="https://www.youtube.com/@thriving_dev"&gt;Thriving.dev YouTube Channel&lt;/a&gt;.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;The first public release was on 9 January 2023.&lt;br&gt;
When writing this blog post the latest version was: &lt;code&gt;0.4.0&lt;/code&gt; - available on &lt;a href="https://central.sonatype.com/artifact/dev.thriving.oss/kafka-streams-cassandra-state-store"&gt;Maven Central&lt;/a&gt;!&lt;/p&gt;
&lt;/blockquote&gt;
&lt;h2&gt;
  
  
  Basics Recap
&lt;/h2&gt;

&lt;p&gt;(Feel free to skip straight to the next section if you're already familiar with Kafka Streams and Apache Cassandra…)&lt;/p&gt;
&lt;h3&gt;
  
  
  Kafka Streams
&lt;/h3&gt;

&lt;p&gt;Quoting &lt;a href="https://en.wikipedia.org/wiki/Apache_Kafka#Streams_API"&gt;Apache Kafka - Wikipedia&lt;/a&gt;:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;“Kafka Streams (or Streams API) is a stream-processing library written in Java. It was added in the Kafka 0.10.0.0 release. The library allows for the development of stateful stream-processing applications that are scalable, elastic, and fully fault-tolerant. The main API is a stream-processing  domain-specific language  (DSL) that offers high-level operators like filter,  map, grouping, windowing, aggregation, joins, and the notion of tables. Additionally, the Processor API can be used to implement custom operators for a more low-level development approach. The DSL and Processor API can be mixed, too. For stateful stream processing, Kafka Streams uses  RocksDB  to maintain local operator state. Because RocksDB can write to disk, the maintained state can be larger than available main memory. For fault-tolerance, all updates to local state stores are also written into a topic in the Kafka cluster. This allows recreating state by reading those topics and feed all data into RocksDB.”&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;In case you are entirely new to Kafka Streams, I recommend to get started with reading some official materials provided by Confluent, e.g. &lt;a href="https://docs.confluent.io/platform/current/streams/introduction.html"&gt;Introduction Kafka Streams API&lt;/a&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  Apache Cassandra
&lt;/h3&gt;

&lt;p&gt;Quoting &lt;a href="https://en.wikipedia.org/wiki/Apache_Cassandra"&gt;Apache Cassandra - Wikipedia&lt;/a&gt;:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;“Apache Cassandra is a free and open-source, distributed, wide-column store, NoSQL database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.”&lt;/p&gt;
&lt;/blockquote&gt;
&lt;h2&gt;
  
  
  Purpose
&lt;/h2&gt;

&lt;p&gt;While Wikipedia’s summary (see above) only mentions RocksDB, Kafka Streams ships with following &lt;code&gt;KeyValueStore&lt;/code&gt; implementations:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;org.apache.kafka.streams.state.internals.RocksDBStore&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;org.apache.kafka.streams.state.internals.InMemoryKeyValueStore&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;org.apache.kafka.streams.state.internals.MemoryLRUCache&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let’s look at the traits of each store implementation in more detail…&lt;/p&gt;
&lt;h3&gt;
  
  
  RocksDBStore
&lt;/h3&gt;

&lt;p&gt;RocksDB is the default state store for Kafka Streams.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;RocksDBStore&lt;/code&gt; is a &lt;em&gt;persistent&lt;/em&gt; key-value store based on &lt;a href="https://rocksdb.org/"&gt;RocksDB&lt;/a&gt; (surprise!). State is flushed to disk, allowing the state to exceed the size of available memory.&lt;/p&gt;

&lt;p&gt;Since the state is persisted to disk, it can be re-used and does not need to be restored (changelog topic replay) when the application instance comes up after a restart (e.g. following an upgrade, instance migration, or failure).&lt;br&gt;
The RocksDB state store provides good performance and is well configured out of the box, but might need to be tuned for certain use cases (which is no small feat and requires understanding of RocksDB configuration). Writing to and reading from disk comes with I/O, for performance reasons buffering and caching patterns are in place. The record cache (on heap) is particularly useful for optimising writes by reducing the number of updates to local state and changelog topics. The RocksDB block cache (off heap) optimises reads.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Info:&lt;/strong&gt; In a typical modern setup stateful Kafka Streams applications run on Kubernetes as a &lt;em&gt;StatefulSet&lt;/em&gt; with persistent state stores (RocksDB) on &lt;em&gt;PersistentVolumes&lt;/em&gt;.&lt;/p&gt;
&lt;/blockquote&gt;
&lt;h3&gt;
  
  
  InMemoryKeyValueStore
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;InMemoryKeyValueStore&lt;/code&gt;, as the name suggests, maintains state &lt;em&gt;in-memory&lt;/em&gt; (RAM).&lt;/p&gt;

&lt;p&gt;One obvious benefit is that the pure in-memory stores come with good performance (operates in RAM…). Further, hosting and operating are simpler compared to RocksDB, since there is no requirement to provide and manage disks.&lt;/p&gt;

&lt;p&gt;Drawbacks for having the store in-memory are limitations in store size and increased infrastructure costs (RAM is more expensive than disk storage). Further, state always is lost on application restart and therefore first needs to be restored from changelog topics (recovery takes longer).&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Tip:&lt;/strong&gt; When low rebalance downtimes / quick recovery is concerned, using standby replicas (&lt;code&gt;num.standby.replicas&lt;/code&gt;) help to reduce recovery time.&lt;/p&gt;
&lt;/blockquote&gt;
&lt;h3&gt;
  
  
  MemoryLRUCache (&lt;code&gt;Stores.lruMap&lt;/code&gt;)
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;MemoryLRUCache&lt;/code&gt; is an &lt;em&gt;in-memory&lt;/em&gt; store based on &lt;em&gt;HashMap&lt;/em&gt;. The term &lt;em&gt;cache&lt;/em&gt; comes from the &lt;em&gt;LRU&lt;/em&gt; (least recently used) behaviour combined with the &lt;em&gt;maxCacheSize&lt;/em&gt; cap (per streams task!).&lt;/p&gt;

&lt;p&gt;It’s a rather uncommon choice but can be a valid fit for certain use cases. Same as the &lt;em&gt;InMemoryKeyValueStore&lt;/em&gt; state always is lost on application restart and is restored from changelog topics.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note&lt;/strong&gt;: &lt;code&gt;maxCacheSize&lt;/code&gt; applies client-side only (in-memory HashMap, per streams task state store -&amp;gt; the least recently used entry is dropped when the underlying HashMap’s capacity is breached) but does not ‘cleanup’ the changelog topic (send &lt;em&gt;tombstones&lt;/em&gt;). The (&lt;em&gt;compacted&lt;/em&gt;) changelog topic keeps growing in size while the state available to processing is constrained by &lt;em&gt;maxCacheSize&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;Therefore, it is recommended to use in combination with custom changelog topic config &lt;code&gt;cleanup.policy=[compact,delete]&lt;/code&gt; (also &lt;code&gt;retention.ms&lt;/code&gt;) to have a time-based retention in place that satisfies your functional data requirements (if possible).&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;⚠️ Reminder:&lt;/strong&gt; The &lt;em&gt;maxCacheSize&lt;/em&gt; is applied per streams task (~input topic partitions), so take into consideration when calculating total capacity, memory requirements per app instance, …&lt;/p&gt;
&lt;/blockquote&gt;
&lt;h3&gt;
  
  
  CassandraKeyValueStore
&lt;/h3&gt;

&lt;p&gt;Now finally we get to the subject to this blog post, the custom implementation of a state store that persists data to Apache Cassandra.&lt;/p&gt;

&lt;p&gt;With &lt;code&gt;CassandraKeyValueStore&lt;/code&gt; data is &lt;strong&gt;persistently stored&lt;/strong&gt; in an external database -&amp;gt; Apache Cassandra &amp;lt;- &lt;em&gt;or compatible solutions (e.g. &lt;a href="https://www.scylladb.com/"&gt;ScyllaDB&lt;/a&gt;). Apache Cassandra is a distributed, clustered data store that allows to scale horizontally to enable up to Petabytes of data, thus **very large Kafka Streams state&lt;/em&gt;* can be accommodated.&lt;/p&gt;

&lt;p&gt;Moving the state into an &lt;em&gt;external&lt;/em&gt; data store - outside the application so to say - allows you to effectively run the app in a &lt;strong&gt;&lt;em&gt;stateless&lt;/em&gt;&lt;/strong&gt; fashion. Further, with &lt;a href="https://kafka.apache.org/34/javadoc/org/apache/kafka/streams/state/StoreBuilder.html#withLoggingDisabled()"&gt;logging disabled&lt;/a&gt;, there's &lt;strong&gt;no changelog topic&lt;/strong&gt; -&amp;gt; &lt;strong&gt;no state restore&lt;/strong&gt; required which enables fluent rebalancing, helps &lt;strong&gt;reduce rebalance downtimes&lt;/strong&gt; and &lt;strong&gt;reduce recovery time&lt;/strong&gt;. &lt;/p&gt;

&lt;p&gt;This greatly &lt;strong&gt;improves elasticity and scalability&lt;/strong&gt; of your application, which opens up for more possibilities such as e.g. efficient &amp;amp; fluent autoscaling... &lt;/p&gt;

&lt;p&gt;It can also help ease/avoid known problems with the Kafka Streams specific task assignment such as 'uneven load distribution' and 'idle consumers' (I'm thinking about writing a separate blog post on these issues...).  &lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Tip:&lt;/strong&gt; Kafka Streams property &lt;code&gt;internal.leave.group.on.close=true&lt;/code&gt; allows to achieve low rebalance downtimes by telling the consumers to send a &lt;code&gt;LeaveGroup&lt;/code&gt; request to the group leader on graceful shutdown. &lt;/p&gt;

&lt;p&gt;For more information on such kafka internals I can recommend to watch+read following Confluent developer guide: &lt;a href="https://developer.confluent.io/learn-kafka/architecture/consumer-group-protocol/"&gt;Consumer Group Protocol&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Note that this property is also used &amp;amp; explained in the demo.&lt;/p&gt;

&lt;p&gt;⚠ Please be aware this is an in-official property (not part of the public API), thus can be deprecated or dropped any time. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;⚠ Adding an external, 3rd party Software to the heart (or rather stomach?) to your stream processing application, adds a new, additional &lt;strong&gt;single point of failure&lt;/strong&gt; to your architecture.&lt;/p&gt;
&lt;h2&gt;
  
  
  Usage Example
&lt;/h2&gt;
&lt;h3&gt;
  
  
  Get it!
&lt;/h3&gt;

&lt;p&gt;The artifact is available on &lt;a href="https://central.sonatype.com/artifact/dev.thriving.oss/kafka-streams-cassandra-state-store/"&gt;Maven Central&lt;/a&gt;:&lt;/p&gt;
&lt;h4&gt;
  
  
  Maven
&lt;/h4&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;dev.thriving.oss&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;kafka-streams-cassandra-state-store&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;${version}&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h4&gt;
  
  
  Gradle (Groovy DSL)
&lt;/h4&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight groovy"&gt;&lt;code&gt;&lt;span class="n"&gt;implementation&lt;/span&gt; &lt;span class="err"&gt;'&lt;/span&gt;&lt;span class="n"&gt;dev&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;thriving&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;oss&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;streams&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;cassandra&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="nl"&gt;store:&lt;/span&gt;&lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="n"&gt;version&lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;&lt;span class="err"&gt;’&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Classes of this library are in the package &lt;code&gt;dev.thriving.oss.kafka.streams.cassandra.state.store&lt;/code&gt;.&lt;/p&gt;
&lt;h3&gt;
  
  
  Quick Start
&lt;/h3&gt;
&lt;h4&gt;
  
  
  High-level DSL &amp;lt;&amp;gt; StoreSupplier
&lt;/h4&gt;

&lt;p&gt;When using the high-level DSL, i.e., &lt;code&gt;StreamsBuilder&lt;/code&gt;, users create &lt;code&gt;StoreSupplier&lt;/code&gt;s that can be further customized via &lt;code&gt;Materialized&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;For example, a topic read as &lt;code&gt;KTable&lt;/code&gt; can be materialized into a cassandra k/v store with custom key/value serdes, with logging and caching disabled:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;StreamsBuilder&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;StreamsBuilder&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="nc"&gt;KTable&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;table&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;table&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
  &lt;span class="s"&gt;"topicName"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="n"&gt;as&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                 &lt;span class="nc"&gt;CassandraStores&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"store-name"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;keyValueStore&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
              &lt;span class="o"&gt;)&lt;/span&gt;
              &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withKeySerde&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Serdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
              &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withValueSerde&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Serdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;String&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
              &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withLoggingDisabled&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
              &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withCachingDisabled&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Processor API &amp;lt;&amp;gt; StoreBuilder
&lt;/h4&gt;

&lt;p&gt;When using the Processor API, i.e., &lt;code&gt;Topology&lt;/code&gt;, users create &lt;code&gt;StoreBuilder&lt;/code&gt;s that can be attached to &lt;code&gt;Processor&lt;/code&gt;s.&lt;/p&gt;

&lt;p&gt;For example, you can create a cassandra stringKey value store with custom key/value serdes, logging and caching disabled like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Topology&lt;/span&gt; &lt;span class="n"&gt;topology&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Topology&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

&lt;span class="nc"&gt;StoreBuilder&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;KeyValueStore&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;storeBuilder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Stores&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;keyValueStoreBuilder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                &lt;span class="nc"&gt;CassandraStores&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"store-name"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;keyValueStore&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
                &lt;span class="nc"&gt;Serdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;String&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
                &lt;span class="nc"&gt;Serdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withLoggingDisabled&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withCachingDisabled&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

&lt;span class="n"&gt;topology&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addStateStore&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;storeBuilder&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Demo
&lt;/h3&gt;

&lt;p&gt;Features the notorious &lt;strong&gt;'word-count example'&lt;/strong&gt;, written as a &lt;strong&gt;quarkus application&lt;/strong&gt;, running in a fully &lt;strong&gt;clustered docker-compose localstack&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Source code for this demo: &lt;a href="https://github.com/thriving-dev/kafka-streams-cassandra-state-store/tree/0.4.0/examples/word-count-quarkus"&gt;kafka-streams-cassandra-state-store/examples/word-count-quarkus (at 0.4.0)&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;iframe width="710" height="399" src="https://www.youtube.com/embed/2Co9-8E-uJE"&gt;
&lt;/iframe&gt;
&lt;/p&gt;

&lt;h3&gt;
  
  
  Store Types
&lt;/h3&gt;

&lt;p&gt;kafka-streams-cassandra-state-store comes with 2 different store types:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;keyValueStore&lt;/li&gt;
&lt;li&gt;globalKeyValueStore&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  keyValueStore (recommended default)
&lt;/h4&gt;

&lt;p&gt;A persistent &lt;code&gt;KeyValueStore&amp;lt;Bytes, byte[]&amp;gt;&lt;/code&gt;.&lt;br&gt;
The underlying cassandra table is &lt;strong&gt;partitioned by&lt;/strong&gt; the store context &lt;strong&gt;task partition&lt;/strong&gt;.&lt;br&gt;
Therefore, all CRUD operations against this store always query by and return results for a single stream task.&lt;/p&gt;
&lt;h4&gt;
  
  
  globalKeyValueStore
&lt;/h4&gt;

&lt;p&gt;A persistent &lt;code&gt;KeyValueStore&amp;lt;Bytes, byte[]&amp;gt;&lt;/code&gt;.&lt;br&gt;
The underlying cassandra table uses the &lt;strong&gt;record key as sole /PRIMARY KEY/&lt;/strong&gt;.&lt;br&gt;
Therefore, all CRUD operations against this store work from any streams task and therefore always are “global”.&lt;br&gt;
Due to the nature of cassandra tables having a single PK (no clustering key), this store supports only a limited number of operations.&lt;/p&gt;

&lt;p&gt;⚠ If you're planning to use this store type, please make sure to get a full understanding of the specifics by reading the &lt;a href="https://github.com/thriving-dev/kafka-streams-cassandra-state-store#globalkeyvaluestore"&gt;relevant docs&lt;/a&gt; to understand its behaviour.&lt;/p&gt;
&lt;h3&gt;
  
  
  Advanced
&lt;/h3&gt;

&lt;p&gt;For more detailed documentation, please visit the GitHub project…&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://github.com/thriving-dev/kafka-streams-cassandra-state-store#store-types"&gt;Store types, supported operations&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/thriving-dev/kafka-streams-cassandra-state-store#builder"&gt;Builder usage + config options&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Under the hood
&lt;/h2&gt;
&lt;h3&gt;
  
  
  Implemented/compiled with
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Java 17&lt;/li&gt;
&lt;li&gt;kafka-streams 3.4&lt;/li&gt;
&lt;li&gt;datastax java-driver-core 4.15.0&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Supported client-libs
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Kafka Streams 2.7.0+ (maybe even earlier versions, but wasn’t tested further back)&lt;/li&gt;
&lt;li&gt;Datastax java client (v4) &lt;code&gt;'com.datastax.oss:java-driver-core:4.15.0'&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;ScyllaDB shard-aware datastax java client (v4) fork &lt;code&gt;'com.scylladb:java-driver-core:4.14.1.0'&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Supported databases
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Apache Cassandra 3.11&lt;/li&gt;
&lt;li&gt;Apache Cassandra 4.0, 4.1&lt;/li&gt;
&lt;li&gt;ScyllaDB (should work from 4.3+)&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Underlying CQL Schema
&lt;/h3&gt;
&lt;h5&gt;
  
  
  keyValueStore
&lt;/h5&gt;

&lt;p&gt;Using defaults, for a state store named "word-count" following CQL Schema applies:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt; &lt;span class="n"&gt;word_count_kstreams_store&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;partition&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;key&lt;/span&gt; &lt;span class="nb"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nb"&gt;time&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="nb"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="k"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="k"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="n"&gt;compaction&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="s1"&gt;'class'&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'LeveledCompactionStrategy'&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h5&gt;
  
  
  globalKeyValueStore
&lt;/h5&gt;

&lt;p&gt;Using defaults, for a state store named "clicks-global" following CQL Schema applies:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt; &lt;span class="n"&gt;clicks_global_kstreams_store&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;key&lt;/span&gt; &lt;span class="nb"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nb"&gt;time&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="nb"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="n"&gt;compaction&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="s1"&gt;'class'&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'LeveledCompactionStrategy'&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Feat: Cassandra table with default TTL
&lt;/h3&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Pro Tip:&lt;/strong&gt;&lt;br&gt;
Cassandra has a table option &lt;code&gt;default_time_to_live&lt;/code&gt; (default expiration time (“TTL”) in seconds for a table) which can be useful for certain use cases where data (state) expires after a known time span.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Please note&lt;/strong&gt; writes to cassandra are made with system time. The table &lt;strong&gt;TTL is applied&lt;/strong&gt; based on &lt;strong&gt;time of write&lt;/strong&gt; -&amp;gt; the time of the current record being processed (&lt;em&gt;!= stream time&lt;/em&gt;).&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;The &lt;code&gt;default_time_to_live&lt;/code&gt; can be defined via the builder &lt;code&gt;withTableOptions&lt;/code&gt; method, e.g.:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;CassandraStores&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"word-grouped-count"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withTableOptions&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"""
                compaction = { 'class' : 'LeveledCompactionStrategy' }
                AND default_time_to_live = 86400
                """&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;keyValueStore&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Cassandra table partitioning (avoiding large partitions)
&lt;/h3&gt;

&lt;p&gt;Kafka is persisting data in segments and is built for sequential r/w. As long as there’s sufficient disk storage space available to brokers, a high number of messages for a single topic partition is not a problem.&lt;/p&gt;

&lt;p&gt;Apache Cassandra on the other hand can get inefficient (up to severe failures such as load shedding, dropped messages, and to crashed and downed nodes) when partition size grows too large.&lt;br&gt;
The reason is that searching becomes too slow as search within partition is slow. Also, it puts a lot of pressure on (JVM) heap.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;⚠ The community has offered a standard recommendation for Cassandra users to keep Partitions under 400MB, and preferably under 100MB.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;For the current implementation, the cassandra table created for the ‘default’ key-value store is partitioned by the kafka &lt;em&gt;partition key&lt;/em&gt; (“wide partition pattern”).&lt;br&gt;
Please keep these issues in mind when working with relevant data volumes.&lt;br&gt;
In case you don’t need to query your store / only lookup by key (‘range’, ‘prefixScan’; ref Supported operations by store type) it’s recommended to use &lt;code&gt;globalKeyValueStore&lt;/code&gt; rather than &lt;code&gt;keyValueStore&lt;/code&gt; since it is partitioned by the &lt;em&gt;event key&lt;/em&gt; (:= primary key).&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;References:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;blog post on &lt;a href="https://thelastpickle.com/blog/2019/01/11/wide-partitions-cassandra-3-11.html"&gt;Wide Partitions in Apache Cassandra 3.11&lt;/a&gt;
&lt;strong&gt;Note:&lt;/strong&gt; in case anyone has funded knowledge if/how this has changed with Cassandra 4, please share in the comments below!!&lt;/li&gt;
&lt;li&gt;&lt;a href="https://stackoverflow.com/questions/68237371/wide-partition-pattern-in-cassandra"&gt;stackoverflow question&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Known Limitations
&lt;/h2&gt;

&lt;p&gt;Adding additional infrastructure for data persistence external to Kafka comes with certain risks and constraints.&lt;/p&gt;

&lt;h3&gt;
  
  
  Consistency
&lt;/h3&gt;

&lt;p&gt;Kafka Streams supports &lt;em&gt;at-least-once&lt;/em&gt; and &lt;em&gt;exactly-once&lt;/em&gt; processing guarantees. At-least-once semantics is enabled by default.&lt;/p&gt;

&lt;p&gt;Kafka Streams &lt;em&gt;exactly-once&lt;/em&gt; processing guarantees is using Kafka transactions. These transactions wrap the entirety of processing a message throughout your streams topology, including messages published to outbound topic(s), changelog topic(s), and consumer offsets topic(s).&lt;/p&gt;

&lt;p&gt;This is possible through transactional interaction with a single distributed system (Apache Kafka). Bringing an external system (Cassandra) into play breaks this pattern. Once data is written to the database it can’t be rolled back in the event of a subsequent error / failure to complete the current message processing.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;⚠ =&amp;gt; If you need strong consistency, have &lt;em&gt;exactly-once&lt;/em&gt; processing enabled (streams config: &lt;code&gt;processing.guarantee="exactly_once_v2"&lt;/code&gt;), and/or your processing logic is not fully idempotent then using &lt;strong&gt;kafka-streams-cassandra-state-store&lt;/strong&gt; is discouraged!&lt;/p&gt;

&lt;p&gt;ℹ️ Please note this is also the case when using kafka-streams with the native state stores (RocksDB/InMemory) with &lt;em&gt;at-least-once&lt;/em&gt; processing.guarantee (default).&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;For more information on Kafka Streams processing guarantees, check the sources referenced below.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://medium.com/lydtech-consulting/kafka-streams-transactions-exactly-once-messaging-82194b50900a"&gt;https://medium.com/lydtech-consulting/kafka-streams-transactions-exactly-once-messaging-82194b50900a&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#processing-guarantee"&gt;https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#processing-guarantee&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/concepts.html#processing-guarantees"&gt;https://docs.confluent.io/platform/current/streams/concepts.html#processing-guarantees&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Incomplete Implementation of Interfaces &lt;code&gt;StateStore&lt;/code&gt;
&lt;/h3&gt;

&lt;p&gt;For now, only &lt;code&gt;KeyValueStore&lt;/code&gt; is supported (vs. e.g. &lt;code&gt;WindowStore&lt;/code&gt;/&lt;code&gt;SessionStore&lt;/code&gt;).&lt;br&gt;
Also, not all methods have been implemented. Please check &lt;a href="https://dev.to/#store-types/"&gt;store types method support table&lt;/a&gt; above for more details.&lt;/p&gt;

&lt;h2&gt;
  
  
  Next Steps
&lt;/h2&gt;

&lt;p&gt;Here are some of the tasks (high level) in the current backlog:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Features

&lt;ul&gt;
&lt;li&gt;Implement &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores"&gt;KIP-889: Versioned State Stores&lt;/a&gt; (coming soon with Kafka 3.5.0 release)&lt;/li&gt;
&lt;li&gt;Add a simple (optional) InMemory read cache -&amp;gt; &lt;a href="https://github.com/ben-manes/caffeine"&gt;Caffeine&lt;/a&gt;?&lt;/li&gt;
&lt;li&gt;Support &lt;code&gt;WindowStore&lt;/code&gt; / &lt;code&gt;SessionStore&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;Non-functional

&lt;ul&gt;
&lt;li&gt;Benchmark&lt;/li&gt;
&lt;li&gt;Add metrics&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;Ops

&lt;ul&gt;
&lt;li&gt;GitHub actions to release + publish to maven central (snapshot / releases)&lt;/li&gt;
&lt;li&gt;Add Renovate&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Interested to contribute? Please &lt;a href="https://dev.to/about"&gt;reach out&lt;/a&gt;!&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;It's been a fun journey so far, starting from an initial POC, to a working library published to maven central - though still to be considered 'experimental', since it's not been production-tested yet. &lt;/p&gt;

&lt;p&gt;The out-of-the-box state stores satisfy most requirements, no need to switch without necessity.&lt;br&gt;
Still it's a usable piece of software that may fill a gap for specific requirements. &lt;/p&gt;

&lt;p&gt;I'm looking forward to work on next steps such as benchmarking / load testing.&lt;/p&gt;

&lt;p&gt;Feedback is very welcome, also, if you are planning to, or have decided to use the library in a project, please leave a comment below.&lt;/p&gt;

&lt;h2&gt;
  
  
  Footnotes
&lt;/h2&gt;

&lt;p&gt;At the time of writing this blog post the latest versions of relevant libs were&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Kafka / Streams API: 3.4.0&lt;/li&gt;
&lt;li&gt;Cassandra java-driver-core: 4.15.0 &lt;/li&gt;
&lt;li&gt;kafka-streams-cassandra-state-store: 0.4.0 &lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://www.oreilly.com/library/view/mastering-kafka-streams/9781492062486/ch04.html"&gt;4. Stateful Processing - Mastering Kafka Streams and ksqlDB Book&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html"&gt;Kafka Streams Memory Management | Confluent Documentation&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This is a re-publish of &lt;a href="https://thriving.dev/blog/introducing-kafka-streams-cassandra-state-store"&gt;https://thriving.dev/blog/introducing-kafka-streams-cassandra-state-store&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>cassandra</category>
      <category>streamprocessing</category>
      <category>scylladb</category>
    </item>
  </channel>
</rss>
