This is the second installment in our series, which runs you through creating durable job queues using Zero Assumption's open source Rust solution, Aide-De-Camp.
In part one of our series, we went over the Rust traits that we created for the job queue. In this post, we're implementing the traits we talked about in part one.
Aide-De-Camp Code
Code from this series is available in our open source Aide-De-Camp GitHub repository.
The Crate is published to crates.io for everyone to use.
SQLite Schema
SQLite is just the first backend we've implemented. More backend implementations to come.
Aide-De-Camp uses SQLite to persist the queue. Let's look at the schema.
Jobs go into the main queue, adc_queue
. Dead letter jobs, or jobs that couldn't be completed, go into the adc_dead_queue
, where you can investigate them later.
CREATE TABLE IF NOT EXISTS adc_queue (
jid TEXT PRIMARY KEY,
queue TEXT NOT NULL default 'default',
job_type TEXT not null,
payload blob not null,
retries int not null default 0,
scheduled_at INTEGER not null,
started_at INTEGER,
enqueued_at INTEGER not null default (strftime('%s', 'now'))
);
CREATE TABLE IF NOT EXISTS adc_dead_queue (
jid TEXT PRIMARY KEY,
queue TEXT NOT NULL,
job_type TEXT not null,
payload blob not null,
retries int not null,
scheduled_at INTEGER not null,
started_at INTEGER not null,
enqueued_at INTEGER not null,
died_at INTEGER not null default (strftime('%s', 'now'))
);
CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue (
scheduled_at asc,
started_at asc,
queue,
job_type
);
Queue Implementation
Next, we implement the queue using Rust. The following code defines the connection pool and bincode configuration.
We only need to implement two methods, scheduled_at
and poll_next_with_instant
.
#[derive(Clone)]
pub struct SqliteQueue {
pool: SqlitePool,
bincode_config: bincode::config::Configuration,
}
scheduled_at
To schedule a job, all we need to do is serialize the payload, create a new JID (we use XID for JIDs), and insert both into the adc_queue
table.
#[instrument(skip_all, err, ret, fields(job_type = J::name(), payload_size))]
async fn schedule_at<J>(
&self,
payload: J::Payload,
scheduled_at: DateTime,
) -> Result<Xid, QueueError>
where
J: JobProcessor + 'static,
J::Payload: Encode,
{
let payload = bincode::encode_to_vec(&payload, self.bincode_config)?;
let jid = new_xid();
let jid_string = jid.to_string();
let job_type = J::name();
tracing::Span::current().record("payload_size", &payload.len());
sqlx::query!(
"INSERT INTO adc_queue (jid,job_type,payload,scheduled_at) VALUES (?1,?2,?3,?4)",
jid_string,
job_type,
payload,
scheduled_at
)
.execute(&self.pool)
.await
.context("Failed to add job to the queue")?;
Ok(jid)
}
poll_next_with_instant
Since SQLite doesn't have built-in notifications for new jobs queued, we poll the queue periodically. In our UPDATE
statement, we mark the row as in progress, then use the RETURNING *
clause to retrieve job metadata.
The following is an excerpt from our 0.1.0
tag queue.rs
source code. In our next posts, we'll review the modifications we've made to poll_next_with_instant
.
UPDATE adc_queue SET started_at=?, retries=retries+1 WHERE jid IN (SELECT job from adc_queue WHERE started_at IS NULL AND queue=? AND scheduled_at <= ? AND job_type IN (?,?) LIMIT 1) RETURNING *;
JobHandle Trait Implementation
Since the entire job lifecycle is within the JobHandle trait, we have to implement that trait too.
pub struct SqliteJobHandle {
pool: SqlitePool,
row: JobRow,
}
#[async_trait]
impl JobHandle for SqliteJobHandle {
fn id(&self) -> Xid {
self.row.jid
}
fn job_type(&self) -> &str {
&self.row.job_type
}
fn payload(&self) -> Bytes {
self.row.payload.clone()
}
fn retries(&self) -> u32 {
self.row.retries
}
async fn complete(mut self) -> Result<(), QueueError> {
let jid = self.row.jid.to_string();
sqlx::query!("DELETE FROM adc_queue where jid = ?1", jid)
.execute(&self.pool)
.await
.context("Failed to mark job as completed")?;
Ok(())
}
async fn fail(mut self) -> Result<(), QueueError> {
let jid = self.row.jid.to_string();
sqlx::query!("UPDATE adc_queue SET started_at=null WHERE jid = ?1", jid)
.execute(&self.pool)
.await
.context("Failed to mark job as failed")?;
Ok(())
}
async fn dead_queue(mut self) -> Result<(), QueueError> {
let jid = self.row.jid.to_string();
let retries = self.row.retries;
let job_type = self.row.job_type.clone();
let payload = self.row.payload.as_ref();
let scheduled_at = self.row.scheduled_at;
let enqueued_at = self.row.enqueued_at;
let mut tx = self
.pool
.begin()
.await
.context("Failed to start transaction")?;
sqlx::query!("DELETE FROM adc_queue WHERE jid = ?1", jid)
.execute(&mut tx)
.await
.context("Failed to delete job from the queue")?;
sqlx::query!("INSERT INTO adc_dead_queue (jid, job_type, payload, retries, scheduled_at, enqueued_at) VALUES (?1, ?2, ?3, ?4,?5,?6)",
jid,
job_type,
payload,
retries,
scheduled_at,
enqueued_at)
.execute(&mut tx).await
.context("Failed to move job to dead queue")?;
Ok(())
}
}
Up Next
So far, we've gone over creating traits and implementing them. In parts 3 and 4, we'll review how we modified our code based on community feature requests.
Follow us here on DEV, Twitter, and LinkedIn for updates on Aide-De-Camp.
Aide-De-Camp is a product of Zero Assumptions.
Top comments (0)