<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: fr33m0nk</title>
    <description>The latest articles on DEV Community by fr33m0nk (@fr33m0nk).</description>
    <link>https://dev.to/fr33m0nk</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1086619%2F9a50cd70-17f3-429a-8ed7-4adbf87d50e6.jpeg</url>
      <title>DEV Community: fr33m0nk</title>
      <link>https://dev.to/fr33m0nk</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/fr33m0nk"/>
    <language>en</language>
    <item>
      <title>Alpakka Kafka and Clojure</title>
      <dc:creator>fr33m0nk</dc:creator>
      <pubDate>Mon, 26 Jun 2023 16:58:06 +0000</pubDate>
      <link>https://dev.to/fr33m0nk/alpakka-kafka-and-clojure-1hie</link>
      <guid>https://dev.to/fr33m0nk/alpakka-kafka-and-clojure-1hie</guid>
      <description>&lt;p&gt;Kafka streams is an amazing abstraction for working with Kafka and has amazing JVM support. However, if stream application does I/O operations, it may not be right fit. Alpakka Kafka uses &lt;a href="https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations"&gt;Flow control optimizations&lt;/a&gt;, well suited for creating streams where I/O operations are taking place. For a detailed comparison on Kafka Streams and Alpakka Kafka, refer this Medium &lt;a href="https://medium.com/geekculture/kafka-streams-vs-alpakka-8906a90ca286"&gt;story&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Alpakka Kafka uses Akka under the hood and offers a rich Scala and Java API. However, Clojure story is pretty lackluster. Using Alpakka Kafka essentially means, doing a whole lot of interop with Alpakka Kafka’s Java API.&lt;/p&gt;

&lt;p&gt;I have created a &lt;a href="https://github.com/fr33m0nk/clj-alpakka-kafka"&gt;Clojure wrapper that abstracts the interop with Alpakka Kafka Java API&lt;/a&gt; and allows library consumer to use Alpakka Kafka without much hassle.&lt;/p&gt;

&lt;p&gt;I will demo an example of setting up a Alpakka Kafka stream using aforementioned Clojure wrapper library:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Import required dependencies in the (Clojure deps) project.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight clojure"&gt;&lt;code&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;net.clojars.fr33m0nk/clj-alpakka-kafka&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="no"&gt;:mvn/version&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s"&gt;"0.1.6"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="n"&gt;org.apache.kafka/kafka-clients&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="no"&gt;:mvn/version&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s"&gt;"3.3.2"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Import required namespaces in the demo namespace &lt;code&gt;alpakka-kafka-demo&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight clojure"&gt;&lt;code&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;ns&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;alpakka-kafka-demo&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="no"&gt;:require&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;fr33m0nk.akka.actor&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="no"&gt;:as&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;actor&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;fr33m0nk.akka.stream&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="no"&gt;:as&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;fr33m0nk.alpakka-kafka.committer&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="no"&gt;:as&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;committer&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;fr33m0nk.alpakka-kafka.consumer&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="no"&gt;:as&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;fr33m0nk.alpakka-kafka.producer&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="no"&gt;:as&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;fr33m0nk.utils&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="no"&gt;:as&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;utils&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="no"&gt;:import&lt;/span&gt;&lt;span class="w"&gt;
   &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;org.apache.kafka.common.serialization&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;StringDeserializer&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;StringSerializer&lt;/span&gt;&lt;span class="p"&gt;]))&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;We will create a new stream topology.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;This stream topology will consume message from a Kafka Topic, transform it and then publish to another Kafka topic.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight clojure"&gt;&lt;code&gt;&lt;span class="w"&gt;  &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;defn&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;test-stream-with-producer&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;actor-system&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;committer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;producer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer-topics&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;producer-topic&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;-&amp;gt;&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;consumer/-&amp;gt;committable-source&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer-topics&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;s/map-async&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="w"&gt;
                     &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;fn&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
                       &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;let&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;_key&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;consumer/key&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="w"&gt;
                             &lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;consumer/value&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="w"&gt;
                             &lt;/span&gt;&lt;span class="n"&gt;committable-offset&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;consumer/committable-offset&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="w"&gt;
                             &lt;/span&gt;&lt;span class="n"&gt;message-to-publish&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;producer/-&amp;gt;producer-record&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;producer-topic&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;str/upper-case&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;))]&lt;/span&gt;&lt;span class="w"&gt;
                         &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;producer/single-producer-message-envelope&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;committable-offset&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;message-to-publish&lt;/span&gt;&lt;span class="p"&gt;))))&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;s/to-mat&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;producer/committable-sink&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;producer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;committer-settings&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer/create-draining-control&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;s/run&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;actor-system&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;s/map-async&lt;/code&gt; executes mapping function with 2 messages being processed in parallel&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Then we will publish messages to another topic and commit offsets to Kafka via &lt;code&gt;s/to-mat&lt;/code&gt; and &lt;code&gt;producer/committable-sink&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Finally, we run the stream with our actor-system using &lt;code&gt;s/run&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;Let’s create required dependencies.&lt;br&gt;
