Earlier this year, I set a goal of doing a deep dive into a Ruby gem. I ended up choosing Solid Queue. My goal wasn't just to understand how the gem works, but to use it as a way to become a better Software Engineer by reading high-quality open source code.
Main Goals
- Understand how Solid Queue works under the hood
- Learn from high-quality open source code
- Understand the architecture behind a job queuing system
- Improve my ability to write software with clarity by reading more code
- Inspire others to read open source
Plan
Read README
Get a good understanding of what the gem does and how it works.
Read Articles and Watch Videos
Look for blog posts, guides, or videos on Solid Queue. This helps build context and see how other engineers are using or explaining it.
Go commit by commit (PR by PR)
Start from the very first commit on Github and go through the project's history step by step. For each commit and pull request, read the message and discussions to understand what changed and why.
Run Solid Queue in an Application
Set up Solid Queue in a real Rails app to see it in action and get a feel for how it integrates with a typical project.
Debug Gem
Follow tests or track through the code to understand the full execution flow.
What is Solid Queue
Solid Queue is a database-backed queueing back for Active Job. It lets you run your code asynchronously and is perfect for moving slow or heavy operations into the background to keep your app responsive. It is the default job queue starting in Rails 8 so there will be a lot of eyes on the code. It is designed to be simple, reliable, and easy to deploy (No Redis required).
Killer Features
Solid Queue has many great features: delayed jobs, recurring jobs, pausing queues, prioritization, and bulk enqueuing. In my opinion, these are the killer features that make it stand out:
Heartbeat Check
Every 60 seconds, there is a check to see if a worker is alive. If a worker hasn’t updated its heartbeat in over five minutes, it’s considered dead. I'm sure most of you have run into situations where you had a long running job that seemed stuck and you weren't sure if it is still processing or silently frozen. The heartbeat check gives you great clarity on what is running.
Parallelism and Multi-thread Support
Solid Queue supports multi-threaded execution, letting you take full advantage of your CPU. Depending on whether your workload is CPU-bound or I/O-bound, you can tune the number of processes or threads to get optimal throughput. For I/O-heavy workloads, increasing threads can significantly improve performance.
Concurrency Controls
Allows you to limit how many jobs of a certain type or with certain arguments can run at the same time.
limits_concurrency to: 1, key: ->(contact) { contact.account }, duration: 5.minutes
Diving into the Code
I recommend cloning the repo locally and following along.
Job
Represents background work to be done and is stored and tracked in the database. Basically, when a job is created, a callback is called to see if the job is ready to be run or if it should be run later.
class Job < Record
class EnqueueError < StandardError; end
# Job Lifecycle 1: When MyJob.perform_later(args) is called, the job is executed.
include Executable, Clearable, Recurrable
serialize :arguments, coder: JSON
module SolidQueue
class Job
module Executable
included do
# Job Lifecycle 2: After the job is created, it is prepared for execution.
after_create :prepare_for_execution
end
# Job Lifecycle 3: After the job is created, it is prepared for execution.
def prepare_for_execution
if due? then dispatch
else
schedule
end
end
def dispatch
# Job Lifecycle 4: If we are able to acquire a concurrency lock, we proceed to ready the job for execution.
if acquire_concurrency_lock then ready
else
handle_concurrency_conflict
end
end
end
end
end
module SolidQueue
class Job
module ConcurrencyControls
def acquire_concurrency_lock
# Job Lifecycle 5 - If our job had something like: limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes
# then we would use the key to acquire a lock.
return true unless concurrency_limited?
# Job Lifecycle 6 - The method delegates to SolidQueue::Semaphore.wait(self), passing the job instance
Semaphore.wait(self)
end
end
end
end
module SolidQueue
class Semaphore < Record
class Proxy
def wait
# Job Lifecycle 7 - Check if a semaphore exists for the job's concurrency key.
if semaphore = Semaphore.find_by(key: key)
# Job Lifecycle 8 - Check if the semaphore's value is greater than 0 and attempt to decrement it.
semaphore.value > 0 && attempt_decrement
else
attempt_creation
end
end
private
def attempt_decrement
# Job Lifecycle 9 - Decement the semaphore's value and update its expiration time.
Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
end
def attempt_increment
# Job Lifecycle 10 - Increment the semaphore's value and update its expiration time.
Semaphore.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0
end
end
end
end
module SolidQueue
class Job
module Executable
extend ActiveSupport::Concern
def ready
# Job Lifecycle 11 - Create a ReadyExecution record for the job.
ReadyExecution.create_or_find_by!(job_id: id)
end
end
end
end
Supervisor
Runs workers and dispatchers and checks their heartbeats and starts and stops when needed.
module SolidQueue
class Cli < Thor
def start
# Supervisor Lifecycle - 1 - The CLI starts the SolidQueue::Supervisor with options.
SolidQueue::Supervisor.start(**options.symbolize_keys)
end
end
end
module SolidQueue
class Supervisor < Processes::Base
include LifecycleHooks
include Maintenance, Signals, Pidfiled
after_shutdown :run_exit_hooks
class << self
def start(**options)
# Supervisor Lifecycle - 2 - Sets configuration and initializes the supervisor:
SolidQueue.supervisor = true
configuration = Configuration.new(**options)
if configuration.valid?
new(configuration).tap(&:start)
else
abort configuration.errors.full_messages.join("\n") + "\nExiting..."
end
end
end
def start
# Supervisor Lifecycle - 3 - Boots which runs the boot callbacks
boot
# Supervisor Lifecycle - 4 - Runs the start hooks for logging/hooks
run_start_hooks
# Supervisor Lifecycle - 5 - Starts the processes to supervise
start_processes
# Supervisor Lifecycle - 6 - Launches the maintenance task to prune dead processes
launch_maintenance_task
# Supervisor Lifecycle - 7 - Enters the main loop to supervise processes
supervise
end
private
def start_processes
# Supervisor Lifecycle - 5.2 - For each configured process, start a process instance.
configuration.configured_processes.each { |configured_process| start_process(configured_process) }
end
def supervise
loop do
# Supervisor Lifecycle - 8 - Loop until stopped.
break if stopped?
set_procline
# Supervisor Lifecycle - 9 - Process signal queue to handle signals.
process_signal_queue
unless stopped?
# Supervisor Lifecycle - 10 - Reap terminated forks and replace them.
reap_and_replace_terminated_forks
# Supervisor Lifecycle - 11 - Sleep to avoid busy waiting. Not using `sleep` directly to allow interruption.
# This allows the process to wake up when a signal is received.
# This is useful for handling signals like TERM or INT without busy waiting.
# It also allows the process to wake up when a fork is terminated.
# This is useful for handling the termination of supervised processes.
# The sleep time is set to 1 second, but can be adjusted as needed.
interruptible_sleep(1.second)
end
end
ensure
shutdown
end
def start_process(configured_process)
process_instance = configured_process.instantiate.tap do |instance|
instance.supervised_by process
instance.mode = :fork
end
# Supervisor Lifecycle - 5.3 - Forks a new process for the configured process.
pid = fork do
process_instance.start
end
configured_processes[pid] = configured_process
forks[pid] = process_instance
end
end
end
Worker
Workers are in charge of picking jobs ready to run from queues and processing them.
module SolidQueue
class Supervisor < Processes::Base
def start_process(configured_process)
process_instance = configured_process.instantiate.tap do |instance|
instance.supervised_by process
instance.mode = :fork
end
pid = fork do
# Worker Lifecycle 1 - For each process the supervisor will start the worker lifecycle
process_instance.start
end
configured_processes[pid] = configured_process
forks[pid] = process_instance
end
end
end
module SolidQueue
class Worker < Processes::Poller
def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)
# Ensure that the queues array is deep frozen to prevent accidental modification
@queues = Array(options[:queues]).map(&:freeze).freeze
# Worker Lifecycle 2 - Set up pool based on the number of threads set.
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
super(**options)
end
end
end
module SolidQueue::Processes
module Runnable
include Supervised
def start
# Worker Lifecycle 3 - We start the worker lifecycle by booting
boot
if running_async?
# Worker Lifecycle 4 - Start running
@thread = create_thread { run }
else
run
end
end
end
end
module SolidQueue::Processes
class Poller < Base
include Runnable
private
def run
# Worker Lifecycle 5 - Start loop
start_loop
end
def start_loop
loop do
break if shutting_down?
delay = wrap_in_app_executor do
# Worker Lifecycle 6 - Start polling
poll
end
interruptible_sleep(delay)
end
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
run_callbacks(:shutdown) { shutdown }
end
end
end
end
module SolidQueue
class Worker < Processes::Poller
private
def poll
# Worker Lifecycle 7 - Claim executions from the queues and post them to the thread pool
claim_executions.then do |executions|
executions.each do |execution|
pool.post(execution)
end
pool.idle? ? polling_interval : 10.minutes
end
end
end
end
module SolidQueue
class Pool
def post(execution)
available_threads.decrement
Concurrent::Promises.future_on(executor, execution) do |thread_execution|
wrap_in_app_executor do
# Worker Lifecycle 8 - Perform the job in the thread pool
thread_execution.perform
ensure
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
end
end.on_rejection! do |e|
handle_thread_error(e)
end
end
end
end
Dispatchers
In charge of selecting jobs scheduled to run in the future that are due and dispatching them.
module SolidQueue::Processes
module Runnable
include Supervised
attr_writer :mode
def start
# Dispatcher Lifecycle - 1 - Dispatcher started
boot
end
end
end
module SolidQueue::Processes
class Poller < Base
def run
# Dispatcher Lifecycle - 2 - Polling loop started
start_loop
end
def start_loop
loop do
break if shutting_down?
delay = wrap_in_app_executor do
# Dispatcher Lifecycle - 3 - Polling loop
poll
end
interruptible_sleep(delay)
end
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
run_callbacks(:shutdown) { shutdown }
end
end
end
end
module SolidQueue
class Dispatcher < Processes::Poller
def poll
# Dispatcher Lifecycle - 4 - Dispatching next batch
batch = dispatch_next_batch
batch.zero? ? polling_interval : 0.seconds
end
def dispatch_next_batch
with_polling_volume do
# Dispatcher Lifecycle - 5 - Dispatching next batch
ScheduledExecution.dispatch_next_batch(batch_size)
end
end
end
end
module SolidQueue
class ScheduledExecution < Execution
class << self
def dispatch_next_batch(batch_size)
transaction do
# Dispatcher Lifecycle - 6 - Finds the next available jobs and dispatches them
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
if job_ids.empty? then 0
else
SolidQueue.instrument(:dispatch_scheduled, batch_size: batch_size) do |payload|
payload[:size] = dispatch_jobs(job_ids)
end
end
end
end
end
end
end
module SolidQueue
class Execution
module Dispatching
extend ActiveSupport::Concern
class_methods do
def dispatch_jobs(job_ids)
jobs = Job.where(id: job_ids)
# Dispatcher Lifecycle - 7 - Dispatches jobs and then deletes from executions
Job.dispatch_all(jobs).map(&:id).then do |dispatched_job_ids|
where(id: where(job_id: dispatched_job_ids).pluck(:id)).delete_all
end
end
end
end
end
end
Scheduler
Manages recurring tasks.
module SolidQueue::Processes
module Runnable
def start
# Scheduler Lifecycle - 1 - Schedule started
boot
end
end
end
module SolidQueue
class Scheduler < Processes::Base
# Scheduler Lifecycle - 2 - Schedule recurring tasks
after_boot :schedule_recurring_tasks
# Scheduler Lifecycle - 4 - Unschedule recurring tasks
before_shutdown :unschedule_recurring_tasks
private
SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks
def run
loop do
# Scheduler Lifecycle - 3 - Schedule loop
break if shutting_down?
interruptible_sleep(SLEEP_INTERVAL)
end
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
run_callbacks(:shutdown) { shutdown }
end
end
end
end
Commits/PRs that caught my eye
- Allow supervisor to recover after crash #519 - Gradually increase the time between retries when something fails, so that you're not hammering the system with rapid restarts. You give time for transient issues to resolve.
-
Wait for a random interval before starting the scheduler's tasks - Loved the clarity of the method name
initial_jitter. -
Use SKIP LOCKED - I love how we went from having
FOR UPDATE SKIP LOCKEDin each query to overriding the lock method to renaming to something new so we don't run into issues where someone else overrides it. - Implement perform_all_later via enqueue_all #93 - The beauty of open source is that now I've learned about perform_all_later and how it was implemented in Rails.
- Use Zeitwerk as autoloader instead of explicit requires - I've been updating an app from Rails 6 to Rails 7 and dealing with a lot of Zeitwerk failures. It was nice to see the benefit of it being introduced here.
If I were to do it again
Honestly, I would have skipped the commit-by-commit review. I did learn a ton, but it just took too long. After a full day of coding and reviewing at work, there's only so much your brain can handle. I'm the type of person who finishes what they start, no matter what. But this reminded me of something I read. It is okay to stop reading a bad book. I tend to push through and painfully finish these books. Not saying this was a bad book. It's just that the value I got from going commit-by-commit probably wasn't worth the time and mental load.
New Plan
- Read ReadMe
- Read articles and watch videos on Solid Queue
- Run Solid Queue in an Application
- Debug Gem
- Read a couple of closed PRs
Thanks
I just want to thank Rosa Gutierrez for building this gem. I learned a lot from her simply by reading through the code. Hopefully, others out there will do deep dive on a gem too.
Top comments (0)