DEV Community

Cover image for Build a Job Queue with Rust Using Aide-De-Camp (Part 1)
Falon Darville for Zero Assumptions

Posted on • Edited on

Build a Job Queue with Rust Using Aide-De-Camp (Part 1)

This is the first installment in a series that runs you through creating durable job queues using Zero Assumption's open source solution, Aide-De-Camp, which is built using Rust. For full transparency, hi - I'm Zero Assumption's creator, Andrey Snow. Follow along as I make it easier for you to manage job queueing using Rust.

Aide-De-Camp Code

Code from this series is available in our open source Aide-De-Camp GitHub repository.

The Crate is published to crates.io for everyone to use.

Intro to Queues

From time to time, your application needs to offload some work to a different process or delay execution of a certain task. This is where job queues come in handy.

Many existing job queues are opinionated on how to store and process jobs. For example, many require Redis running, some only work with PostgreSQL, and others require some external manager process running. It's not a problem whether you're already using Redis or PostgreSQL, but what if you want to use something lighter like SQLite and run a single process?

With very little code, SQLite and PostgreSQL can be turned into a durable job queue.

First, let's define our interfaces. Traits allow us to make our job queue backend agnostic. Maybe today using SQLite, tomorrow using PostgreSQL, and the day after using Redis.

Job

Job Lifecycle

Before we begin, let's define the expected job behavior. A job is any distinct work to be done. Even the same job task repeated will have a unique job ID.

We're going to visualize job lifecycle from the runner point of view as a Finite-state machine.

Finite state diagram

The above lifecycle has two kinds of errors, JobError and RunnerError.

JobError is an error specific to processing the job itself, where there's a problem decoding the input.

Errors with the handler are passed to the RunnerRouter. Handler-specific errors must implement the Into<JobError> trait. The easiest way to handle errors is to wrap your error in anyhow::Error.

RunnerError are queue domain errors and indicate a runtime issue. A runner error is thrown in the case of failure to transition a job due to an underlying QueueError, given a job that can't be handled by the current RunnerRouter. These errors are bubbled up and result in temporary suspension of a worker in hope that issue will go away in the future.

JobProcessor Trait

Use the JobProcessor trait to implement the Aide-De-Camp library. Implementation looks like the following.



#[async_trait]
pub trait JobProcessor: Send + Sync {
    /// What is the input to this handler. If you want to use `RunnerRouter`, then this must implement `bincode::Decode` and `bincode::Encode`.
    type Payload: Send;
    /// Which error is returned
    type Error: Send;
    /// Run the job, passing the payload to it. Your payload should implement `bincode::Decode`.
    async fn handle(&self, jid: Xid, payload: Self::Payload) -> Result<(), Self::Error>;

    /// How many times job should be re-tried before being moved to dead queue
    fn max_retries(&self) -> u32 {
        0
    }

    /// Job type, used to differentiate between different jobs in the queue.
    fn name() -> &'static str
    where
        Self: Sized;
}


Enter fullscreen mode Exit fullscreen mode

A job needs the following.

  • Include the Payload type of Send. For example, type Payload: Send.
  • Declare an error type, such as type Error: Send. The error type can also be Infallible.
  • run function that takes jid (job_id shortened) and its input (payload) as arguments. For example, async fn handle(&self, jid: Xid, payload: Self::Payload) -> Result<(), Self::Error>;.
  • name function that returns the job name. This is used to distinguish jobs in the queue and used in RunnerRouter. For example, fn name() -> &'static str.
  • Be Send + Sync.
    • Optionally, specify how many times a job should be re-tried upon failure. The default value is 1, meaning the job only has one chance to succeed.

Some queue implementations also pass Context to the job, and Context acts like an Extension of Axum. In this queue implementation, if your job has external dependencies (for example, an API client for your transactional email service provider), then it should be part of an entity that implements the Job trait. This way, you choose how to wire the dependencies, whether by passing them manually or using the Dependency Injection framework.

Type-Erased Job Handler

Rust has some limitations on the usage of the following: HashMap<String, JobType1|JobType2>.

