DEV Community

Ahsan Nabi Dar
Ahsan Nabi Dar

Posted on

Brod; boss kafka in elixir

Elixir has rapidly gained recognition as a powerful language for developing distributed systems. Its concurrency model, based on the Erlang VM (BEAM), allows developers to build highly scalable and fault-tolerant applications. One of the areas where Elixir shines is in reading messages from Kafka using Broadway to build concurrent and multi-stage data ingestion and processing pipelines. However, when it comes to producing messages to Kafka, Elixir's ecosystem seems to lack a unified focus, leading to some confusion.

Elixir's inherent support for concurrency and fault tolerance makes it an ideal choice for distributed systems. The language's lightweight process model, along with features like supervisors and the actor model, enables developers to create systems that can handle massive loads and recover gracefully from failures. This makes Elixir particularly well-suited for distributed systems, where reliability and performance are crucial.

In the Elixir ecosystem, there is a clear focus on consuming messages from Kafka. Libraries like Broadway make it easy to build sophisticated data ingestion pipelines. Broadway allows developers to define multi-stage pipelines that can process large volumes of data concurrently, leveraging Elixir's strengths in concurrency and fault tolerance.

While Broadway excels at consuming messages, there's a slight terminology hiccup. The method for sending messages is called produce, which might be a bit confusing. It's important to remember that Broadway focuses on the consumer side of the Kafka equation.

However, when it comes to producing messages to Kafka, Elixir's libraries present a more fragmented landscape.

There are three primary Kafka libraries available for Elixir developers: brod, kaffe, and kafka_ex. Each of these libraries has its own strengths and use cases.

brod: An Erlang client for Kafka, brod is known for its robustness and performance. It operates seamlessly within the BEAM ecosystem, taking advantage of Erlang's mature infrastructure for building distributed systems. However, working with brod can be cumbersome and requires a fair amount of setup. Despite this, it remains a reliable and performant choice for Kafka integration.

kaffe: A wrapper around brod, kaffe simplifies the process of interacting with Kafka, particularly for those using Heroku Kafka clusters. By abstracting away some of the complexities of brod, kaffe makes it easier for developers to get started with Kafka in Elixir. It focuses on providing a more user-friendly experience while still leveraging the underlying power of brod.

kafka_ex: Currently in a state of transition, kafka_ex is undergoing significant changes that are not backward compatible. This situation can be likened to Python's shift from version 2 to version 3, where developers faced considerable breaking changes. While kafka_ex has been a popular choice, the ongoing transition means developers need to be cautious about using it in production until the changes stabilize.

Keeping in mind all the above and scouring elixirforum.com the most shared opinion was to go with brod and write a wrapper around it.

Something that lacks is over how to set it up in elixir and in particular phoenix as brod is an erlang library.

To setup brod in phoenix you need to define a supervisor process that can than be added to your phoenix supervisor like other process to be part of your supervision tree.

To setup the brod client with sasl and ssl as a supervisor it can be done as below.

defmodule Maverick.Kafka.BrodSupervisor do
  @moduledoc """
  Maverick.Kafka.BrodSupervisor
  """
  use Supervisor

  alias Maverick.Kafka, as: Kafka

  def start_link(_config) do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_config) do
    :ok =
      :brod.start_client(
        Kafka.hosts(),
        Kafka.brod_client(),
        ssl: true,
        ssl_options: [
          # from CAStore package
          cacertfile: CAStore.file_path(),
          verify_type: :verify_peer,
          customize_hostname_check: [
            match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
          ]
        ],
        sasl: Kafka.authentication(),
        auto_start_producers: true,
        reconnect_cool_down_seconds: 10,
        default_producer_config: [
          required_acks: -1,
          partition_buffer_limit: 1024
        ]
      )

    children = []
    Supervisor.init(children, strategy: :one_for_one)
  end
end
Enter fullscreen mode Exit fullscreen mode

After this you can wrap the message producer for convenience

defmodule Maverick.Kafka.Brod do
  @moduledoc """
  Maverick.Kafka.Brod
  """

  use Retry.Annotation

  @retry with: exponential_backoff() |> randomize() |> expiry(10_000)
  def produce(client, topic, partition, key, msg) do
    :brod.produce_sync(client, topic, partition, key, msg)
  end
end
Enter fullscreen mode Exit fullscreen mode

I highly recommend to check out retry hex package it will save you from all those network issues as network is always dubious.

Before I close this post the thing I would like to highlight is the required_acks settings and what it means the required_acks (also known as acks) setting determines how many acknowledgements the producer requires the leader to have received before considering a request complete. This setting has a significant impact on the durability and consistency guarantees of your messages. The required_acks can be set to different values:

0: The producer does not wait for any acknowledgement from the server at all. This means that the producer will not receive any acknowledgement for the messages sent, and message loss can occur if the server fails before the message is written to disk.
1: The leader writes the record to its local log but responds without waiting for full acknowledgement from all followers. This means that the message is acknowledged as soon as the leader writes it, but before all replicas have received it.
-1 (or all): The leader waits for the full set of in-sync replicas to acknowledge the record. This is the strongest guarantee and means that the message is considered committed only when all in-sync replicas have acknowledged it.

In the context of the brod library, setting required_acks: -1 ensures that:

The producer waits for acknowledgements from all in-sync replicas before considering the message successfully sent.
This provides the highest level of durability since the message will be available even if the leader broker fails after the message is acknowledged.

I hope this makes it simple for people looking to work with Kafka using Elixir.

Top comments (0)