Tutorial: Building a .NET 9 Console App with Hangfire and Channels
Overview
In this tutorial, you will build a .NET 8 console application that:
- Polls data from an API, or simulates it
- Uses System.Threading.Channels for producer consumer messaging
- Runs background tasks using Hosted Services
- Enqueues jobs in Hangfire using in memory storage
Why Use System.Threading.Channels
Traditional producer consumer patterns often rely on BlockingCollection or custom queues, which can introduce:
- Thread contention
- Blocking calls
- Complex synchronization
System.Threading.Channels provides:
- Asynchronous, non blocking communication between producers and consumers
- Built in backpressure when using bounded channels
- High performance with low allocation
In this app:
- DataPollingService is the producer, it writes work items into the channel
- ChannelConsumerService is the consumer, it reads items and enqueues Hangfire jobs
Why Hosted Services
Hosted services, BackgroundService, integrate with the .NET Generic Host. You get:
- Graceful lifecycle management, start and stop
- Dependency injection
- Logging and configuration
- Non blocking background loops
This app uses two hosted services:
-
DataPollingServicesimulates fetching data and writes work items to the channel -
ChannelConsumerServicereads from the channel and enqueues Hangfire jobs
What is BackgroundService
BackgroundService is a base class that implements IHostedService and gives you a long running ExecuteAsync loop. It plugs into the Generic Host, so you get DI, logging, and coordinated shutdown.
Use cases
- Poll APIs or message brokers
- Drain a queue or channel
- Run internal maintenance tasks
Key points
- Use
ExecuteAsync(CancellationToken)as your main loop - Respect the cancellation token for fast shutdown
- Prefer
PeriodicTimeroverTask.Delayin recurring loops
What is Hangfire
Hangfire is a background job processor for .NET. You enqueue a method call, Hangfire persists and executes it with workers. It supports fire and forget, delayed, recurring, continuations, and batches.
Why it helps here
- The consumer stays lightweight, it only enqueues work
- Processing is retried and observable with a dashboard
- You can swap storage later for durability
Common job types
- Fire and forget,
BackgroundJob.Enqueue(() => DoWork()) - Delayed,
BackgroundJob.Schedule(() => DoWork(), TimeSpan.FromMinutes(5)) - Recurring,
RecurringJob.AddOrUpdate("nightly", () => DoWork(), "0 2 * * *")
Notes, in memory storage is fine for demos, not for production. Jobs should be idempotent.
What is System.Threading.Channels
Channels are high performance, async friendly, in process queues. You get a ChannelWriter<T> for producers and a ChannelReader<T> for consumers. Channels work with async and IAsyncEnumerable<T>, and avoid blocking threads.
Why they shine
- Async reads and writes
- Bounded channels provide backpressure
- Options like
SingleReaderandSingleWriterreduce overhead when true
Fast read loop
while (await reader.WaitToReadAsync(ct))
while (reader.TryRead(out var item))
await HandleAsync(item, ct);
Steps to Build
1. Create the Project
dotnet new console -n CustomerSyncConsole
cd CustomerSyncConsole
2. Add NuGet Packages
dotnet add package Hangfire
dotnet add package Hangfire.MemoryStorage
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Microsoft.Extensions.Logging.Console
3. Implement the Code
Below is a complete Program.cs with comments. It uses an unbounded channel for simplicity. See the optional bounded channel section that follows if you want backpressure.
using System.Threading.Channels;
using Hangfire;
using Hangfire.MemoryStorage;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace CustomerSyncConsole;
public sealed record CustomerWorkItem(int CustomerId, string PayloadId, DateTimeOffset ReceivedAt, string RawJson);
public interface ICustomerJobs
{
Task ProcessCustomer(int customerId, string payloadId, CancellationToken ct = default);
}
public sealed class CustomerJobs(ILogger<CustomerJobs> logger) : ICustomerJobs
{
private readonly ILogger<CustomerJobs> _logger = logger;
public async Task ProcessCustomer(int customerId, string payloadId, CancellationToken ct = default)
{
_logger.LogInformation("Hangfire job started for Customer {CustomerId}, Payload {PayloadId}", customerId, payloadId);
await Task.Delay(TimeSpan.FromSeconds(2), ct);
_logger.LogInformation("Hangfire job completed for Customer {CustomerId}, Payload {PayloadId}", customerId, payloadId);
}
}
public interface IStorage
{
Task SaveAsync(string payloadId, string rawJson, CancellationToken ct);
}
public sealed class InMemoryStorage(ILogger<InMemoryStorage> logger) : IStorage
{
private readonly ILogger<InMemoryStorage> _logger = logger;
private readonly Dictionary<string, string> _store = new();
public Task SaveAsync(string payloadId, string rawJson, CancellationToken ct)
{
_store[payloadId] = rawJson;
_logger.LogInformation("Saved payload {PayloadId} in-memory (size={Size}).", payloadId, _store.Count);
return Task.CompletedTask;
}
}
public sealed class DataPollingService(
ILogger<DataPollingService> logger,
Channel<CustomerWorkItem> channel,
IConfiguration config,
IStorage storage) : BackgroundService
{
private readonly ILogger<DataPollingService> _logger = logger;
private readonly ChannelWriter<CustomerWorkItem> _writer = channel.Writer;
private readonly IConfiguration _config = config;
private readonly IStorage _storage = storage;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var intervalSeconds = _config.GetValue("Polling:IntervalSeconds", 10);
var timer = new PeriodicTimer(TimeSpan.FromSeconds(intervalSeconds));
_logger.LogInformation("Polling started. Interval={Interval}s", intervalSeconds);
try
{
while (await timer.WaitForNextTickAsync(stoppingToken))
{
var simulated = new[]
{
new IncomingDto { CustomerId = 1, PayloadId = Guid.NewGuid().ToString("N") },
new IncomingDto { CustomerId = 2, PayloadId = Guid.NewGuid().ToString("N") },
};
foreach (var dto in simulated)
{
var raw = System.Text.Json.JsonSerializer.Serialize(dto);
await _storage.SaveAsync(dto.PayloadId, raw, stoppingToken);
var item = new CustomerWorkItem(dto.CustomerId, dto.PayloadId, DateTimeOffset.UtcNow, raw);
await _writer.WriteAsync(item, stoppingToken);
_logger.LogInformation("Queued work item for Customer {CustomerId}, Payload {PayloadId}", dto.CustomerId, dto.PayloadId);
}
}
}
finally
{
_writer.TryComplete();
}
}
private sealed class IncomingDto
{
public int CustomerId { get; set; }
public string PayloadId { get; set; } = Guid.NewGuid().ToString("N");
}
}
public sealed class ChannelConsumerService(
Channel<CustomerWorkItem> channel,
IBackgroundJobClient backgroundJobClient,
ILogger<ChannelConsumerService> logger) : BackgroundService
{
private readonly ChannelReader<CustomerWorkItem> _reader = channel.Reader;
private readonly IBackgroundJobClient _backgroundJobClient = backgroundJobClient;
private readonly ILogger<ChannelConsumerService> _logger = logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Channel consumer started.");
await foreach (var item in _reader.ReadAllAsync(stoppingToken))
{
if (item.CustomerId == 1)
{
var jobId = _backgroundJobClient.Enqueue<ICustomerJobs>(job => job.ProcessCustomer(item.CustomerId, item.PayloadId, CancellationToken.None));
_logger.LogInformation("Enqueued Hangfire job {JobId} for Customer {CustomerId}, Payload {PayloadId}", jobId, item.CustomerId, item.PayloadId);
}
}
_logger.LogInformation("Channel consumer stopping.");
}
}
public static class Program
{
public static async Task Main(string[] args)
{
using var host = Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration(cfg =>
{
cfg.AddInMemoryCollection(new Dictionary<string, string?>
{
["Polling:IntervalSeconds"] = "10"
});
})
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddSimpleConsole(o =>
{
o.TimestampFormat = "HH:mm:ss ";
o.SingleLine = true;
});
logging.SetMinimumLevel(LogLevel.Information);
})
.ConfigureServices((ctx, services) =>
{
// Unbounded for simplicity in this tutorial
services.AddSingleton(Channel.CreateUnbounded<CustomerWorkItem>());
// Hangfire in memory
services.AddHangfire(config => config.UseMemoryStorage());
services.AddHangfireServer();
services.AddSingleton<ICustomerJobs, CustomerJobs>();
services.AddSingleton<IStorage, InMemoryStorage>();
services.AddHostedService<DataPollingService>();
services.AddHostedService<ChannelConsumerService>();
})
.Build();
await host.RunAsync();
}
}
Optional, switch to a bounded channel
For services that can receive spikes, prefer bounded channels to get backpressure.
services.AddSingleton(_ =>
{
var opts = new BoundedChannelOptions(capacity: 512)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true, // one consumer service
SingleWriter = true // one producer service
};
return Channel.CreateBounded<CustomerWorkItem>(opts);
});
Replace the unbounded registration with the code above.
Best Practices
- Pass
CancellationTokento everyWriteAsyncandReadAllAsync - Complete the writer during shutdown, then await reader completion
- Make Hangfire jobs idempotent, retries can occur
- Consider persistent Hangfire storage for real workloads, for example SQL Server or SQLite
- Expose channel depth as a metric if you need visibility, current size versus capacity
Run it
dotnet run
You should see logs from both services. The producer writes items into the channel. The consumer enqueues Hangfire jobs for customer 1. The Hangfire server executes those jobs.
Top comments (2)
That's exactly what I need, a Hangfire.
That's great. Hangfire is a great tool 🔥🔥