DEV Community

Cover image for Diving into the Solid Queue Gem
Albert Jankowski
Albert Jankowski

Posted on

Diving into the Solid Queue Gem

Earlier this year, I set a goal of doing a deep dive into a Ruby gem. I ended up choosing Solid Queue. My goal wasn't just to understand how the gem works, but to use it as a way to become a better Software Engineer by reading high-quality open source code.

Main Goals

  • Understand how Solid Queue works under the hood
  • Learn from high-quality open source code
  • Understand the architecture behind a job queuing system
  • Improve my ability to write software with clarity by reading more code
  • Inspire others to read open source

Plan

Read README
Get a good understanding of what the gem does and how it works.

Read Articles and Watch Videos
Look for blog posts, guides, or videos on Solid Queue. This helps build context and see how other engineers are using or explaining it.

Go commit by commit (PR by PR)
Start from the very first commit on Github and go through the project's history step by step. For each commit and pull request, read the message and discussions to understand what changed and why.

Run Solid Queue in an Application
Set up Solid Queue in a real Rails app to see it in action and get a feel for how it integrates with a typical project.

Debug Gem
Follow tests or track through the code to understand the full execution flow.

What is Solid Queue

Solid Queue is a database-backed queueing back for Active Job. It lets you run your code asynchronously and is perfect for moving slow or heavy operations into the background to keep your app responsive. It is the default job queue starting in Rails 8 so there will be a lot of eyes on the code. It is designed to be simple, reliable, and easy to deploy (No Redis required).

Killer Features

Solid Queue has many great features: delayed jobs, recurring jobs, pausing queues, prioritization, and bulk enqueuing. In my opinion, these are the killer features that make it stand out:

Heartbeat Check
Every 60 seconds, there is a check to see if a worker is alive. If a worker hasn’t updated its heartbeat in over five minutes, it’s considered dead. I'm sure most of you have run into situations where you had a long running job that seemed stuck and you weren't sure if it is still processing or silently frozen. The heartbeat check gives you great clarity on what is running.

Parallelism and Multi-thread Support
Solid Queue supports multi-threaded execution, letting you take full advantage of your CPU. Depending on whether your workload is CPU-bound or I/O-bound, you can tune the number of processes or threads to get optimal throughput. For I/O-heavy workloads, increasing threads can significantly improve performance.

Concurrency Controls
Allows you to limit how many jobs of a certain type or with certain arguments can run at the same time.

  limits_concurrency to: 1, key: ->(contact) { contact.account }, duration: 5.minutes
Enter fullscreen mode Exit fullscreen mode

Diving into the Code

I recommend cloning the repo locally and following along.

Job

Represents background work to be done and is stored and tracked in the database. Basically, when a job is created, a callback is called to see if the job is ready to be run or if it should be run later.

  class Job < Record
    class EnqueueError < StandardError; end

    # Job Lifecycle 1: When MyJob.perform_later(args) is called, the job is executed.
    include Executable, Clearable, Recurrable

    serialize :arguments, coder: JSON
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Job
    module Executable
      included do
        # Job Lifecycle 2: After the job is created, it is prepared for execution.
        after_create :prepare_for_execution
      end

      # Job Lifecycle 3: After the job is created, it is prepared for execution.
      def prepare_for_execution
        if due? then dispatch
        else
          schedule
        end
      end

      def dispatch
        # Job Lifecycle 4: If we are able to acquire a concurrency lock, we proceed to ready the job for execution.
        if acquire_concurrency_lock then ready
        else
          handle_concurrency_conflict
        end
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Job
    module ConcurrencyControls
        def acquire_concurrency_lock
          # Job Lifecycle 5 - If our job had something like:  limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes
          # then we would use the key to acquire a lock.
          return true unless concurrency_limited?

          # Job Lifecycle 6 - The method delegates to SolidQueue::Semaphore.wait(self), passing the job instance
          Semaphore.wait(self)
        end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Semaphore < Record
    class Proxy
      def wait
        # Job Lifecycle 7 - Check if a semaphore exists for the job's concurrency key.
        if semaphore = Semaphore.find_by(key: key)
          # Job Lifecycle 8 - Check if the semaphore's value is greater than 0 and attempt to decrement it.
          semaphore.value > 0 && attempt_decrement
        else
          attempt_creation
        end
      end

      private
        def attempt_decrement
          # Job Lifecycle 9 - Decement the semaphore's value and update its expiration time.
          Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
        end

        def attempt_increment
          # Job Lifecycle 10 - Increment the semaphore's value and update its expiration time.
          Semaphore.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0
        end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Job
    module Executable
      extend ActiveSupport::Concern
        def ready
          # Job Lifecycle 11 - Create a ReadyExecution record for the job.
          ReadyExecution.create_or_find_by!(job_id: id)
        end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Supervisor

