DEV Community

loading...
Cover image for How to scale Hangfire with docker

How to scale Hangfire with docker

Mohsen Esmailpour
I'm a software developer with several years of experience in developing software mainly working on web platforms with Microsoft .NET stack.
Updated on ・6 min read

Hangfire is an open-source framework that helps you to create, process and manage your background jobs and an easy way to perform fire-and-forget, delayed and recurring jobs inside .NET applications. This post does not cover the basics of Hanfgire so read this article to learn the basics of Hangfire.

This post covers:

  • How to config Hangfire to have multiple queues
  • How to config worker count
  • How to scale Hangfire worker service

I'm going to implement a sample project that contains a producer service that produces and exposes messages via web API and a consumer service which every second fetch messages from the producer service and enqueue messages into Hangfire. Each message has a different priority to process, so we enqueue each message to different queues based on priority and a worker service that processes messages and can be scaled out. You can skip steps 1-10 if you already have experience with Hangfire.

Producer Service

Imagine you need to call an external service and get a substantial amount of messages from that external service and queue messages to process. Our producer service mimics that external service behavior that provides messages.

  • Step 1 - Create a class library project and name it JobQueue.Shared
  • Step 2 - Add Message model class to the class library project
public class MessageModel
{
    public Guid MessageId { get; set; }

    public DateTime CreateDate { get; set; }

    public string Category { get; set; }

    public object Payload { get; set; }
}
Enter fullscreen mode Exit fullscreen mode
  • Step 3 - Create ASP.NET Core Web API project and name it JobQueue.ProducerService
  • Step 4 - Reference shared class library project to producer service project

  • Step 5 - Create message producer

Let's create a message producer class that generates a random message. I used an excellent library Bogus to generate fake data for testing purposes.

internal class MessageGenerator
{
    private static readonly string[] Categories = { "express", "normal" };
    private static readonly Faker<MessageModel> Faker;

    static MessageGenerator()
    {
        var random = new Random();

        Faker = new Faker<MessageModel>()
            .StrictMode(false)
            .RuleFor(p => p.Category, f => f.PickRandom(Categories))
            .RuleFor(p => p.MessageId, f => f.Random.Guid())
            .RuleFor(p => p.CreateDate, f => f.Date.Between(DateTime.Now.AddSeconds(-random.Next(1, 5)), DateTime.Now));
    }

    public static IEnumerable<MessageModel> GenerateMessages()
    {
        return Faker.Generate(100);
    }
}
Enter fullscreen mode Exit fullscreen mode

Let's save generated message in a message store.

internal class MessageStore
{
    private readonly List<MessageModel> _store = new();
    private static readonly MessageStore _instance = new();

    private MessageStore()
    {
    }

    public static MessageStore Instance => _instance;

    public int Count => _store.Count;

    public void AddMessages(IEnumerable<MessageModel> messages)
    {
        _store.AddRange(messages);
    }

    public IEnumerable<MessageModel> GetMessages(int count)
    {
        var message = _store.Take(count).ToList();
        _store.RemoveRange(0, message.Count);

        return message;
    }
}
Enter fullscreen mode Exit fullscreen mode

Let's create a background service that periodically generates messages and save them into messages. I used hosted service in ASP.NET Core to achieve this aim.

internal class MessageProducerHostedService : IHostedService, IDisposable
{
    private Timer _timer;

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _timer = new Timer(SeedData, null, TimeSpan.Zero, TimeSpan.FromSeconds(1));

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _timer?.Change(Timeout.Infinite, 0);

        return Task.CompletedTask;
    }

    public void Dispose()
    {
        _timer?.Dispose();
    }

    private void SeedData(object state)
    {
        if (MessageStore.Instance.Count > 2000)
            return;

        var messages = MessageGenerator.GenerateMessages();
        MessageStore.Instance.AddMessages(messages);
    }
}
Enter fullscreen mode Exit fullscreen mode
  • Step 6 - Create an API to expose messages
[ApiController]
[Route("api/v1/[controller]")]
public class MessagesController : ControllerBase
{
    [HttpGet]
    public IEnumerable<MessageModel> Get()
    {
        return MessageStore.Instance.GetMessages(new Random().Next(50, 200));
    }
}
Enter fullscreen mode Exit fullscreen mode

