DEV Community

Spyros Ponaris
Spyros Ponaris

Posted on

Tutorial: Building a .NET 9 Console App with Hangfire and Channels

Tutorial: Building a .NET 9 Console App with Hangfire and Channels

Overview

In this tutorial, you will build a .NET 8 console application that:

  • Polls data from an API, or simulates it
  • Uses System.Threading.Channels for producer consumer messaging
  • Runs background tasks using Hosted Services
  • Enqueues jobs in Hangfire using in memory storage

Why Use System.Threading.Channels

Traditional producer consumer patterns often rely on BlockingCollection or custom queues, which can introduce:

  • Thread contention
  • Blocking calls
  • Complex synchronization

System.Threading.Channels provides:

  • Asynchronous, non blocking communication between producers and consumers
  • Built in backpressure when using bounded channels
  • High performance with low allocation

In this app:

  • DataPollingService is the producer, it writes work items into the channel
  • ChannelConsumerService is the consumer, it reads items and enqueues Hangfire jobs

Why Hosted Services

Hosted services, BackgroundService, integrate with the .NET Generic Host. You get:

  • Graceful lifecycle management, start and stop
  • Dependency injection
  • Logging and configuration
  • Non blocking background loops

This app uses two hosted services:

  • DataPollingService simulates fetching data and writes work items to the channel
  • ChannelConsumerService reads from the channel and enqueues Hangfire jobs

What is BackgroundService

BackgroundService is a base class that implements IHostedService and gives you a long running ExecuteAsync loop. It plugs into the Generic Host, so you get DI, logging, and coordinated shutdown.

Use cases

  • Poll APIs or message brokers
  • Drain a queue or channel
  • Run internal maintenance tasks

Key points

  • Use ExecuteAsync(CancellationToken) as your main loop
  • Respect the cancellation token for fast shutdown
  • Prefer PeriodicTimer over Task.Delay in recurring loops

What is Hangfire

Hangfire is a background job processor for .NET. You enqueue a method call, Hangfire persists and executes it with workers. It supports fire and forget, delayed, recurring, continuations, and batches.

Why it helps here

  • The consumer stays lightweight, it only enqueues work
  • Processing is retried and observable with a dashboard
  • You can swap storage later for durability

Common job types

  • Fire and forget, BackgroundJob.Enqueue(() => DoWork())
  • Delayed, BackgroundJob.Schedule(() => DoWork(), TimeSpan.FromMinutes(5))
  • Recurring, RecurringJob.AddOrUpdate("nightly", () => DoWork(), "0 2 * * *")

Notes, in memory storage is fine for demos, not for production. Jobs should be idempotent.


What is System.Threading.Channels

Channels are high performance, async friendly, in process queues. You get a ChannelWriter<T> for producers and a ChannelReader<T> for consumers. Channels work with async and IAsyncEnumerable<T>, and avoid blocking threads.

Why they shine

  • Async reads and writes
  • Bounded channels provide backpressure
  • Options like SingleReader and SingleWriter reduce overhead when true

Fast read loop

while (await reader.WaitToReadAsync(ct))
    while (reader.TryRead(out var item))
        await HandleAsync(item, ct);
Enter fullscreen mode Exit fullscreen mode

Steps to Build

1. Create the Project

dotnet new console -n CustomerSyncConsole
cd CustomerSyncConsole
Enter fullscreen mode Exit fullscreen mode

2. Add NuGet Packages

dotnet add package Hangfire
dotnet add package Hangfire.MemoryStorage
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Microsoft.Extensions.Logging.Console
Enter fullscreen mode Exit fullscreen mode

3. Implement the Code

Below is a complete Program.cs with comments. It uses an unbounded channel for simplicity. See the optional bounded channel section that follows if you want backpressure.

using System.Threading.Channels;
using Hangfire;
using Hangfire.MemoryStorage;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace CustomerSyncConsole;

public sealed record CustomerWorkItem(int CustomerId, string PayloadId, DateTimeOffset ReceivedAt, string RawJson);

public interface ICustomerJobs
{
    Task ProcessCustomer(int customerId, string payloadId, CancellationToken ct = default);
}

public sealed class CustomerJobs(ILogger<CustomerJobs> logger) : ICustomerJobs
{
    private readonly ILogger<CustomerJobs> _logger = logger;

    public async Task ProcessCustomer(int customerId, string payloadId, CancellationToken ct = default)
    {
        _logger.LogInformation("Hangfire job started for Customer {CustomerId}, Payload {PayloadId}", customerId, payloadId);
        await Task.Delay(TimeSpan.FromSeconds(2), ct);
        _logger.LogInformation("Hangfire job completed for Customer {CustomerId}, Payload {PayloadId}", customerId, payloadId);
    }
}

public interface IStorage
{
    Task SaveAsync(string payloadId, string rawJson, CancellationToken ct);
}

public sealed class InMemoryStorage(ILogger<InMemoryStorage> logger) : IStorage
{
    private readonly ILogger<InMemoryStorage> _logger = logger;
    private readonly Dictionary<string, string> _store = new();

    public Task SaveAsync(string payloadId, string rawJson, CancellationToken ct)
    {
        _store[payloadId] = rawJson;
        _logger.LogInformation("Saved payload {PayloadId} in-memory (size={Size}).", payloadId, _store.Count);
        return Task.CompletedTask;
    }
}

