DEV Community

Cover image for Creation of a job processing library using Elixir and Redis
Volodymyr Potiichuk
Volodymyr Potiichuk

Posted on

Creation of a job processing library using Elixir and Redis

Today I will cover how you can build your job processing library and understand the core aspects of creating such a tool. This article adds to the previous one about queue data structure. I want to demonstrate a real-world example of using queues and build a job processing library that others can use in their projects.

A tool that we are going to build

Today we are going to build a library that will be able to execute tasks from a queue by a separate process (worker), store these tasks persistently in Redis, and have the ability to check statistics of processed, failed, and scheduled tasks.

To develop this library, we need to familiarize ourselves with the terminology:

  • Job

Simply put, a job is any function we put in the queue and want to execute.

  • Worker

A worker is an independent process that performs jobs that are queued. Usually, there can be several workers to process tasks faster, but for simplicity of the code we will create functionality for only one worker, but it will be more than enough.

Prerequisites

There are several prerequisites for building such a tool, and they’re chosen for a reason, let’s look at why:

Redis is an open-source, in-memory application that we will use as a database for data persistence. The main advantage of Redis is the speed of executing operations because it works straight in RAM (Random Access Memory) and you don’t need to write and read a disk whenever you need some data.

Elixir is a dynamic, scalable, functional programming language that is especially good for such applications due to its fault-tolerant behavior.

Also, some Elixir libraries will help us on our way:

Simple client for Redis, to have the ability to write Redis requests in Elixir language.

Jason is a library that allows you to encode and parse a JSON structure.

A library that will allow us to generate unique IDs for our tasks so that we can distinguish them by a unique identifier.

Project structure

To build such a library, let’s understand what basic modules we need for our application to work and I assume that we need to create the following ones:

  • Enqueuer

This module will be responsible for putting the jobs into the queue.

  • Dequeuer

This module will be responsible for taking jobs from the queue.

  • Job

The module is responsible for serialization, deserialization, and data preparation for the queue.

  • Stats

This module will be able to show us statistics on processed, failed, and scheduled tasks.

  • QueueWorker

This module is responsible for processing jobs from the queues.

Flow

Before we start implementing this via code, I would like to show how our application is supposed to work with two flow diagrams:

Client flow diagram

Here is how our library will work when the client creates a new task:

  1. The client provides raw data about the task to the Enqueuer module.

  2. The Enqueuer module asks the Job module to construct job data and serialize it to JSON.

  3. The Enqueuer module passes the generated JSON to the Queue module.

  4. The Queue module saves JSON in the database using the Database module.

  5. We send an asynchronous message to the worker that we have a task in this queue, and if the worker is up and available, he can process it.

  6. Return the created job to the client.

And the second one from the queue worker perspective:

Queue worker flow diagram

  1. Queue worker checks all queues under the library namespace in Redis.

  2. Queue worker takes the first queue he finds and peeks elements from the start of the queue (FIFO principle).

  3. Queue worker parses the job from JSON to struct via the Job module

  4. Queue worker starts to process the parsed job

  5. When the job is completed, the queue worker moves the job to a different queue based on the result, either failed or processed.

  6. Recursively repeat from the second point until the queue is empty, after that we can go to the next queue.

  7. After all tasks are completed, the worker process remains in standby mode, and as soon as a new task appears, it will be ready to process it. If the worker is not enabled at the moment of task enqueue, all such tasks will be processed at the moment of worker startup.

Project initialization

Let’s start with the creation of the project. We will use the Mix tool, which shipped with Elixir, and use the “mix new ” command for project initiation:

mix new processing_library
Enter fullscreen mode Exit fullscreen mode

After command execution, we should get a ready-made set for developing our projects, namely:

  • folders structure

  • dependency management

  • building tooling

  • test environment

and in my case, there is such structure with such file naming:

“mix new <project_name” command result

After successful project initiation, we can add the dependencies to the “mix.exs” file:

defp deps do
[
{:redix, "~> 1.1"},
{:jason, "~> 1.4"},
{:uuid, "~> 1.1"}
]
end
view raw mix.exs hosted with ❤ by GitHub

and install them as well with the Mix tool:

mix deps.get
Enter fullscreen mode Exit fullscreen mode

Also, we need to start Redis, I usually do it through the docker engine and docker-compose utility. Still, you can do it through any convenient installer like homebrew or asdf. Anyway, here’s my docker-compose.yml file:

