DEV Community

Danie Palm
Danie Palm

Posted on

Doing things only once in Elixir

Listen very carefully, I shall say zis only once

It often so happens that multiple processes need the same, potentially expensive function to be executed. A common solution is to cache the computed value for future callers, but on a cold cache (or when a cached value expires) it could still happen that the function is executed multiple times. Such a spike in resource usage is known as the stampeding herd problem. If only the animals would proceed in an orderly fashion.

Elixir's GenServer provides the handle_call callback, which is guaranteed to execute synchronously. As such, if several processes make a call to a GenServer, they would execute sequentially, and the value could even be cached for the benefit of the next caller. Problem solved.

However, anything that runs within the handle_call or any other callback is blocking, potentially making the GenServer appear unresponsive and turn it into a bottleneck. Ever heard of Python's GIL :)

But what if the GenServer could dish out tasks to be executed as separate processes and keep the execution time of the callback to a minimum? To make this work, we need to cover two concepts.

handle_call doesn't have to reply immediately

You are probably familiar with the anatomy of a typical handle_call implementation:

def handle_call(request, from, state) do
  result, new_state = calculate_state(request, state )
  {:reply, result, new_state}
end
Enter fullscreen mode Exit fullscreen mode

The :reply in the tuple means that the result will be sent immediately to the caller (the from). But handle_call may also return {:noreply, new_state} in which case the caller will be left hanging until it receives a reply or times out.

To send a reply to a caller after having initially returned {:noreply, new_state} we have to use GenServer.reply/2, which needs the from and some kind of result. This can be called from within any callback implementation.

Task is built on message passing

It's good advice to only call Task.async if you are also going to await the result. Typically you would do so with Task.await or some of the other functions available in the Task module. But under the hood these functions rely on receiving the result from the task by means of message passing. That is, they make use of Kernel.SpecialForms.receive/1 to wait for the result to roll in.

When you call Task.async from within a GenServer callback, you could await it with Task.await as usual, but for the duration of the computation of the task, the callback would be blocking, preventing the GenServer from handling any other calls. Depending on what you want from your GenServer this may defeat the purpose altogether.

Fortunately, within a GenServer you can await the success or failure of a task as you would await any other message - with a handle_info callback implementation:

def handle_info({ref, result}, state) do
  handle_task_success(ref, result, state)
end

def handle_info({:DOWN, ref, _, _, reason}, state) do
  handle_task_failure(ref, reason, state)
end
Enter fullscreen mode Exit fullscreen mode

Make sure to guard that ref is a reference, otherwise you might catch unintended calls.

A solution

So here is a broad overview of a solution to the stampeding herd problem. Processes that are interested in the results of a function, can call a GenServer to provide them with the result. The GenServer returns the result if it has it, or spawns a Task to compute it and keeps track of the caller, or if it has spawned a Task in the past, it doesn't do so again, but still keeps track of the caller. On receiving the response, it notifies all the callers using GenServer.reply.

Here is the whole thing together:

defmodule GenHerder do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, [], Keyword.put(opts, :name, __MODULE__))
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call(request, from, state) do
    case state[request] do
      nil ->
        task =
          Task.Supervisor.async_nolink(__MODULE__.TaskSupervisor, fn ->
            handle_request(request)
          end)

        {:noreply, Map.put(state, request, {:task, task, [from]})}

      {:task, task, froms} ->
        {:noreply, Map.put(state, request, {:task, task, [from | froms]})}

      {:result, result} ->
        {:reply, result, state}
    end
  end

  defp handle_request(_request) do
    :only_once_kenobi
  end

  @impl true
  def handle_info({ref, result}, state) when is_reference(ref) do
    handle_task_success(ref, result, state)
  end

  @impl true
  def handle_info({:DOWN, ref, _, _, reason}, state) do
    handle_task_failure(ref, reason, state)
  end

  defp handle_task_success(ref, result, state) do
    # The task succeeded so we can cancel the monitoring and discard the DOWN message
    Process.demonitor(ref, [:flush])

    {request, _task_and_froms} =
      Enum.find(state, fn
        {_request, {:task, task, _forms}} -> task.ref == ref
        _ -> false
      end)

    {{:task, _task, froms}, state} = Map.pop(state, request)

    state = Map.put(state, request, {:result, result})

    # Send the result to everyone that asked for it
    for from <- froms do
      GenServer.reply(from, result)
    end

    {:noreply, state}
  end

  defp handle_task_failure(ref, reason, state) do
    {request, _task_and_froms} =
      Enum.find(state, fn
        {_request, {:task, task, _forms}} -> task.ref == ref
        _ -> false
      end)

    {{:task, _task, froms}, state} = Map.pop(state, request)

    # Send the result to everyone that asked for it
    for from <- froms do
      GenServer.reply(from, {:error, reason})
    end

    {:noreply, state}
  end
end
Enter fullscreen mode Exit fullscreen mode

You have to start it with something like this in a supervision tree:

children = [
          {Task.Supervisor, name: GenHerder.TaskSupervisor},
          GenHerder
        ]

Supervisor.start_link(children, Keyword.put(opts, :strategy, :one_for_one))
Enter fullscreen mode Exit fullscreen mode

And then you can call it with:

GenServer.call(GenHerder, request, timeout)
Enter fullscreen mode Exit fullscreen mode

Obviously, you can add nice wrappers around it. But a simpler way might be to use my package on hexpm. Call mix hex.info gen_herder for more info.

It implements the above with some sugar and also adds optional expiry of results.

Top comments (0)