&lt;/p&gt;

&lt;pre class="highlight clojure"&gt;&lt;code&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;actor-system&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;actor/-&amp;gt;actor-system&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s"&gt;"test-actor-system"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;&lt;span class="w"&gt;

 &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;committer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;committer/committer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;actor-system&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="no"&gt;:batch-size&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;}))&lt;/span&gt;&lt;span class="w"&gt;

 &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;consumer/consumer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;actor-system&lt;/span&gt;&lt;span class="w"&gt;
                                               &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="no"&gt;:group-id&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s"&gt;"a-test-consumer"&lt;/span&gt;&lt;span class="w"&gt;
                                                &lt;/span&gt;&lt;span class="no"&gt;:bootstrap-servers&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="w"&gt;
                                                &lt;/span&gt;&lt;span class="no"&gt;:key-deserializer&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;StringDeserializer.&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="w"&gt;
                                                &lt;/span&gt;&lt;span class="no"&gt;:value-deserializer&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;StringDeserializer.&lt;/span&gt;&lt;span class="p"&gt;)}))&lt;/span&gt;&lt;span class="w"&gt;

 &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;producer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;producer/producer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;actor-system&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="no"&gt;:bootstrap-servers&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="w"&gt;
                                                             &lt;/span&gt;&lt;span class="no"&gt;:key-serializer&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;StringSerializer.&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="w"&gt;
                                                             &lt;/span&gt;&lt;span class="no"&gt;:value-serializer&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;StringSerializer.&lt;/span&gt;&lt;span class="p"&gt;)}))&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;




&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;Let’s run the stream and see it in action&lt;br&gt;
&lt;/p&gt;

&lt;pre class="highlight clojure"&gt;&lt;code&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer-control&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;test-stream-with-producer&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;actor-system&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;committer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;producer-settings&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"testing_stuff"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s"&gt;"output-topic"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--cp0XNozd--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0j3ibeucarznie30c69f.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--cp0XNozd--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0j3ibeucarznie30c69f.png" alt="Streams in action :D" width="800" height="178"&gt;&lt;/a&gt;&lt;/p&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;We can shut this Alpakka Kafka stream down via following.&lt;br&gt;
&lt;/p&gt;

&lt;pre class="highlight clojure"&gt;&lt;code&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="c1"&gt;;; shutdown streams using consumer-control var&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;consumer/drain-and-shutdown&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;consumer-control&lt;/span&gt;&lt;span class="w"&gt;
                         &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;CompletableFuture/supplyAsync&lt;/span&gt;&lt;span class="w"&gt;
                           &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;utils/-&amp;gt;fn0&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;fn&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="no"&gt;::done&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;&lt;span class="w"&gt;
                         &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;actor/get-dispatcher&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;actor-system&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;



&lt;p&gt;&lt;code&gt;utils/-&amp;gt;fn0&lt;/code&gt; reifies &lt;code&gt;java.util.function.Supplier&lt;/code&gt; interface to a Clojure function with 0 arity.&lt;/p&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;We can shutdown Akka actor-system as well.&lt;br&gt;
&lt;/p&gt;

&lt;pre class="highlight clojure"&gt;&lt;code&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;actor/terminate&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="n"&gt;actor-system&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;




&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;I hope this story introduced Alpakka Kafka as a viable alternate to Kafka streams using Clojure. I have documented more examples on using Alpakka Kafka with my Clojure wrapper library &lt;a href="https://github.com/fr33m0nk/clj-alpakka-kafka/blob/main/doc/examples.md"&gt;here&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>clojure</category>
      <category>kafka</category>
      <category>akka</category>
      <category>alpakka</category>
    </item>
  </channel>
</rss>
