<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Falon Darville</title>
    <description>The latest articles on DEV Community by Falon Darville (@falondarville).</description>
    <link>https://dev.to/falondarville</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F945086%2F105da3db-c82a-4c8d-8e59-86c93cb07839.png</url>
      <title>DEV Community: Falon Darville</title>
      <link>https://dev.to/falondarville</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/falondarville"/>
    <language>en</language>
    <item>
      <title>Simple IT: Set Up and Ongoing Maintenance of Zero Assumptions</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Sat, 29 Apr 2023 20:35:47 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/simple-it-set-up-and-ongoing-maintenance-of-zero-assumptions-5381</link>
      <guid>https://dev.to/zeroassumptions/simple-it-set-up-and-ongoing-maintenance-of-zero-assumptions-5381</guid>
      <description>&lt;p&gt;Zero Assumptions is easy to set up and easy to maintain. A very small team or even a single person is enough to get Zero Assumptions up and running. The benefit with our solution is that the cost is very low - and we've considered the in-house costs beyond the payment for our product. It's low cost because it won't take a whole team to maintain.&lt;/p&gt;

&lt;p&gt;Think of Zero Assumptions as barely perceptibly, and immensely impactful. You likely already have someone on staff who can set up and maintain it. This minimally invasive monitoring tool can be launched in a day given that servers are ready to funnel information through.&lt;/p&gt;

&lt;h2&gt;
  
  
  Quick Set Up
&lt;/h2&gt;

&lt;p&gt;Zero Assumptions can be installed onto your own company server in less than 5 minutes using our &lt;a href="https://docs.zeroassumptions.dev/docs/installation"&gt;installation instructions&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;After installation, the &lt;a href="https://docs.zeroassumptions.dev/docs/configuration"&gt;configuration file&lt;/a&gt; is created with all of the settings that the company will use. Alternatively, we offer many settings to be modified via the UI. &lt;/p&gt;

&lt;p&gt;Configuration enables the control plane to start, and sets the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Server locations&lt;/li&gt;
&lt;li&gt;User authentication&lt;/li&gt;
&lt;li&gt;Email settings for password recovery and user invitations&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Easy Ongoing Maintenance
&lt;/h2&gt;

&lt;p&gt;Zero Assumptions &lt;a href="https://docs.zeroassumptions.dev/docs/ongoing-management"&gt;ongoing maintenance&lt;/a&gt; is minimal. &lt;/p&gt;

&lt;p&gt;Maintainers control the following: users, devices, groups, resources, nodes, and audit log access.&lt;/p&gt;

&lt;p&gt;If maintainers want to go a step further, they can also automate the above controls. &lt;/p&gt;

&lt;h2&gt;
  
  
  Stay Connected to Zero Assumptions
&lt;/h2&gt;

&lt;p&gt;Follow us here on DEV, &lt;a href="https://twitter.com/ZeroAssumptions"&gt;Twitter&lt;/a&gt;, and &lt;a href="https://www.linkedin.com/company/zero-assumptions"&gt;LinkedIn&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Read more about &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumptions&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>rust</category>
      <category>zeroassumptions</category>
      <category>zerotrust</category>
      <category>networking</category>
    </item>
    <item>
      <title>Why Companies Need Audit Logs Now</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Mon, 10 Apr 2023 22:39:18 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/why-companies-need-audit-logs-now-4c6i</link>
      <guid>https://dev.to/zeroassumptions/why-companies-need-audit-logs-now-4c6i</guid>
      <description>&lt;p&gt;As we're doing more and more engineering work from different places, "the office" is no longer a static physical location. It could be home at the kitchen counter, at the airport, at a coffee shop, or even back in your childhood home. With this distribution of the workforce, companies have a duty to their stakeholders and to compliance protocols to keep audit logs for engineer activities. &lt;/p&gt;

&lt;p&gt;In this article, we'll go over the ways in which Zero Assumptions provides security and data for unexpected activities performed by engineers.&lt;/p&gt;

&lt;h2&gt;
  
  
  Audit logs as insurance and assurance
&lt;/h2&gt;

&lt;p&gt;Audit logs provide an insurance for tracking down errors and unexpected activities being performed on company databases and servers. When something unexpected happens, designated operators only need to log in to view audit logs and interpret the logs. From there, they can report out to stakeholders using real data rather than perceptions and assumptions. &lt;/p&gt;

&lt;p&gt;Because Zero Assumptions is an on-prem solution, companies determine where to store logs and for how long. This means that companies have a high level of ownership over the compliance standards that they must uphold. Zero Assumptions gives that freedom back to companies, rather than dictating how data is stored. &lt;/p&gt;

&lt;p&gt;With audit logs and control over data storage, companies can be assured that they have a solution in place in case of bad actors, breaches, and human error. &lt;/p&gt;

&lt;h2&gt;
  
  
  Bonus use cases for audit logs
&lt;/h2&gt;

&lt;p&gt;Audit logs can do a lot more than provide a written record for investigation of unexpected activities. Here are some additional ideas for how to use audit log data:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Reveal inefficient database requests&lt;/li&gt;
&lt;li&gt;Provide aggregation of activity for database and server cost analysis&lt;/li&gt;
&lt;li&gt;Compile work hours to determine engineers' work habits (can help with exploring experimental programs like the 4-day work week, etc.)&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Stay Connected to Zero Assumptions
&lt;/h2&gt;

&lt;p&gt;Follow us here on DEV, &lt;a href="https://twitter.com/ZeroAssumptions"&gt;Twitter&lt;/a&gt;, and &lt;a href="https://www.linkedin.com/company/zero-assumptions"&gt;LinkedIn&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Read more about &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumptions&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>rust</category>
      <category>zeroassumptions</category>
      <category>zerotrust</category>
      <category>networking</category>
    </item>
    <item>
      <title>Secure Data with On-Prem Storage of Engineer Activity</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Sun, 26 Mar 2023 00:06:09 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/secure-data-with-on-prem-storage-of-engineer-activity-58ef</link>
      <guid>https://dev.to/zeroassumptions/secure-data-with-on-prem-storage-of-engineer-activity-58ef</guid>
      <description>&lt;p&gt;Zero Assumptions offers companies a secure way to get and store their engineers’ activities on databases and servers. For companies, this means peace of mind over all actions performed on company information because there’s traceability. &lt;/p&gt;

&lt;p&gt;Each action is attributed to a specific user device and location. In addition, we link time of action, access location, machine signature, and other defining metadata. &lt;/p&gt;

&lt;h2&gt;
  
  
  Zero Assumptions is low maintenance
&lt;/h2&gt;

&lt;p&gt;Zero Assumptions doesn’t require significant staffing to roll out or upkeep. An IT role individual fills out the configuration file, and deploys the configuration onto an on-prem server. &lt;/p&gt;

&lt;p&gt;Our product is easy to manage and maintain once it’s deployed. Ongoing management tasks include user management, device management, group management, resource management, and node management. &lt;/p&gt;

&lt;h2&gt;
  
  
  Zero Assumptions leverages audit logs
&lt;/h2&gt;

&lt;p&gt;Activity is printed to an audit log that’s available at any time. Audit logs can also be exported to another location for storage purposes. &lt;/p&gt;

&lt;p&gt;Audit logs can be used for internal and external auditing to verify engineer actions, and trace back root causes for issues that arise in a company's system.&lt;/p&gt;

&lt;h2&gt;
  
  
  Zero Assumptions helps companies be compliant
&lt;/h2&gt;

&lt;p&gt;Companies have full control over their data, since Zero Assumptions doesn’t require cloud storage. On-prem works especially well if your company deals with regulated and sensitive data that you need to retain sole access to. &lt;/p&gt;

&lt;p&gt;Zero Assumptions helps companies to be SOC2 and ISO 27001 compliant by providing tremendous flexibility. &lt;/p&gt;

&lt;h2&gt;
  
  
  Stay Connected to Zero Assumptions
&lt;/h2&gt;

&lt;p&gt;Follow us here on DEV, &lt;a href="https://twitter.com/ZeroAssumptions"&gt;Twitter&lt;/a&gt;, and &lt;a href="https://www.linkedin.com/company/zero-assumptions"&gt;LinkedIn&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Read more about &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumptions&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>zeroassumptions</category>
      <category>rust</category>
      <category>zerotrust</category>
      <category>networking</category>
    </item>
    <item>
      <title>Zero Assumption's Proxy Architecture</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Tue, 07 Feb 2023 02:02:25 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/zero-assumptions-proxy-architecture-8ok</link>
      <guid>https://dev.to/zeroassumptions/zero-assumptions-proxy-architecture-8ok</guid>
      <description>&lt;p&gt;Zero Assumptions (ZA) is a simple to configure proxy that's minimal invasive to engineers, and maximally beneficial to business. After on-prem installation, Zero Assumptions requires engineers to log in, and it monitors their activity within a private network - all in the background while they work. For businesses, there's no need to set up complex logins as ZA can leverage existing identity providers. &lt;/p&gt;

&lt;p&gt;In this article, we'll go into Zero Assumption's logical proxy architecture. This architecture won't impact configuration, though it'll help businesses understand what's happening behind the scenes. We use a control plane and data plane separation, though ZA can be run from a single virtual machine. The two planes are mainly used for ease of maintenance and build. &lt;/p&gt;

