DEV Community

Cover image for Alpakka Kafka and Clojure
fr33m0nk
fr33m0nk

Posted on

Alpakka Kafka and Clojure

Kafka streams is an amazing abstraction for working with Kafka and has amazing JVM support. However, if stream application does I/O operations, it may not be right fit. Alpakka Kafka uses Flow control optimizations, well suited for creating streams where I/O operations are taking place. For a detailed comparison on Kafka Streams and Alpakka Kafka, refer this Medium story.

Alpakka Kafka uses Akka under the hood and offers a rich Scala and Java API. However, Clojure story is pretty lackluster. Using Alpakka Kafka essentially means, doing a whole lot of interop with Alpakka Kafka’s Java API.

I have created a Clojure wrapper that abstracts the interop with Alpakka Kafka Java API and allows library consumer to use Alpakka Kafka without much hassle.

I will demo an example of setting up a Alpakka Kafka stream using aforementioned Clojure wrapper library:

  1. Import required dependencies in the (Clojure deps) project.

     net.clojars.fr33m0nk/clj-alpakka-kafka {:mvn/version "0.1.6"}
     org.apache.kafka/kafka-clients {:mvn/version "3.3.2"}
    
  2. Import required namespaces in the demo namespace alpakka-kafka-demo

     (ns alpakka-kafka-demo
       (:require
         [fr33m0nk.akka.actor :as actor]
         [fr33m0nk.akka.stream :as s]
         [fr33m0nk.alpakka-kafka.committer :as committer]
         [fr33m0nk.alpakka-kafka.consumer :as consumer]
         [fr33m0nk.alpakka-kafka.producer :as producer]
         [fr33m0nk.utils :as utils])
     (:import
       [org.apache.kafka.common.serialization StringDeserializer StringSerializer]))
    
  3. We will create a new stream topology.

    1. This stream topology will consume message from a Kafka Topic, transform it and then publish to another Kafka topic.

        (defn test-stream-with-producer
          [actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic]
          (-> (consumer/->committable-source consumer-settings consumer-topics)
              (s/map-async 2
                           (fn [message]
                             (let [_key (consumer/key message)
                                   value (consumer/value message)
                                   committable-offset (consumer/committable-offset message)
                                   message-to-publish (producer/->producer-record producer-topic (str/upper-case value))]
                               (producer/single-producer-message-envelope committable-offset message-to-publish))))
              (s/to-mat (producer/committable-sink producer-settings committer-settings) consumer/create-draining-control)
              (s/run actor-system)))
      
    2. s/map-async executes mapping function with 2 messages being processed in parallel

    3. Then we will publish messages to another topic and commit offsets to Kafka via s/to-mat and producer/committable-sink

    4. Finally, we run the stream with our actor-system using s/run

  4. Let’s create required dependencies.

     (def actor-system (actor/->actor-system "test-actor-system"))
    
     (def committer-settings (committer/committer-settings actor-system {:batch-size 1}))
    
     (def consumer-settings (consumer/consumer-settings actor-system
                                                   {:group-id "a-test-consumer"
                                                    :bootstrap-servers "localhost:9092"
                                                    :key-deserializer (StringDeserializer.)
                                                    :value-deserializer (StringDeserializer.)}))
    
     (def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092"
                                                                 :key-serializer (StringSerializer.)
                                                                 :value-serializer (StringSerializer.)}))
    
  5. Let’s run the stream and see it in action

     (def consumer-control (test-stream-with-producer actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
    

    Streams in action :D

  6. We can shut this Alpakka Kafka stream down via following.

     ;; shutdown streams using consumer-control var
     @(consumer/drain-and-shutdown consumer-control
                             (CompletableFuture/supplyAsync
                               (utils/->fn0 (fn [] ::done)))
                             (actor/get-dispatcher actor-system))
    

    utils/->fn0 reifies java.util.function.Supplier interface to a Clojure function with 0 arity.

  7. We can shutdown Akka actor-system as well.

     @(actor/terminate actor-system)
    

I hope this story introduced Alpakka Kafka as a viable alternate to Kafka streams using Clojure. I have documented more examples on using Alpakka Kafka with my Clojure wrapper library here.

Top comments (0)