public sealed class DataPollingService(
    ILogger<DataPollingService> logger,
    Channel<CustomerWorkItem> channel,
    IConfiguration config,
    IStorage storage) : BackgroundService
{
    private readonly ILogger<DataPollingService> _logger = logger;
    private readonly ChannelWriter<CustomerWorkItem> _writer = channel.Writer;
    private readonly IConfiguration _config = config;
    private readonly IStorage _storage = storage;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var intervalSeconds = _config.GetValue("Polling:IntervalSeconds", 10);
        var timer = new PeriodicTimer(TimeSpan.FromSeconds(intervalSeconds));

        _logger.LogInformation("Polling started. Interval={Interval}s", intervalSeconds);

        try
        {
            while (await timer.WaitForNextTickAsync(stoppingToken))
            {
                var simulated = new[]
                {
                    new IncomingDto { CustomerId = 1, PayloadId = Guid.NewGuid().ToString("N") },
                    new IncomingDto { CustomerId = 2, PayloadId = Guid.NewGuid().ToString("N") },
                };

                foreach (var dto in simulated)
                {
                    var raw = System.Text.Json.JsonSerializer.Serialize(dto);
                    await _storage.SaveAsync(dto.PayloadId, raw, stoppingToken);

                    var item = new CustomerWorkItem(dto.CustomerId, dto.PayloadId, DateTimeOffset.UtcNow, raw);
                    await _writer.WriteAsync(item, stoppingToken);
                    _logger.LogInformation("Queued work item for Customer {CustomerId}, Payload {PayloadId}", dto.CustomerId, dto.PayloadId);
                }
            }
        }
        finally
        {
            _writer.TryComplete();
        }
    }

    private sealed class IncomingDto
    {
        public int CustomerId { get; set; }
        public string PayloadId { get; set; } = Guid.NewGuid().ToString("N");
    }
}

public sealed class ChannelConsumerService(
    Channel<CustomerWorkItem> channel,
    IBackgroundJobClient backgroundJobClient,
    ILogger<ChannelConsumerService> logger) : BackgroundService
{
    private readonly ChannelReader<CustomerWorkItem> _reader = channel.Reader;
    private readonly IBackgroundJobClient _backgroundJobClient = backgroundJobClient;
    private readonly ILogger<ChannelConsumerService> _logger = logger;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Channel consumer started.");
        await foreach (var item in _reader.ReadAllAsync(stoppingToken))
        {
            if (item.CustomerId == 1)
            {
                var jobId = _backgroundJobClient.Enqueue<ICustomerJobs>(job => job.ProcessCustomer(item.CustomerId, item.PayloadId, CancellationToken.None));
                _logger.LogInformation("Enqueued Hangfire job {JobId} for Customer {CustomerId}, Payload {PayloadId}", jobId, item.CustomerId, item.PayloadId);
            }
        }
        _logger.LogInformation("Channel consumer stopping.");
    }
}

public static class Program
{
    public static async Task Main(string[] args)
    {
        using var host = Host.CreateDefaultBuilder(args)
            .ConfigureAppConfiguration(cfg =>
            {
                cfg.AddInMemoryCollection(new Dictionary<string, string?>
                {
                    ["Polling:IntervalSeconds"] = "10"
                });
            })
            .ConfigureLogging(logging =>
            {
                logging.ClearProviders();
                logging.AddSimpleConsole(o =>
                {
                    o.TimestampFormat = "HH:mm:ss ";
                    o.SingleLine = true;
                });
                logging.SetMinimumLevel(LogLevel.Information);
            })
            .ConfigureServices((ctx, services) =>
            {
                // Unbounded for simplicity in this tutorial
                services.AddSingleton(Channel.CreateUnbounded<CustomerWorkItem>());

                // Hangfire in memory
                services.AddHangfire(config => config.UseMemoryStorage());
                services.AddHangfireServer();

                services.AddSingleton<ICustomerJobs, CustomerJobs>();
                services.AddSingleton<IStorage, InMemoryStorage>();
                services.AddHostedService<DataPollingService>();
                services.AddHostedService<ChannelConsumerService>();
            })
            .Build();

        await host.RunAsync();
    }
}
Enter fullscreen mode Exit fullscreen mode

Optional, switch to a bounded channel

For services that can receive spikes, prefer bounded channels to get backpressure.

services.AddSingleton(_ =>
{
    var opts = new BoundedChannelOptions(capacity: 512)
    {
        FullMode = BoundedChannelFullMode.Wait,
        SingleReader = true,   // one consumer service
        SingleWriter = true    // one producer service
    };
    return Channel.CreateBounded<CustomerWorkItem>(opts);
});
Enter fullscreen mode Exit fullscreen mode

Replace the unbounded registration with the code above.


Best Practices

  • Pass CancellationToken to every WriteAsync and ReadAllAsync
  • Complete the writer during shutdown, then await reader completion
  • Make Hangfire jobs idempotent, retries can occur
  • Consider persistent Hangfire storage for real workloads, for example SQL Server or SQLite
  • Expose channel depth as a metric if you need visibility, current size versus capacity

Run it

dotnet run
Enter fullscreen mode Exit fullscreen mode

You should see logs from both services. The producer writes items into the channel. The consumer enqueues Hangfire jobs for customer 1. The Hangfire server executes those jobs.


References

Top comments (2)

Collapse
 
__b63657 profile image
nikosst

That's exactly what I need, a Hangfire.

Collapse
 
stevsharp profile image
Spyros Ponaris

That's great. Hangfire is a great tool 🔥🔥