&lt;p&gt;Access our &lt;a href="https://docs.zeroassumptions.dev/docs/design-principles#architecture" rel="noopener noreferrer"&gt;Design Principles documentation&lt;/a&gt; for more information.&lt;/p&gt;

&lt;h2&gt;
  
  
  Control Plane
&lt;/h2&gt;

&lt;p&gt;The Zero Assumptions control plane can be considered the brain of the product. Here, information is inputted, retained, and used. The control plane holds definitions for identity management, including the business's preferred, ZA-compatible identity provider. &lt;/p&gt;

&lt;p&gt;The control plane also holds ACLs (Access Control Lists), which define how engineers can interact with the software within scope of Zero Assumptions monitoring. Admins update ACLs through the configuration file or admin UI.&lt;/p&gt;

&lt;p&gt;Now that we've mentioned configuration, there's a distinction between the &lt;a href="https://docs.zeroassumptions.dev/docs/configuration" rel="noopener noreferrer"&gt;configuration file&lt;/a&gt; written in UCL or JSON and data plane node configuration. The data plane node configuration defines how network traffic is routed, while the admin configuration defines how the product is configured for the engineer end users.&lt;/p&gt;

&lt;h2&gt;
  
  
  Data Plane
&lt;/h2&gt;

&lt;p&gt;While the control plane handles the logic of the application, the data plane runs the overlay network's traffic. The data plane is low maintenance. Zero Assumptions has opted to keep it simple so businesses don't have to bother with continuously handling. Businesses using ZA are responsible for connecting the control plane to the data plane, and periodically &lt;br&gt;
installing updates.&lt;/p&gt;

&lt;p&gt;Altogether, Zero Assumption's proxy architecture is separated for the sake of keeping the codebase clean and easy to maintain on our end. Businesses benefit from this simplicity because they don't have to spend significant time learning how to use our product to get it up and running. &lt;/p&gt;

&lt;h2&gt;
  
  
  Stay Connected to Zero Assumptions
&lt;/h2&gt;

&lt;p&gt;Follow us here on DEV, &lt;a href="https://twitter.com/ZeroAssumptions" rel="noopener noreferrer"&gt;Twitter&lt;/a&gt;, and &lt;a href="https://www.linkedin.com/company/zero-assumptions" rel="noopener noreferrer"&gt;LinkedIn&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Read more about &lt;a href="https://zeroassumptions.dev/" rel="noopener noreferrer"&gt;Zero Assumptions&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>welcome</category>
    </item>
    <item>
      <title>Zero Assumption’s Backbone of Rust</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Tue, 10 Jan 2023 00:57:38 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/zero-assumptions-backbone-of-rust-369c</link>
      <guid>https://dev.to/zeroassumptions/zero-assumptions-backbone-of-rust-369c</guid>
      <description>&lt;p&gt;Zero Assumptions is a zero trust networking solution you can use with your existing Identity Provider. All of your data is stored on-prem, so you control it all. &lt;/p&gt;

&lt;p&gt;While you won't have to write any foundational code to drive the functionality of Zero Assumptions for your organization, this article explores our usage of the &lt;a href="https://www.rust-lang.org/" rel="noopener noreferrer"&gt;Rust&lt;/a&gt; language. The purpose being that you'll gain a stronger understanding of the mechanisms that drive Zero Assumptions, and why our product is secure, safe, and performant. &lt;/p&gt;

&lt;p&gt;We've built our solution on Rust for a number of reasons including its strong community support, packages on crates.io, and future-proofing. &lt;/p&gt;

&lt;h2&gt;
  
  
  Decrease Runtime Errors
&lt;/h2&gt;

&lt;p&gt;Given that Zero Assumptions is on-prem first. Given the nature of our product, if the software goes down, then people can’t access their private resources. We have to send product that is as solid as possible at runtime. &lt;/p&gt;

&lt;p&gt;Rust is a memory-safe language that eliminates an entire class of errors without sacrificing performance. Specifically, in safe mode, Absence of Null prevents another class of bugs occurring in runtime.&lt;/p&gt;

&lt;h2&gt;
  
  
  Consistent Quality and Longevity
&lt;/h2&gt;

&lt;p&gt;Rust contributors intentionally keep the language backwards compatible. This means that when Rust changes, Zero Assumptions won't introduce breaking changes due specifically to using a newer version of Rust. &lt;/p&gt;

&lt;p&gt;When writing Rust itself, the language has no additional usage "cost" for abstractions. This will allow us to continue to iterate quickly without slowing down the end product.&lt;/p&gt;

&lt;p&gt;It's also worth mentioning that Rust has robust and clear documentation, which greatly adds to the overall quality of the language. &lt;/p&gt;

&lt;h2&gt;
  
  
  Security
&lt;/h2&gt;

&lt;p&gt;When Rust compiles, it looks at memory compilation, which can prevent memory management problems. Rust also allows for more granular control over memory, so we can highly customize where we want to.&lt;/p&gt;

&lt;p&gt;Zero Assumptions benefits from its Rust foundation on many levels. And while our users don't need to know any Rust, they'll reap all the rewards of Zero Assumptions being built on Rust. &lt;/p&gt;

&lt;p&gt;Follow Zero Assumptions here on &lt;a href="https://dev.to/zeroassumptions"&gt;DEV&lt;/a&gt;, &lt;a href="https://www.linkedin.com/company/zero-assumptions/" rel="noopener noreferrer"&gt;LinkedIn&lt;/a&gt;, and &lt;a href="https://twitter.com/zeroassumptions" rel="noopener noreferrer"&gt;Twitter&lt;/a&gt; to stay up-to-date on our product. We're launching this year, 2023, and can't wait for you to use Zero Assumptions.&lt;/p&gt;

</description>
      <category>discuss</category>
      <category>ai</category>
    </item>
    <item>
      <title>Build a Job Queue with Rust Using Aide-De-Camp (Part 4)</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Tue, 13 Dec 2022 02:02:01 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/build-a-job-queue-with-rust-using-aide-de-camp-part-4-415p</link>
      <guid>https://dev.to/zeroassumptions/build-a-job-queue-with-rust-using-aide-de-camp-part-4-415p</guid>
      <description>&lt;p&gt;This is the fourth and final installment in our series. We continue to run you through creating durable job queues using &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumption&lt;/a&gt;'s open source solution, &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp"&gt;Aide-De-Camp&lt;/a&gt;, which is built using Rust. &lt;/p&gt;

&lt;p&gt;In this post, we'll build cron job support into Aide-De-Camp.&lt;/p&gt;

&lt;h2&gt;
  
  
  Aide-De-Camp Code
&lt;/h2&gt;

&lt;p&gt;Code from this series is available in &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp"&gt;our open source Aide-De-Camp GitHub repository&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;The Crate is published to &lt;a href="https://crates.io/crates/aide-de-camp"&gt;crates.io&lt;/a&gt; for everyone to use.&lt;/p&gt;

&lt;h2&gt;
  
  
  Challenges of Making a Cron Job Scheduler
&lt;/h2&gt;

&lt;p&gt;Making a persistent and distributed cron job scheduler is easier said than done. There are many cases to account for that aren't immediately apparent. For example, in a distributed environment, there can only be one scheduler that keeps track of what has and hasn't been scheduled. Another example is the case for smart decision making. If you want to run a scheduler at midnight and the scheduler is off at midnight.&lt;/p&gt;

&lt;p&gt;It's straightforward when you have a &lt;a href="https://geko.cloud/en/kubernetes-cronjobs-a-deeper-look/"&gt;cluster with a leader with distributed storage&lt;/a&gt;. It's different when you're working with just a job queue and job processor.&lt;/p&gt;

&lt;p&gt;Let's break down how a cron job scheduler functions.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KRuBkbho--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/t8mk4j6lzf3x6r2helep.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KRuBkbho--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/t8mk4j6lzf3x6r2helep.png" alt="Image description" width="760" height="550"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;From a high level, a cron job scheduler performs the following in a continuous loop.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Check if there is a job to run&lt;/li&gt;
&lt;li&gt;If there is a job to run, run that job&lt;/li&gt;
&lt;li&gt;Sleep for a designated period of time&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Leader Election
&lt;/h2&gt;

&lt;p&gt;If you have a single leader instance, this instance is always a leader because that's the only possible outcome of any election. What if you have two or more? Well, if instances don't coordinate between each other then your jobs will get scheduled N times every time, where N is number of schedulers.&lt;/p&gt;

&lt;p&gt;One solution can be to have a single scheduler. However, the goal of Aide-De-Camp is to be "serverless" in that there isn't a special server running somewhere. &lt;/p&gt;

&lt;p&gt;Aide-De-Camp won't handle the leader election process. This will be done by the crate consumer.&lt;/p&gt;

&lt;p&gt;To Aide-De-Camp, the scheduler will take &lt;a href="https://docs.rs/tokio/latest/tokio/sync/watch/index.html"&gt;&lt;code&gt;watch::Receiver&amp;lt;IsLeader&amp;gt;&lt;/code&gt;&lt;/a&gt; and check if the current instance is a leader before proceeding. &lt;/p&gt;

