loading...
Cover image for Yggdrasil and RabbitMQ Subscriptions

Yggdrasil and RabbitMQ Subscriptions

alexdesousa profile image Alex de Sousa Originally published at thebroken.link ・3 min read

Yggdrasil: Easy Pub-Sub in Elixir (4 Part Series)

1) Yggdrasil: Easy Pub-Sub in Elixir 2) Yggdrasil and PostgreSQL Notifications 3) Yggdrasil and RabbitMQ Subscriptions 4) Yggdrasil as Distributed PubSub

One of the features I really like about RabbitMQ is its queue routing. Its flexibility allows you to do interesting things without much of a hassle. But before I dig deep into RabbitMQ's routing capabilities, I would like to mention some concepts.

Connections and Channels

RabbitMQ uses not only connections, but virtual connections called channels. The idea of channels is to introduce multiplexing in a single connection. A small system could establish only one connection with RabbitMQ while opening a channel for every single execution thread e.g:

RabbitMQ connection multiplexing

The rule of thumb would be to use:

  • One connection per application.
  • One channel per process in the application.

Note: Once our connection starts to be overloaded, we can start adding more connections to our connection pool.

With a normal RabbitMQ setup, we need to deal with:

  • Connection pools: avoiding over consuming resources.
  • Channel cleaning: avoiding channel memory leaks when they are not closed properly.
  • Fault-tolerant connections: supporting re-connections in case of failure or disconnection.
  • Re-connection back-off time: avoiding overloading the database on multiple re-connections.

Exchanges and Queues

An exchange is a message router. Every queue attached to it will be identified by a routing key. Typically, routing keys are words separated by dots e.g. spain.barcelona.gracia.

Additionally, routing keys support wildcards, for example: spain.barcelona.* will match messages with routing keys like spain.barcelona.gracia and spain.barcelona.raval.

It's easier to see these concepts with an image example:

RabbitMQ message routing

In the previous image:

  • Publisher X and Publisher Y are sending messages to Exchange logs.
  • Subscriber A is subscribed to logs.*.
  • Subscriber B is subscribed to logs.error.

Then:

  • Publisher X message will end up in Queue logs.info.
  • Publisher Y message will end up in Queue logs.error.
  • Subscriber A will receive Publisher X and Publisher Y's messages.
  • Subscriber B will receive Publisher Y's message.

Handling Subscriptions in Yggdrasil

Handling RabbitMQ's complexity might be intimidating. Fortunately, Yggdrasil for RabbitMQ generalizes the complexity in order to have a simpler API.

The biggest difference with previous adapters is the channel name. Instead of being a string, it's a tuple with the exchange name and the routing key e.g:

A subscriber would connect to the exchange amq.topic using the routing key logs.* as follows:

iex(subscriber)> Yggdrasil.subscribe(name: {"amq.topic", "logs.*"}, adapter: :rabbitmq)
iex(subscriber)> flush()
{:Y_CONNECTED, %Yggdrasil.Channel{...}}

Note: The exchange must exist and its type should be topic. The exchange amq.topic is created by default in RabbitMQ.

Then a publisher could send a message to the exchange amq.topic using logs.info as routing key:

iex(publisher)> Yggdrasil.publish([name: {"amq.topic", "logs.info"}, adapter: :rabbitmq], "Some message")
:ok

Finally, the subscriber would receive the message:

iex(subscriber)> flush()
{:Y_EVENT, %Yggdrasil.Channel{...}, "Some message"}

Additionally, the subscriber can be written using the Yggdrasil behaviour:

defmodule Logs.Subscriber do
  use Yggdrasil

  def start_link(options \\ []) do
    channel = [
      name: {"amq.topic", "logs.*"},
      adapter: :rabbitmq
    ]

    Yggdrasil.start_link(__MODULE__, [channel], options)
  end

  @impl true
  def handle_event(_channel, message, _state) do
    ... handle event ...
    {:ok, nil}
  end
end

Lost Messages

Yggdrasil will acknowledge the messages as soon as they arrive to the adapter, then it will broadcast them to all the subscribers. If the adapter is alive while the subscribers are restarting/failing, some messages might be lost.

Though it's possible to overcome this problem with exclusive queues, this feature is not implemented yet.

Conclusion

Yggdrasil for RabbitMQ handles RabbitMQ complexity let's you focus in what really matters: messages.

Cover image by Aswathy N

Yggdrasil: Easy Pub-Sub in Elixir (4 Part Series)

1) Yggdrasil: Easy Pub-Sub in Elixir 2) Yggdrasil and PostgreSQL Notifications 3) Yggdrasil and RabbitMQ Subscriptions 4) Yggdrasil as Distributed PubSub

Posted on by:

Discussion

markdown guide
 

It looks very good. Does Yggdrasil contain similar functions like Broadway handle_batch or handle_message?

 

Using Yggdrasil behaviour you can implement the handle_event callback and accomplish comparable results as handle_message callback.

However, when I first implemented Yggdrasil, I didn't need too worry about losing messages or handling back-pressure. In that case, Broadway is superior.

Before Broadway was released, I was planning on building an adapter using GenStage in order to having those features in the adapter as well. But I never get to do it in the end. Still, it's in my TODO list.

 

Thanks for your reply Alex. That would be great, maybe someday you'll do it :-)