services:
redis:
image: redis:alpine
container_name: redis
ports:
- "6379:6379"

This configuration will start Redis on Linux Alpine inside the container on port 6379 and we will redirect it to our local host on port 6379. To run it, you need to start the container via docker-compose CLI:

docker-compose up redis
Enter fullscreen mode Exit fullscreen mode

And if everything goes well, you will face something like this:

Successful project bootstrap

Implementation

I suggest we start our journey with the module responsible for the tasks:

defmodule ProcessingLibrary.Job do
@derive {Jason.Encoder, only: [:params, :worker_module, :jid, :error, :start_at, :finish_at]}
defstruct params: [], worker_module: nil, jid: nil, error: nil, start_at: nil, finish_at: nil
def construct(worker_module, params) do
%ProcessingLibrary.Job{
params: params,
worker_module: worker_module,
jid: UUID.uuid4()
}
end
def encode(%ProcessingLibrary.Job{} = job), do: Jason.encode!(job)
def decode(job_json),
do: struct(ProcessingLibrary.Job, Jason.decode!(job_json, keys: :atoms))
end
view raw job.ex hosted with ❤ by GitHub

There, we define the job structure and functions accountable for serialization, deserialization, and job creation from raw data. The structure of the job in our case will have the following fields:

  • params

These are variables that will be passed to the job function as parameters:

# client side
ProcessingLibrary.enqueue("queue_name", ProcessingLibrary.DummyWorker, ["param1", "param2", "param3"])
# queue worker side
ProcessingLibrary.DummyWorker.perform("param1", "param2", "param3")
view raw console.ex hosted with ❤ by GitHub
  • worker_module

This is an abstract module responsible for executing a particular task, and every module that inherits it through behavior directive must implement the “perform” function that the queue worker will eventually call:

defmodule ProcessingLibrary.Worker do
@callback perform(any()) :: any()
end
view raw worker.ex hosted with ❤ by GitHub
  • jid (Job ID)

For each task, we will assign a job ID to be able to distinguish them by a unique identifier.

  • error

Since a job can also break during execution, saving the error in the job object would be a good idea.

  • start_at, finish_at

These time stamps will indicate how long it took the worker to complete the task.

Next, we want to create a module that is a simple command wrapper around the Redix library. This library does not provide high-level functions, only Redix.command/3, an alternative to writing commands directly in the redis-cli client. For a better understanding of what we want to do, let’s show an example of viewing an item in a Redis queue:

# Somewhere inside the bussiness logic...
{:ok, range} = Redix.command(conn, ~w(LRANGE my_queue -1 -1))
if range == [] do
{:ok, nil}
else
[el | _] = range
{:ok, el}
end
# What does this code do???
# Looks like it's hard to understand by first glance.
# Also the guts of the Redis are turned outward in our business logic.
Instead, we will have a function in the Redis module that will be responsible for peeking elements:
# We hide the implementation details behind the Redis interface
ProcessingLibrary.Redis.peek(queue, :rear)
# or
ProcessingLibrary.Redis.peek(queue, :front)
view raw console.ex hosted with ❤ by GitHub
This approach will help us to divide the area of responsibility between files and increase the cohesion of the modules. Good, now let’s take a look at the elements that make up our Redis wrapper module:
defmodule ProcessingLibrary.Redis do
use GenServer
def start_link(_init_arg) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_init_arg) do
{:ok, conn} =
Redix.start_link(
host: ProcessingLibrary.Env.get_redis_host(),
port: ProcessingLibrary.Env.get_redis_port(),
database: ProcessingLibrary.Env.get_redis_database()
)
{:ok, conn}
end
view raw redis.ex hosted with ❤ by GitHub
Here we use GenServer behavior to maintain a single Redis connection and to be able to manage this process through the Elixir supervisor. For the client to customize where the library should look for the Redis process, we provide the ability to configure this data via config settings:
import Config
config :processing_library,
redis_namespace: "processing_library",
redis_host: "localhost",
redis_port: 6379,
redis_database: 1
import_config "#{config_env()}.exs"
view raw config.ex hosted with ❤ by GitHub
In the case of our library, we have several variables and the most interesting one is “redis_namespace”, it allows us to distinguish between library redis data and others. For this purpose, we will have special functions that wrap and unwrap Redis keys in the provided library namespace:
defp extract_key(key),
do: key |> String.split(":", trim: true) |> List.last() |> String.to_atom()
defp namespace_key(key) when is_atom(key), do: namespace_key(key |> Atom.to_string())
defp namespace_key(key) do
namespace = ProcessingLibrary.Env.get_redis_namespace()
if String.contains?(key, namespace) do
key
else
"#{namespace}:#{key}"
end
end
view raw redis.ex hosted with ❤ by GitHub
Using these methods we will create a set of core functions, which we will use in the future (P.S. There is only the public API for simplicity, but if you want to see the full file, click here):
def flush_db() do
GenServer.call(__MODULE__, :flush_db)
end
def get_keys() do
GenServer.call(__MODULE__, :keys)
end
def remove(queue, value) do
GenServer.call(__MODULE__, {:remove, namespace_key(queue), value})
end
def enqueue(queue, value) do
GenServer.call(__MODULE__, {:queue, namespace_key(queue), value})
end
def dequeue(queue) do
GenServer.call(__MODULE__, {:dequeue, namespace_key(queue)})
end
def peek(queue, :rear) do
GenServer.call(__MODULE__, {:peek, :rear, namespace_key(queue)})
end
def peek(queue, :front) do
GenServer.call(__MODULE__, {:peek, :front, namespace_key(queue)})
end
def get_queue(queue) do
GenServer.call(__MODULE__, {:get_queue, namespace_key(queue)})
end
def set(key, value) do
GenServer.call(__MODULE__, {:set, namespace_key(key), value})
end
def get_queues(opts \\ []) do
GenServer.call(__MODULE__, {:get_queues, opts})
end
view raw redis.ex hosted with ❤ by GitHub