&lt;h2&gt;
  
  
  Cron Job Storage for Scheduling
&lt;/h2&gt;

&lt;p&gt;We use cron job storage to keep track of what jobs we've scheduled and hold the following.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;When a new job should be scheduled&lt;/li&gt;
&lt;li&gt;When the last job was scheduled&lt;/li&gt;
&lt;li&gt;Whether or not a job is already schedule during the requested job time&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Storage enables us to time our cron jobs in a most sophisticated way than checking the schedule on a +/- 1 second cadence, or something similar. Scheduling a short cadenced check would saturate CPU if many tasks were running.&lt;/p&gt;

&lt;p&gt;Let's look at in-memory storage for scheduling, which has some downsides. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;With more than one scheduler, all schedulers must agree on what schedules exist &lt;/li&gt;
&lt;li&gt;We can lose some ticks:

&lt;ul&gt;
&lt;li&gt;Imagine there is a single scheduler&lt;/li&gt;
&lt;li&gt;There is a schedule that should fire at T&lt;/li&gt;
&lt;li&gt;Instance goes offline at &lt;code&gt;T - x&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Instance comes back at &lt;code&gt;T + y&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Scheduler calculated a new &lt;code&gt;T&lt;/code&gt; that is the next tick&lt;/li&gt;
&lt;li&gt;This is fine if this task is scheduled often, but for another task, this will create issues&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For Aide-De-Camp, we're going to implement in-memory store and SQLite store for now.&lt;/p&gt;

&lt;p&gt;Another reason to have storage is to keep track what we've already scheduled. This is a bit tricker because we want to support multiple strategies to handle overlapping jobs:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Schedule anyway&lt;/li&gt;
&lt;li&gt;Wait for current job to finish&lt;/li&gt;
&lt;li&gt;Skip&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Up Next
&lt;/h2&gt;

&lt;p&gt;This post wraps up our first Aide-De-Camp series! &lt;/p&gt;

&lt;p&gt;Follow us here on DEV, &lt;a href="https://twitter.com/ZeroAssumptions"&gt;Twitter&lt;/a&gt;, and &lt;a href="https://www.linkedin.com/company/zero-assumptions"&gt;LinkedIn&lt;/a&gt; for updates on &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp"&gt;Aide-De-Camp&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Aide-De-Camp is a product of &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumptions&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>zeroassumptions</category>
      <category>rust</category>
      <category>zerotrust</category>
      <category>aidedecamp</category>
    </item>
    <item>
      <title>Build a Job Queue with Rust Using Aide-De-Camp (Part 3)</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Tue, 06 Dec 2022 00:23:34 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/build-a-job-queue-with-rust-using-aide-de-camp-part-3-2832</link>
      <guid>https://dev.to/zeroassumptions/build-a-job-queue-with-rust-using-aide-de-camp-part-3-2832</guid>
      <description>&lt;p&gt;This is the third installment in our series, where we continue to run you through creating durable job queues using &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumption&lt;/a&gt;'s open source solution, &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp"&gt;Aide-De-Camp&lt;/a&gt;, which is built using Rust. &lt;/p&gt;

&lt;p&gt;In this post, we'll go over feature requests we've received and how we created those features. &lt;/p&gt;

&lt;h2&gt;
  
  
  Aide-De-Camp Code
&lt;/h2&gt;

&lt;p&gt;Code from this series is available in &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp"&gt;our open source Aide-De-Camp GitHub repository&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;The Crate is published to &lt;a href="https://crates.io/crates/aide-de-camp"&gt;crates.io&lt;/a&gt; for everyone to use.&lt;/p&gt;

&lt;h2&gt;
  
  
  Feature Requests
&lt;/h2&gt;

&lt;p&gt;We'll address two feature requests we've implemented:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://github.com/ZeroAssumptions/aide-de-camp/issues/6"&gt;Cancel job by ID&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/ZeroAssumptions/aide-de-camp/issues/7"&gt;Add job priority&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;These two requests required changes to our public API, which created breaking changes. &lt;/p&gt;

&lt;p&gt;To submit your own feature request for Aide-De-Camp, open an Issue on our &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp/issues"&gt;open source GitHub repository&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Cancel Job By ID Feature
&lt;/h2&gt;

&lt;p&gt;In this Issue, we were asked to implement cancelling a job by providing the job ID. See the &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp/issues/6"&gt;Issue submitted on GitHub&lt;/a&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Cancel a Job That Hasn't Started
&lt;/h3&gt;

&lt;p&gt;In our implementation, you can only cancel a job when it has not yet started.&lt;/p&gt;

&lt;p&gt;A job ID is available once the job is submitted. We use this job ID value in a new &lt;code&gt;Queue&lt;/code&gt; trait. In a method of the &lt;code&gt;Queue&lt;/code&gt; trait, we do a couple things:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Check if the job has already started. If so, ignore the cancellation request.&lt;/li&gt;
&lt;li&gt;Check if the job ID does not exist. If so, return an error indicating that the job can't be found. &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If the job ID exists and the job has not started, the cancellation goes through. A payload for the cancelled job is returned when the job type is specified. If no job type is specified, then no payload returns but the job is still cancelled. You won't always be able to provide the job type as it's not always available.&lt;/p&gt;

&lt;p&gt;In the following code, you'll see two cancel job functions:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Cancel scheduled job using job id, and return no payload upon success&lt;/li&gt;
&lt;li&gt;Cancel scheduled job using job id and job type, and return payload upon success
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="cd"&gt;/// Cancel job that has been scheduled. Right now this will only cancel if the job hasn't started yet.&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;cancel_job&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Xid&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="cd"&gt;/// The same as [`cancel_job`](struct.cancel_job.html), but returns payload of canceled job.&lt;/span&gt;
&lt;span class="cd"&gt;/// If deserialization fails, then job won't be cancelled.&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="n"&gt;unschedule_job&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Xid&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt;
    &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decode&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To review the full job cancellation feature implementation, see &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp/pull/9/files"&gt;Aide-De-Camp feature #9&lt;/a&gt;.&lt;/p&gt;

&lt;h4&gt;
  
  
  Unit Testing for Solution
&lt;/h4&gt;

&lt;p&gt;With the above functionality, we tested against the following cases:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Job ID cannot be found&lt;/li&gt;
&lt;li&gt;Job was cancelled successfully with payload returned since the job type was included in the cancellation request&lt;/li&gt;
&lt;li&gt;Job was cancelled successfully without payload returned&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;job_type&lt;/code&gt; that does NOT match &lt;code&gt;jid&lt;/code&gt; (for method that returns payload)&lt;/li&gt;
&lt;li&gt;Cancelling the job with payload, but the payload couldn't be deserialized&lt;/li&gt;
&lt;li&gt;Cannot cancel a job that has already started&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Job Priority Feature
&lt;/h2&gt;

&lt;p&gt;In this Issue, we were asked to provide a job prioritization ability. See the &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp/issues/7"&gt;Issue submitted on GitHub&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Priority becomes important when there is worker exhaustion, or more jobs ready to be worked on than there are job processors. In that case, it's beneficial to run jobs with higher priority first even if they were submitted later than lower priority jobs.&lt;/p&gt;

&lt;p&gt;We chose to implement priority using an integer, i8 or the range -128 to 127. The higher the number, the higher the priority.&lt;/p&gt;

&lt;p&gt;Given this, the queue interface needed to be changed. We added a required argument &lt;code&gt;priority: &amp;lt;i8 value&amp;gt;&lt;/code&gt;. We recommend setting low priority jobs to &lt;code&gt;priority: 0&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;A SQL query looks at jobs in the queue and runs through the &lt;code&gt;priority&lt;/code&gt; values, ordering them from highest number to lowest number.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;adc_queue&lt;/span&gt; &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;started_at&lt;/span&gt; &lt;span class="k"&gt;IS&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;scheduled_at&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;job_type&lt;/span&gt; &lt;span class="k"&gt;IN&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;my_job&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;priority&lt;/span&gt; &lt;span class="k"&gt;DESC&lt;/span&gt; &lt;span class="k"&gt;LIMIT&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To review the full job prioritization feature implementation, see &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp/pull/10/files"&gt;Aide-De-Camp feature #10&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Up Next
&lt;/h2&gt;

&lt;p&gt;In this post, we explained two feature requests, job cancellation and job prioritization, and how we implemented them. In part 4, we'll go over building cron job support.&lt;/p&gt;

&lt;p&gt;Follow us here on DEV, &lt;a href="https://twitter.com/ZeroAssumptions"&gt;Twitter&lt;/a&gt;, and &lt;a href="https://www.linkedin.com/company/zero-assumptions"&gt;LinkedIn&lt;/a&gt; for updates on &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp"&gt;Aide-De-Camp&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Aide-De-Camp is a product of &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumptions&lt;/a&gt;.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Simplify Auditing with Zero Assumptions</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Tue, 29 Nov 2022 03:33:14 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/simplify-auditing-with-zero-assumptions-4g5k</link>
      <guid>https://dev.to/zeroassumptions/simplify-auditing-with-zero-assumptions-4g5k</guid>
      <description>&lt;p&gt;&lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumptions&lt;/a&gt; is a simple-to-configure access control that monitors engineers' work. Use your own Identity Provider (IDP) and build out your highly customizable configuration using our uncomplicated parameters.&lt;/p&gt;

