DEV Community

Miroslav Malkin
Miroslav Malkin

Posted on

Injecting and Discovering Dependencies in OTP Supervisors

Our task

We need siblings of supervisors to find out about each other. Preferably we also want our supervisor tree to be "hermetic": the possibility to start multiple tree instances with different options.

Example: a pipeline for processing network data which consists of several intercommunicating processes that consume and transform data. We would like to start different pipelines for different data sources in an ad-hoc fashion so they should not interfere with each other and be "hermetic".

There are several ways to do it.

Our options

Hardcoded process name registration

The easiest and most common way is to simply register your processes using global names.

defmodule PipelineSupervisor do
  use Supervisor

  @impl Supervisor
  def init([]) do
    children = [
      {SomeConsumer, name: ConsumerGlobalName}
      {SomeWorker, %{consumer: ConsumerGlobalName}},
    ]

    Supervisor.init(children, strategy: :rest_for_one)
  end
end

defmodule SomeWorker do
  use GenServer

  @impl GenServer
  def init(%{consumer: consumer} = options) do
    {:ok, options}
  end

  @impl GenServer
  def handle_call({:process_data_chunk, data_chunk}, _from, state) do
    {:reply, :ok, do_process_data_chunk(data_chunk, state)}
  end

  defp do_process_data_chunk(data_chunk, state) do
    case parse_data_somehow(data_chunk, state) do
      {:ok, parsed_message, new_state} ->
        Process.send(state.consumer, {:message_received, parsed_message})
        new_state

      {:more_data_necessary, new_state} ->
        new_state
    end
  end
end

defmodule SomeConsumer do
  use GenServer

  def start_link(%{name: name}) do
    GenServer.start_link(__MODULE__, [], name: name)
  end

  @impl GenServer
  def init([]) do
    {:ok, :undefined_state}
  end

  @impl GenServer
  def handle_info({:message_received, parsed_message}, state) do
    # ...
    {:noreply, state}
  end
end
Enter fullscreen mode Exit fullscreen mode

Instead of implicit name registration provided by erlang, you can use some process registry that will serve basically as DI container: a way to register and lookup dependencies (process pid) by some arbitrary id.

These include:

  • Using a well-known and reliable registry, for instance, Elixir.Registry
  • Using some self-written registry that stores its state (process pid mappings) as GenServer State or in ETS

When the process starts, it registers itself. When its pid is required by other processes they ask this DI registry for necessary dependency.

But the problem is still there. It is a chicken and egg dilemma: To call registry you need in turn to know its pid or name.

This solution is easy and common, but it prevents us from running the supervision tree multiple times because of the hardcoded process name of Consumer.

Prefixed process name registration

To make our supervision tree hermetic, we can prefix all processes with some common key and make it an init parameter of our supervisor.

defmodule PipelineSupervisor do
  use Supervisor

  @impl Supervisor
  def init(%{name_prefix: prefix}) do
    consumer_name = Module.concat([__MODULE__, prefix, Consumer])

    children = [
      {SomeConsumer, name: consumer_name}
      {SomeWorker, %{consumer: consumer_name}},
    ]

    Supervisor.init(children, strategy: :rest_for_one)
  end
end
Enter fullscreen mode Exit fullscreen mode

Although it gets the job done, it clutters the process namespace and creates API that might seem awkward ("why do you need me to provide some prefix?").

Implicit registry using Supervisor.which_children function

If we pay closer attention to Supervisor we can notice that any supervisor is basically a registry itself. It naturally knows pids of all its processes and all processes have unique ids local to the supervisor.

Unfortunately (or luckily) supervisors are single-purpose building blocks and therefore lack convenient apis to lookup their children. However, we still have the way to use supervisors as registers using Supervisor.which_children function.

defmodule PipelineSupervisor do
  use Supervisor

  @impl Supervisor
  def init([]) do
    supervisor_pid = self()

    children = [
      SomeConsumer,
      {SomeWorker, %{parent_supervisor: supervisor_pid}},
    ]

    Supervisor.init(children, strategy: :rest_for_one)
  end

  def child_pid!(supervisor_pid, child_id) do
    spec = Supervisor.which_children(supervisor_pid)

    case List.keyfind(spec, child_id, 0) do
      {_id, pid, _type, _modules} when is_pid(pid) ->
        pid

      nil ->
        raise "no started child with id:#{id} in supervisor spec:#{inspect(spec)}"
    end
  end