Next, with our Redis module, we can create an abstraction module for the database:

defmodule ProcessingLibrary.Database do
defdelegate init(init_arg), to: ProcessingLibrary.Redis, as: :init
defdelegate start_link(init_arg), to: ProcessingLibrary.Redis, as: :start_link
defdelegate get_queues(opts), to: ProcessingLibrary.Redis, as: :get_queues
defdelegate get_queues(), to: ProcessingLibrary.Redis, as: :get_queues
defdelegate get_queue(queue), to: ProcessingLibrary.Redis, as: :get_queue
defdelegate get_keys(), to: ProcessingLibrary.Redis, as: :get_keys
defdelegate set(key, value), to: ProcessingLibrary.Redis, as: :set
defdelegate flush(), to: ProcessingLibrary.Redis, as: :flush_db
end
view raw database.ex hosted with ❤ by GitHub

We are simply delegating our method calls from the database to the Redis module, but why? Why not just call methods from the Redis module directly?

Explanation: Imagine you’re writing a program and you’re relying on the interface Redis provides. At some moment you realize that you need to switch from Redis to PostgreSQL. You start looking at the code and realize that every part of your program depends not on an abstract database, but on Redis, which makes the move very difficult. That’s why we want to hide the implementation details behind a certain interface so that in case of anything, we would have to change only one module instead of the whole program.

The same goes for the queue module:

defmodule ProcessingLibrary.Database.Queue do
defdelegate enqueue(queue, value), to: ProcessingLibrary.Redis, as: :enqueue
defdelegate dequeue(queue), to: ProcessingLibrary.Redis, as: :dequeue
defdelegate remove(queue, value), to: ProcessingLibrary.Redis, as: :remove
defdelegate peek(queue, position), to: ProcessingLibrary.Redis, as: :peek
end
view raw queue.ex hosted with ❤ by GitHub
The output is a simple and clear interface that can be interacted with.

Having an abstraction over the database and the queue, we can start working with the modules that work most closely with the queue, namely the enqueuer and dequeuer modules:

defmodule ProcessingLibrary.Enqueuer do
def enqueue(queue_name, %ProcessingLibrary.Job{} = job_data) do
encoded_job_data = ProcessingLibrary.Job.encode(job_data)
with {:ok, _} <- ProcessingLibrary.Database.Queue.enqueue(queue_name, encoded_job_data),
false <- ProcessingLibrary.is_reserved_queue?(queue_name) do
ProcessingLibrary.QueueWorker.publish_job(queue_name)
{:ok, job_data}
else
error ->
{:error, error}
end
end
def enqueue(queue_name, worker_module, params) do
job_data = ProcessingLibrary.Job.construct(worker_module, params)
enqueue(queue_name, job_data)
end
end
view raw enqueuer.ex hosted with ❤ by GitHub