&lt;p&gt;For engineers, Zero Assumptions is easy to use and low effort. They log into the IDP and connect to the desired resource. No slowdown for them, and no assumptions for you.&lt;/p&gt;

&lt;p&gt;Zero Assumptions purposefully enables organizations to create audit trails. It covers the missing authentication, authorization, and audit pieces. We provide the information that administrators need to hold their users accountable to their actions. &lt;/p&gt;

&lt;h2&gt;
  
  
  Password Deep Dive
&lt;/h2&gt;

&lt;p&gt;Password sharing means that multiple individuals use the same password to authenticate and perform tasks. It's bad practice. Let's get into some specific reasons why.&lt;/p&gt;

&lt;p&gt;The practice of sharing passwords poses many issues and complications, including the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Rotating shared passwords is a nightmare. There's no certainty around who is using the password, so some individuals may need to be tracked down (to share the new password with). Some individuals may get locked out and passwords may be shared over unreliable networks, opening the password up to being used by unintended individuals.&lt;/li&gt;
&lt;li&gt;Every time an individual off-boards or needs to lose access to an account, someone has to remember to change the password. It's easy to forget and leave the account open to unwanted access.&lt;/li&gt;
&lt;li&gt;It's easy to use a password manager to store and populate website passwords. It's more convoluted to use a password manager for databases and servers.&lt;/li&gt;
&lt;li&gt;Auditing actions on a shared password account is hard, and sometimes nearly impossible.&lt;/li&gt;
&lt;li&gt;If a password is leaked and precautions are taken to regain the account, then all users will be locked out. &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To prevent getting into the password trap, we recommend: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Don't wait until password sharing becomes a problem. Access your situation early to come up with an alternative.&lt;/li&gt;
&lt;li&gt;When determining if a solution is right for you, imagine it in scale. What's it going to be like to manage the solution for an exponential amount more of resources and people?&lt;/li&gt;
&lt;li&gt;Understand that more passwords is not the solution.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;At the end of the day, the purpose of a password is to provide access and associate that access with an identity. With Zero Assumptions, we're particularly interested in the auditing aspect, which is why shared passwords are never in the equation. Let's look at how we associate individuals to their actions.&lt;/p&gt;

&lt;h2&gt;
  
  
  Auditing and Individual Identities
&lt;/h2&gt;

&lt;p&gt;With shared passwords, audit trails cannot be trusted. Multiple people use the same credentials, so it's hard to tell who is doing what. Who's running a very long query in production? Who downloaded all of your customer's social security numbers on a Friday night? &lt;/p&gt;

&lt;p&gt;You need to be able to authenticate for accountability, and you can't do this without uniquely authenticating all users and/or all user devices. &lt;/p&gt;

&lt;p&gt;Being unable to identify who's performing certain actions also won't pass certifications like SOC Type-2 or PCI-DSS.&lt;/p&gt;

&lt;p&gt;With Zero Assumptions, you're part way to fulfilling certification criteria since you're accounting for self-managed off-boarding. &lt;/p&gt;

&lt;h2&gt;
  
  
  Reduce Password Usage
&lt;/h2&gt;

&lt;p&gt;Zero Assumptions lets you bring your own IDP: Google Workspaces, Okta, and generic OIDC. With an IDP, you don't have to manage many users using many services. &lt;/p&gt;

&lt;p&gt;Your users won't have to remember yet another password, and they'll be individually identified. All their actions will be directly associated with them.&lt;/p&gt;

&lt;h2&gt;
  
  
  Stay Connected to Zero Assumptions
&lt;/h2&gt;

&lt;p&gt;Follow us here on DEV, &lt;a href="https://twitter.com/ZeroAssumptions"&gt;Twitter&lt;/a&gt;, and &lt;a href="https://www.linkedin.com/company/zero-assumptions"&gt;LinkedIn&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Read more about &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumptions&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>zeroassumptions</category>
      <category>rust</category>
      <category>zerotrust</category>
      <category>password</category>
    </item>
    <item>
      <title>Build a Job Queue with Rust Using Aide-De-Camp (Part 2)</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Thu, 17 Nov 2022 03:00:06 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/build-a-job-queue-with-rust-using-aide-de-camp-part-2-993</link>
      <guid>https://dev.to/zeroassumptions/build-a-job-queue-with-rust-using-aide-de-camp-part-2-993</guid>
      <description>&lt;p&gt;This is the second installment in our series, which runs you through creating durable job queues using &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumption&lt;/a&gt;'s open source Rust solution, &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp"&gt;Aide-De-Camp&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;In &lt;a href="https://dev.to/zeroassumptions/build-a-job-queue-with-rust-using-aide-de-camp-part-1-4g5m"&gt;part one of our series&lt;/a&gt;, we went over the Rust traits that we created for the job queue. In this post, we're implementing the traits we talked about in part one.&lt;/p&gt;

&lt;h2&gt;
  
  
  Aide-De-Camp Code
&lt;/h2&gt;

&lt;p&gt;Code from this series is available in &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp"&gt;our open source Aide-De-Camp GitHub repository&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;The Crate is published to &lt;a href="https://crates.io/crates/aide-de-camp"&gt;crates.io&lt;/a&gt; for everyone to use.&lt;/p&gt;

&lt;h2&gt;
  
  
  SQLite Schema
&lt;/h2&gt;

&lt;blockquote&gt;
&lt;p&gt;SQLite is just the first backend we've implemented. More backend implementations to come.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Aide-De-Camp uses SQLite to persist the queue. Let's look at the schema.&lt;/p&gt;

&lt;p&gt;Jobs go into the main queue, &lt;code&gt;adc_queue&lt;/code&gt;. Dead letter jobs, or jobs that couldn't be completed, go into the &lt;code&gt;adc_dead_queue&lt;/code&gt;, where you can investigate them later.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt; &lt;span class="n"&gt;adc_queue&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
 &lt;span class="n"&gt;jid&lt;/span&gt; &lt;span class="nb"&gt;TEXT&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;queue&lt;/span&gt; &lt;span class="nb"&gt;TEXT&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="s1"&gt;'default'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;job_type&lt;/span&gt; &lt;span class="nb"&gt;TEXT&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="nb"&gt;blob&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;retries&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;scheduled_at&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;started_at&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;enqueued_at&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'%s'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'now'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt; &lt;span class="n"&gt;adc_dead_queue&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
 &lt;span class="n"&gt;jid&lt;/span&gt; &lt;span class="nb"&gt;TEXT&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;queue&lt;/span&gt; &lt;span class="nb"&gt;TEXT&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;job_type&lt;/span&gt; &lt;span class="nb"&gt;TEXT&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="nb"&gt;blob&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;retries&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;scheduled_at&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;started_at&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;enqueued_at&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;died_at&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'%s'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'now'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;INDEX&lt;/span&gt; &lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt; &lt;span class="n"&gt;adc_queue_jobs&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;adc_queue&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;scheduled_at&lt;/span&gt; &lt;span class="k"&gt;asc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;started_at&lt;/span&gt; &lt;span class="k"&gt;asc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;job_type&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Queue Implementation
&lt;/h3&gt;

&lt;p&gt;Next, we implement the queue using Rust. The following code defines the connection pool and bincode configuration.&lt;/p&gt;

&lt;p&gt;We only need to implement two methods, &lt;code&gt;scheduled_at&lt;/code&gt; and &lt;code&gt;poll_next_with_instant&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="nd"&gt;#[derive(Clone)]&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;SqliteQueue&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;pool&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;SqlitePool&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;bincode_config&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;bincode&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;config&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Configuration&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  scheduled_at
&lt;/h4&gt;

&lt;p&gt;To schedule a job, all we need to do is serialize the payload, create a new JID (we use &lt;a href="https://github.com/rs/xid"&gt;XID&lt;/a&gt; for JIDs), and insert both into the &lt;code&gt;adc_queue&lt;/code&gt; table.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;    &lt;span class="nd"&gt;#[instrument(skip_all,&lt;/span&gt; &lt;span class="nd"&gt;err,&lt;/span&gt; &lt;span class="nd"&gt;ret,&lt;/span&gt; &lt;span class="nd"&gt;fields(job_type&lt;/span&gt; &lt;span class="nd"&gt;=&lt;/span&gt; &lt;span class="nd"&gt;J::name(),&lt;/span&gt; &lt;span class="nd"&gt;payload_size))]&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="n"&gt;schedule_at&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;scheduled_at&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;DateTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Xid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt;
        &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Encode&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;bincode&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;encode_to_vec&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.bincode_config&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;new_xid&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;jid_string&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;job_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;name&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="nn"&gt;tracing&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;Span&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;current&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.record&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"payload_size"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="nf"&gt;.len&lt;/span&gt;&lt;span class="p"&gt;());&lt;/span&gt;

        &lt;span class="nn"&gt;sqlx&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;query!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="s"&gt;"INSERT INTO adc_queue (jid,job_type,payload,scheduled_at) VALUES (?1,?2,?3,?4)"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;jid_string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;job_type&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;scheduled_at&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;.execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.pool&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;.await&lt;/span&gt;
        &lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to add job to the queue"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jid&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  poll_next_with_instant