To work around this, many Rust queues either:

  • Make your job handler accept some form of &[u8] as input, which requires too much boilerplate code.
  • Declare an enum with all possible job types, which limits flexibility. The publisher needs to be aware of every possible job, and it's hard to separate which jobs run where.

axum provides a workaround. After an hour of "Go-to-definition" between multiple tokio crates, I've come up with the following.



/// Object-safe implementation of a job that can be used in the runner
pub struct WrappedJobHandler<T: JobProcessor> {
    job: T,
    config: Configuration,
}

impl<J> WrappedJobHandler<J>
where
    J: JobProcessor + 'static,
    J::Payload: Decode + Encode,
    J::Error: Into<JobError>,
{
    pub fn new(job: J) -> Self {
        let config = bincode::config::standard();
        Self { job, config }
    }

    pub fn boxed(self) -> BoxedJobHandler {
        Box::new(self) as BoxedJobHandler
    }
}

#[async_trait::async_trait]
impl<J> JobProcessor for WrappedJobHandler<J>
where
    J: JobProcessor + 'static,
    J::Payload: Decode + Encode,
    J::Error: Into<JobError>,
{
    type Payload = Bytes;
    type Error = JobError;

    async fn handle(&self, jid: Xid, payload: Self::Payload) -> Result<(), Self::Error> {
        let (payload, _) = bincode::decode_from_slice(payload.as_ref(), self.config)?;
        self.job.handle(jid, payload).await.map_err(Into::into)
    }

    fn name() -> &'static str {
        J::name()
    }
}


Enter fullscreen mode Exit fullscreen mode

Now, our RunnerRouter just needs to know about Box<dyn Job<Payload=Bytes, Error=JobError>. WrappedJobHandler<J> will handle decoding the payload and errors.

Handle Multiple Job Types in a Single Runner

I've already mentioned RunnerRouter multiple times without explaining what it is. You can think about it as an HTTP request router. I was seriously considering using router from axum underneath and make everything that implements Job also implement Tower's Service, but it seemed a little too much, so I've made my own:



#[derive(Default)]
pub struct RunnerRouter {
    jobs: HashMap<&'static str, BoxedJobHandler>,
}

impl RunnerRouter {
    /// Register a job handler with the router. If job by that name already present, it will get replaced.
    pub fn add_job_handler<J>(&mut self, job: J)
    where
        J: JobProcessor + 'static,
        J::Payload: Decode + Encode,
        J::Error: Into<JobError>,
    {
        let name = J::name();
        let boxed = WrappedJobHandler::new(job).boxed();
        self.jobs.entry(name).or_insert(boxed);
    }

    pub fn types(&self) -> Vec<&'static str> {
        self.jobs.keys().copied().collect()
    }

    /// Process job handle. This function is responsible for job lifecycle. If you're implementing your
    /// own job runner, then this is what you should use to process a job that is already pulled
    /// from the queue. In all other cases, you shouldn't use this function directly.
    #[instrument(skip_all, err, fields(job_type = %job_handle.job_type(), jid = %job_handle.id().to_string(), retries = job_handle.retries()))]
    pub async fn process<H: JobHandle>(&self, job_handle: H) -> Result<(), RunnerError> {
        if let Some(r) = self.jobs.get(job_handle.job_type()) {
            match r
                .handle(job_handle.id(), job_handle.payload())
                .await
                .map_err(JobError::from)
            {
                Ok(_) => {
                    job_handle.complete().await?;
                    Ok(())
                }
                Err(e) => {
                    tracing::error!("Error during job processing: {}", e);
                    if job_handle.retries() >= r.max_retries() {
                        tracing::warn!("Moving job {} to dead queue", job_handle.id().to_string());
                        job_handle.dead_queue().await?;
                        Ok(())
                    } else {
                        job_handle.fail().await?;
                        Ok(())
                    }
                }
            }
        } else {
            Err(RunnerError::UnknownJobType(
                job_handle.job_type().to_string(),
            ))
        }
    }

}


Enter fullscreen mode Exit fullscreen mode

This isn't a complete version of it, we will expand it later, but for now, that's enough. Don't worry about JobHandle trait just yet, I will get to it soon.

Registering job handlers

Handlers can only be registered while you have a mutable reference to RunnerRouter. I've done it like that to avoid having to lock jobs behind RwLock or making a separate builder structure. Intended use:



