DEV Community

Alex Reichert
Alex Reichert

Posted on

Learning Elixir's GenServer with a real-world example

I started learning Elixir a few months ago, mostly through hacking on Papercups. I'm ashamed to say most of my Elixir education has been through trial and error, figuring things out as I go along. So this past week I decided to take some time off from Papercups to go a bit deeper into Elixir.

In particular, I was itching to learn more about handling concurrency in Elixir. This, of course, led me to GenServers.

Anyone who's moderately familiar with Elixir has probably at least heard of GenServer. (If you haven't, that's ok too!) Strictly speaking, it's one of those things you can get away with avoiding for a while, and still be reasonably productive while using something like the Phoenix framework.

But it's an incredible useful feature of the language that you can add to your toolbox! There are so many nice things you can do with it. For example, with GenServers, you can:

  • create a simple cache, buffer, or rate limiter
  • set up recurring tasks to run every few hours (e.g. check out Jose's answer to "How can I schedule code to run every few hours in Elixir?" on Stack Overflow)
  • handle async processes much more easily, with built-in functionality for tracing, error reporting, and retry logic

...all without any external dependencies!

What is a GenServer, anyway?

A GenServer is a process, just like any other Elixir process. It can be used to manage state and execute code asynchronously, and includes some handy functionality around tracing and error reporting.

Another way of thinking about it — a GenServer is like an isolated little box, with a few things inside of it:

  • its internal state (which can be any Elixir data structure, from a simple number to a complex map/struct),
  • a mailbox to process incoming messages (in the order they're received),
  • and a callback module, which defines the code to run depending on the message that is received in the mailbox

If that's confusing, hopefully a real-world example will help clarify things! 😉

Finding a good real-world example

While learning about GenServers, I found many of the examples to be a bit contrived... a lot of "key-value" stores, shopping lists, and things like that. I wanted to understand how GenServers are being used "out in the wild", so I turned to one of my favorite Elixir open-source repos: https://github.com/plausible/analytics

A bit of background: Plausible is an open-source analytics platform, with >1000 paying customers. Because they are a real-world production application serving many users, I figured I could learn a lot from diagnosing one of their modules. 🤓

For the sake of this post, I'm going to go through one of the simpler GenServers in their repo: Plausible.Event.WriteBuffer, which can be found at /lib/plausible/event/write_buffer.ex

What does it do?

At a high level, this GenServer allows Plausible to insert large quantities of events into the database in batches, rather than doing an insertion for each individual event. (This seems particularly important for a product like Plausible's, where they are potentially ingesting thousands of events per second!)

At the code level, what this process is doing is essentially:

  • ingesting events via the insert/1 method
  • adding them to a buffer in the process's internal state, represented by a list (i.e. [])
  • "flushing" the buffer every 5 seconds, or if it reaches its capacity of 10,000 events — whichever comes first
    • where "flushing" means saving all the events in the buffer to a database (in this case, ClickHouse)

(For the full context, let's take a look at the codebase: we can see that the Plausible.Event.WriteBuffer GenServer is used in the PlausibleWeb.Api.ExternalController.event/2 method, which handles the logic for the POST /api/event API endpoint, which in turn gets called from their JavaScript tracking snippet.)

The code

Before we jump into it, here's the full module, with some minor stylistic tweaks and annotations I've added myself for some additional context:

defmodule Plausible.Event.WriteBuffer do
  use GenServer
  require Logger

  @flush_interval_ms 5_000
  @max_buffer_size 10_000

  # Client APIs

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def insert(event) do
    GenServer.cast(__MODULE__, {:insert, event})

    {:ok, event}
  end

  def flush() do
    GenServer.call(__MODULE__, :flush, :infinity)

    :ok
  end

  # Server (callbacks)

  @impl true
  def init(buffer) do
    Process.flag(:trap_exit, true)
    timer = Process.send_after(self(), :tick, @flush_interval_ms)

    {:ok, %{buffer: buffer, timer: timer}}
  end

  @impl true
  def handle_cast({:insert, event}, %{buffer: buffer, timer: timer} = _state) do
    new_buffer = [event | buffer]

    if length(new_buffer) >= @max_buffer_size do
      Logger.info("Buffer full, flushing to disk")
      Process.cancel_timer(timer)
      do_flush(new_buffer)
      new_timer = Process.send_after(self(), :tick, @flush_interval_ms)

      {:noreply, %{buffer: [], timer: new_timer}}
    else
      {:noreply, %{state | buffer: new_buffer}}
    end
  end

  @impl true
  def handle_info(:tick, %{buffer: buffer} = _state) do
    do_flush(buffer)
    timer = Process.send_after(self(), :tick, @flush_interval_ms)

    {:noreply, %{buffer: [], timer: timer}}
  end

  @impl true
  def handle_call(:flush, _from, %{buffer: buffer, timer: timer} = _state) do
    Process.cancel_timer(timer)
    do_flush(buffer)
    new_timer = Process.send_after(self(), :tick, @flush_interval_ms)

    {:reply, nil, %{buffer: [], timer: new_timer}}
  end

  @impl true
  def terminate(_reason, %{buffer: buffer} = _state) do
    Logger.info("Flushing event buffer before shutdown...")
    do_flush(buffer)
  end

  # Private/utility methods

  defp do_flush(buffer) do
    case buffer do
      [] ->
        nil

      events ->
        Logger.info("Flushing #{length(events)} events")
        events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
        Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Note: If you're able to read the code above and know exactly what's going on at every step, you probably don't need to read any further!

Let's start off where the core of the logic actually lives: in the GenServer callbacks.

The GenServer callbacks

The callbacks implemented in this particular module are:

  • init/1 (required) - invoked when the server is started, and sets the initial state
  • handle_cast/2 - invoked when GenServer.cast/2 is called, to handle messages asynchronously
  • handle_call/3 - invoked when GenServer.call/3 is called, to handle messages synchronously
    • GenServer.call/3 will block until a reply is received (unless the call times out or nodes are disconnected)
  • handle_info/2 - invoked to handle all other messages (i.e. outside of those triggered by GenServer.call/3 and GenServer.cast/2, and "system" messages)
    • e.g. messages/events handled internally within the GenServer
  • terminate/2 - invoked when the GenServer is about to exit

Let's go through each one! 🚀

The init/1 callback

The init/1 callback is invoked when the GenServer is started, and handles setting the initial internal state of the server process.

init callback diagram

Here's the method definition from above:

  @impl true
  def init(buffer) do
    Process.flag(:trap_exit, true)
    timer = Process.send_after(self(), :tick, @flush_interval_ms)

    {:ok, %{buffer: buffer, timer: timer}}
  end
Enter fullscreen mode Exit fullscreen mode

The first line of this method (Process.flag(:trap_exit, true)) sets us up to "trap exits". This allows us to handle any "clean-up" tasks before the process terminates, in the terminate/2 callback. (We'll discuss this below!)

Next, we set up a timer to run the :tick event in 5 seconds (@flush_interval_ms == 5_000). As we'll see below, this :tick event also calls itself recursively, so that it effectively runs every 5 seconds while the server is alive. (That explains why it's called "tick"!)

Finally, we store the initial buffer (in this case, an empty list, i.e. []) and the timer in the internal state.

The handle_cast/2 callback

The handle_cast/2 callback is invoked when GenServer.cast/2 is called, to handle asynchronous requests.

handle_cast callback diagram

Here's the method definition from above:

  @impl true
  def handle_cast({:insert, event}, %{buffer: buffer, timer: timer} = _state) do
    new_buffer = [event | buffer]

    if length(new_buffer) >= @max_buffer_size do
      Logger.info("Buffer full, flushing to disk")
      Process.cancel_timer(timer)
      do_flush(new_buffer)
      new_timer = Process.send_after(self(), :tick, @flush_interval_ms)

      {:noreply, %{buffer: [], timer: new_timer}}
    else
      {:noreply, %{state | buffer: new_buffer}}
    end
  end
Enter fullscreen mode Exit fullscreen mode

All this is doing is:

  • Inserting a new event into the internal state's buffer, by prepending it to the list
  • Checking if the buffer has reached capacity, and if so,
    • cancels the current timer to prevent the next automatic flush from happening
    • performs the flush operation manually with do_flush/1
    • sets up a new timer which will trigger the next automatic flush in 5 seconds
    • resets the internal state's buffer to an empty list, and updates the timer as well
  • If the buffer is not at capacity, it simply updates the internal state's buffer

The reason this is handled in a handle_cast/2 callback rather than a handle_call/3 callback is so that the client can execute this asynchronously, without blocking on the completion of a potential do_flush/1 method invocation. (As we'll see below, the "flushing"/saving of 10,000 events is a potentially expensive operation that could take a few seconds to complete.)

Let's take a quick look at the do_flush/1 method to see what's going on there:

  defp do_flush(buffer) do
    case buffer do
      [] ->
        nil

      events ->
        Logger.info("Flushing #{length(events)} events")
        events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
        Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
    end
  end
Enter fullscreen mode Exit fullscreen mode

This method is just standard Elixir. All it's doing is taking the events in the buffer, formatting them into plain maps (rather than structs), and then persisting them to the database (ClickHouse). (If we wanted a slightly more explicit name for this method, we could call it something like save_to_clickhouse/1 instead.)

Note that if we wanted to save some whitespace, this could also be written like:

  defp do_flush([]), do: nil

  defp do_flush(events) do
    Logger.info("Flushing #{length(events)} events")
    events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
    Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
  end
Enter fullscreen mode Exit fullscreen mode

Hooray for pattern matching!

The handle_call/3 callback

The handle_call/3 callback is invoked when GenServer.call/3 is called, to handle synchronous messages. Note that GenServer.call/3 will block until a reply is received (unless the call times out or nodes are disconnected).

handle_call callback diagram

Here's the method definition from above:

  @impl true
  def handle_call(:flush, _from, %{buffer: buffer, timer: timer} = _state) do
    Process.cancel_timer(timer)
    do_flush(buffer)
    new_timer = Process.send_after(self(), :tick, @flush_interval_ms)

    {:reply, nil, %{buffer: [], timer: new_timer}}
  end
Enter fullscreen mode Exit fullscreen mode

This code looks quite similar to the logic above, in the handle_cast/2 callback. In this case, we're allow the :flush event to be called manually, which takes whatever is currently in the state's buffer and saves it to the database (in the do_flush/1 method). Once again, the buffer is reset to an empty list, and timer is reset as well.

The handle_info/2 callback

The handle_info/2 callback is invoked to handle all other messages (i.e. outside of those triggered by GenServer.call/3 and GenServer.cast/2). In our case, we use it to handle messages passed around internally within the GenServer itself.

handle_info callback diagram

Here's the method definition from above:

  @impl true
  def handle_info(:tick, %{buffer: buffer} = _state) do
    do_flush(buffer)
    timer = Process.send_after(self(), :tick, @flush_interval_ms)

    {:noreply, %{buffer: [], timer: timer}}
  end
Enter fullscreen mode Exit fullscreen mode

The :tick handles flushing the events in our buffer at a regular interval. (In this case, every 5 seconds.)

It does this by recursively calling itself with Process.send_after(self(), :tick, @flush_interval_ms)

The terminate/2 callback

This callback is invoked when the server is about to exit, and can handle any clean-up tasks.

Here's the method definition from above:

  @impl true
  def terminate(_reason, %{buffer: buffer} = _state) do
    Logger.info("Flushing event buffer before shutdown...")
    do_flush(buffer)
  end
Enter fullscreen mode Exit fullscreen mode

By trapping exists in the init/1 callback above, we can ensure that before this GenServer process terminates, we flush anything that's left in our buffer. Otherwise, we might end up in a situation where the process is terminated before some of the events in memory don't have a chance to get saved to the database, and would be lost.

The GenServer Client APIs

The APIs exposed to the client are mostly boilerplate, and pretty straightforward:

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def insert(event) do
    GenServer.cast(__MODULE__, {:insert, event})

    {:ok, event}
  end

  def flush() do
    GenServer.call(__MODULE__, :flush, :infinity)

    :ok
  end
Enter fullscreen mode Exit fullscreen mode

Let's break it down!

Plausible.Event.WriteBuffer.start_link/1

This is probably the least intuitive of the three methods above — once we understand the code here, the rest should more or less fall into place.

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end
Enter fullscreen mode Exit fullscreen mode

As you might guess, the start_link/1 method is responsible for, well, starting the GenServer process. All it's doing is calling start_link/3 on GenServer itself, with a few strange arguments:

  • __MODULE__, which is simply an alias for the module itself (in this case, Plausible.Event.WriteBuffer). This first argument passes the Plausible.Event.WriteBuffer in as the callback module, so that we can use the server callbacks we implemented above when certain messages are passed to the server process via GenServer.call and GenServer.cast.
    • Note that calling GenServer.start_link(Plausible.Event.WriteBuffer, [], name: Plausible.Event.WriteBuffer) would result in identical behavior. The main advantage of using __MODULE__ is if we decide to rename the module, we only have to update it in one place!
  • An empty list (i.e. []) is passed in as the second argument. This is passed to our init/1 callback as the default buffer. We'll take a look at this in more detail below.
  • The last argument is a keyword list of options. Here, we assign the process a name with name: MODULE (which is the same as name: Plausible.Event.WriteBuffer). This allows us to pass messages to the GenServer using the module name (i.e GenServer.call(MODULE, message, timeout)/GenServer.cast(MODULE, message)). Otherwise, after starting the GenServer, we would have to keep a reference to its process ID (or "PID", which is returned in the start_link/1 method), and then pass it in explicitly. (This is a very common practice.)

(GenServer also has a method called simply start/3, which takes the same arguments as start_link/3 — the only difference is that start_link/3 "links" the GenServer process to the current process, which is useful when we want to start the GenServer as part of a "supervision tree". By starting a process with start_link under a Supervisor, it allows the application to monitor the process for any errors/crashes, and automatically restart the process if necessary.)

In practice, this is where the process is started: https://github.com/plausible/analytics/blob/master/lib/plausible/application.ex

Here's the relevant code below:

defmodule Plausible.Application do
  use Application

  def start(_type, _args) do
    children = [
      # ...
      Plausible.Event.WriteBuffer,
      # ...
    ]

    opts = [strategy: :one_for_one, name: Plausible.Supervisor]
    # ...
    Supervisor.start_link(children, opts)
  end

  # ...
end
Enter fullscreen mode Exit fullscreen mode

GenServer's start_link/3 method is what enables us to start the process under this Supervisor. (When we include Plausible.Event.WriteBuffer amongst the children of the supervisor, its start_link/3 method is invoked automatically when Supervisor.start_link/2 is called.)

Hopefully at this point, we have a pretty good understanding of what's going on in Plausible.Event.WriteBuffer.start_link/1! Now all that's left to cover are the two methods exposed in our client API: insert/1 and flush/0

Plausible.Event.WriteBuffer.insert/1

One last time, here's the insert/1 method:

  def insert(event) do
    GenServer.cast(__MODULE__, {:insert, event})

    {:ok, event}
  end
Enter fullscreen mode Exit fullscreen mode

Here we see that all this is doing is taking an event and sending an asynchronous request to our GenServer via the cast/2 method. (Since we're using cast/2 here instead of call/3, we know that this is a non-blocking call which will return immediately, regardless of what happens in the callback.)

The first argument of GenServer.cast/2 takes a process ID (i.e. PID) or a registered server name. In this case, we set the name to __MODULE__ in the 3rd argument of our GenServer.start_link/3 above, which is why we're using that here. (This is a very common practice with GenServers.)

The second argument is the "message" we want to send, often in the form of a tuple or an atom. Here we're sending {:insert, event}, which will be handled in the handle_cast({:insert, event}, ...) callback discussed above.

Since we don't wait for a return value, the insert/1 method just returns an echo of the event that was passed in, in the form of {:ok, event}.

Plausible.Event.WriteBuffer.flush/0

We're almost done! Let's take a look at the flush/0 method:

  def flush() do
    GenServer.call(__MODULE__, :flush, :infinity)

    :ok
  end
Enter fullscreen mode Exit fullscreen mode

This is quite similar to what we saw in insert/1. The only differences are:

  • GenServer.call/3, unlike GenServer.cast/2, blocks the process until the callback completes. So if the :flush event ends up taking 10 seconds to execute, the flush/0 method will not return :ok until 10 seconds have passed.
  • call/3 takes a third argument, which we see here is :infinity. This represents the maximum amount of time we allow the method to take before a timeout error occurs. By default, this is set to 5_000, or 5 seconds. By passing in :infinity, we're allowing the process to take as long as it needs to complete.

Reviewing it all

Let's take one last look at the full module definition, to double check that we understand everything that's going on. I've added some comments in the code to help out a bit. 😉

defmodule Plausible.Event.WriteBuffer do
  use GenServer
  require Logger

  # Flush/save every 5 seconds
  @flush_interval_ms 5_000

  # Allow a maximum of 10,000 events in our buffer list
  @max_buffer_size 10_000

  ############################################################################
  # Client APIs
  ############################################################################

  @doc """
  Starts a linked GenServer process, passing in the current module
  (`__MODULE__` == `Plausible.Event.WriteBuffer`) as both the callback module
  and the alias/name of the server process for future reference.

  We also pass in an empty list (`[]`) as the initial value of the internal
  state's `buffer` (handled in the `init/1` callback below).
  """
  def start_link(_opts) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  @doc """
  Sends an `:insert` message asynchronously to the current module's GenServer
  process, which adds the provided `event` to the internal state's `buffer`.
  """
  def insert(event) do
    GenServer.cast(__MODULE__, {:insert, event})

    {:ok, event}
  end

  @doc """
  Sends a (synchronous) `:flush` message to the current module's GenServer
  process, which manually "flushes" all events in the internal state's `buffer`
  to the database.

  Sets the `timeout` option to `:infinity`, allowing this call to take as long
  as it needs to complete.
  """
  def flush() do
    GenServer.call(__MODULE__, :flush, :infinity)

    :ok
  end

  ############################################################################
  # Server callbacks
  ############################################################################

  @doc """
  Sets up the initial state of the GenServer, and traps exits so that we can
  gracefully handle a process termination in our `terminate/2` callback below.
  """
  @impl true
  def init(buffer) do
    Process.flag(:trap_exit, true)
    timer = Process.send_after(self(), :tick, @flush_interval_ms)

    {:ok, %{buffer: buffer, timer: timer}}
  end

  @doc """
  Callback that handles `:insert` messages passed to `GenServer.cast/2`.

  In this callback, we add the new event to the internal state's `buffer`.
  If we reach the maximum capacity of the `buffer`, we "flush" it to the database
  and reset our state. Otherwise, we simply update the internal state with the
  updated `buffer`.
  """
  @impl true
  def handle_cast({:insert, event}, %{buffer: buffer, timer: timer} = _state) do
    new_buffer = [event | buffer]

    if length(new_buffer) >= @max_buffer_size do
      Logger.info("Buffer full, flushing to disk")
      Process.cancel_timer(timer)
      do_flush(new_buffer)
      new_timer = Process.send_after(self(), :tick, @flush_interval_ms)

      {:noreply, %{buffer: [], timer: new_timer}}
    else
      {:noreply, %{state | buffer: new_buffer}}
    end
  end

  @doc """
  Callback that handles internal `:tick` messages.

  In this callback, we automatically flush whatever is in the `buffer`
  every 5 seconds (i.e. `@flush_interval_ms`).
  """
  @impl true
  def handle_info(:tick, %{buffer: buffer} = _state) do
    do_flush(buffer)
    timer = Process.send_after(self(), :tick, @flush_interval_ms)

    {:noreply, %{buffer: [], timer: timer}}
  end

  @doc """
  Callback that handles `:flush` messages passed to `GenServer.call/3`.

  In this callback, we handle manually flushing whatever is currently in
  the internal state's `buffer` to the database.
  """
  @impl true
  def handle_call(:flush, _from, %{buffer: buffer, timer: timer} = _state) do
    Process.cancel_timer(timer)
    do_flush(buffer)
    new_timer = Process.send_after(self(), :tick, @flush_interval_ms)

    {:reply, nil, %{buffer: [], timer: new_timer}}
  end

  @doc """
  Callback that gets invoked when the process is about to shut down.

  In this callback, we make sure to flush whatever is currently in the internal
  state's `buffer` to the database before the process terminates.
  """
  @impl true
  def terminate(_reason, %{buffer: buffer} = _state) do
    Logger.info("Flushing event buffer before shutdown...")
    do_flush(buffer)
  end

  ############################################################################
  # Private/utility methods
  ############################################################################

  @doc """
  Perform a "flush" of the events in the `buffer` to the database. By "flush",
  we simply mean we do a bulk insertion of the events into ClickHouse (the db).
  """
  defp do_flush(buffer) do
    case buffer do
      [] ->
        nil

      events ->
        Logger.info("Flushing #{length(events)} events")
        events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
        Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

If you still don't understand something or think I missed something, feel free to reach out to me on our public Slack channel. 😬

That's all for now!

If you're interested in contributing to an open source Elixir project, come check out Papercups on Github!

Top comments (2)

Collapse
 
silvagustin profile image
Agustin

Thank you for such a well elaborated article.

Collapse
 
rafaltrojanowski profile image
Rafał Trojanowski

Great explanation. Thanks!