&lt;/h4&gt;

&lt;p&gt;Since SQLite doesn't have built-in notifications for new jobs queued, we poll the queue periodically. In our &lt;code&gt;UPDATE&lt;/code&gt; statement, we mark the row as in progress, then use the &lt;a href="https://www.sqlite.org/lang_returning.html"&gt;&lt;code&gt;RETURNING *&lt;/code&gt;&lt;/a&gt; clause to retrieve job metadata.&lt;/p&gt;

&lt;p&gt;The following is an excerpt from our &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp/blob/0.1.0/aide-de-camp-sqlite/src/job_handle.rs"&gt;&lt;code&gt;0.1.0&lt;/code&gt; tag &lt;code&gt;queue.rs&lt;/code&gt; source code&lt;/a&gt;. In our next posts, we'll review the modifications we've made to &lt;code&gt;poll_next_with_instant&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;UPDATE&lt;/span&gt; &lt;span class="n"&gt;adc_queue&lt;/span&gt; &lt;span class="k"&gt;SET&lt;/span&gt; &lt;span class="n"&gt;started_at&lt;/span&gt;&lt;span class="o"&gt;=?&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;retries&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;retries&lt;/span&gt;&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt; &lt;span class="k"&gt;IN&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;adc_queue&lt;/span&gt; &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;started_at&lt;/span&gt; &lt;span class="k"&gt;IS&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="o"&gt;=?&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;scheduled_at&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;job_type&lt;/span&gt; &lt;span class="k"&gt;IN&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;LIMIT&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;RETURNING&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  JobHandle Trait Implementation
&lt;/h3&gt;

&lt;p&gt;Since the entire job lifecycle is within the JobHandle trait, we have to implement that trait too.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;SqliteJobHandle&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;pool&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;SqlitePool&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;row&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobRow&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nd"&gt;#[async_trait]&lt;/span&gt;
&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="n"&gt;JobHandle&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;SqliteJobHandle&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;id&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Xid&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.jid&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;job_type&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.job_type&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Bytes&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.payload&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;retries&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;u32&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.retries&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;complete&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.jid&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="nn"&gt;sqlx&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;query!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"DELETE FROM adc_queue where jid = ?1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="nf"&gt;.execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.pool&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;.await&lt;/span&gt;
            &lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to mark job as completed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(())&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;fail&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.jid&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="nn"&gt;sqlx&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;query!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"UPDATE adc_queue SET started_at=null WHERE jid = ?1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="nf"&gt;.execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.pool&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;.await&lt;/span&gt;
            &lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to mark job as failed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(())&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;dead_queue&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.jid&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;retries&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.retries&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;job_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.job_type&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.payload&lt;/span&gt;&lt;span class="nf"&gt;.as_ref&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;scheduled_at&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.scheduled_at&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;enqueued_at&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.row.enqueued_at&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;
            &lt;span class="py"&gt;.pool&lt;/span&gt;
            &lt;span class="nf"&gt;.begin&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="k"&gt;.await&lt;/span&gt;
            &lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to start transaction"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="nn"&gt;sqlx&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;query!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"DELETE FROM adc_queue WHERE jid = ?1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="nf"&gt;.execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;.await&lt;/span&gt;
            &lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to delete job from the queue"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

        &lt;span class="nn"&gt;sqlx&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;query!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"INSERT INTO adc_dead_queue (jid, job_type, payload, retries, scheduled_at, enqueued_at) VALUES (?1, ?2, ?3, ?4,?5,?6)"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;jid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;job_type&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;retries&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;scheduled_at&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;enqueued_at&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="nf"&gt;.execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;
            &lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to move job to dead queue"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(())&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Up Next
&lt;/h2&gt;

&lt;p&gt;So far, we've gone over creating traits and implementing them. In parts 3 and 4, we'll review how we modified our code based on community feature requests.&lt;/p&gt;

&lt;p&gt;Follow us here on DEV, &lt;a href="https://twitter.com/ZeroAssumptions"&gt;Twitter&lt;/a&gt;, and &lt;a href="https://www.linkedin.com/company/zero-assumptions"&gt;LinkedIn&lt;/a&gt; for updates on &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp"&gt;Aide-De-Camp&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Aide-De-Camp is a product of &lt;a href="https://zeroassumptions.dev/"&gt;Zero Assumptions&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>aidedecamp</category>
      <category>rust</category>
      <category>zerotrust</category>
      <category>zeroassumptions</category>
    </item>
    <item>
      <title>Build a Job Queue with Rust Using Aide-De-Camp (Part 1)</title>
      <dc:creator>Falon Darville</dc:creator>
      <pubDate>Sun, 30 Oct 2022 01:19:39 +0000</pubDate>
      <link>https://dev.to/zeroassumptions/build-a-job-queue-with-rust-using-aide-de-camp-part-1-4g5m</link>
      <guid>https://dev.to/zeroassumptions/build-a-job-queue-with-rust-using-aide-de-camp-part-1-4g5m</guid>
      <description>&lt;p&gt;This is the first installment in a series that runs you through creating durable job queues using &lt;a href="https://zeroassumptions.dev/" rel="noopener noreferrer"&gt;Zero Assumption&lt;/a&gt;'s open source solution, &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp" rel="noopener noreferrer"&gt;Aide-De-Camp&lt;/a&gt;, which is built using Rust. For full transparency, hi - I'm Zero Assumption's creator, Andrey Snow. Follow along as I make it easier for you to manage job queueing using Rust.&lt;/p&gt;

&lt;h2&gt;
  
  
  Aide-De-Camp Code
&lt;/h2&gt;

&lt;p&gt;Code from this series is available in &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp" rel="noopener noreferrer"&gt;our open source Aide-De-Camp GitHub repository&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;The Crate is published to &lt;a href="https://crates.io/crates/aide-de-camp" rel="noopener noreferrer"&gt;crates.io&lt;/a&gt; for everyone to use.&lt;/p&gt;

&lt;h2&gt;
  
  
  Intro to Queues
&lt;/h2&gt;

&lt;p&gt;From time to time, your application needs to offload some work to a different process or delay execution of a certain task. This is where job queues come in handy.&lt;/p&gt;

&lt;p&gt;Many existing job queues are opinionated on how to store and process jobs. For example, many require Redis running, some only work with PostgreSQL, and others require some external manager process running. It's not a problem whether you're already using Redis or PostgreSQL, but what if you want to use something lighter like SQLite and run a single process?&lt;/p&gt;

&lt;p&gt;With very little code, SQLite and PostgreSQL can be turned into a durable job queue. &lt;/p&gt;

&lt;p&gt;First, let's define our interfaces. &lt;a href="https://doc.rust-lang.org/book/ch10-02-traits.html" rel="noopener noreferrer"&gt;Traits&lt;/a&gt; allow us to make our job queue backend agnostic. Maybe today using SQLite, tomorrow using PostgreSQL, and the day after using Redis.&lt;/p&gt;

&lt;h2&gt;
  
  
  Job
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Job Lifecycle
&lt;/h3&gt;

&lt;p&gt;Before we begin, let's define the expected job behavior. A job is any distinct work to be done. Even the same job task repeated will have a unique job ID.&lt;/p&gt;

&lt;p&gt;We're going to visualize job lifecycle from the runner point of view as a &lt;a href="https://en.wikipedia.org/wiki/Finite-state_machine" rel="noopener noreferrer"&gt;Finite-state machine&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzzlmuoffgdgk3r1pnspq.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzzlmuoffgdgk3r1pnspq.png" alt="Finite state diagram"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The above lifecycle has two kinds of errors, &lt;code&gt;JobError&lt;/code&gt; and &lt;code&gt;RunnerError&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;JobError&lt;/code&gt; is an error specific to processing the job itself, where there's a problem decoding the input. &lt;/p&gt;

&lt;p&gt;Errors with the handler are passed to the &lt;code&gt;RunnerRouter&lt;/code&gt;. Handler-specific errors must implement the &lt;a href="https://doc.rust-lang.org/std/convert/trait.Into.html" rel="noopener noreferrer"&gt;&lt;code&gt;Into&amp;lt;JobError&amp;gt;&lt;/code&gt;&lt;/a&gt; trait. The easiest way to handle errors is to wrap your error in &lt;a href="https://docs.rs/anyhow/latest/anyhow/struct.Error.html" rel="noopener noreferrer"&gt;&lt;code&gt;anyhow::Error&lt;/code&gt;&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;RunnerError&lt;/code&gt; are queue domain errors and indicate a runtime issue. A runner error is thrown in the case of failure to transition a job due to an underlying &lt;code&gt;QueueError&lt;/code&gt;, given a job that can't be handled by the current &lt;code&gt;RunnerRouter&lt;/code&gt;. These errors are bubbled up and result in temporary suspension of a worker in hope that issue will go away in the future.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;JobProcessor&lt;/code&gt; Trait
&lt;/h3&gt;