// create your job handler here
let my_job = MyJob::default();
let router = {
    let mut r = RunnerRouter::default();
    r.add_job_handler(my_job);
    r
};


Enter fullscreen mode Exit fullscreen mode

Job processing

This is handled by process function. It takes a type that implements JobProcessor trait as input. This method is in charge of handling the entire job lifecycle.

First, we check if the current RunnerRouter is capable of handling the given job. Then, we attempt to process the given job. Finally, we have the option to do the following:

  • Remove the job from the queue
  • Re-queue the job
  • Move the job to the dead queue if the retry count has exceeded the desired maximum

Queue

Until now, we didn't talk about the queue itself. Abstracted queue works like this:



while let Some(job) = queue.pop() {
    job.process();
}


Enter fullscreen mode Exit fullscreen mode

This wouldn't be a very efficient queue for many reasons. Our queue is going to look more like this:

Job queue diagram

Queue trait

Now that we have the idea of how queue works, we can define its trait:



#[async_trait]
pub trait Queue: Send + Sync {
    type JobHandle: JobHandle;
    /// Schedule a job to run at the future time.
    async fn schedule_at<J>(
        &self,
        payload: J::Payload,
        scheduled_at: DateTime,
    ) -> Result<Xid, QueueError>
    where
        J: JobProcessor + 'static,
        J::Payload: Encode;
    /// Schedule a job to run next. Depending on queue backlog this may start running later than you expect.
    async fn schedule<J>(&self, payload: J::Payload) -> Result<Xid, QueueError>
    where
        J: JobProcessor + 'static,
        J::Payload: Encode,
    {
        self.schedule_at::<J>(payload, Utc::now()).await
    }

    /// Schedule a job to run at the future time relative to now.
    async fn schedule_in<J>(
        &self,
        payload: J::Payload,
        scheduled_in: Duration,
    ) -> Result<Xid, QueueError>
    where
        J: JobProcessor + 'static,
        J::Payload: Encode,
    {
        let when = Utc::now() + scheduled_in;
        self.schedule_at::<J>(payload, when).await
    }

    /// Pool queue, implementation should not wait for next job, if there nothing return `Ok(None)`.
    async fn poll_next_with_instant(
        &self,
        job_types: &[&str],
        time: DateTime,
    ) -> Result<Option<Self::JobHandle>, QueueError>;

    /// Pool queue, implementation should not wait for next job, if there nothing return `Ok(None)`.
    async fn poll_next(&self, job_types: &[&str]) -> Result<Option<Self::JobHandle>, QueueError> {
        self.poll_next_with_instant(job_types, Utc::now()).await
    }