Runs workers and dispatchers and checks their heartbeats and starts and stops when needed.

module SolidQueue
  class Cli < Thor
    def start
      # Supervisor Lifecycle - 1 - The CLI starts the SolidQueue::Supervisor with options.
      SolidQueue::Supervisor.start(**options.symbolize_keys)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Supervisor < Processes::Base
    include LifecycleHooks
    include Maintenance, Signals, Pidfiled

    after_shutdown :run_exit_hooks

    class << self
      def start(**options)
        # Supervisor Lifecycle - 2 - Sets configuration and initializes the supervisor:
        SolidQueue.supervisor = true
        configuration = Configuration.new(**options)

        if configuration.valid?
          new(configuration).tap(&:start)
        else
          abort configuration.errors.full_messages.join("\n") + "\nExiting..."
        end
      end
    end

    def start
      # Supervisor Lifecycle - 3 - Boots which runs the boot callbacks
      boot
      # Supervisor Lifecycle - 4 - Runs the start hooks for logging/hooks
      run_start_hooks

      # Supervisor Lifecycle - 5 - Starts the processes to supervise
      start_processes
      # Supervisor Lifecycle - 6 - Launches the maintenance task to prune dead processes
      launch_maintenance_task

      # Supervisor Lifecycle - 7 - Enters the main loop to supervise processes
      supervise
    end
    private
      def start_processes
        # Supervisor Lifecycle - 5.2 - For each configured process, start a process instance.
        configuration.configured_processes.each { |configured_process| start_process(configured_process) }
      end

      def supervise
        loop do
          # Supervisor Lifecycle - 8 - Loop until stopped.
          break if stopped?

          set_procline
          # Supervisor Lifecycle - 9 - Process signal queue to handle signals.
          process_signal_queue

          unless stopped?
            # Supervisor Lifecycle - 10 - Reap terminated forks and replace them.
            reap_and_replace_terminated_forks
            # Supervisor Lifecycle - 11 - Sleep to avoid busy waiting.  Not using `sleep` directly to allow interruption.
            # This allows the process to wake up when a signal is received.
            # This is useful for handling signals like TERM or INT without busy waiting.
            # It also allows the process to wake up when a fork is terminated.
            # This is useful for handling the termination of supervised processes.
            # The sleep time is set to 1 second, but can be adjusted as needed.
            interruptible_sleep(1.second)
          end
        end
      ensure
        shutdown
      end

      def start_process(configured_process)
        process_instance = configured_process.instantiate.tap do |instance|
          instance.supervised_by process
          instance.mode = :fork
        end

        # Supervisor Lifecycle - 5.3 - Forks a new process for the configured process.
        pid = fork do
          process_instance.start
        end

        configured_processes[pid] = configured_process
        forks[pid] = process_instance
      end
  end
end
Enter fullscreen mode Exit fullscreen mode

Worker

Workers are in charge of picking jobs ready to run from queues and processing them.

module SolidQueue
  class Supervisor < Processes::Base
      def start_process(configured_process)
        process_instance = configured_process.instantiate.tap do |instance|
          instance.supervised_by process
          instance.mode = :fork
        end

        pid = fork do
          # Worker Lifecycle 1 - For each process the supervisor will start the worker lifecycle
          process_instance.start
        end

        configured_processes[pid] = configured_process
        forks[pid] = process_instance
      end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Worker < Processes::Poller
    def initialize(**options)
      options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)

      # Ensure that the queues array is deep frozen to prevent accidental modification
      @queues = Array(options[:queues]).map(&:freeze).freeze

      # Worker Lifecycle 2 - Set up pool based on the number of threads set.
      @pool = Pool.new(options[:threads], on_idle: -> { wake_up })

      super(**options)
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue::Processes
  module Runnable
    include Supervised

    def start
      # Worker Lifecycle 3 - We start the worker lifecycle by booting
      boot

      if running_async?
        # Worker Lifecycle 4  - Start running
        @thread = create_thread { run }
      else
        run
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue::Processes
  class Poller < Base
    include Runnable

    private
      def run
        # Worker Lifecycle 5  - Start loop
        start_loop
      end

      def start_loop
        loop do
          break if shutting_down?

          delay = wrap_in_app_executor do
            # Worker Lifecycle 6  - Start polling
            poll
          end

          interruptible_sleep(delay)
        end
      ensure
        SolidQueue.instrument(:shutdown_process, process: self) do
          run_callbacks(:shutdown) { shutdown }
        end
      end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Worker < Processes::Poller
    private
      def poll
        # Worker Lifecycle 7 - Claim executions from the queues and post them to the thread pool
        claim_executions.then do |executions|
          executions.each do |execution|
            pool.post(execution)
          end

          pool.idle? ? polling_interval : 10.minutes
        end
      end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Pool
    def post(execution)
      available_threads.decrement

      Concurrent::Promises.future_on(executor, execution) do |thread_execution|
        wrap_in_app_executor do
          # Worker Lifecycle 8 - Perform the job in the thread pool
          thread_execution.perform
        ensure
          available_threads.increment
          mutex.synchronize { on_idle.try(:call) if idle? }
        end
      end.on_rejection! do |e|
        handle_thread_error(e)
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Dispatchers

