This is the first installment in a series that runs you through creating durable job queues using Zero Assumption's open source solution, Aide-De-Camp, which is built using Rust. For full transparency, hi - I'm Zero Assumption's creator, Andrey Snow. Follow along as I make it easier for you to manage job queueing using Rust.
Aide-De-Camp Code
Code from this series is available in our open source Aide-De-Camp GitHub repository.
The Crate is published to crates.io for everyone to use.
Intro to Queues
From time to time, your application needs to offload some work to a different process or delay execution of a certain task. This is where job queues come in handy.
Many existing job queues are opinionated on how to store and process jobs. For example, many require Redis running, some only work with PostgreSQL, and others require some external manager process running. It's not a problem whether you're already using Redis or PostgreSQL, but what if you want to use something lighter like SQLite and run a single process?
With very little code, SQLite and PostgreSQL can be turned into a durable job queue.
First, let's define our interfaces. Traits allow us to make our job queue backend agnostic. Maybe today using SQLite, tomorrow using PostgreSQL, and the day after using Redis.
Job
Job Lifecycle
Before we begin, let's define the expected job behavior. A job is any distinct work to be done. Even the same job task repeated will have a unique job ID.
We're going to visualize job lifecycle from the runner point of view as a Finite-state machine.
The above lifecycle has two kinds of errors, JobError
and RunnerError
.
JobError
is an error specific to processing the job itself, where there's a problem decoding the input.
Errors with the handler are passed to the RunnerRouter
. Handler-specific errors must implement the Into<JobError>
trait. The easiest way to handle errors is to wrap your error in anyhow::Error
.
RunnerError
are queue domain errors and indicate a runtime issue. A runner error is thrown in the case of failure to transition a job due to an underlying QueueError
, given a job that can't be handled by the current RunnerRouter
. These errors are bubbled up and result in temporary suspension of a worker in hope that issue will go away in the future.
JobProcessor
Trait
Use the JobProcessor
trait to implement the Aide-De-Camp library. Implementation looks like the following.
#[async_trait]
pub trait JobProcessor: Send + Sync {
/// What is the input to this handler. If you want to use `RunnerRouter`, then this must implement `bincode::Decode` and `bincode::Encode`.
type Payload: Send;
/// Which error is returned
type Error: Send;
/// Run the job, passing the payload to it. Your payload should implement `bincode::Decode`.
async fn handle(&self, jid: Xid, payload: Self::Payload) -> Result<(), Self::Error>;
/// How many times job should be re-tried before being moved to dead queue
fn max_retries(&self) -> u32 {
0
}
/// Job type, used to differentiate between different jobs in the queue.
fn name() -> &'static str
where
Self: Sized;
}
A job needs the following.
- Include the Payload type of Send. For example,
type Payload: Send
. - Declare an error type, such as
type Error: Send
. The error type can also beInfallible
. -
run
function that takesjid
(job_id
shortened) and its input (payload
) as arguments. For example,async fn handle(&self, jid: Xid, payload: Self::Payload) -> Result<(), Self::Error>;
. -
name
function that returns the job name. This is used to distinguish jobs in the queue and used inRunnerRouter
. For example,fn name() -> &'static str
. - Be
Send
+Sync
.- Optionally, specify how many times a job should be re-tried upon failure. The default value is
1
, meaning the job only has one chance to succeed.
- Optionally, specify how many times a job should be re-tried upon failure. The default value is
Some queue implementations also pass Context
to the job, and Context
acts like an Extension
of Axum. In this queue implementation, if your job has external dependencies (for example, an API client for your transactional email service provider), then it should be part of an entity that implements the Job
trait. This way, you choose how to wire the dependencies, whether by passing them manually or using the Dependency Injection framework.
Type-Erased Job Handler
Rust has some limitations on the usage of the following: HashMap<String, JobType1|JobType2>
.
To work around this, many Rust queues either:
- Make your job handler accept some form of
&[u8]
as input, which requires too much boilerplate code. - Declare an enum with all possible job types, which limits flexibility. The publisher needs to be aware of every possible job, and it's hard to separate which jobs run where.
axum
provides a workaround. After an hour of "Go-to-definition" between multiple tokio crates, I've come up with the following.
/// Object-safe implementation of a job that can be used in the runner
pub struct WrappedJobHandler<T: JobProcessor> {
job: T,
config: Configuration,
}
impl<J> WrappedJobHandler<J>
where
J: JobProcessor + 'static,
J::Payload: Decode + Encode,
J::Error: Into<JobError>,
{
pub fn new(job: J) -> Self {
let config = bincode::config::standard();
Self { job, config }
}
pub fn boxed(self) -> BoxedJobHandler {
Box::new(self) as BoxedJobHandler
}
}
#[async_trait::async_trait]
impl<J> JobProcessor for WrappedJobHandler<J>
where
J: JobProcessor + 'static,
J::Payload: Decode + Encode,
J::Error: Into<JobError>,
{
type Payload = Bytes;
type Error = JobError;
async fn handle(&self, jid: Xid, payload: Self::Payload) -> Result<(), Self::Error> {
let (payload, _) = bincode::decode_from_slice(payload.as_ref(), self.config)?;
self.job.handle(jid, payload).await.map_err(Into::into)
}
fn name() -> &'static str {
J::name()
}
}
Now, our RunnerRouter
just needs to know about Box<dyn Job<Payload=Bytes, Error=JobError>
. WrappedJobHandler<J>
will handle decoding the payload and errors.
Handle Multiple Job Types in a Single Runner
I've already mentioned RunnerRouter
multiple times without explaining what it is. You can think about it as an HTTP request router. I was seriously considering using router from axum underneath and make everything that implements Job
also implement Tower's Service
, but it seemed a little too much, so I've made my own:
#[derive(Default)]
pub struct RunnerRouter {
jobs: HashMap<&'static str, BoxedJobHandler>,
}
impl RunnerRouter {
/// Register a job handler with the router. If job by that name already present, it will get replaced.
pub fn add_job_handler<J>(&mut self, job: J)
where
J: JobProcessor + 'static,
J::Payload: Decode + Encode,
J::Error: Into<JobError>,
{
let name = J::name();
let boxed = WrappedJobHandler::new(job).boxed();
self.jobs.entry(name).or_insert(boxed);
}
pub fn types(&self) -> Vec<&'static str> {
self.jobs.keys().copied().collect()
}
/// Process job handle. This function is responsible for job lifecycle. If you're implementing your
/// own job runner, then this is what you should use to process a job that is already pulled
/// from the queue. In all other cases, you shouldn't use this function directly.
#[instrument(skip_all, err, fields(job_type = %job_handle.job_type(), jid = %job_handle.id().to_string(), retries = job_handle.retries()))]
pub async fn process<H: JobHandle>(&self, job_handle: H) -> Result<(), RunnerError> {
if let Some(r) = self.jobs.get(job_handle.job_type()) {
match r
.handle(job_handle.id(), job_handle.payload())
.await
.map_err(JobError::from)
{
Ok(_) => {
job_handle.complete().await?;
Ok(())
}
Err(e) => {
tracing::error!("Error during job processing: {}", e);
if job_handle.retries() >= r.max_retries() {
tracing::warn!("Moving job {} to dead queue", job_handle.id().to_string());
job_handle.dead_queue().await?;
Ok(())
} else {
job_handle.fail().await?;
Ok(())
}
}
}
} else {
Err(RunnerError::UnknownJobType(
job_handle.job_type().to_string(),
))
}
}
}
This isn't a complete version of it, we will expand it later, but for now, that's enough. Don't worry about JobHandle
trait just yet, I will get to it soon.
Registering job handlers
Handlers can only be registered while you have a mutable reference to RunnerRouter
. I've done it like that to avoid having to lock jobs
behind RwLock
or making a separate builder structure. Intended use:
// create your job handler here
let my_job = MyJob::default();
let router = {
let mut r = RunnerRouter::default();
r.add_job_handler(my_job);
r
};
Job processing
This is handled by process
function. It takes a type that implements JobProcessor
trait as input. This method is in charge of handling the entire job lifecycle.
First, we check if the current RunnerRouter
is capable of handling the given job. Then, we attempt to process the given job. Finally, we have the option to do the following:
- Remove the job from the queue
- Re-queue the job
- Move the job to the dead queue if the retry count has exceeded the desired maximum
Queue
Until now, we didn't talk about the queue itself. Abstracted queue works like this:
while let Some(job) = queue.pop() {
job.process();
}
This wouldn't be a very efficient queue for many reasons. Our queue is going to look more like this:
Queue trait
Now that we have the idea of how queue works, we can define its trait:
#[async_trait]
pub trait Queue: Send + Sync {
type JobHandle: JobHandle;
/// Schedule a job to run at the future time.
async fn schedule_at<J>(
&self,
payload: J::Payload,
scheduled_at: DateTime,
) -> Result<Xid, QueueError>
where
J: JobProcessor + 'static,
J::Payload: Encode;
/// Schedule a job to run next. Depending on queue backlog this may start running later than you expect.
async fn schedule<J>(&self, payload: J::Payload) -> Result<Xid, QueueError>
where
J: JobProcessor + 'static,
J::Payload: Encode,
{
self.schedule_at::<J>(payload, Utc::now()).await
}
/// Schedule a job to run at the future time relative to now.
async fn schedule_in<J>(
&self,
payload: J::Payload,
scheduled_in: Duration,
) -> Result<Xid, QueueError>
where
J: JobProcessor + 'static,
J::Payload: Encode,
{
let when = Utc::now() + scheduled_in;
self.schedule_at::<J>(payload, when).await
}
/// Pool queue, implementation should not wait for next job, if there nothing return `Ok(None)`.
async fn poll_next_with_instant(
&self,
job_types: &[&str],
time: DateTime,
) -> Result<Option<Self::JobHandle>, QueueError>;
/// Pool queue, implementation should not wait for next job, if there nothing return `Ok(None)`.
async fn poll_next(&self, job_types: &[&str]) -> Result<Option<Self::JobHandle>, QueueError> {
self.poll_next_with_instant(job_types, Utc::now()).await
}
/// Await next job. Default implementation polls the queue with defined interval until there is something.
async fn next(
&self,
job_types: &[&str],
interval: Duration,
) -> Result<Self::JobHandle, QueueError> {
let duration = interval
.to_std()
.map_err(|_| QueueError::InvalidInterval(interval))?;
let mut interval = tokio::time::interval(duration);
loop {
interval.tick().await;
let job = self.poll_next(job_types).await?;
if let Some(job) = job {
break Ok(job);
}
}
}
}
This trait designed to work only with jobs that can be handled by RunnerRouter
. This trait provides convenient method to scheduling jobs: schedule
for scheduling job right now, schedule_in
for scheduling job in the future (e.g., in 5 minutes), schedule_at
for scheduling job at the specific time (e.g., Mon, 16 May 2022 21:02:35 GMT
). Methods that implementation needs to implement are: schedule_at
and poll_next_with_instant
.
schedule_at
is self-explanatory: these methods adds a job to the queue to run at a specific time.
poll_next_with_instant
is a bit more complicated. It takes a list of job types and time
as arguments. time
is used to determine what should be treated as "now" when queue polled. Using time as an input is very helpful because it allows time travel in tests. The implementation should check the queue and return a JobHandle
to a job if there is anything in the queue.
There is also next
function that in a loop polls the queue, if it's empty, it sleeps a specified interval and polls again until there is something. This is just a default implementation, more specific implementation can use combination of sleeping and external wake-ups (e.g. NOTIFY
from PostgreSQL or SUBSCRIBE
from Redis).
JobHandle
You might have already noticed that most of the job lifecycle handled by some JobHandle
. What is it? Well…it's an implementation specific way of handling job lifecycle. Queue
only two jobs: sending jobs and polling for jobs. Queue
here is an interface to a queue that exists somewhere, rather the queue itself.
JobHandle
trait looks like this:
#[async_trait]
pub trait JobHandle: Send + Sync {
// Get job id
fn id(&self) -> Xid;
// Get Job type
fn job_type(&self) -> &str;
// Get job payload.
fn payload(&self) -> Bytes;
// How many times this job has been retried already.
fn retries(&self) -> u32;
// Mark the job as completed successfully.
async fn complete(mut self) -> Result<(), QueueError>;
// Mark the job as failed
async fn fail(mut self) -> Result<(), QueueError>;
// Move the job to dead queue.
async fn dead_queue(mut self) -> Result<(), QueueError>;
}
Everything past "job checked out of the queue" in job lifecycle is handled within its implementation.
Processing the Queue
Extending RunnerRouter
Now that we have an interface to interact with our queue, we can extend RunnerRouter
to work with it:
impl RunnerRouter {
pub async fn listen<Q, QR>(&self, queue: Q, poll_interval: Duration)
where
Q: AsRef<QR>,
QR: Queue,
{
let job_types = self.types();
loop {
match queue.as_ref().next(&job_types, poll_interval).await {
Ok(handle) => match self.process(handle).await {
Ok(_) => {}
Err(RunnerError::QueueError(e)) => handle_queue_error(e).await,
Err(RunnerError::UnknownJobType(name)) => {
tracing::error!("Unknown job type: {}", name)
}
},
Err(e) => {
handle_queue_error(e).await;
}
}
}
}
}
async fn handle_queue_error(error: QueueError) {
tracing::error!("Encountered QueueError: {}", error);
tracing::warn!("Suspending worker for 5 seconds");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
In a loop, we poll the queue and handle the incoming jobs. If we encounter a QueueError
during polling, then worker goes into a short sleep. If we encounter a RunnerError
, then we just log it and work on the next job.
Putting it all together
Technically, we can just create a queue
, pass it to RunnerRouter::listen
, and call it a day. The only issue is that it's processing a single job at a time. In some cases, that's enough. But we want to go fast.
Full disclaimer - the following is a naive version of a job runner. It works, but has some limitations.
const JITTER_INTERVAL_MS: [i64; 10] = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34];
pub struct JobRunner<Q>
where
Q: Queue,
{
queue: Arc<Q>,
processor: Arc<RunnerRouter>,
semaphore: Arc<Semaphore>,
}
impl<Q> JobRunner<Q>
where
Q: Queue + 'static,
{
pub fn new(queue: Q, processor: RunnerRouter, concurrency: usize) -> Self {
Self {
queue: Arc::new(queue),
processor: Arc::new(processor),
semaphore: Arc::new(Semaphore::new(concurrency)),
}
}
}
impl<Q> JobRunner<Q>
where
Q: Queue + 'static,
{
pub async fn run(&mut self, interval: chrono::Duration) -> Result<(), QueueError> {
loop {
let semaphore = self.semaphore.clone();
let permit = semaphore
.acquire_owned()
.await
.context("Semaphore closed while running")?;
let queue = self.queue.clone();
let processor = self.processor.clone();
tokio::spawn(async move {
let _permit = permit;
let queue = queue;
let processor = processor;
let interval = interval + get_random_jitter();
processor.listen(queue, interval).await;
});
}
}
}
fn get_random_jitter() -> chrono::Duration {
JITTER_INTERVAL_MS
.choose(&mut rand::thread_rng())
.map(|ms| chrono::Duration::milliseconds(*ms))
.unwrap_or_else(|| chrono::Duration::milliseconds(5))
}
First, let's talk about the JobRunner
:
pub struct JobRunner<Q>
where
Q: Queue,
{
queue: Arc<Q>,
processor: Arc<RunnerRouter>,
semaphore: Arc<Semaphore>,
}
impl<Q> JobRunner<Q>
where
Q: Queue + 'static,
{
pub fn new(queue: Q, processor: RunnerRouter, concurrency: usize) -> Self {
Self {
queue: Arc::new(queue),
processor: Arc::new(processor),
semaphore: Arc::new(Semaphore::new(concurrency)),
}
}
}
We already know why we need the first two, but what about Semaphore
? Semaphore
is a synchronization primitive that limits concurrent access to a resource.
Essentially, it's a metered highway ramp for your program. We're going to utilize it to limit how many jobs could be in flight at any given moment.
Generally, it's a bad idea to have N things hammer a resource at the same time, so we're going to introduce a random-ish jitter:
const JITTER_INTERVAL_MS: [i64; 10] = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34];
fn get_random_jitter() -> chrono::Duration {
JITTER_INTERVAL_MS
.choose(&mut rand::thread_rng())
.map(|ms| chrono::Duration::milliseconds(*ms))
.unwrap_or_else(|| chrono::Duration::milliseconds(5))
}
It doesn't have to be a Fibonacci series, but using this series allows for some pretty unique patterns.
Now, the job begins processing. First, we start a loop. Then we acquire a "permit" from our semaphore:
let semaphore = self.semaphore.clone();
let permit = semaphore
.acquire_owned()
.await
.context("Semaphore closed while running")?;
The await
will pause the loop until the permit becomes available. After, we clone our Queue
and RunnerRouter
:
let queue = self.queue.clone();
let processor = self.processor.clone();
Finally, we start polling our queue and process incoming job:
tokio::spawn(async move {
let _permit = permit;
let queue = queue;
let processor = processor;
let interval = interval + get_random_jitter();
processor.listen(queue, interval).await;
});
We spawn
a new future into the tokio reactor and that's it. The rest is implemented in RunnerRouter
. When the job completes, the permit will return to our semaphore and the cycle repeats.
Possible Improvements
Dynamic concurrency
Currently, concurrency is set in stone and can't be changed after JobRunner
started its work. That means with concurrency of 50, there will be 50 tasks constantly asking a queue for new jobs. I want concurrency to shrink and grow depending on the current load.
Shrinking and growing concurrency can be done by ballooning semaphore permits. A task can check queue size and acquire or release permits based on that. It's a bit harder to reclaim permits that are already taken by a waiting worker, but it is possible and may require changing the interface of Queue
trait.
Hooks
There is very little insight into what is happening in the queue scheduler and worker process. In the future, I''ll add a way to get notified when certain events happen. Hooks be used to drive dynamic concurrency.
Batching
Instead of polling jobs one by one, I want to poll N jobs at once. Batching is helpful for a situation where we have many small jobs constantly sent to the queue.
Multiple Queues
Currently, jobs are split by type and that's the only way to have different concurrency and/or poll interval. This is fine for many use cases, but occasionally, we want to have "hot" and "cold" queues that can have the same types of jobs. For example, you're working on a SaaS product with a free trial where tasks from free tier clients can go to a slow queue, while paid clients go to fast queue.
Queue Insights
Not being able to see queue state isn't a big deal if your jobs never fail (spoiler alert: jobs do fail). Our Queue
trait can be extended to provide some insight. Namely, how many jobs in the queue, how many jobs should be executed right now, and what is going on in the dead queue.
Up Next
So far, we only covered generic items like traits and glue between generic components. Part 2 is going to cover SQLite
implementation of Queue
trait and benchmarking.
Follow us here on DEV, Twitter, and LinkedIn for updates on Aide-De-Camp.
Aide-De-Camp is a product of Zero Assumptions.
Top comments (0)