end

defmodule SomeWorker do
  use GenServer

  @impl GenServer
  def init(%{supervisor_pid: supervisor_pid} = options) do
    {:ok, options}
  end

  defp do_process_data_chunk(data_chunk, state) do
    # ...
    consumer_pid = PipelineSupervisor.child_pid!(state.supervisor_pid, SomeConsumer)
    Process.send(consumer_pid, {:message_received, parsed_message})
    # ...
    end
  end
end

defmodule SomeConsumer do
  use GenServer

  def start_link([]) do
    GenServer.start_link(__MODULE__, [])
  end

end
Enter fullscreen mode Exit fullscreen mode

In some cases calling a supervisor on each message might create a performance bottleneck. You can fix this by calling child_pid! once in Worker.init function and saving Consumer pid in GenServer state.

defmodule SomeWorker do
  use GenServer

  @impl GenServer
  def init(%{supervisor_pid: supervisor_pid} = options) do
    consumer_pid = PipelineSupervisor.child_pid!(state.supervisor_pid, SomeConsumer)
    {:ok, %{consumer_pid: consumer_pid}}
  end

  defp do_process_data_chunk(data_chunk, state) do
    # ...
    Process.send(state.consumer_pid, {:message_received, parsed_message})
    # ...
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

But when we save process pids, we should always be careful, and in our case, think about what would happen if Consumer crashes. In our case, everything will be fine because of the rest_for_one supervisor strategy. If Consumer crashes, its pid saved in Worker will become invalid. But our supervisor is configured with rest_for_one strategy, and Consumer is higher in its children list than Worker. Therefore when Consumer crashes, Worker will also be restarted and will discover pid of new Consumer.

Starting dependencies ad-hoc

Also, we can exploit the fact that Supervisor returns pid of started process in Supervisor.start_child function.

defmodule PipelineSupervisor do
  use Supervisor

  @impl Supervisor
  def init([]) do
    supervisor_pid = self()

    children = [
      {SomeWorker, %{parent_supervisor: supervisor_pid}},
    ]

    Supervisor.init(children, strategy: :rest_for_one)
  end

  def start_consumer do
    Supervisor.start_child(SomeConsumer)
  end
end

defmodule SomeWorker do
  use GenServer

  @impl GenServer
  def init(%{supervisor_pid: supervisor_pid} = options) do
    {:ok, options, {:continue, :start_worker}}
  end

  def handle_continue(:start_worker, state) do
    {:ok, consumer_pid} = PipelineSupervisor.start_consumer()
    {:noreply, Map.put(state, :consumer_pid, consumer_pid)}
  end

  defp do_process_data_chunk(data_chunk, state) do
    # ...
    Process.send(state.consumer_pid, {:message_received, parsed_message})
    # ...
    end
  end
end

defmodule SomeConsumer do
  use GenServer

  def start_link([]) do
    GenServer.start_link(__MODULE__, [])
  end

end
Enter fullscreen mode Exit fullscreen mode

Note that we have to use :handle_continue callback to avoid deadlocking the supervisor. Otherwise, it would not be able to reply to our start_child call because it would be waiting init function to return.

This method of DI is convoluted and probably not worth it just for the sake of itself. But it can make more sense in more complicated scenarios where Worker instead of just sending messages to Consumer also needs to manage its lifetime (kill or restart).

Conclusion

Some of the options that I showed may seem complex and unnecessary. And in many cases that is entirely true. But my point is that using OTP is much more than just writing GenServers and occasionally some custom Supervisor or two. OTP gives us a pretty basic but very smart combination of base elements that we can use to form some higher-level usage patterns. Our task as a community is to discover those patterns and make them simple to use.

To find out more about supervisors and their usage patterns look at this great blogpost Using Supervisors to Organize Your Elixir Application

About the author

I am Miroslav Malkin, Erlang/Elixir expert at FunBox. I am also co-founder of CogitoΣ, where we investigate code and process quality and measure their impact on product success.

Discussion (1)

Collapse
ewildgoose profile image
Ed Wildgoose

Fantastic article! Thanks!

I also think the "Parent" library has some nice abstractions for the kinds of challenges you address here?