DEV Community

Marco Imperatore
Marco Imperatore

Posted on • Updated on

Adding support for queues to RubyJob

Over the holidays, with a bit of spare time between family gatherings, I decided to scratch an itch and create a new open source job processing framework for Ruby, called RubyJob.

I'd used various other such frameworks in the past, such as Sidekiq, Resque, Delayed::Job and SuckerPunch, but none of them behaved and performed to my liking.

I'm not going to go over the pros & cons of all of these frameworks here, but feel free to check out the nice article the Scout APM Team put together here if you're interested in that.

To my surprise, in the past two days, after I released my gem on RubyGems.org, it has been downloaded over 200 times! I have no idea who might be trying it out, but I'm glad they are. The interest shown further increases my desire to write the next feature I had been planning: Queue Support.

Remaining in the spirit of openness, I've decided to capture the thought process and work I'm going to go through in order to take the current version of RubyJob, which currently has no support for multiple queues, and enhance it to allow the developer to specify both a queue to use, as well as its priority relative to other queues.

TDD: Test Driven [Design|Development]

I generally like to work using my own flavour of TDD, one that isn't exactly like the often discussed red/green/refactor method, but rather a less constraining form that sees me sometimes writing tests first, and sometimes writing tests second (but in all cases before any change is deemed complete.)

I like to use guard, and its RSpec-plugin, to watch the filesystem for any changes to my source code and automatically re-run tests. This not only makes it easy for me to remember to think about the tests, but also helps steer me in a direction that allows me to make incremental changes that always keep the tests green while refactoring, and almost always when in the process of adding a breaking change.

You may have noticed the title of this section having Design|Development as the last D in TDD. That's because I firmly believe that tests are a great tool to use to actually flesh out emergent software design. Often, and as is the case for the feature I'm going to be adding, we don't know exactly what we want the API to look like in advance, and writing tests actually helps us better visualize what the end result may end up looking like. That's because test code tends to look similar, or have similar properties, to production code that uses the software we're developing.

Let's dig into thinking about the design of the queue/priority feature...

Typical Queue Support In Other Gems

Most gems similar to RubyJob have some way of partitioning jobs across one of more queues, and to assign to each queue a relative priority, such that jobs get processed in an order that can be controlled by the developer. For example, you may put urgent jobs on a queue called critical, and regular jobs on the default queue.

In Sidekiq, this option can be specified with the :queue option:

class MyCriticalWorker
  include Sidekiq::Worker

  sidekiq_options queue: 'critical'

  def perform
    #job code goes here
  end
end

Adding to that a bit of configuration in sidekiq.yml, you also establish the queue priorities. In the case below, queue critical is 10x more important than default.

:queues:
  - ["critical", 10]
  - ["default", 1]

Specifying Queues In RubyJob

RubyJob is a bit different from the other job processing frameworks in that it has the notion of a Job Store, which is an abstraction that enables us to choose how we want to keep track of our jobs.

For example, if our job worker looked like this:

class MyWorker
  include RubyJob::Worker

  def perform
    #job code goes here
  end
end

then we tell the framework to track MyWorker jobs like this:

MyWorker.jobstore = RubyJob::InMemoryJobStore.new

In this case, the jobs are being tracked (stored) in an InMemoryJobStore, a data structure backed by a Fibonacci Heap which is very efficient for this use case.

The server to then process the jobs is started like this:

server = RubyJob::ThreadedServer.new(num_threads: 10, jobstore: MyWorker.jobstore)
server_thread = server.start

The Job Store is explicitly passed to the constructor of ThreadedServer. The server spawns the specified number of threads, each of which repeatedly fetches the next ready job from the job store and processes it. (For additional information on the full Job Store API, check out the docs here).

So, how could we envision job queues meshing into this design?

Job Stores As Queues

In some ways, the job stores themselves are already queues; they already keep track of jobs to process, and the order in which to process them.

The issue with the example above is that all threads read the same, single, job store. But what if we could specify a different number of threads for each different job stores, like this:

