DEV Community

Mustafa Turan
Mustafa Turan

Posted on

Decoupled Modules with Elixir EventBus

Elixir EventBus is a library that allows different modules to communicate with each other without knowing about each other. A module/function can create an Event struct, and deliver to the EventBus without knowing which modules will consume.

Modules can also listen to events on the EventBus, without knowing who sent the events. Thus, modules can communicate without depending on each other. Moreover, it is very easy to substitute a consumer module. As long as the new module understands the Event struct that is being sent and received, the other modules will never know.


Get started with decoupled modules in 4 steps

Decoupled modules help us to write clear code, focus on only one thing at a time. With EventBus library, you can get benefit of real event bus system and write decoupled modules easily.

Registering event topics

On your app init, or anytime before delivering the events we need to register our topics. For example, when we receive a payment, we will create checkout_completed event.

EventBus.register_topic(:checkout_completed)
EventBus.register_topic(:checkout_failed)
Enter fullscreen mode Exit fullscreen mode

Delivering events

EventBus.EventSource module provides notify helper to deliver events without modifying your current code.

defmodule Order do
  ...
  use EventBus.EventSource
  ...
  def checkout(params) do
    event_params = %{topic: :checkout_completed, error_topic: :checkout_failed}
    EventSource.notify(event_params) do
      ... # process the payment as usual in here and if errors then return {:error, _} tuple
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Subscribing consumers to topics

Assume that we have several operations waiting on success and failure conditions. Let's subscribe the consumers to the event topics.

EventBus.subscribe({StockUpdateService, ["^checkout_completed$", "^new_stock_arrived$", ...]})
EventBus.subscribe({CargoService, ["^checkout_completed$, "^cargo_dispatched$", "^new_stock_arrived$", ...]})
EventBus.subscribe({EmailService, ["^user_registered$", "^checkout_completed$", ...]})
EventBus.subscribe({EventBus.Logger, [".*"]})
Enter fullscreen mode Exit fullscreen mode

Consuming events

EventBus consumers are just modules so they can be implemented by any kind of consumer strategy including GenStage(event_bus_postgres as sample), pooling, DynamicSupervisors, global consumers, spawn et al. So, depending on your use case, implement your consumers with the best suitable strategy. (But any kind of blocker strategies are not recommended!)

defmodule CargoService do
  use GenServer
  ...
  def process({topic, id}) do
    GenServer.cast(__MODULE__, event_shadow)
  end

  def handle_cast({topic, id}, state) do
    payment_data = EventBus.fetch_event_data({topic, id})
    # do sth with payment_data
    ...
    # mark event as completed for this consumer
    EventBus.mark_as_completed({CargoService, topic, id})
    {:noreply, state}
  end
end
Enter fullscreen mode Exit fullscreen mode

Here is another consumer that needs all event structure and consumes with spawning:

defmodule StockUpdateService do
  ...
  def process({topic, id}) do
    spawn(fn ->     
      event = EventBus.fetch_event({topic, id})
      # do sth with event.data and/or any other event attributes
      ...
      # mark event as completed for this consumer
      EventBus.mark_as_completed({StockUpdateService, topic, id})
    end)
  end
end
Enter fullscreen mode Exit fullscreen mode

About the EventBus library

EventBus library is a traceable, extendible, fast, and memory friendly pure Elixir implementation without any external dependency in the pocket.

Traceable

EventBus library comes with good optional attributes to provide traceability. When a consumer receive the topic and event_id, it can fetch structure with fetch_event/1 function. And if you use EventBus.EventSource helper, the optional fields are set automatically for you. Here is the structure of an Event model:

%EventBus.Model.Event{
  id: String.t | integer(), # required
  transaction_id: String.t | integer(), # optional
  topic: atom(), # required
  data: any() # required,
  initialized_at: integer(), # optional
  occurred_at: integer(), # optional
  source: String.t(), # optional
  ttl: integer() # optional
}
Enter fullscreen mode Exit fullscreen mode

Extendible

EventBus library allows subscribing events with regex patterns, which allows consumers to subscribe new topics automatically. With this feature, it allows extending the system asynchronously with generic consumers like event_bus_logger, generic Postgresql event store, event_bus_metrics UI, and so on.

Fast by design

EventBus library uses builtin memory store ETS effectively to get benefits of concurrent reads and writes. It doesn't block majority of the read operations, and allows concurrent reads to fetch Event data.

It applies queueing theory to handle inputs. And almost all implementation data accesses have O(1) complexity.

Memory friendly

EventBus library delivers only topic and event_id to the consumers and keeps the original event data in the topic's Event table. So, only when consumers are able to process the event, then they fetch the event data.


Distributed?

Even though the main intention to create this library is to use for internal module communications, it is possible to deliver events across nodes. One of the ways to delivery is sending message to connected other nodes as usual Elixir/Erlang way:

for node <- Node.list() do
  Node.spawn(node, EventBus, :register_topic, [:checkout_completed])
  Node.spawn(node, EventBus, :notify, [%{EventBus.Model.Event{..}}])
end
Enter fullscreen mode Exit fullscreen mode

Note: EventBus library is not in the same class distributed solutions like Apache Kafka, RabbitMQ, or Redis Streams.

Original story on Medium: https://medium.com/elixirlabs/decoupled-modules-with-elixir-eventbus-a709b1479411

The source code of EventBus library: https://github.com/otobus/event_bus

Top comments (0)