I started learning Elixir a few months ago, mostly through hacking on Papercups. I'm ashamed to say most of my Elixir education has been through trial and error, figuring things out as I go along. So this past week I decided to take some time off from Papercups to go a bit deeper into Elixir.
In particular, I was itching to learn more about handling concurrency in Elixir. This, of course, led me to GenServers.
Anyone who's moderately familiar with Elixir has probably at least heard of GenServer
. (If you haven't, that's ok too!) Strictly speaking, it's one of those things you can get away with avoiding for a while, and still be reasonably productive while using something like the Phoenix framework.
But it's an incredible useful feature of the language that you can add to your toolbox! There are so many nice things you can do with it. For example, with GenServers, you can:
- create a simple cache, buffer, or rate limiter
- set up recurring tasks to run every few hours (e.g. check out Jose's answer to "How can I schedule code to run every few hours in Elixir?" on Stack Overflow)
- handle async processes much more easily, with built-in functionality for tracing, error reporting, and retry logic
...all without any external dependencies!
What is a GenServer, anyway?
A GenServer is a process, just like any other Elixir process. It can be used to manage state and execute code asynchronously, and includes some handy functionality around tracing and error reporting.
Another way of thinking about it — a GenServer is like an isolated little box, with a few things inside of it:
- its internal state (which can be any Elixir data structure, from a simple number to a complex map/struct),
- a mailbox to process incoming messages (in the order they're received),
- and a callback module, which defines the code to run depending on the message that is received in the mailbox
If that's confusing, hopefully a real-world example will help clarify things! 😉
Finding a good real-world example
While learning about GenServers, I found many of the examples to be a bit contrived... a lot of "key-value" stores, shopping lists, and things like that. I wanted to understand how GenServers are being used "out in the wild", so I turned to one of my favorite Elixir open-source repos: https://github.com/plausible/analytics
A bit of background: Plausible is an open-source analytics platform, with >1000 paying customers. Because they are a real-world production application serving many users, I figured I could learn a lot from diagnosing one of their modules. 🤓
For the sake of this post, I'm going to go through one of the simpler GenServers in their repo: Plausible.Event.WriteBuffer, which can be found at /lib/plausible/event/write_buffer.ex
What does it do?
At a high level, this GenServer allows Plausible to insert large quantities of events into the database in batches, rather than doing an insertion for each individual event. (This seems particularly important for a product like Plausible's, where they are potentially ingesting thousands of events per second!)
At the code level, what this process is doing is essentially:
- ingesting events via the
insert/1
method - adding them to a
buffer
in the process's internal state, represented by a list (i.e.[]
) - "flushing" the buffer every 5 seconds, or if it reaches its capacity of 10,000 events — whichever comes first
- where "flushing" means saving all the events in the buffer to a database (in this case, ClickHouse)
(For the full context, let's take a look at the codebase: we can see that the Plausible.Event.WriteBuffer
GenServer is used in the PlausibleWeb.Api.ExternalController.event/2
method, which handles the logic for the POST /api/event
API endpoint, which in turn gets called from their JavaScript tracking snippet.)
The code
Before we jump into it, here's the full module, with some minor stylistic tweaks and annotations I've added myself for some additional context:
defmodule Plausible.Event.WriteBuffer do
use GenServer
require Logger
@flush_interval_ms 5_000
@max_buffer_size 10_000
# Client APIs
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def insert(event) do
GenServer.cast(__MODULE__, {:insert, event})
{:ok, event}
end
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
# Server (callbacks)
@impl true
def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:ok, %{buffer: buffer, timer: timer}}
end
@impl true
def handle_cast({:insert, event}, %{buffer: buffer, timer: timer} = _state) do
new_buffer = [event | buffer]
if length(new_buffer) >= @max_buffer_size do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(timer)
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
end
end
@impl true
def handle_info(:tick, %{buffer: buffer} = _state) do
do_flush(buffer)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: timer}}
end
@impl true
def handle_call(:flush, _from, %{buffer: buffer, timer: timer} = _state) do
Process.cancel_timer(timer)
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:reply, nil, %{buffer: [], timer: new_timer}}
end
@impl true
def terminate(_reason, %{buffer: buffer} = _state) do
Logger.info("Flushing event buffer before shutdown...")
do_flush(buffer)
end
# Private/utility methods
defp do_flush(buffer) do
case buffer do
[] ->
nil
events ->
Logger.info("Flushing #{length(events)} events")
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
end
end
end
Note: If you're able to read the code above and know exactly what's going on at every step, you probably don't need to read any further!
Let's start off where the core of the logic actually lives: in the GenServer callbacks.
The GenServer callbacks
The callbacks implemented in this particular module are:
-
init/1
(required) - invoked when the server is started, and sets the initial state -
handle_cast/2
- invoked whenGenServer.cast/2
is called, to handle messages asynchronously -
handle_call/3
- invoked whenGenServer.call/3
is called, to handle messages synchronously-
GenServer.call/3
will block until a reply is received (unless the call times out or nodes are disconnected)
-
-
handle_info/2
- invoked to handle all other messages (i.e. outside of those triggered byGenServer.call/3
andGenServer.cast/2
, and "system" messages)- e.g. messages/events handled internally within the GenServer
-
terminate/2
- invoked when the GenServer is about to exit
Let's go through each one! 🚀
The init/1
callback
The init/1
callback is invoked when the GenServer is started, and handles setting the initial internal state of the server process.
Here's the method definition from above:
@impl true
def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:ok, %{buffer: buffer, timer: timer}}
end
The first line of this method (Process.flag(:trap_exit, true)
) sets us up to "trap exits". This allows us to handle any "clean-up" tasks before the process terminates, in the terminate/2
callback. (We'll discuss this below!)
Next, we set up a timer to run the :tick
event in 5 seconds (@flush_interval_ms == 5_000
). As we'll see below, this :tick
event also calls itself recursively, so that it effectively runs every 5 seconds while the server is alive. (That explains why it's called "tick"!)
Finally, we store the initial buffer
(in this case, an empty list, i.e. []
) and the timer
in the internal state.
The handle_cast/2
callback
The handle_cast/2
callback is invoked when GenServer.cast/2
is called, to handle asynchronous requests.
Here's the method definition from above:
@impl true
def handle_cast({:insert, event}, %{buffer: buffer, timer: timer} = _state) do
new_buffer = [event | buffer]
if length(new_buffer) >= @max_buffer_size do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(timer)
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
end
end
All this is doing is:
- Inserting a new event into the internal state's
buffer
, by prepending it to the list - Checking if the buffer has reached capacity, and if so,
- cancels the current timer to prevent the next automatic
flush
from happening - performs the
flush
operation manually withdo_flush/1
- sets up a new timer which will trigger the next automatic
flush
in 5 seconds - resets the internal state's
buffer
to an empty list, and updates the timer as well
- cancels the current timer to prevent the next automatic
- If the buffer is not at capacity, it simply updates the internal state's
buffer
The reason this is handled in a handle_cast/2
callback rather than a handle_call/3
callback is so that the client can execute this asynchronously, without blocking on the completion of a potential do_flush/1
method invocation. (As we'll see below, the "flushing"/saving of 10,000 events is a potentially expensive operation that could take a few seconds to complete.)
Let's take a quick look at the do_flush/1
method to see what's going on there:
defp do_flush(buffer) do
case buffer do
[] ->
nil
events ->
Logger.info("Flushing #{length(events)} events")
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
end
end
This method is just standard Elixir. All it's doing is taking the events in the buffer, formatting them into plain maps (rather than structs), and then persisting them to the database (ClickHouse). (If we wanted a slightly more explicit name for this method, we could call it something like save_to_clickhouse/1
instead.)
Note that if we wanted to save some whitespace, this could also be written like:
defp do_flush([]), do: nil
defp do_flush(events) do
Logger.info("Flushing #{length(events)} events")
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
end
Hooray for pattern matching!
The handle_call/3
callback
The handle_call/3
callback is invoked when GenServer.call/3
is called, to handle synchronous messages. Note that GenServer.call/3
will block until a reply is received (unless the call times out or nodes are disconnected).
Here's the method definition from above:
@impl true
def handle_call(:flush, _from, %{buffer: buffer, timer: timer} = _state) do
Process.cancel_timer(timer)
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:reply, nil, %{buffer: [], timer: new_timer}}
end
This code looks quite similar to the logic above, in the handle_cast/2
callback. In this case, we're allow the :flush
event to be called manually, which takes whatever is currently in the state's buffer
and saves it to the database (in the do_flush/1
method). Once again, the buffer
is reset to an empty list, and timer
is reset as well.
The handle_info/2
callback
The handle_info/2
callback is invoked to handle all other messages (i.e. outside of those triggered by GenServer.call/3
and GenServer.cast/2
). In our case, we use it to handle messages passed around internally within the GenServer itself.
Here's the method definition from above:
@impl true
def handle_info(:tick, %{buffer: buffer} = _state) do
do_flush(buffer)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: timer}}
end
The :tick
handles flushing the events in our buffer
at a regular interval. (In this case, every 5 seconds.)
It does this by recursively calling itself with Process.send_after(self(), :tick, @flush_interval_ms)
The terminate/2
callback
This callback is invoked when the server is about to exit, and can handle any clean-up tasks.
Here's the method definition from above:
@impl true
def terminate(_reason, %{buffer: buffer} = _state) do
Logger.info("Flushing event buffer before shutdown...")
do_flush(buffer)
end
By trapping exists in the init/1
callback above, we can ensure that before this GenServer process terminates, we flush anything that's left in our buffer
. Otherwise, we might end up in a situation where the process is terminated before some of the events in memory don't have a chance to get saved to the database, and would be lost.
The GenServer Client APIs
The APIs exposed to the client are mostly boilerplate, and pretty straightforward:
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def insert(event) do
GenServer.cast(__MODULE__, {:insert, event})
{:ok, event}
end
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
Let's break it down!
Plausible.Event.WriteBuffer.start_link/1
This is probably the least intuitive of the three methods above — once we understand the code here, the rest should more or less fall into place.
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
As you might guess, the start_link/1
method is responsible for, well, starting the GenServer process. All it's doing is calling start_link/3
on GenServer
itself, with a few strange arguments:
-
__MODULE__
, which is simply an alias for the module itself (in this case,Plausible.Event.WriteBuffer
). This first argument passes thePlausible.Event.WriteBuffer
in as the callback module, so that we can use the server callbacks we implemented above when certain messages are passed to the server process viaGenServer.call
andGenServer.cast
.- Note that calling
GenServer.start_link(Plausible.Event.WriteBuffer, [], name: Plausible.Event.WriteBuffer)
would result in identical behavior. The main advantage of using__MODULE__
is if we decide to rename the module, we only have to update it in one place!
- Note that calling
- An empty list (i.e.
[]
) is passed in as the second argument. This is passed to ourinit/1
callback as the defaultbuffer
. We'll take a look at this in more detail below. - The last argument is a keyword list of options. Here, we assign the process a name with
name: MODULE
(which is the same asname: Plausible.Event.WriteBuffer
). This allows us to pass messages to the GenServer using the module name (i.eGenServer.call(MODULE, message, timeout)
/GenServer.cast(MODULE, message)
). Otherwise, after starting the GenServer, we would have to keep a reference to its process ID (or "PID", which is returned in thestart_link/1
method), and then pass it in explicitly. (This is a very common practice.)
(GenServer also has a method called simply start/3
, which takes the same arguments as start_link/3
— the only difference is that start_link/3
"links" the GenServer process to the current process, which is useful when we want to start the GenServer as part of a "supervision tree". By starting a process with start_link
under a Supervisor
, it allows the application to monitor the process for any errors/crashes, and automatically restart the process if necessary.)
In practice, this is where the process is started: https://github.com/plausible/analytics/blob/master/lib/plausible/application.ex
Here's the relevant code below:
defmodule Plausible.Application do
use Application
def start(_type, _args) do
children = [
# ...
Plausible.Event.WriteBuffer,
# ...
]
opts = [strategy: :one_for_one, name: Plausible.Supervisor]
# ...
Supervisor.start_link(children, opts)
end
# ...
end
GenServer's start_link/3
method is what enables us to start the process under this Supervisor
. (When we include Plausible.Event.WriteBuffer
amongst the children of the supervisor, its start_link/3
method is invoked automatically when Supervisor.start_link/2
is called.)
Hopefully at this point, we have a pretty good understanding of what's going on in Plausible.Event.WriteBuffer.start_link/1
! Now all that's left to cover are the two methods exposed in our client API: insert/1
and flush/0
Plausible.Event.WriteBuffer.insert/1
One last time, here's the insert/1
method:
def insert(event) do
GenServer.cast(__MODULE__, {:insert, event})
{:ok, event}
end
Here we see that all this is doing is taking an event
and sending an asynchronous request to our GenServer via the cast/2
method. (Since we're using cast/2
here instead of call/3
, we know that this is a non-blocking call which will return immediately, regardless of what happens in the callback.)
The first argument of GenServer.cast/2
takes a process ID (i.e. PID) or a registered server name. In this case, we set the name to __MODULE__
in the 3rd argument of our GenServer.start_link/3
above, which is why we're using that here. (This is a very common practice with GenServers.)
The second argument is the "message" we want to send, often in the form of a tuple or an atom. Here we're sending {:insert, event}
, which will be handled in the handle_cast({:insert, event}, ...)
callback discussed above.
Since we don't wait for a return value, the insert/1
method just returns an echo of the event that was passed in, in the form of {:ok, event}
.
Plausible.Event.WriteBuffer.flush/0
We're almost done! Let's take a look at the flush/0
method:
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
This is quite similar to what we saw in insert/1
. The only differences are:
-
GenServer.call/3
, unlikeGenServer.cast/2
, blocks the process until the callback completes. So if the:flush
event ends up taking 10 seconds to execute, theflush/0
method will not return:ok
until 10 seconds have passed. -
call/3
takes a third argument, which we see here is:infinity
. This represents the maximum amount of time we allow the method to take before a timeout error occurs. By default, this is set to5_000
, or 5 seconds. By passing in:infinity
, we're allowing the process to take as long as it needs to complete.
Reviewing it all
Let's take one last look at the full module definition, to double check that we understand everything that's going on. I've added some comments in the code to help out a bit. 😉
defmodule Plausible.Event.WriteBuffer do
use GenServer
require Logger
# Flush/save every 5 seconds
@flush_interval_ms 5_000
# Allow a maximum of 10,000 events in our buffer list
@max_buffer_size 10_000
############################################################################
# Client APIs
############################################################################
@doc """
Starts a linked GenServer process, passing in the current module
(`__MODULE__` == `Plausible.Event.WriteBuffer`) as both the callback module
and the alias/name of the server process for future reference.
We also pass in an empty list (`[]`) as the initial value of the internal
state's `buffer` (handled in the `init/1` callback below).
"""
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
@doc """
Sends an `:insert` message asynchronously to the current module's GenServer
process, which adds the provided `event` to the internal state's `buffer`.
"""
def insert(event) do
GenServer.cast(__MODULE__, {:insert, event})
{:ok, event}
end
@doc """
Sends a (synchronous) `:flush` message to the current module's GenServer
process, which manually "flushes" all events in the internal state's `buffer`
to the database.
Sets the `timeout` option to `:infinity`, allowing this call to take as long
as it needs to complete.
"""
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
############################################################################
# Server callbacks
############################################################################
@doc """
Sets up the initial state of the GenServer, and traps exits so that we can
gracefully handle a process termination in our `terminate/2` callback below.
"""
@impl true
def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:ok, %{buffer: buffer, timer: timer}}
end
@doc """
Callback that handles `:insert` messages passed to `GenServer.cast/2`.
In this callback, we add the new event to the internal state's `buffer`.
If we reach the maximum capacity of the `buffer`, we "flush" it to the database
and reset our state. Otherwise, we simply update the internal state with the
updated `buffer`.
"""
@impl true
def handle_cast({:insert, event}, %{buffer: buffer, timer: timer} = _state) do
new_buffer = [event | buffer]
if length(new_buffer) >= @max_buffer_size do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(timer)
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
end
end
@doc """
Callback that handles internal `:tick` messages.
In this callback, we automatically flush whatever is in the `buffer`
every 5 seconds (i.e. `@flush_interval_ms`).
"""
@impl true
def handle_info(:tick, %{buffer: buffer} = _state) do
do_flush(buffer)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: timer}}
end
@doc """
Callback that handles `:flush` messages passed to `GenServer.call/3`.
In this callback, we handle manually flushing whatever is currently in
the internal state's `buffer` to the database.
"""
@impl true
def handle_call(:flush, _from, %{buffer: buffer, timer: timer} = _state) do
Process.cancel_timer(timer)
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:reply, nil, %{buffer: [], timer: new_timer}}
end
@doc """
Callback that gets invoked when the process is about to shut down.
In this callback, we make sure to flush whatever is currently in the internal
state's `buffer` to the database before the process terminates.
"""
@impl true
def terminate(_reason, %{buffer: buffer} = _state) do
Logger.info("Flushing event buffer before shutdown...")
do_flush(buffer)
end
############################################################################
# Private/utility methods
############################################################################
@doc """
Perform a "flush" of the events in the `buffer` to the database. By "flush",
we simply mean we do a bulk insertion of the events into ClickHouse (the db).
"""
defp do_flush(buffer) do
case buffer do
[] ->
nil
events ->
Logger.info("Flushing #{length(events)} events")
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
end
end
end
If you still don't understand something or think I missed something, feel free to reach out to me on our public Slack channel. 😬
That's all for now!
If you're interested in contributing to an open source Elixir project, come check out Papercups on Github!
Top comments (3)
Thank you for such a well elaborated article.
Great explanation. Thanks!
A really great article, thank you !