Consumer Service

  • Step 7 - Create ASP.NET Core Web API project and name it JobQueue.ConsumerService

  • Step 8 - Install Hangfire.AspNetCore and HangFire.Redis.StackExchange nuget packages

  • Step 9 - Implement a background service that periodically fetches messages from the producer service and enqueue into a queue

public class MessageReceiverHostedService : IHostedService
{
    private readonly CancellationTokenSource _cts;
    private readonly IServiceProvider _serviceProvider;

    public MessageReceiverHostedService(IServiceProvider serviceProvider, ILogger<MessageReceiverHostedService> logger)
    {
        _serviceProvider = serviceProvider;
        _cts = new CancellationTokenSource();
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await Task.Factory.StartNew(() => FetchMessagesAsync(_cts.Token), cancellationToken);
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _cts.Cancel();

        return Task.CompletedTask;
    }

    private async Task FetchMessagesAsync(CancellationToken cancellationToken)
    {
        while (true)
        {
            using var scope = _serviceProvider.CreateScope();
            var httpClient = scope.ServiceProvider.GetRequiredService<JobHttpClient>();
            var messages = await httpClient.GetJobMessagesAsync(cancellationToken);

            if (!messages.Any())
                continue;

            var categories = messages.GroupBy(m => m.Category).ToList();

            Parallel.ForEach(categories, category =>
            {
                Enqueue(category.Key, category.ToList());
            });

            await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);

            if (cancellationToken.IsCancellationRequested)
                break;
        }
    }

    private void Enqueue(string queueName, List<MessageModel> messages)
    {
        var client = new BackgroundJobClient();
        var state = new EnqueuedState(queueName);

        foreach (var message in messages.OrderBy(o => o.CreateDate))
        {
            Expression<Action> action = queueName == "express"
                ? () => MessageProcessor.ProcessExpressMessageAsync(message, message.MessageId)
                : () => MessageProcessor.ProcessNormalMessageAsync(message, message.MessageId);
            client.Create(action, state);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Each message has a Category property that identifies the priority of it. We have two categories, express and normal and express has a higher priority. We need two queues for express and normal categories.

One more thing I want to mention is that in this background service I didn't use the timer. Read this article to find out more on this case.

  • Step 8 - Add new class MessageProcessor to JobQueue.Shared project
public class MessageProcessor
{
    [Queue("express")]
    [DisplayName("JobId: {1}")]
    [AutomaticRetry(Attempts = 3)]
    public static async Task ProcessExpressMessageAsync(MessageModel message, Guid messageId)
    {
        await Task.Delay(TimeSpan.FromSeconds(new Random().Next(1, 4)));
    }

    [Queue("normal")]
    [DisplayName("JobId: {1}")]
    [AutomaticRetry(Attempts = 3)]
    public static async Task ProcessNormalMessageAsync(MessageModel message, Guid messageId)
    {
        await Task.Delay(TimeSpan.FromSeconds(new Random().Next(1, 4)));
    }
}
Enter fullscreen mode Exit fullscreen mode

We have to methods for processing messages from each queue. One limitation of Hangfire is that you cannot use a method to process jobs from multiple queues (or at least I'm not aware of it but it can be done by implementing a custom Queue attribute).

  • Step 9 - Reference shared class library project to consumer service project

  • Step 10 - Add Hangfire dashboard to consumer service
    Hangfire has a dashboard that enables you to monitor the jobs and their statuses. It also allows you to manually trigger available jobs.
    Open Startup.cs class and add Hangfire dependencies to ConfigureServices method:

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();
    services.AddHostedService<MessageReceiverHostedService>();

    services.AddHangfire(configuration => configuration
        .SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
        .UseSimpleAssemblyNameTypeSerializer()
        .UseRecommendedSerializerSettings()
        .UseRedisStorage(_redis));
}
Enter fullscreen mode Exit fullscreen mode

And in configure method:

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    ...

    app.UseEndpoints(endpoints =>
    {
        endpoints.MapDefaultControllerRoute();
        endpoints.MapHangfireDashboard();
    });
}
Enter fullscreen mode Exit fullscreen mode

Run producer project and navigate to <localhost>/hangfire to view the dashboard.

Worker Service

  • Step 11 - Create Worker Service project and name it JobQueue.WorkerService
  • Step 12 - Open appsettings.json file and the following configuration:
"Hangfire": [
  {
    "QueueName": "express",
    "WorkerCount": 5
  },
  {
    "QueueName": "normal",
    "WorkerCount": 2
  }
],
Enter fullscreen mode Exit fullscreen mode

Background jobs are processed by a dedicated pool of worker threads that run inside the Hangfire server subsystem. For each queue, we can config the number of workers.

  • Step 13 - Add new class HangfireQueueSetting to bind configuration:
internal class HangfireQueueSetting
{
    public string QueueName { get; set; }

    public int WorkerCount { get; set; }
}
Enter fullscreen mode Exit fullscreen mode
  • Step 14 - Install Hangfire.AspNetCore and HangFire.Redis.StackExchange nuget packages
  • Step 15 - Open Program.cs class and add Hangfire dependencies
public class Program
{
    private static ConnectionMultiplexer _redis;

    public static void Main(string[] args)
    {
        CreateHostBuilder(args).Build().Run();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                _redis = ConnectionMultiplexer.Connect(hostContext.Configuration.GetConnectionString("RedisConnection"));

                services.AddHangfire(configuration => configuration
                    .SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
                    .UseSimpleAssemblyNameTypeSerializer()
                    .UseRecommendedSerializerSettings()
                    .UseRedisStorage(_redis));

                var queueSettings = hostContext.Configuration.GetSection("Hangfire").Get<List<HangfireQueueSetting>>();
                foreach (var setting in queueSettings)
                {
                    services.AddHangfireServer(options =>
                    {
                        options.ServerName = $"{Environment.MachineName}:{setting.QueueName}";
                        options.Queues = new[] { setting.QueueName };
                        options.WorkerCount = setting.WorkerCount;
                    });
                }
            });
}
Enter fullscreen mode Exit fullscreen mode
  • Step 16 - Reference shared class library project to producer service project