server = RubyJob::ThreadedServer.new(
  config: [
    { num_threads: 10, jobstore: MyCriticalWorker.jobstore },
    { num_threads: 1, jobstore: MyRegularWorker.jobstore }
  ])
server.start

On the surface, this seems like a solution that could work, and it would be pretty trivial to implement. The code in ThreadedServer#start which is currently:

    def start
      Thread.new do
        @num_threads.times.map do
          Thread.new do
            JobProcessor.new(@jobstore).run(**@options)
          end
        end.each(&:join)
      end
    end

would need to be only slightly tweaked so we iterate over the specified config elements, and create the right sets of threads.

Different Server Per Queue

But what about a different approach -- one that doesn't require us to change anything at all?

We know we can create a multi-threaded server object with a specified number of threads to process a single job store. And the server itself is merely a top-level parent thread of the worker threads. What if we simply created multiple servers?

critical_server = RubyJob::ThreadedServer.new(
  num_threads: 10, jobstore: MyCriticalWorker.jobstore
)
regular_server = RubyJob::ThreadedServer.new(
  num_threads: 1, jobstore: MyRegularWorker.jobstore
)
critical_server.start
regular_server.start

This option to me feels more robust and extensible than the first, because it doesn't couple the behaviour of the entire set of queues into a single ThreadedServer running instance. Granted, you could pass different config's using the first approach as well, but that seems counterintuitive and forced, whereas this approach fits in naturally with the current API design.

So, RubyJob Already Supports Queues & Priorities?

If we want to go with the approach above, yes! And we do want to go with the approach above, for the reasons already stated.

Pretty cool! We started off thinking we didn't have the queueing and priority features, but thinking through it, we actually discovered that how the system had been designed already afforded us that capability! Nice, pleasant surprise! And this is not to say that things may not change in the future, but for now there is no real reason to make things more complicated.

We're Not Done Though

Even though we do have the intrinsic ability to partition jobs into various queues and ensure that they run with a priority that we control, doing so requires a bit more boilerplate code than desirable.

Ideally, we wouldn't need to write all this code:

MyCriticalWorker.jobstore = RubyJob::InMemoryJobStore.new
MyRegularWorker.jobstore = RubyJob::InMemoryJobStore.new

critical_server = RubyJob::ThreadedServer.new(
  num_threads: 10, jobstore: MyCriticalWorker.jobstore
)
regular_server = RubyJob::ThreadedServer.new(
  num_threads: 1, jobstore: MyRegularWorker.jobstore
)

critical_server.start
regular_server.start

Keeping Queue and Worker Decoupled

One of the problems with Sidekiq's and Resque's way of specifying the queue is that it is coupled to the code that defines the worker. The decision of which queue and priority a particular job should have should rest with whoever is orchestrating and managing the overall system, not the worker class itself.

Likewise, and for similar reasons, the number of threads should not be specified in the worker either, like Sucker Punch does.

For RubyJob, it would be therefore desirable to keep the configuration completely external (e.g. in a YAML file), such that it can be read by a coordinator which configures and starts the necessary ThreadedServers. I'm envisioning a config file like this:

MyCriticalWorker:
  num_threads: 10
  jobstore: RubyJob::InMemoryJobStore
MyRegularWorker:
  num_threads: 1
  jobstore: RubyJob::InMemoryJobStore

and a way of starting the servers like this:

$ ruby_job -w MyCriticalWorker start
$ ruby_job -w MyRegularWorker start

This, however, is beyond the scope of what I had intended to cover in today's post, and will be subject of a future day's work and probable post. It's a different enough topic that deserves its own set of design considerations. I'll be busting out thor and some interesting RSpec techniques for testing command-line interfaces!

I had hoped that I'd actually be writing some more code that I could share with you today, but hopefully sharing my thoughts about what was next on my list of things to do on RubyJob was at least a bit interesting or informative. Feel free to share any comments or questions below... thanks for reading!

Top comments (0)