I recently had to build convenience wrappers around producers and consumers for RabbitMQ in a pub-sub architecture.
There are many obstacles to ensuring reliable (at-least-once) message delivery when using a distributed message broker like RabbitMQ.
Messages may be lost between the producer and the broker, inside the broker, and between the broker and consumers. And there are many subtleties.
In a nutshell, you need to enable:
- publisher confirms - so that the broker acks messages
- message persistence - so that the broker acks messages only after flushing it to disk
- durable queues (with a durable exchange) - so that queues survive broker and consumer restarts
- consumer acking - so that the queue retains messages until acked by a consumer
The official RabbitMQ website has some great documentation, including this Elixir tutorial, but examples of using the AMQP primitives in a robust OTP setting are sparse.
In this post I specifically want to address efficient publisher confirms.
Synchronously awaiting publisher confirmations
Here are two projects with examples of publisher modules:
Unfortunately, both make use of the AMQP.Confirm.wait_for_confirms/2
primitive inside a GenServer
callback. In this way, messages are published (with AMQP.Basic.publish/5
) and immediately awaited inside a handle_call/3
callback and thus creates a bottleneck that effectively prevents concurrent publishing. Throughput will certainly suffer.
Async handling of publisher confirmations
There is another way. AMQP.Confirm.register_handler/2
allows you to register a process that will receive broker confirmations asynchronously. The acks (or nacks) will then invoke handle_info
with one of these sets of arguments:
{:basic_ack, seqno, multiple}, state
{:basic_nack, seqno, multiple}, state
This indicates that the publish with sequence number seqno
was either acked or nacked. If we somehow had a mapping between seqno
and the original process that called the our GenServer's publish callback, we would be able to reply to it with a success or error message.
Fortunately, the ampq
library provides us with AMQP.Confirm.next_publish_seqno
so that we can get a handle on seqno
at the same point in time where we have a handle on from
(the process that called us in the first place), allowing us to store a mapping between them.
Enough high-level talk. Let's step through an outline of the solution. First we get the boilerplate out of the way:
defmodule MyPublisher do
use GenServer
...
def start_link(channel) do
# Start the process
GenServer.start_link(__MODULE__, channel, name: __MODULE__)
end
def init(channel) do
# Initialize the process state and do setup. We
# assume you have an exchange called "pub_sub"
# already set up.
# Enable publisher confirms
AMQP.Confirm.select(channel)
# Register the current process as the async handler
# for acks and nacks
AMQP.Confirm.register_handler(channel, self())
...
# Put the channel in the state. Proper
# connection and channel management is
# not covered in this post.
{:ok, %{channel: channel}}
end
...
end
Now we'll add the API for publishing messages:
def publish(topic, payload) do
# This function runs in the calling process
GenServer.call(__MODULE__, {:publish, topic, payload})
end
def handle_call({:publish, topic, payload}, from, state) do
# This function runs in the GenServer
...
# Perform the actual publishing (ensure it is persistent)
AMQP.Basic.publish(state.channel, "pub_sub", topic, payload, persistent: true)
# Note that we are NOT replying here
{:noreply, state}
end
And, having registered the GenServer
as the handler of confirmation messages, we need to implement some confirmation handling callbacks:
def handle_info({:basic_ack, seqno, multiple}, state) do
# The broker is saying that the publish with `seqno`
# has been received. If `multiple` is true, all messages
# up to `seqno` have been received.
confirm_messages(seqno, multiple, fn from ->
# For each of the received messages,
# we can reply with `:ok`.
GenServer.reply(from, :ok)
end)
{:noreply, state}
end
def handle_info({:basic_nack, seqno, multiple}, state) do
# The broker is saying that the publish with `seqno`
# has been lost. If `multiple` is true, all messages
# up to `seqno` have been lost.
confirm_messages(seqno, multiple, fn from ->
# For each of the lost messages,
# we can reply with `{:error, :nack}`.
GenServer.reply(from, {:error, :nack})
end)
{:noreply, state}
end
We next need to understand the implementation of confirm_messages
, but first, some updates to the boilerplate and publishing code:
def init(channel) do
...
# Create an ETS table in which the elements are ordered. It
# will be owned by the GenServer process. We make it private
# because there is no need for other processes to read or
# write to it.
:ets.new(@table_name, [:ordered_set, :private, :named_table])
{:ok, %{channel: channel}}
end
...
def handle_call({:publish, topic, payload}, from, state) do
# Before we publish, get a hold of the next publish
# sequence number `seqno`.
seqno = AMQP.Confirm.next_publish_seqno(state.channel)
# Store the caller reference (`from`) against
# the `seqno` in ETS
:ets.insert(@table_name, {seqno, from})
# Perform the actual publishing
...
end
And now finally, when receiving an ack or a nack, we can lookup
the corresponding caller reference by seqno
and reply appropriately:
defp confirm_messages(seqno, _multiple = false, reply_fun) do
# Lookup and remove the entries for seqno
case :ets.take(@table_name, seqno) do
[{^seqno, from}] -> reply_fun.(from)
_ -> :ok
end
end
The case where the broker chooses to acknowledge multiple messages
at once is somewhat less readable and involves a clunky ETS-specific match specification, but you can largely gloss over that:
defp confirm_messages(seqno, true = _multiple, reply_fun) do
:ets.select(@table_name, [
{
# bind `seqno` and `from` to $1 and $2
{:"$1", :"$2"},
# match entries where the bound seqno is =< to the incoming seqno
[{:"=<", :"$1", seqno}],
# and return all the matching entries as tuples of $1 and $2
[{{:"$1", :"$2"}}]}
])
|> Enum.each(fn {key, from} ->
reply_fun.(from)
:ets.delete(@table_name, key)
end)
end
Full outline
Here is the module in full. But keep in mind that we did not include any fault-tolerance in terms of connections or channels dying. We also do not include exchange declaration.
defmodule MyPublisher do
use GenServer
@table_name :pending_confirms
def start_link(channel) do
GenServer.start_link(__MODULE__, channel, name: __MODULE__)
end
def init(channel) do
AMQP.Confirm.select(channel)
AMQP.Confirm.register_handler(channel, self())
:ets.new(@table_name, [:ordered_set, :private, :named_table])
{:ok, %{channel: channel}}
end
def publish(topic, payload) do
GenServer.call(__MODULE__, {:publish, topic, payload})
end
def handle_call({:publish, topic, payload}, from, state) do
seqno = AMQP.Confirm.next_publish_seqno(state.channel)
# Store the caller reference in ETS
:ets.insert(@table_name, {seqno, from})
# Perform the actual publishing
AMQP.Basic.publish(state.channel, "pub_sub", topic, payload, persistent: true)
{:noreply, state}
end
def handle_info({:basic_ack, seqno, multiple}, state) do
confirm_messages(seqno, multiple, fn from ->
GenServer.reply(from, :ok)
end)
{:noreply, state}
end
def handle_info({:basic_nack, seqno, multiple}, state) do
confirm_messages(seqno, multiple, fn from ->
GenServer.reply(from, {:error, :nack})
end)
{:noreply, state}
end
defp confirm_messages(seqno, true = _multiple, reply_fun) do
:ets.select(@table_name, [
{{:"$1", :"$2"}, [{:"=<", :"$1", seqno}], [{{:"$1", :"$2"}}]}
])
|> Enum.each(fn {key, from} ->
reply_fun.(from)
:ets.delete(@table_name, key)
end)
end
defp confirm_messages(seqno, false, reply_fun) do
case :ets.take(@table_name, seqno) do
[{^seqno, from}] -> reply_fun.(from)
_ -> :ok
end
end
end
Conclusion
We have outlined a solution for efficient publisher confirmations. This enables the calling process (the one that invokes MyPublisher.publish/2
) to know for certain whether a published message successfully made it to the broker or not.
The solution is efficient, because even though the call to MyPublisher.publish/2
is synchronous, it does not block calls to the same function from within other processes while waiting for confirmation from the broker. Processing of the confirmation is asynchronous.
The power of this solution lies in the, perhaps, underappreciated fact that synchronous GenServer
callbacks (like handle_call
) do not need to reply immediately. As long as it keeps track of who called, it can always reply at a later stage (keeping timeouts in mind), after some async work is complete.
Finally, if you don't need to support very high throughput, but still want to benefit from the async publisher confirmation handling, you can also use a plain old map instead of an ETS table to match seqno
to from
.
Top comments (0)