DEV Community

Chad Burggraf
Chad Burggraf

Posted on • Originally published at assetbots.com

How to Schedule Delayed Jobs on a Specific Queue with Hangfire

We use Hangfire at Assetbots to manage and coordinate all our background processing and event handling. While Hangfire comes with a lot of great features out of the box, it lacks the ability to schedule delayed jobs on a specific queue. Luckily for us, Hangfire’s architecture is extremely simple and extensible, so with just a little bit of custom code we can implement this feature ourselves.

Jobs as State Machines

At its core, Hangfire treats jobs as individual state machines. There are a number of built-in states that the system ships with, along with their corresponding handlers. Each state’s handler is responsible for transitioning jobs to and from that state in storage.

For example, the EnqueuedState handler adds jobs to their corresponding queue in storage. Compare that to the ScheduledState handler, which sets a timestamp on a custom scheduled metadata key in storage that indicates when the job should be enqueued.

Job Filters as State Machine Middleware

Another core feature of Hangfire’s architecture is the chain-of-responsibility pipeline. This processing pipeline has a number of stages that can be intercepted using job filters. Each filter can operate on and change the job’s behavior at that point in the pipeline. There can be multiple filters applied, each operating independently, and each applied at different levels of granularity (e.g., at the job or method level, at the class level, or system-wide).

Using filters, you can extend Hangfire to implement things like logging each state transition or reporting unhandled errors to Bugsnag or Sentry.

Using States + Filters to Schedule Jobs on a Specific Queue

As you might imagine, there are a lot of possibilities afforded by these two simple primitives. In any case, let's use a custom job state and custom queue filter to enable us to schedule delayed jobs on the queue of our choice.

We need to accomplish two things:

  1. Add a custom piece of metadata to the scheduled job indicating what queue we want to be enqueued onto when we transition to the EnqueuedState.
  2. Read our custom queue metadata when transitioning from the ScheduledState to the EnqueuedState and using it to enqueue the job to the correct queue.

ScheduledQueueState

Hangfire ships with an un-sealed state representing delayed, scheduled jobs. And since states in Hangfire are persisted as JSON using JSON.NET, we can simply extend this class and add a custom property to it. This property will be serialized and de-serialized using the default JSON serialization infrastructure.

using System;
using Hangfire.States;
using Newtonsoft.Json;

public sealed class ScheduledQueueState : ScheduledState
{
        public ScheduledQueueState(TimeSpan enqueueIn)
                : this(DateTime.UtcNow.Add(enqueueIn), null)
        {
        }

        public ScheduledQueueState(DateTime enqueueAt)
                : this(enqueueAt, null)
        {
        }

        [JsonConstructor]
        public ScheduledQueueState(DateTime enqueueAt, string queue)
                : base(enqueueAt)
        {
                this.Queue = queue?.Trim();
        }

        public string Queue { get; }
}
Enter fullscreen mode Exit fullscreen mode

This class is extremely straightforward: extend ScheduledState and add a Queue property while maintaining JSON serialization compatibility. Next, we need to take advantage of this new property when moving into the EnqueuedState.

QueueFilter

Our QueueFilter class will do two things:

  1. Watch for jobs being created in either the EnqueuedState or the ScheduledQueueState and grab their Queue property to store as a custom job parameter, and
  2. Use our custom Queue job parameter when transitioning to (or electing) the EnqueuedState.
using System;
using Hangfire.Client;
using Hangfire.States;

public sealed class QueueFilter : IClientFilter, IElectStateFilter
{
        public const string QueueParameterName = "Queue";

        public void OnCreated(CreatedContext filterContext)
        {
        }

        public void OnCreating(CreatingContext filterContext)
        {
                string queue = null;

                switch (filterContext.InitialState)
                {
                        case EnqueuedState es:
                                queue = es.Queue;
                                break;
                        case ScheduledQueueState sqs:
                                queue = sqs.Queue;
                                break;
                        default:
                                break;
                }

                if (!string.IsNullOrWhiteSpace(queue))
                {
                        filterContext.SetJobParameter(QueueFilter.QueueParameterName, queue);
                }
        }

        public void OnStateElection(ElectStateContext context)
        {
                if (context.CandidateState.Name == EnqueuedState.StateName)
                {
                        string queue = context.GetJobParameter<string>(QueueFilter.QueueParameterName)?.Trim();

                        if (string.IsNullOrWhiteSpace(queue))
                        {
                            queue = EnqueuedState.DefaultQueue;
                        }

                        context.CandidateState = new EnqueuedState(queue);
                }
        }
}
Enter fullscreen mode Exit fullscreen mode

Again, this class is pretty simple. We’re using C#’s pattern matching to check if the job is being created with either of the two states we care about. If it is, we are setting a custom parameter that will travel with the job through the pipeline.

Then, when the system is transitioning the job to a new state, we check to see if the state being transitioned to is the EnqueuedState. If it is, we look for our previously set custom parameter and use it to create a new version of the EnqueuedState to transition into instead.

Putting the Pieces Together

Now that we have all the code we need in place, how do we actually wire up the filter and take advantage of our new ScheduledQueueState? First, we need to register our QueueFilter into Hangfire’s processing pipeline:

services.AddHangfire(configuration: (services, config) =>
{
        config.UseFilter(new QueueFilter());
});
Enter fullscreen mode Exit fullscreen mode

Finally, we can create extension methods corresponding to the Schedule overloads that we need. For example:

public static string Schedule(
        [NotNull] this IBackgroundJobClient client, 
        [NotNull, InstantHandle] Expression<Action> methodCall, 
        TimeSpan delay,
        string queue)
{
        if (client == null)
        {
            throw new ArgumentNullException(nameof(client));
        }

        return client.Create(methodCall, new ScheduledQueueState(delay, queue));
}
Enter fullscreen mode Exit fullscreen mode

The above extension will let us schedule delayed jobs that don’t return a Task and don’t take a parameter onto the scheduled queue as follows:

// Acquire a reference to IBackgroundJobClient via dependency injection.
private IBackgroundJobClient client;    

// When you want to schedule your job.
this.client.Schedule(
        () => Console.Log("Hello, world!"),
        TimeSpan.FromDays(1),
        "scheduled");
Enter fullscreen mode Exit fullscreen mode

It is straightforward from here how to add any other overloads you may need.

Wrapping Up

We have been happy with Hangfire’s combination of easy setup and rich extensibility while building Assetbots. It has become an important part of our internal infrastructure as we scale our background processing needs.

Top comments (0)