    /// Await next job. Default implementation polls the queue with defined interval until there is something.
    async fn next(
        &self,
        job_types: &[&str],
        interval: Duration,
    ) -> Result<Self::JobHandle, QueueError> {
        let duration = interval
            .to_std()
            .map_err(|_| QueueError::InvalidInterval(interval))?;
        let mut interval = tokio::time::interval(duration);
        loop {
            interval.tick().await;
            let job = self.poll_next(job_types).await?;
            if let Some(job) = job {
                break Ok(job);
            }
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

This trait designed to work only with jobs that can be handled by RunnerRouter. This trait provides convenient method to scheduling jobs: schedule for scheduling job right now, schedule_in for scheduling job in the future (e.g., in 5 minutes), schedule_at for scheduling job at the specific time (e.g., Mon, 16 May 2022 21:02:35 GMT). Methods that implementation needs to implement are: schedule_at and poll_next_with_instant.

schedule_at is self-explanatory: these methods adds a job to the queue to run at a specific time.

poll_next_with_instant is a bit more complicated. It takes a list of job types and time as arguments. time is used to determine what should be treated as "now" when queue polled. Using time as an input is very helpful because it allows time travel in tests. The implementation should check the queue and return a JobHandle to a job if there is anything in the queue.

There is also next function that in a loop polls the queue, if it's empty, it sleeps a specified interval and polls again until there is something. This is just a default implementation, more specific implementation can use combination of sleeping and external wake-ups (e.g. NOTIFY from PostgreSQL or SUBSCRIBE from Redis).

JobHandle

You might have already noticed that most of the job lifecycle handled by some JobHandle. What is it? Well…it's an implementation specific way of handling job lifecycle. Queue only two jobs: sending jobs and polling for jobs. Queue here is an interface to a queue that exists somewhere, rather the queue itself.

JobHandle trait looks like this:



#[async_trait]
pub trait JobHandle: Send + Sync {
    // Get job id
    fn id(&self) -> Xid;
    // Get Job type
    fn job_type(&self) -> &str;
    // Get job payload.
    fn payload(&self) -> Bytes;
    // How many times this job has been retried already.
    fn retries(&self) -> u32;
    // Mark the job as completed successfully.
    async fn complete(mut self) -> Result<(), QueueError>;
    // Mark the job as failed
    async fn fail(mut self) -> Result<(), QueueError>;
    // Move the job to dead queue.
    async fn dead_queue(mut self) -> Result<(), QueueError>;
}


Enter fullscreen mode Exit fullscreen mode

Everything past "job checked out of the queue" in job lifecycle is handled within its implementation.

Processing the Queue

Extending RunnerRouter

Now that we have an interface to interact with our queue, we can extend RunnerRouter to work with it:



impl RunnerRouter {
    pub async fn listen<Q, QR>(&self, queue: Q, poll_interval: Duration)
    where
        Q: AsRef<QR>,
        QR: Queue,
    {
        let job_types = self.types();
        loop {
            match queue.as_ref().next(&job_types, poll_interval).await {
                Ok(handle) => match self.process(handle).await {
                    Ok(_) => {}
                    Err(RunnerError::QueueError(e)) => handle_queue_error(e).await,
                    Err(RunnerError::UnknownJobType(name)) => {
                        tracing::error!("Unknown job type: {}", name)
                    }
                },
                Err(e) => {
                    handle_queue_error(e).await;
                }
            }
        }
    }
}
async fn handle_queue_error(error: QueueError) {
    tracing::error!("Encountered QueueError: {}", error);
    tracing::warn!("Suspending worker for 5 seconds");
    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}


Enter fullscreen mode Exit fullscreen mode

In a loop, we poll the queue and handle the incoming jobs. If we encounter a QueueError during polling, then worker goes into a short sleep. If we encounter a RunnerError, then we just log it and work on the next job.

Putting it all together

Technically, we can just create a queue, pass it to RunnerRouter::listen, and call it a day. The only issue is that it's processing a single job at a time. In some cases, that's enough. But we want to go fast.

Full disclaimer - the following is a naive version of a job runner. It works, but has some limitations.



const JITTER_INTERVAL_MS: [i64; 10] = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34];

pub struct JobRunner<Q>
where
    Q: Queue,
{
    queue: Arc<Q>,
    processor: Arc<RunnerRouter>,
    semaphore: Arc<Semaphore>,
}

impl<Q> JobRunner<Q>
where
    Q: Queue + 'static,
{
    pub fn new(queue: Q, processor: RunnerRouter, concurrency: usize) -> Self {
        Self {
            queue: Arc::new(queue),
            processor: Arc::new(processor),
            semaphore: Arc::new(Semaphore::new(concurrency)),
        }
    }
}

impl<Q> JobRunner<Q>
where
    Q: Queue + 'static,
{
    pub async fn run(&mut self, interval: chrono::Duration) -> Result<(), QueueError> {
        loop {
            let semaphore = self.semaphore.clone();
            let permit = semaphore
                .acquire_owned()
                .await
                .context("Semaphore closed while running")?;
            let queue = self.queue.clone();
            let processor = self.processor.clone();
            tokio::spawn(async move {
                let _permit = permit;
                let queue = queue;
                let processor = processor;
                let interval = interval + get_random_jitter();
                processor.listen(queue, interval).await;
            });
        }
    }
}

fn get_random_jitter() -> chrono::Duration {
    JITTER_INTERVAL_MS
        .choose(&mut rand::thread_rng())
        .map(|ms| chrono::Duration::milliseconds(*ms))
        .unwrap_or_else(|| chrono::Duration::milliseconds(5))
}



Enter fullscreen mode Exit fullscreen mode

First, let's talk about the JobRunner:



pub struct JobRunner<Q>
where
    Q: Queue,
{
    queue: Arc<Q>,
    processor: Arc<RunnerRouter>,
    semaphore: Arc<Semaphore>,
}

impl<Q> JobRunner<Q>
where
    Q: Queue + 'static,
{
    pub fn new(queue: Q, processor: RunnerRouter, concurrency: usize) -> Self {
        Self {
            queue: Arc::new(queue),
            processor: Arc::new(processor),
            semaphore: Arc::new(Semaphore::new(concurrency)),
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

We already know why we need the first two, but what about Semaphore? Semaphore
is a synchronization primitive that limits concurrent access to a resource.

Essentially, it's a metered highway ramp for your program. We're going to utilize it to limit how many jobs could be in flight at any given moment.

Generally, it's a bad idea to have N things hammer a resource at the same time, so we're going to introduce a random-ish jitter:



const JITTER_INTERVAL_MS: [i64; 10] = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34];

fn get_random_jitter() -> chrono::Duration {
    JITTER_INTERVAL_MS
        .choose(&mut rand::thread_rng())
        .map(|ms| chrono::Duration::milliseconds(*ms))
        .unwrap_or_else(|| chrono::Duration::milliseconds(5))
}


Enter fullscreen mode Exit fullscreen mode

It doesn't have to be a Fibonacci series, but using this series allows for some pretty unique patterns.

Now, the job begins processing. First, we start a loop. Then we acquire a "permit" from our semaphore:



  let semaphore = self.semaphore.clone();
  let permit = semaphore
      .acquire_owned()
      .await
      .context("Semaphore closed while running")?;



Enter fullscreen mode Exit fullscreen mode

The await will pause the loop until the permit becomes available. After, we clone our Queue and RunnerRouter:



  let queue = self.queue.clone();
  let processor = self.processor.clone();


Enter fullscreen mode Exit fullscreen mode

Finally, we start polling our queue and process incoming job:



  tokio::spawn(async move {
      let _permit = permit;
      let queue = queue;
      let processor = processor;
      let interval = interval + get_random_jitter();
      processor.listen(queue, interval).await;
  });


Enter fullscreen mode Exit fullscreen mode

We spawn a new future into the tokio reactor and that's it. The rest is implemented in RunnerRouter. When the job completes, the permit will return to our semaphore and the cycle repeats.

Possible Improvements

Dynamic concurrency

Currently, concurrency is set in stone and can't be changed after JobRunner started its work. That means with concurrency of 50, there will be 50 tasks constantly asking a queue for new jobs. I want concurrency to shrink and grow depending on the current load.

Shrinking and growing concurrency can be done by ballooning semaphore permits. A task can check queue size and acquire or release permits based on that. It's a bit harder to reclaim permits that are already taken by a waiting worker, but it is possible and may require changing the interface of Queue trait.

Hooks

There is very little insight into what is happening in the queue scheduler and worker process. In the future, I''ll add a way to get notified when certain events happen. Hooks be used to drive dynamic concurrency.

Batching

Instead of polling jobs one by one, I want to poll N jobs at once. Batching is helpful for a situation where we have many small jobs constantly sent to the queue.

Multiple Queues

Currently, jobs are split by type and that's the only way to have different concurrency and/or poll interval. This is fine for many use cases, but occasionally, we want to have "hot" and "cold" queues that can have the same types of jobs. For example, you're working on a SaaS product with a free trial where tasks from free tier clients can go to a slow queue, while paid clients go to fast queue.

Queue Insights

Not being able to see queue state isn't a big deal if your jobs never fail (spoiler alert: jobs do fail). Our Queue trait can be extended to provide some insight. Namely, how many jobs in the queue, how many jobs should be executed right now, and what is going on in the dead queue.

Up Next

So far, we only covered generic items like traits and glue between generic components. Part 2 is going to cover SQLite implementation of Queue trait and benchmarking.

Follow us here on DEV, Twitter, and LinkedIn for updates on Aide-De-Camp.

Aide-De-Camp is a product of Zero Assumptions.

Top comments (0)