&lt;p&gt;Use the &lt;code&gt;JobProcessor&lt;/code&gt; trait to implement the Aide-De-Camp library. Implementation looks like the following.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="nd"&gt;#[async_trait]&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;trait&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Send&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nb"&gt;Sync&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="cd"&gt;/// What is the input to this handler. If you want to use `RunnerRouter`, then this must implement `bincode::Decode` and `bincode::Encode`.&lt;/span&gt;
    &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Send&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="cd"&gt;/// Which error is returned&lt;/span&gt;
    &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Send&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="cd"&gt;/// Run the job, passing the payload to it. Your payload should implement `bincode::Decode`.&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;handle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Xid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="k"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="cd"&gt;/// How many times job should be re-tried before being moved to dead queue&lt;/span&gt;
    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;max_retries&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;u32&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="mi"&gt;0&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="cd"&gt;/// Job type, used to differentiate between different jobs in the queue.&lt;/span&gt;
    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;name&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;'static&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt;
        &lt;span class="k"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Sized&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;A job needs the following.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Include the Payload type of Send. For example, &lt;code&gt;type Payload: Send&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Declare an error type, such as &lt;code&gt;type Error: Send&lt;/code&gt;. The error type can also be &lt;a href="https://doc.rust-lang.org/std/convert/enum.Infallible.html" rel="noopener noreferrer"&gt;&lt;code&gt;Infallible&lt;/code&gt;&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;run&lt;/code&gt; function that takes &lt;code&gt;jid&lt;/code&gt; (&lt;code&gt;job_id&lt;/code&gt; shortened) and its input (&lt;code&gt;payload&lt;/code&gt;) as arguments. For example, &lt;code&gt;async fn handle(&amp;amp;self, jid: Xid, payload: Self::Payload) -&amp;gt; Result&amp;lt;(), Self::Error&amp;gt;;&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;name&lt;/code&gt; function that returns the job name. This is used to distinguish jobs in the queue and used in &lt;code&gt;RunnerRouter&lt;/code&gt;. For example, &lt;code&gt;fn name() -&amp;gt; &amp;amp;'static str&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Be &lt;a href="https://doc.rust-lang.org/nomicon/send-and-sync.html" rel="noopener noreferrer"&gt;&lt;code&gt;Send&lt;/code&gt; + &lt;code&gt;Sync&lt;/code&gt;&lt;/a&gt;.

&lt;ul&gt;
&lt;li&gt;Optionally, specify how many times a job should be re-tried upon failure. The default value is &lt;code&gt;1&lt;/code&gt;, meaning the job only has one chance to succeed.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;Some queue implementations also pass &lt;code&gt;Context&lt;/code&gt; to the job, and &lt;code&gt;Context&lt;/code&gt; acts like an &lt;a href="https://docs.rs/axum/latest/axum/struct.Extension.html" rel="noopener noreferrer"&gt;&lt;code&gt;Extension&lt;/code&gt;&lt;/a&gt; of Axum. In this queue implementation, if your job has external dependencies (for example, an API client for your transactional email service provider), then it should be part of an entity that implements the &lt;code&gt;Job&lt;/code&gt; trait. This way, you choose how to wire the dependencies, whether by passing them manually or using the Dependency Injection framework.&lt;/p&gt;

&lt;h4&gt;
  
  
  Type-Erased Job Handler
&lt;/h4&gt;

&lt;p&gt;Rust has some limitations on the usage of the following: &lt;code&gt;HashMap&amp;lt;String, JobType1|JobType2&amp;gt;&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;To work around this, many Rust queues either:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Make your job handler accept some form of &lt;code&gt;&amp;amp;[u8]&lt;/code&gt; as input, which requires too much boilerplate code.&lt;/li&gt;
&lt;li&gt;Declare an enum with all possible job types, which limits flexibility. The publisher needs to be aware of every possible job, and it's hard to separate which jobs run where.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://docs.rs/axum/0.5.5/axum/#routing" rel="noopener noreferrer"&gt;&lt;code&gt;axum&lt;/code&gt;&lt;/a&gt; provides a workaround. After an hour of "Go-to-definition" between multiple tokio crates, I've come up with the following.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="cd"&gt;/// Object-safe implementation of a job that can be used in the runner&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;WrappedJobHandler&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Configuration&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;WrappedJobHandler&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt;
    &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decode&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;Encode&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Into&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;JobError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;Self&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;bincode&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;config&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;standard&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;Self&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;boxed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;BoxedJobHandler&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nn"&gt;Box&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;BoxedJobHandler&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nd"&gt;#[async_trait::async_trait]&lt;/span&gt;
&lt;span class="k"&gt;impl&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;WrappedJobHandler&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt;
    &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decode&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;Encode&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Into&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;JobError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Payload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Bytes&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Error&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;JobError&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;handle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;jid&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Xid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="k"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;bincode&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;decode_from_slice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="nf"&gt;.as_ref&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.config&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.job&lt;/span&gt;&lt;span class="nf"&gt;.handle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="nf"&gt;.map_err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;Into&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;into&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;name&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;'static&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;name&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Now, our &lt;code&gt;RunnerRouter&lt;/code&gt; just needs to know about &lt;code&gt;Box&amp;lt;dyn Job&amp;lt;Payload=Bytes, Error=JobError&amp;gt;&lt;/code&gt;. &lt;code&gt;WrappedJobHandler&amp;lt;J&amp;gt;&lt;/code&gt; will handle decoding the payload and errors.&lt;/p&gt;

&lt;h2&gt;
  
  
  Handle Multiple Job Types in a Single Runner
&lt;/h2&gt;

&lt;p&gt;I've already mentioned &lt;code&gt;RunnerRouter&lt;/code&gt; multiple times without explaining what it is. You can think about it as &lt;a href="https://docs.rs/axum/latest/axum/struct.Router.html" rel="noopener noreferrer"&gt;an HTTP request router&lt;/a&gt;. I was seriously considering using router from axum underneath and make everything that implements &lt;code&gt;Job&lt;/code&gt; also implement Tower's &lt;a href="https://docs.rs/tower-service/0.3.1/tower_service/trait.Service.html" rel="noopener noreferrer"&gt;&lt;code&gt;Service&lt;/code&gt;&lt;/a&gt;, but it seemed a little too much, so I've made my own:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="nd"&gt;#[derive(Default)]&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;RunnerRouter&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;jobs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;HashMap&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;'static&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;BoxedJobHandler&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="n"&gt;RunnerRouter&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="cd"&gt;/// Register a job handler with the router. If job by that name already present, it will get replaced.&lt;/span&gt;
    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="n"&gt;add_job_handler&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt;
        &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Decode&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;Encode&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Into&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;JobError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;name&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;boxed&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;WrappedJobHandler&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.boxed&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.jobs&lt;/span&gt;&lt;span class="nf"&gt;.entry&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.or_insert&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;boxed&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;types&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;'static&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.jobs&lt;/span&gt;&lt;span class="nf"&gt;.keys&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.copied&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.collect&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="cd"&gt;/// Process job handle. This function is responsible for job lifecycle. If you're implementing your&lt;/span&gt;
    &lt;span class="cd"&gt;/// own job runner, then this is what you should use to process a job that is already pulled&lt;/span&gt;
    &lt;span class="cd"&gt;/// from the queue. In all other cases, you shouldn't use this function directly.&lt;/span&gt;
    &lt;span class="nd"&gt;#[instrument(skip_all,&lt;/span&gt; &lt;span class="nd"&gt;err,&lt;/span&gt; &lt;span class="nd"&gt;fields(job_type&lt;/span&gt; &lt;span class="nd"&gt;=&lt;/span&gt; &lt;span class="err"&gt;%&lt;/span&gt;&lt;span class="nd"&gt;job_handle&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nd"&gt;job_type(),&lt;/span&gt; &lt;span class="nd"&gt;jid&lt;/span&gt; &lt;span class="nd"&gt;=&lt;/span&gt; &lt;span class="err"&gt;%&lt;/span&gt;&lt;span class="nd"&gt;job_handle&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nd"&gt;id()&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nd"&gt;to_string(),&lt;/span&gt; &lt;span class="nd"&gt;retries&lt;/span&gt; &lt;span class="nd"&gt;=&lt;/span&gt; &lt;span class="nd"&gt;job_handle&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nd"&gt;retries()))]&lt;/span&gt;
    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="n"&gt;process&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;H&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobHandle&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;H&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;RunnerError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="nf"&gt;Some&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.jobs&lt;/span&gt;&lt;span class="nf"&gt;.get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="nf"&gt;.job_type&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;match&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;
                &lt;span class="nf"&gt;.handle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="nf"&gt;.id&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="nf"&gt;.payload&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
                &lt;span class="k"&gt;.await&lt;/span&gt;
                &lt;span class="nf"&gt;.map_err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;JobError&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;from&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="nf"&gt;.complete&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                    &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(())&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="nn"&gt;tracing&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;error!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error during job processing: {}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
                    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="nf"&gt;.retries&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="nf"&gt;.max_retries&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="nn"&gt;tracing&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;warn!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Moving job {} to dead queue"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="nf"&gt;.id&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;());&lt;/span&gt;
                        &lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="nf"&gt;.dead_queue&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                        &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(())&lt;/span&gt;
                    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="nf"&gt;.fail&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                        &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(())&lt;/span&gt;
                    &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;RunnerError&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;UnknownJobType&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;job_handle&lt;/span&gt;&lt;span class="nf"&gt;.job_type&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This isn't a complete version of it, we will expand it later, but for now, that's enough. Don't worry about &lt;code&gt;JobHandle&lt;/code&gt; trait just yet, I will get to it soon.&lt;/p&gt;