In this module:

  1. We create a job from incoming parameters

  2. Add it to the Redis queue

  3. If the queue to which we add is not a queue reserved by our library (like processed or failed queues), we send the asynchronous message to the Queue Worker and if it is available it will process it.

Something similar is happening with the dequeuer module:

defmodule ProcessingLibrary.Dequeuer do
def find_job(job_id) do
{:ok, queues} = ProcessingLibrary.Database.get_queues(include_reserved: true)
jobs =
Enum.flat_map(queues, fn queue ->
case ProcessingLibrary.Database.get_queue(queue) do
{:ok, jobs} -> Enum.map(jobs, fn job -> {job, queue} end)
_ -> []
end
end)
job = Enum.find(jobs, fn {job, _queue} -> ProcessingLibrary.Job.decode(job).jid == job_id end)
case job do
nil -> {:error, "Job not found"}
{job, queue} -> {:ok, ProcessingLibrary.Job.decode(job), queue}
end
end
def remove(job_id) do
with {:ok, job, queue} <- find_job(job_id) do
job_json = ProcessingLibrary.Job.encode(job)
ProcessingLibrary.Database.Queue.remove(queue, job_json)
else
_ -> {:error, "Job not found"}
end
end
defdelegate dequeue(queue), to: ProcessingLibrary.Database.Queue, as: :dequeue
end
view raw dequeuer.ex hosted with ❤ by GitHub
The remove/1 function is responsible for pulling an item from the queue without paying attention to the order using the find_job/1 function, it is very convenient if we want to remove a task from the queue that was already scheduled some time ago. The last dequeue/1 function is passed directly to the Queue module since we are completely satisfied with it.

Now we going to the fun part, we’ll move on to the queue worker module:

defmodule ProcessingLibrary.QueueWorker do
use GenServer
require Logger
def init(_) do
{:ok, queues} = ProcessingLibrary.Database.get_queues()
start_processing(queues)
{:ok, %{}}
end
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
view raw queue_worker.ex hosted with ❤ by GitHub
At the very beginning, we request all queues available under our library’s namespace and send them for processing:
def start_processing(queues) do
Enum.each(queues, fn queue ->
process_job(queue)
end)
end
view raw queue_worker.ex hosted with ❤ by GitHub
After that, going through each queue, we synchronously look at the first available task (by FIFO principle), and send it for processing:
def process_job(%ProcessingLibrary.Job{} = job, queue) do
Logger.info("#{log_context(job)} start")
start_time = DateTime.utc_now()
try do
apply(job.worker_module |> String.to_atom(), :perform, [job.params])
finish_time = DateTime.utc_now()
diff_time = DateTime.diff(finish_time, start_time, :millisecond)
Logger.info("#{log_context(job)} )} finished in #{diff_time}ms")
ProcessingLibrary.Enqueuer.enqueue(:processed, %{
job
| start_at: start_time,
finish_at: finish_time
})
rescue
e ->
finish_time = DateTime.utc_now()
Logger.error("#{log_context(job)})} failed with exception:\n#{Exception.message(e)}")
ProcessingLibrary.Enqueuer.enqueue(:failed, %{
job
| error: ~c"#{Exception.message(e)}",
start_at: start_time,
finish_at: finish_time
})
end
ProcessingLibrary.Dequeuer.dequeue(queue)
process_job(queue)
end
def process_job(queue) do
{:ok, job_json} = ProcessingLibrary.Database.Queue.peek(queue, :front)
if not is_nil(job_json) do
job_json |> ProcessingLibrary.Job.decode() |> process_job(queue)
end
end
view raw queue_worker.ex hosted with ❤ by GitHub
At the moment of job processing:
  1. We save timestamps and log information about the job start to the I/O.

  2. Execute job through Kernel.apply/3 function

  3. Save the execution result in one of the queues (either failed or processed).

  4. Start executing the next task in the queue recursively.

That’s all the queue worker does, but it also accepts other signals from outside (e.g. from the Enqueuer module), and can perform such tasks asynchronously thanks to GenServer.cast/2:

