Today I will cover how you can build your job processing library and understand the core aspects of creating such a tool. This article adds to the previous one about queue data structure. I want to demonstrate a real-world example of using queues and build a job processing library that others can use in their projects.
A tool that we are going to build
Today we are going to build a library that will be able to execute tasks from a queue by a separate process (worker), store these tasks persistently in Redis, and have the ability to check statistics of processed, failed, and scheduled tasks.
To develop this library, we need to familiarize ourselves with the terminology:
- Job
Simply put, a job is any function we put in the queue and want to execute.
- Worker
A worker is an independent process that performs jobs that are queued. Usually, there can be several workers to process tasks faster, but for simplicity of the code we will create functionality for only one worker, but it will be more than enough.
Prerequisites
There are several prerequisites for building such a tool, and they’re chosen for a reason, let’s look at why:
Redis is an open-source, in-memory application that we will use as a database for data persistence. The main advantage of Redis is the speed of executing operations because it works straight in RAM (Random Access Memory) and you don’t need to write and read a disk whenever you need some data.
Elixir is a dynamic, scalable, functional programming language that is especially good for such applications due to its fault-tolerant behavior.
Also, some Elixir libraries will help us on our way:
Simple client for Redis, to have the ability to write Redis requests in Elixir language.
Jason is a library that allows you to encode and parse a JSON structure.
A library that will allow us to generate unique IDs for our tasks so that we can distinguish them by a unique identifier.
Project structure
To build such a library, let’s understand what basic modules we need for our application to work and I assume that we need to create the following ones:
- Enqueuer
This module will be responsible for putting the jobs into the queue.
- Dequeuer
This module will be responsible for taking jobs from the queue.
- Job
The module is responsible for serialization, deserialization, and data preparation for the queue.
- Stats
This module will be able to show us statistics on processed, failed, and scheduled tasks.
- QueueWorker
This module is responsible for processing jobs from the queues.
Flow
Before we start implementing this via code, I would like to show how our application is supposed to work with two flow diagrams:
Here is how our library will work when the client creates a new task:
The client provides raw data about the task to the Enqueuer module.
The Enqueuer module asks the Job module to construct job data and serialize it to JSON.
The Enqueuer module passes the generated JSON to the Queue module.
The Queue module saves JSON in the database using the Database module.
We send an asynchronous message to the worker that we have a task in this queue, and if the worker is up and available, he can process it.
Return the created job to the client.
And the second one from the queue worker perspective:
Queue worker checks all queues under the library namespace in Redis.
Queue worker takes the first queue he finds and peeks elements from the start of the queue (FIFO principle).
Queue worker parses the job from JSON to struct via the Job module
Queue worker starts to process the parsed job
When the job is completed, the queue worker moves the job to a different queue based on the result, either failed or processed.
Recursively repeat from the second point until the queue is empty, after that we can go to the next queue.
After all tasks are completed, the worker process remains in standby mode, and as soon as a new task appears, it will be ready to process it. If the worker is not enabled at the moment of task enqueue, all such tasks will be processed at the moment of worker startup.
Project initialization
Let’s start with the creation of the project. We will use the Mix tool, which shipped with Elixir, and use the “mix new ” command for project initiation:
mix new processing_library
After command execution, we should get a ready-made set for developing our projects, namely:
folders structure
dependency management
building tooling
test environment
and in my case, there is such structure with such file naming:
After successful project initiation, we can add the dependencies to the “mix.exs” file:
defp deps do | |
[ | |
{:redix, "~> 1.1"}, | |
{:jason, "~> 1.4"}, | |
{:uuid, "~> 1.1"} | |
] | |
end |
and install them as well with the Mix tool:
mix deps.get
Also, we need to start Redis, I usually do it through the docker engine and docker-compose utility. Still, you can do it through any convenient installer like homebrew or asdf. Anyway, here’s my docker-compose.yml file:
services: | |
redis: | |
image: redis:alpine | |
container_name: redis | |
ports: | |
- "6379:6379" |
This configuration will start Redis on Linux Alpine inside the container on port 6379 and we will redirect it to our local host on port 6379. To run it, you need to start the container via docker-compose CLI:
docker-compose up redis
And if everything goes well, you will face something like this:
Implementation
I suggest we start our journey with the module responsible for the tasks:
defmodule ProcessingLibrary.Job do | |
@derive {Jason.Encoder, only: [:params, :worker_module, :jid, :error, :start_at, :finish_at]} | |
defstruct params: [], worker_module: nil, jid: nil, error: nil, start_at: nil, finish_at: nil | |
def construct(worker_module, params) do | |
%ProcessingLibrary.Job{ | |
params: params, | |
worker_module: worker_module, | |
jid: UUID.uuid4() | |
} | |
end | |
def encode(%ProcessingLibrary.Job{} = job), do: Jason.encode!(job) | |
def decode(job_json), | |
do: struct(ProcessingLibrary.Job, Jason.decode!(job_json, keys: :atoms)) | |
end |
There, we define the job structure and functions accountable for serialization, deserialization, and job creation from raw data. The structure of the job in our case will have the following fields:
- params
These are variables that will be passed to the job function as parameters:
# client side | |
ProcessingLibrary.enqueue("queue_name", ProcessingLibrary.DummyWorker, ["param1", "param2", "param3"]) | |
# queue worker side | |
ProcessingLibrary.DummyWorker.perform("param1", "param2", "param3") |
- worker_module
This is an abstract module responsible for executing a particular task, and every module that inherits it through behavior directive must implement the “perform” function that the queue worker will eventually call:
defmodule ProcessingLibrary.Worker do | |
@callback perform(any()) :: any() | |
end |
- jid (Job ID)
For each task, we will assign a job ID to be able to distinguish them by a unique identifier.
- error
Since a job can also break during execution, saving the error in the job object would be a good idea.
- start_at, finish_at
These time stamps will indicate how long it took the worker to complete the task.
Next, we want to create a module that is a simple command wrapper around the Redix library. This library does not provide high-level functions, only Redix.command/3, an alternative to writing commands directly in the redis-cli client. For a better understanding of what we want to do, let’s show an example of viewing an item in a Redis queue:
# Somewhere inside the bussiness logic... | |
{:ok, range} = Redix.command(conn, ~w(LRANGE my_queue -1 -1)) | |
if range == [] do | |
{:ok, nil} | |
else | |
[el | _] = range | |
{:ok, el} | |
end | |
# What does this code do??? | |
# Looks like it's hard to understand by first glance. | |
# Also the guts of the Redis are turned outward in our business logic. |
# We hide the implementation details behind the Redis interface | |
ProcessingLibrary.Redis.peek(queue, :rear) | |
# or | |
ProcessingLibrary.Redis.peek(queue, :front) |
defmodule ProcessingLibrary.Redis do | |
use GenServer | |
def start_link(_init_arg) do | |
GenServer.start_link(__MODULE__, [], name: __MODULE__) | |
end | |
def init(_init_arg) do | |
{:ok, conn} = | |
Redix.start_link( | |
host: ProcessingLibrary.Env.get_redis_host(), | |
port: ProcessingLibrary.Env.get_redis_port(), | |
database: ProcessingLibrary.Env.get_redis_database() | |
) | |
{:ok, conn} | |
end | |
import Config | |
config :processing_library, | |
redis_namespace: "processing_library", | |
redis_host: "localhost", | |
redis_port: 6379, | |
redis_database: 1 | |
import_config "#{config_env()}.exs" |
defp extract_key(key), | |
do: key |> String.split(":", trim: true) |> List.last() |> String.to_atom() | |
defp namespace_key(key) when is_atom(key), do: namespace_key(key |> Atom.to_string()) | |
defp namespace_key(key) do | |
namespace = ProcessingLibrary.Env.get_redis_namespace() | |
if String.contains?(key, namespace) do | |
key | |
else | |
"#{namespace}:#{key}" | |
end | |
end |
def flush_db() do | |
GenServer.call(__MODULE__, :flush_db) | |
end | |
def get_keys() do | |
GenServer.call(__MODULE__, :keys) | |
end | |
def remove(queue, value) do | |
GenServer.call(__MODULE__, {:remove, namespace_key(queue), value}) | |
end | |
def enqueue(queue, value) do | |
GenServer.call(__MODULE__, {:queue, namespace_key(queue), value}) | |
end | |
def dequeue(queue) do | |
GenServer.call(__MODULE__, {:dequeue, namespace_key(queue)}) | |
end | |
def peek(queue, :rear) do | |
GenServer.call(__MODULE__, {:peek, :rear, namespace_key(queue)}) | |
end | |
def peek(queue, :front) do | |
GenServer.call(__MODULE__, {:peek, :front, namespace_key(queue)}) | |
end | |
def get_queue(queue) do | |
GenServer.call(__MODULE__, {:get_queue, namespace_key(queue)}) | |
end | |
def set(key, value) do | |
GenServer.call(__MODULE__, {:set, namespace_key(key), value}) | |
end | |
def get_queues(opts \\ []) do | |
GenServer.call(__MODULE__, {:get_queues, opts}) | |
end |
Next, with our Redis module, we can create an abstraction module for the database:
defmodule ProcessingLibrary.Database do | |
defdelegate init(init_arg), to: ProcessingLibrary.Redis, as: :init | |
defdelegate start_link(init_arg), to: ProcessingLibrary.Redis, as: :start_link | |
defdelegate get_queues(opts), to: ProcessingLibrary.Redis, as: :get_queues | |
defdelegate get_queues(), to: ProcessingLibrary.Redis, as: :get_queues | |
defdelegate get_queue(queue), to: ProcessingLibrary.Redis, as: :get_queue | |
defdelegate get_keys(), to: ProcessingLibrary.Redis, as: :get_keys | |
defdelegate set(key, value), to: ProcessingLibrary.Redis, as: :set | |
defdelegate flush(), to: ProcessingLibrary.Redis, as: :flush_db | |
end |
We are simply delegating our method calls from the database to the Redis module, but why? Why not just call methods from the Redis module directly?
Explanation: Imagine you’re writing a program and you’re relying on the interface Redis provides. At some moment you realize that you need to switch from Redis to PostgreSQL. You start looking at the code and realize that every part of your program depends not on an abstract database, but on Redis, which makes the move very difficult. That’s why we want to hide the implementation details behind a certain interface so that in case of anything, we would have to change only one module instead of the whole program.
The same goes for the queue module:
defmodule ProcessingLibrary.Database.Queue do | |
defdelegate enqueue(queue, value), to: ProcessingLibrary.Redis, as: :enqueue | |
defdelegate dequeue(queue), to: ProcessingLibrary.Redis, as: :dequeue | |
defdelegate remove(queue, value), to: ProcessingLibrary.Redis, as: :remove | |
defdelegate peek(queue, position), to: ProcessingLibrary.Redis, as: :peek | |
end |
Having an abstraction over the database and the queue, we can start working with the modules that work most closely with the queue, namely the enqueuer and dequeuer modules:
defmodule ProcessingLibrary.Enqueuer do | |
def enqueue(queue_name, %ProcessingLibrary.Job{} = job_data) do | |
encoded_job_data = ProcessingLibrary.Job.encode(job_data) | |
with {:ok, _} <- ProcessingLibrary.Database.Queue.enqueue(queue_name, encoded_job_data), | |
false <- ProcessingLibrary.is_reserved_queue?(queue_name) do | |
ProcessingLibrary.QueueWorker.publish_job(queue_name) | |
{:ok, job_data} | |
else | |
error -> | |
{:error, error} | |
end | |
end | |
def enqueue(queue_name, worker_module, params) do | |
job_data = ProcessingLibrary.Job.construct(worker_module, params) | |
enqueue(queue_name, job_data) | |
end | |
end |
In this module:
We create a job from incoming parameters
Add it to the Redis queue
If the queue to which we add is not a queue reserved by our library (like processed or failed queues), we send the asynchronous message to the Queue Worker and if it is available it will process it.
Something similar is happening with the dequeuer module:
defmodule ProcessingLibrary.Dequeuer do | |
def find_job(job_id) do | |
{:ok, queues} = ProcessingLibrary.Database.get_queues(include_reserved: true) | |
jobs = | |
Enum.flat_map(queues, fn queue -> | |
case ProcessingLibrary.Database.get_queue(queue) do | |
{:ok, jobs} -> Enum.map(jobs, fn job -> {job, queue} end) | |
_ -> [] | |
end | |
end) | |
job = Enum.find(jobs, fn {job, _queue} -> ProcessingLibrary.Job.decode(job).jid == job_id end) | |
case job do | |
nil -> {:error, "Job not found"} | |
{job, queue} -> {:ok, ProcessingLibrary.Job.decode(job), queue} | |
end | |
end | |
def remove(job_id) do | |
with {:ok, job, queue} <- find_job(job_id) do | |
job_json = ProcessingLibrary.Job.encode(job) | |
ProcessingLibrary.Database.Queue.remove(queue, job_json) | |
else | |
_ -> {:error, "Job not found"} | |
end | |
end | |
defdelegate dequeue(queue), to: ProcessingLibrary.Database.Queue, as: :dequeue | |
end |
Now we going to the fun part, we’ll move on to the queue worker module:
defmodule ProcessingLibrary.QueueWorker do | |
use GenServer | |
require Logger | |
def init(_) do | |
{:ok, queues} = ProcessingLibrary.Database.get_queues() | |
start_processing(queues) | |
{:ok, %{}} | |
end | |
def start_link(_) do | |
GenServer.start_link(__MODULE__, %{}, name: __MODULE__) | |
end |
def start_processing(queues) do | |
Enum.each(queues, fn queue -> | |
process_job(queue) | |
end) | |
end |
def process_job(%ProcessingLibrary.Job{} = job, queue) do | |
Logger.info("#{log_context(job)} start") | |
start_time = DateTime.utc_now() | |
try do | |
apply(job.worker_module |> String.to_atom(), :perform, [job.params]) | |
finish_time = DateTime.utc_now() | |
diff_time = DateTime.diff(finish_time, start_time, :millisecond) | |
Logger.info("#{log_context(job)} )} finished in #{diff_time}ms") | |
ProcessingLibrary.Enqueuer.enqueue(:processed, %{ | |
job | |
| start_at: start_time, | |
finish_at: finish_time | |
}) | |
rescue | |
e -> | |
finish_time = DateTime.utc_now() | |
Logger.error("#{log_context(job)})} failed with exception:\n#{Exception.message(e)}") | |
ProcessingLibrary.Enqueuer.enqueue(:failed, %{ | |
job | |
| error: ~c"#{Exception.message(e)}", | |
start_at: start_time, | |
finish_at: finish_time | |
}) | |
end | |
ProcessingLibrary.Dequeuer.dequeue(queue) | |
process_job(queue) | |
end | |
def process_job(queue) do | |
{:ok, job_json} = ProcessingLibrary.Database.Queue.peek(queue, :front) | |
if not is_nil(job_json) do | |
job_json |> ProcessingLibrary.Job.decode() |> process_job(queue) | |
end | |
end |
We save timestamps and log information about the job start to the I/O.
Execute job through Kernel.apply/3 function
Save the execution result in one of the queues (either failed or processed).
Start executing the next task in the queue recursively.
That’s all the queue worker does, but it also accepts other signals from outside (e.g. from the Enqueuer module), and can perform such tasks asynchronously thanks to GenServer.cast/2:
def publish_job(queue) do | |
GenServer.cast(__MODULE__, {:publish, queue}) | |
end | |
def handle_cast({:publish, queue}, state) do | |
process_job(queue) | |
{:noreply, state} | |
end |
defmodule ProcessingLibrary.Supervisor do | |
use Supervisor | |
def start_link(opts) do | |
Supervisor.start_link(__MODULE__, :ok, opts) | |
end | |
@impl true | |
def init(:ok) do | |
children = [] | |
children = | |
if Mix.env() != :test do | |
children ++ | |
[ | |
ProcessingLibrary.Database, | |
ProcessingLibrary.QueueWorker | |
] | |
else | |
children | |
end | |
Supervisor.init(children, strategy: :one_for_one) | |
end | |
end |
After adding our processes to the supervisor, we can start our library and start using it. But wait, something is missing… oh… statistics module. Let’s implement it very fast:
defmodule ProcessingLibrary.Stats do | |
@stats_queues [:processed, :failed] | |
def is_stats_queue?(queue) do | |
Enum.member?(@stats_queues, queue) | |
end | |
def stat(queue) when queue in @stats_queues do | |
{:ok, jobs} = ProcessingLibrary.Database.get_queue(queue) | |
length(jobs) | |
end | |
def stat(:scheduled) do | |
{:ok, queues} = ProcessingLibrary.Database.get_queues() | |
queues | |
|> Enum.reduce(0, fn queue, acc -> | |
{:ok, jobs} = ProcessingLibrary.Database.get_queue(queue) | |
acc + length(jobs) | |
end) | |
end | |
def stats() do | |
(@stats_queues ++ [:scheduled]) | |
|> Enum.reduce(%{}, fn s, acc -> | |
Map.put(acc, s, stat(s)) | |
end) | |
end | |
end |
Inside this module we check the queues, pick the number of jobs inside them, and provide a map as a result:
%{processed: 105, scheduled: 70, failed: 5}
Finally, the cherry on top will be a public interface for users, which will provide everything users need and remove the necessity to go deep into modules using our library:
defmodule ProcessingLibrary do | |
def is_reserved_queue?(queue), do: ProcessingLibrary.Stats.is_stats_queue?(queue) | |
defdelegate enqueue(queue, worker_module, params), | |
to: ProcessingLibrary.Enqueuer, | |
as: :enqueue | |
defdelegate enqueue(queue, job_data), | |
to: ProcessingLibrary.Enqueuer, | |
as: :enqueue | |
defdelegate remove(job_id), | |
to: ProcessingLibrary.Dequeuer, | |
as: :remove | |
defdelegate find_job(job_id), | |
to: ProcessingLibrary.Dequeuer, | |
as: :find_job | |
defdelegate stats(), | |
to: ProcessingLibrary.Stats, | |
as: :stats | |
end |
Demo
I made a dashboard using Phoenix LiveView and WebSockets to demonstrate how our library works visually. Also, for this purpose, I added a dummy worker module to our project, which will randomly set a delay for our task between 1 and 3 seconds and randomly throw an error as well so that we could see such cases in UI:
defmodule ProcessingLibrary.DummyWorker do | |
@behaviour ProcessingLibrary.Worker | |
def perform(_args) do | |
Process.sleep(Enum.random(1..3) * 1000) | |
if Enum.random([true, false]) do | |
raise "Oops... something went wrong" | |
end | |
end | |
end |
in the demo itself, I’ll enqueue 10 real-life tasks with our dummy worker, and let’s see how the queue worker handles them:
This demonstration shows how the queue worker pulls up one scheduled task each time, synchronously executes it, moves it to one of the queues depending on the result, and proceeds to the next one.Conclusion
At the end of the story, I want to say that it was super exciting to write such a library, I learned and understood a lot of things related to the concept of job processing tools. I’ll leave a link to the repository here in case anyone wants to check out the whole codebase. Thanks for reading, I hope you have learned something new too!
Top comments (0)