Now run all projects and navigate to Hangfire dashboard and you could see jobs are processing:
Alt Text

Click on the Servers tab and you can see we have two queues and with different worker count:
Alt Text

Dockerizing Services

version: '3.4'

networks:
  service_network:

services:
  redis:
    image: "redis"
    ports:
      - 6379:6379
    networks:
      - service_network

  consumerservice:
    image: ${DOCKER_REGISTRY-}jobqueueconsumerservice
    container_name: consumerservice
    ports:
      - 9000:80
    networks:
      - service_network
    build:
      context: .
      dockerfile: JobQueue.ConsumerService/Dockerfile
    environment:
      - ConnectionStrings__RedisConnection=redis:6379
      - JobApi__BaseAddress=http://producerservice

  producerservice:
    image: ${DOCKER_REGISTRY-}jobqueueproducerservice
    container_name: producerservice
    build:
      context: .
      dockerfile: JobQueue.ProducerService/Dockerfile
    networks:
      - service_network

  workerservice:
    image: ${DOCKER_REGISTRY-}workerservice
    networks:
      - service_network
    build:
      context: .
      dockerfile: JobQueue.WorkerService/Dockerfile
    environment:
      - ConnectionStrings__RedisConnection=redis:6379
      - Hangfire__0__WorkerCount=10
      - Hangfire__1__WorkerCount=5
Enter fullscreen mode Exit fullscreen mode

We can configure worker count via docker-compose file by passing values through environment:

Hangfire__0__WorkerCount=10
Enter fullscreen mode Exit fullscreen mode

Let's run projects via docker-compose:

  • run docker-compose build
  • run docker-compose up -d
  • run docker-compose scale workerservice=2
  • To access the job dashboard, enter http://localhost:9000 address in the browser Alt Text

You can find the source code for this walkthrough on Github.

Discussion (2)

Collapse
javadborhani profile image
Jeb

Thanks for sharing.
there is a typo "Hangfir" => "Hangfire"

Collapse
moesmp profile image
Mohsen Esmailpour Author

Thank you, I fixed the typo.