&lt;h3&gt;
  
  
  Registering job handlers
&lt;/h3&gt;

&lt;p&gt;Handlers can only be registered while you have a mutable reference to &lt;code&gt;RunnerRouter&lt;/code&gt;. I've done it like that to avoid having to lock &lt;code&gt;jobs&lt;/code&gt; behind &lt;code&gt;RwLock&lt;/code&gt; or making a separate builder structure. Intended use:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="c1"&gt;// create your job handler here&lt;/span&gt;
&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;my_job&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;MyJob&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;default&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;router&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;RunnerRouter&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;default&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="nf"&gt;.add_job_handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;my_job&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;r&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  Job processing
&lt;/h3&gt;

&lt;p&gt;This is handled by &lt;code&gt;process&lt;/code&gt; function. It takes a type that implements &lt;code&gt;JobProcessor&lt;/code&gt; trait as input. This method is in charge of handling the entire job lifecycle.&lt;/p&gt;

&lt;p&gt;First, we check if the current &lt;code&gt;RunnerRouter&lt;/code&gt; is capable of handling the given job. Then, we attempt to process the given job. Finally, we have the option to do the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Remove the job from the queue&lt;/li&gt;
&lt;li&gt;Re-queue the job&lt;/li&gt;
&lt;li&gt;Move the job to the dead queue if the retry count has exceeded the desired maximum&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Queue
&lt;/h2&gt;

&lt;p&gt;Until now, we didn't talk about the queue itself. Abstracted queue works like this:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="nf"&gt;Some&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="nf"&gt;.pop&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="nf"&gt;.process&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This wouldn't be a very efficient queue for many reasons. Our queue is going to look more like this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4zrypx0w18q07hkrfal2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4zrypx0w18q07hkrfal2.png" alt="Job queue diagram"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Queue trait
&lt;/h3&gt;

&lt;p&gt;Now that we have the idea of how queue works, we can define its trait:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="nd"&gt;#[async_trait]&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;trait&lt;/span&gt; &lt;span class="n"&gt;Queue&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Send&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nb"&gt;Sync&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;JobHandle&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobHandle&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="cd"&gt;/// Schedule a job to run at the future time.&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="n"&gt;schedule_at&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;scheduled_at&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;DateTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Xid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt;
        &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Encode&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="cd"&gt;/// Schedule a job to run next. Depending on queue backlog this may start running later than you expect.&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="n"&gt;schedule&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Xid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt;
        &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Encode&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.schedule_at&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nn"&gt;Utc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="cd"&gt;/// Schedule a job to run at the future time relative to now.&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="n"&gt;schedule_in&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;scheduled_in&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Xid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt;
        &lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;JobProcessor&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nn"&gt;J&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Encode&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;when&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;Utc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;scheduled_in&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.schedule_at&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;J&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;when&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="cd"&gt;/// Pool queue, implementation should not wait for next job, if there nothing return `Ok(None)`.&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;poll_next_with_instant&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;job_types&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
        &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;DateTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;Option&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;JobHandle&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="cd"&gt;/// Pool queue, implementation should not wait for next job, if there nothing return `Ok(None)`.&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;poll_next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job_types&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;Option&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;JobHandle&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="nf"&gt;.poll_next_with_instant&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job_types&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nn"&gt;Utc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="cd"&gt;/// Await next job. Default implementation polls the queue with defined interval until there is something.&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;job_types&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
        &lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;JobHandle&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;duration&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt;
            &lt;span class="nf"&gt;.to_std&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="nf"&gt;.map_err&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="nn"&gt;QueueError&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;InvalidInterval&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;time&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;duration&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;loop&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="nf"&gt;.tick&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="nf"&gt;.poll_next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job_types&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="nf"&gt;Some&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="k"&gt;break&lt;/span&gt; &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This trait designed to work only with jobs that can be handled by &lt;code&gt;RunnerRouter&lt;/code&gt;. This trait provides convenient method to scheduling jobs: &lt;code&gt;schedule&lt;/code&gt; for scheduling job right now, &lt;code&gt;schedule_in&lt;/code&gt; for scheduling job in the future (e.g., in 5 minutes), &lt;code&gt;schedule_at&lt;/code&gt; for scheduling job at the specific time (e.g., &lt;code&gt;Mon, 16 May 2022 21:02:35 GMT&lt;/code&gt;). Methods that implementation needs to implement are: &lt;code&gt;schedule_at&lt;/code&gt; and &lt;code&gt;poll_next_with_instant&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;schedule_at&lt;/code&gt; is self-explanatory: these methods adds a job to the queue to run at a specific time.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;poll_next_with_instant&lt;/code&gt; is a bit more complicated. It takes a list of job types and &lt;code&gt;time&lt;/code&gt; as arguments. &lt;code&gt;time&lt;/code&gt; is used to determine what should be treated as "now" when queue polled. Using time as an input is very helpful because it allows time travel in tests. The implementation should check the queue and return a &lt;code&gt;JobHandle&lt;/code&gt; to a job if there is anything in the queue.&lt;/p&gt;

&lt;p&gt;There is also &lt;code&gt;next&lt;/code&gt; function that in a loop polls the queue, if it's empty, it sleeps a specified interval and polls again until there is something. This is just a default implementation, more specific implementation can use combination of sleeping and external wake-ups (e.g. &lt;a href="https://www.postgresql.org/docs/current/sql-notify.html" rel="noopener noreferrer"&gt;&lt;code&gt;NOTIFY&lt;/code&gt;&lt;/a&gt; from PostgreSQL or &lt;a href="https://redis.io/commands/subscribe/" rel="noopener noreferrer"&gt;&lt;code&gt;SUBSCRIBE&lt;/code&gt;&lt;/a&gt; from Redis).&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;JobHandle&lt;/code&gt;
&lt;/h3&gt;

&lt;p&gt;You might have already noticed that most of the job lifecycle handled by some &lt;code&gt;JobHandle&lt;/code&gt;. What is it? Well…it's an implementation specific way of handling job lifecycle. &lt;code&gt;Queue&lt;/code&gt; only two jobs: sending jobs and polling for jobs. &lt;code&gt;Queue&lt;/code&gt; here is an interface to a queue that exists somewhere, rather the queue itself.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;JobHandle&lt;/code&gt; trait looks like this:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="nd"&gt;#[async_trait]&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;trait&lt;/span&gt; &lt;span class="n"&gt;JobHandle&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Send&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nb"&gt;Sync&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// Get job id&lt;/span&gt;
    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;id&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Xid&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="c1"&gt;// Get Job type&lt;/span&gt;
    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;job_type&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="c1"&gt;// Get job payload.&lt;/span&gt;
    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Bytes&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="c1"&gt;// How many times this job has been retried already.&lt;/span&gt;
    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;retries&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;u32&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="c1"&gt;// Mark the job as completed successfully.&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;complete&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="c1"&gt;// Mark the job as failed&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;fail&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="c1"&gt;// Move the job to dead queue.&lt;/span&gt;
    &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;dead_queue&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Everything past "job checked out of the queue" in job lifecycle is handled within its implementation.&lt;/p&gt;

&lt;h2&gt;
  
  
  Processing the Queue
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Extending &lt;code&gt;RunnerRouter&lt;/code&gt;
&lt;/h3&gt;

&lt;p&gt;Now that we have an interface to interact with our queue, we can extend &lt;code&gt;RunnerRouter&lt;/code&gt; to work with it:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="n"&gt;RunnerRouter&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="n"&gt;listen&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;QR&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;poll_interval&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt;
        &lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;AsRef&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;QR&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;QR&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Queue&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;job_types&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="nf"&gt;.types&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;loop&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;match&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="nf"&gt;.as_ref&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;job_types&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;poll_interval&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;handle&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;match&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="nf"&gt;.process&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;handle&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
                    &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;RunnerError&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;QueueError&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;handle_queue_error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;RunnerError&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;UnknownJobType&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="nn"&gt;tracing&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;error!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Unknown job type: {}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="p"&gt;},&lt;/span&gt;
                &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="nf"&gt;handle_queue_error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;handle_queue_error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;error&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nn"&gt;tracing&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;error!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Encountered QueueError: {}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;error&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="nn"&gt;tracing&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nd"&gt;warn!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Suspending worker for 5 seconds"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;time&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;std&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;time&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;from_secs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;In a loop, we poll the queue and handle the incoming jobs. If we encounter a &lt;code&gt;QueueError&lt;/code&gt; during polling, then worker goes into a short sleep. If we encounter a &lt;code&gt;RunnerError&lt;/code&gt;, then we just log it and work on the next job.&lt;/p&gt;

&lt;h2&gt;
  
  
  Putting it all together