In charge of selecting jobs scheduled to run in the future that are due and dispatching them.

module SolidQueue::Processes
  module Runnable
    include Supervised

    attr_writer :mode

    def start
      # Dispatcher Lifecycle - 1 - Dispatcher started
      boot
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue::Processes
  class Poller < Base
      def run
        # Dispatcher Lifecycle - 2 -  Polling loop started
        start_loop
      end

      def start_loop
        loop do
          break if shutting_down?

          delay = wrap_in_app_executor do
            # Dispatcher Lifecycle - 3 - Polling loop
            poll
          end

          interruptible_sleep(delay)
        end
      ensure
        SolidQueue.instrument(:shutdown_process, process: self) do
          run_callbacks(:shutdown) { shutdown }
        end
      end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Dispatcher < Processes::Poller
      def poll
        # Dispatcher Lifecycle - 4 - Dispatching next batch
        batch = dispatch_next_batch

        batch.zero? ? polling_interval : 0.seconds
      end

      def dispatch_next_batch
        with_polling_volume do
          # Dispatcher Lifecycle - 5 - Dispatching next batch
          ScheduledExecution.dispatch_next_batch(batch_size)
        end
      end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class ScheduledExecution < Execution
    class << self
      def dispatch_next_batch(batch_size)
        transaction do
          # Dispatcher Lifecycle - 6 - Finds the next available jobs and dispatches them
          job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
          if job_ids.empty? then 0
          else
            SolidQueue.instrument(:dispatch_scheduled, batch_size: batch_size) do |payload|
              payload[:size] = dispatch_jobs(job_ids)
            end
          end
        end
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Execution
    module Dispatching
      extend ActiveSupport::Concern

      class_methods do
        def dispatch_jobs(job_ids)
          jobs = Job.where(id: job_ids)

          # Dispatcher Lifecycle - 7 - Dispatches jobs and then deletes from executions
          Job.dispatch_all(jobs).map(&:id).then do |dispatched_job_ids|
            where(id: where(job_id: dispatched_job_ids).pluck(:id)).delete_all
          end
        end
      end
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Scheduler

Manages recurring tasks.

module SolidQueue::Processes
  module Runnable
    def start
      # Scheduler Lifecycle - 1 - Schedule started
      boot
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
module SolidQueue
  class Scheduler < Processes::Base
    # Scheduler Lifecycle - 2 - Schedule recurring tasks
    after_boot :schedule_recurring_tasks
    # Scheduler Lifecycle - 4 - Unschedule recurring tasks
    before_shutdown :unschedule_recurring_tasks

    private
      SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks

      def run
        loop do
          # Scheduler Lifecycle - 3 - Schedule loop
          break if shutting_down?

          interruptible_sleep(SLEEP_INTERVAL)
        end
      ensure
        SolidQueue.instrument(:shutdown_process, process: self) do
          run_callbacks(:shutdown) { shutdown }
        end
      end
  end
end
Enter fullscreen mode Exit fullscreen mode

Commits/PRs that caught my eye

If I were to do it again

Honestly, I would have skipped the commit-by-commit review. I did learn a ton, but it just took too long. After a full day of coding and reviewing at work, there's only so much your brain can handle. I'm the type of person who finishes what they start, no matter what. But this reminded me of something I read. It is okay to stop reading a bad book. I tend to push through and painfully finish these books. Not saying this was a bad book. It's just that the value I got from going commit-by-commit probably wasn't worth the time and mental load.

New Plan

  1. Read ReadMe
  2. Read articles and watch videos on Solid Queue
  3. Run Solid Queue in an Application
  4. Debug Gem
  5. Read a couple of closed PRs

Thanks

I just want to thank Rosa Gutierrez for building this gem. I learned a lot from her simply by reading through the code. Hopefully, others out there will do deep dive on a gem too.

Top comments (0)