def publish_job(queue) do
GenServer.cast(__MODULE__, {:publish, queue})
end
def handle_cast({:publish, queue}, state) do
process_job(queue)
{:noreply, state}
end
view raw queue_worker.ex hosted with ❤ by GitHub
Great, now let’s connect our queue worker so it starts when the client application is started. It’s important to clarify that we will disable the worker and database in the test environment so it can be done manually before running the tests:
defmodule ProcessingLibrary.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, :ok, opts)
end
@impl true
def init(:ok) do
children = []
children =
if Mix.env() != :test do
children ++
[
ProcessingLibrary.Database,
ProcessingLibrary.QueueWorker
]
else
children
end
Supervisor.init(children, strategy: :one_for_one)
end
end
view raw supervisor.ex hosted with ❤ by GitHub

After adding our processes to the supervisor, we can start our library and start using it. But wait, something is missing… oh… statistics module. Let’s implement it very fast:

defmodule ProcessingLibrary.Stats do
@stats_queues [:processed, :failed]
def is_stats_queue?(queue) do
Enum.member?(@stats_queues, queue)
end
def stat(queue) when queue in @stats_queues do
{:ok, jobs} = ProcessingLibrary.Database.get_queue(queue)
length(jobs)
end
def stat(:scheduled) do
{:ok, queues} = ProcessingLibrary.Database.get_queues()
queues
|> Enum.reduce(0, fn queue, acc ->
{:ok, jobs} = ProcessingLibrary.Database.get_queue(queue)
acc + length(jobs)
end)
end
def stats() do
(@stats_queues ++ [:scheduled])
|> Enum.reduce(%{}, fn s, acc ->
Map.put(acc, s, stat(s))
end)
end
end
view raw stats.ex hosted with ❤ by GitHub

Inside this module we check the queues, pick the number of jobs inside them, and provide a map as a result:

%{processed: 105, scheduled: 70, failed: 5}
Enter fullscreen mode Exit fullscreen mode

Finally, the cherry on top will be a public interface for users, which will provide everything users need and remove the necessity to go deep into modules using our library:

defmodule ProcessingLibrary do
def is_reserved_queue?(queue), do: ProcessingLibrary.Stats.is_stats_queue?(queue)
defdelegate enqueue(queue, worker_module, params),
to: ProcessingLibrary.Enqueuer,
as: :enqueue
defdelegate enqueue(queue, job_data),
to: ProcessingLibrary.Enqueuer,
as: :enqueue
defdelegate remove(job_id),
to: ProcessingLibrary.Dequeuer,
as: :remove
defdelegate find_job(job_id),
to: ProcessingLibrary.Dequeuer,
as: :find_job
defdelegate stats(),
to: ProcessingLibrary.Stats,
as: :stats
end

Demo

I made a dashboard using Phoenix LiveView and WebSockets to demonstrate how our library works visually. Also, for this purpose, I added a dummy worker module to our project, which will randomly set a delay for our task between 1 and 3 seconds and randomly throw an error as well so that we could see such cases in UI:

defmodule ProcessingLibrary.DummyWorker do
@behaviour ProcessingLibrary.Worker
def perform(_args) do
Process.sleep(Enum.random(1..3) * 1000)
if Enum.random([true, false]) do
raise "Oops... something went wrong"
end
end
end
view raw dummy_worker.ex hosted with ❤ by GitHub

in the demo itself, I’ll enqueue 10 real-life tasks with our dummy worker, and let’s see how the queue worker handles them:

This demonstration shows how the queue worker pulls up one scheduled task each time, synchronously executes it, moves it to one of the queues depending on the result, and proceeds to the next one.

Conclusion

At the end of the story, I want to say that it was super exciting to write such a library, I learned and understood a lot of things related to the concept of job processing tools. I’ll leave a link to the repository here in case anyone wants to check out the whole codebase. Thanks for reading, I hope you have learned something new too!

Reinvent your career. Join DEV.

It takes one minute and is worth it for your career.

Get started

Top comments (0)

Heroku

Build apps, not infrastructure.

Dealing with servers, hardware, and infrastructure can take up your valuable time. Discover the benefits of Heroku, the PaaS of choice for developers since 2007.

Visit Site

👋 Kindness is contagious

Dive into an ocean of knowledge with this thought-provoking post, revered deeply within the supportive DEV Community. Developers of all levels are welcome to join and enhance our collective intelligence.

Saying a simple "thank you" can brighten someone's day. Share your gratitude in the comments below!

On DEV, sharing ideas eases our path and fortifies our community connections. Found this helpful? Sending a quick thanks to the author can be profoundly valued.

Okay