&lt;/h2&gt;

&lt;p&gt;Technically, we can just create a &lt;code&gt;queue&lt;/code&gt;, pass it to &lt;code&gt;RunnerRouter::listen&lt;/code&gt;, and call it a day. The only issue is that it's processing a single job at a time. In some cases, that's enough. But we want to go fast.&lt;/p&gt;

&lt;p&gt;Full disclaimer - the following is a naive version of a job runner. It works, but has some limitations.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="k"&gt;const&lt;/span&gt; &lt;span class="n"&gt;JITTER_INTERVAL_MS&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;i64&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;8&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;13&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;21&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;34&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt;

&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;JobRunner&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt;
    &lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Queue&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Arc&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Arc&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;RunnerRouter&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;semaphore&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Arc&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Semaphore&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;JobRunner&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt;
    &lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Queue&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;RunnerRouter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;concurrency&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;usize&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;Self&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;Self&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Arc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Arc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;semaphore&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Arc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;Semaphore&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;concurrency&lt;/span&gt;&lt;span class="p"&gt;)),&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;JobRunner&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt;
    &lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Queue&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;chrono&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;QueueError&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;loop&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;semaphore&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.semaphore&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
            &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;permit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;semaphore&lt;/span&gt;
                &lt;span class="nf"&gt;.acquire_owned&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
                &lt;span class="k"&gt;.await&lt;/span&gt;
                &lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Semaphore closed while running"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.queue&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
            &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.processor&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
            &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;spawn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;move&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;_permit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;permit&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nf"&gt;get_random_jitter&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
                &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="nf"&gt;.listen&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="p"&gt;});&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;get_random_jitter&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nn"&gt;chrono&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Duration&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;JITTER_INTERVAL_MS&lt;/span&gt;
        &lt;span class="nf"&gt;.choose&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="nn"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;thread_rng&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;ms&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="nn"&gt;chrono&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;milliseconds&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;ms&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="nf"&gt;.unwrap_or_else&lt;/span&gt;&lt;span class="p"&gt;(||&lt;/span&gt; &lt;span class="nn"&gt;chrono&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;milliseconds&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;



&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;First, let's talk about the &lt;code&gt;JobRunner&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;JobRunner&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt;
    &lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Queue&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Arc&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Arc&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;RunnerRouter&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;semaphore&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Arc&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Semaphore&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;JobRunner&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;where&lt;/span&gt;
    &lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Queue&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="k"&gt;'static&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Q&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;RunnerRouter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;concurrency&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;usize&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;Self&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;Self&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Arc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Arc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;semaphore&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;Arc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;Semaphore&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;concurrency&lt;/span&gt;&lt;span class="p"&gt;)),&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We already know why we need the first two, but what about &lt;a href="https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html" rel="noopener noreferrer"&gt;&lt;code&gt;Semaphore&lt;/code&gt;&lt;/a&gt;? Semaphore&lt;br&gt;
is &lt;a href="https://en.wikipedia.org/wiki/Semaphore_(programming)" rel="noopener noreferrer"&gt;a synchronization primitive&lt;/a&gt; that limits concurrent access to a resource.&lt;/p&gt;

&lt;p&gt;Essentially, it's a metered highway ramp for your program. We're going to utilize it to limit how many jobs could be in flight at any given moment.&lt;/p&gt;

&lt;p&gt;Generally, it's a bad idea to have N things hammer a resource at the same time, so we're going to introduce a random-ish jitter:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

&lt;span class="k"&gt;const&lt;/span&gt; &lt;span class="n"&gt;JITTER_INTERVAL_MS&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;i64&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;8&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;13&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;21&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;34&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt;

&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;get_random_jitter&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nn"&gt;chrono&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Duration&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;JITTER_INTERVAL_MS&lt;/span&gt;
        &lt;span class="nf"&gt;.choose&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="nn"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;thread_rng&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;ms&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="nn"&gt;chrono&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;milliseconds&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;ms&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="nf"&gt;.unwrap_or_else&lt;/span&gt;&lt;span class="p"&gt;(||&lt;/span&gt; &lt;span class="nn"&gt;chrono&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;milliseconds&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;It doesn't have to be a Fibonacci series, but using this series allows for some pretty unique patterns.&lt;/p&gt;

&lt;p&gt;Now, the job begins processing. First, we start a loop. Then we acquire a "permit" from our semaphore:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

  &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;semaphore&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.semaphore&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;permit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;semaphore&lt;/span&gt;
      &lt;span class="nf"&gt;.acquire_owned&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
      &lt;span class="k"&gt;.await&lt;/span&gt;
      &lt;span class="nf"&gt;.context&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Semaphore closed while running"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;



&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The &lt;code&gt;await&lt;/code&gt; will pause the loop until the permit becomes available. After, we clone our &lt;code&gt;Queue&lt;/code&gt; and &lt;code&gt;RunnerRouter&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

  &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.queue&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.processor&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Finally, we start polling our queue and process incoming job:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;

  &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;spawn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;move&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;_permit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;permit&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nf"&gt;get_random_jitter&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
      &lt;span class="n"&gt;processor&lt;/span&gt;&lt;span class="nf"&gt;.listen&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;});&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We &lt;a href="https://docs.rs/tokio/latest/tokio/fn.spawn.html" rel="noopener noreferrer"&gt;&lt;code&gt;spawn&lt;/code&gt;&lt;/a&gt; a new future into the tokio reactor and that's it. The rest is implemented in &lt;code&gt;RunnerRouter&lt;/code&gt;. When the job completes, the permit will return to our semaphore and the cycle repeats.&lt;/p&gt;

&lt;h3&gt;
  
  
  Possible Improvements
&lt;/h3&gt;

&lt;h4&gt;
  
  
  Dynamic concurrency
&lt;/h4&gt;

&lt;p&gt;Currently, concurrency is set in stone and can't be changed after &lt;code&gt;JobRunner&lt;/code&gt; started its work. That means with concurrency of 50, there will be 50 tasks constantly asking a queue for new jobs. I want concurrency to shrink and grow depending on the current load.&lt;/p&gt;

&lt;p&gt;Shrinking and growing concurrency can be done by &lt;a href="https://en.wikipedia.org/wiki/Memory_ballooning" rel="noopener noreferrer"&gt;ballooning&lt;/a&gt; semaphore permits. A task can check queue size and acquire or release permits based on that. It's a bit harder to reclaim permits that are already taken by a waiting worker, but it is possible and may require changing the interface of &lt;code&gt;Queue&lt;/code&gt; trait.&lt;/p&gt;

&lt;h4&gt;
  
  
  Hooks
&lt;/h4&gt;

&lt;p&gt;There is very little insight into what is happening in the queue scheduler and worker process. In the future, I''ll add a way to get notified when certain events happen. Hooks be used to drive dynamic concurrency.&lt;/p&gt;

&lt;h4&gt;
  
  
  Batching
&lt;/h4&gt;

&lt;p&gt;Instead of polling jobs one by one, I want to poll N jobs at once. Batching is helpful for a situation where we have &lt;em&gt;many &lt;em&gt;small&lt;/em&gt;&lt;/em&gt; jobs constantly sent to the queue.&lt;/p&gt;

&lt;h4&gt;
  
  
  Multiple Queues
&lt;/h4&gt;

&lt;p&gt;Currently, jobs are split by type and that's the only way to have different concurrency and/or poll interval. This is fine for many use cases, but occasionally, we want to have "hot" and "cold" queues that can have the same types of jobs. For example, you're working on a SaaS product with a free trial where tasks from free tier clients can go to a slow queue, while paid clients go to fast queue.&lt;/p&gt;

&lt;h4&gt;
  
  
  Queue Insights
&lt;/h4&gt;

&lt;p&gt;Not being able to see queue state isn't a big deal if your jobs never fail (spoiler alert: jobs do fail). Our &lt;code&gt;Queue&lt;/code&gt; trait can be extended to provide some insight. Namely, how many jobs in the queue, how many jobs should be executed right now, and what is going on in the dead queue.&lt;/p&gt;

&lt;h2&gt;
  
  
  Up Next
&lt;/h2&gt;

&lt;p&gt;So far, we only covered generic items like traits and glue between generic components. Part 2 is going to cover &lt;code&gt;SQLite&lt;/code&gt; implementation of &lt;code&gt;Queue&lt;/code&gt; trait and benchmarking.&lt;/p&gt;

&lt;p&gt;Follow us here on DEV, &lt;a href="https://twitter.com/ZeroAssumptions" rel="noopener noreferrer"&gt;Twitter&lt;/a&gt;, and &lt;a href="https://www.linkedin.com/company/zero-assumptions" rel="noopener noreferrer"&gt;LinkedIn&lt;/a&gt; for updates on &lt;a href="https://github.com/ZeroAssumptions/aide-de-camp" rel="noopener noreferrer"&gt;Aide-De-Camp&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Aide-De-Camp is a product of &lt;a href="https://zeroassumptions.dev/" rel="noopener noreferrer"&gt;Zero Assumptions&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>zerotrust</category>
      <category>zeroassumptions</category>
      <category>aidedecamp</category>
      <category>rust</category>
    </item>
  </channel>
</rss>
