DEV Community

Spyros Ponaris
Spyros Ponaris

Posted on

Priority-Based Job Processing System in C#

This tutorial explains how to build a multithreaded, priority-based job processing system using:

  • PriorityQueue<TElement,TPriority>
  • SemaphoreSlim
  • Producer / Consumer pattern
  • Retry mechanism
  • Dead Letter Queue (DLQ)
  • Multiple workers

1. What is PriorityQueue in C#?

📌 Definition

PriorityQueue<TElement,TPriority> is a built-in .NET collection that:

Stores elements so that the item with the highest priority is dequeued first, not the one inserted first.

Official docs:
https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.priorityqueue-2?view=net-10.0


⚙️ How it works (in this system)

private readonly PriorityQueue<Job, int> _queue = new();
Enter fullscreen mode Exit fullscreen mode

Jobs are added like this:

_queue.Enqueue(job, job.Priority);
Enter fullscreen mode Exit fullscreen mode

Each entry has:

  • ElementJob
  • Priorityint

📊 Priority rules in .NET

👉 Lower number = higher priority

Priority Meaning Execution Order
1 Highest priority First
2 Medium priority Next
5 Low priority Last

🧠 Simple example

var queue = new PriorityQueue<string, int>();

queue.Enqueue("Job A", 5);
queue.Enqueue("Job B", 1);
queue.Enqueue("Job C", 3);

Console.WriteLine(queue.Dequeue()); // Job B
Enter fullscreen mode Exit fullscreen mode

⚠️ Important limitation

  • ❌ PriorityQueue is NOT thread-safe
  • ✔ This system fixes that using lock + SemaphoreSlim

2. Job Model

public record Job(
    string Name,
    int Priority,
    int RetryCount = 0,
    int MaxRetries = 3);
Enter fullscreen mode Exit fullscreen mode

Why this works well

  • Immutable (record)
  • Safe for multithreading
  • Supports retry tracking

3. Blocking Priority Queue (Core Engine)

public class BlockingPriorityQueue
{
    private readonly PriorityQueue<Job, int> _queue = new();
    private readonly object _lock = new();
    private readonly SemaphoreSlim _signal = new(0);
Enter fullscreen mode Exit fullscreen mode

Why these 3 components?

Component Purpose
PriorityQueue Stores jobs by priority
lock Thread safety
SemaphoreSlim Async waiting / signaling

➕ Enqueue (Producer side)

public void Enqueue(Job job)
{
    lock (_lock)
    {
        _queue.Enqueue(job, job.Priority);
        _signal.Release();
    }
}
Enter fullscreen mode Exit fullscreen mode

Flow

Producer → lock → enqueue → signal workers


➖ Dequeue (Consumer side)

public async Task<Job> DequeueAsync(CancellationToken cancellationToken = default)
{
    await _signal.WaitAsync(cancellationToken);

    lock (_lock)
    {
        return _queue.Dequeue();
    }
}
Enter fullscreen mode Exit fullscreen mode

Flow

Worker waits → signal → lock → dequeue highest priority job


4. Dead Letter Queue (DLQ)

public class DeadLetterQueue
{
    private readonly List<Job> _failedJobs = new();

    public void Add(Job job, Exception ex)
    {
        Console.WriteLine($"DLQ: Job {job.Name} failed permanently: {ex.Message}");
        _failedJobs.Add(job);
    }

    public IReadOnlyList<Job> GetAll() => _failedJobs;
}
Enter fullscreen mode Exit fullscreen mode

Purpose

DLQ stores jobs that:

  • exceeded retry limit
  • failed permanently
  • require manual inspection

5. Job Processor (Worker System)

public class JobProcessor(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
    private readonly BlockingPriorityQueue _jobQueue = jobQueue;
    private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
Enter fullscreen mode Exit fullscreen mode

Main processing loop

public async Task ProcessJobsAsync(CancellationToken cancellationToken = default)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        try
        {
            var job = await _jobQueue.DequeueAsync(cancellationToken);

            await ProcessJob(job, cancellationToken);
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Job processing cancelled.");
            break;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Job execution logic

private async Task ProcessJob(Job job, CancellationToken ct)
{
    try
    {
        Console.WriteLine($"Processing {job.Name}");

        await Task.Delay(500, ct);

        if (Random.Shared.Next(0, 3) == 0)
            throw new Exception("Random failure");

        Console.WriteLine($"✅ Success: {job.Name}");
    }
    catch (Exception ex)
    {
        await HandleFailure(job, ex);
    }
}
Enter fullscreen mode Exit fullscreen mode

6. Retry Logic

private async Task HandleFailure(Job job, Exception ex)
{
    if (job.RetryCount < job.MaxRetries)
    {
        var retryJob = job with
        {
            RetryCount = job.RetryCount + 1
        };

        Console.WriteLine($"Retrying {job.Name} ({retryJob.RetryCount})");

        await Task.Delay(1000);

        _jobQueue.Enqueue(retryJob);
    }
    else
    {
        _deadLetterQueue.Add(job, ex);
    }
}
Enter fullscreen mode Exit fullscreen mode

Behavior flow

Failure → retry → requeue → success OR DLQ


7. Job Producer

public class JobProducer(BlockingPriorityQueue jobQueue, DeadLetterQueue deadLetterQueue)
{
    private readonly BlockingPriorityQueue _jobQueue = jobQueue;
    private readonly DeadLetterQueue _deadLetterQueue = deadLetterQueue;
Enter fullscreen mode Exit fullscreen mode

Publish single job

public void PublishJob(string name, int priority)
{
    var job = new Job(name, priority);

    Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");

    _jobQueue.Enqueue(job);
}
Enter fullscreen mode Exit fullscreen mode

Publish multiple jobs

public void PublishJobs(IEnumerable<Job> jobs)
{
    foreach (var job in jobs)
    {
        Console.WriteLine($"Publishing {job.Name} with priority {job.Priority}");
        _jobQueue.Enqueue(job);
    }
}
Enter fullscreen mode Exit fullscreen mode

Manual DLQ injection

public void PublishDeadLetterJob(string name, int priority)
{
    var job = new Job(name, priority);

    Console.WriteLine($"Publishing {job.Name} to DLQ");

    _deadLetterQueue.Add(job, new Exception("Manually added"));
}
Enter fullscreen mode Exit fullscreen mode

8. System Execution

Setup

var queue = new BlockingPriorityQueue();
var dlq = new DeadLetterQueue();
var jobProducer = new JobProducer(queue, dlq);
var cts = new CancellationTokenSource();
Enter fullscreen mode Exit fullscreen mode

Start workers

var job1 = new JobProcessor(queue, dlq);
var job2 = new JobProcessor(queue, dlq);

var workerTasks = new[]
{
    Task.Run(() => job1.ProcessJobsAsync(cts.Token)),
    Task.Run(() => job2.ProcessJobsAsync(cts.Token))
};
Enter fullscreen mode Exit fullscreen mode

Publish jobs

for (int i = 0; i < 10; i++)
{
    var priority = Random.Shared.Next(1, 5);

    Console.WriteLine($"\nPublishing Job - {i} with priority {priority}");

    jobProducer.PublishJob($"Job - {i}", priority);
}
Enter fullscreen mode Exit fullscreen mode

Shutdown system

await Task.Delay(3000);

cts.Cancel();

await Task.WhenAll(workerTasks);

Console.WriteLine("System stopped cleanly.");
Enter fullscreen mode Exit fullscreen mode

9. Runtime behavior

  1. Jobs are published with random priority
  2. Stored in PriorityQueue
  3. Workers wait asynchronously
  4. Highest priority job executes first
  5. Random failures occur
  6. Failed jobs retry if allowed
  7. Permanent failures go to DLQ
  8. System shuts down gracefully

10. Key Concepts Learned

✔ PriorityQueue

Built-in heap structure for priority-based scheduling

✔ Thread Safety

Handled using:

  • lock
  • SemaphoreSlim

✔ Producer / Consumer

  • Producer → JobProducer
  • Consumer → JobProcessor

✔ Retry System

Automatic retry with counter tracking

✔ Dead Letter Queue

Stores failed jobs permanently

✔ Multithreading

Multiple workers process jobs concurrently

✔ Graceful Shutdown

Uses CancellationTokenSource


🚀 Summary

Real-world style system that includes:

  • Priority scheduling
  • Concurrent workers
  • Async processing
  • Retry logic
  • Failure isolation (DLQ)
  • Thread-safe queue coordination

Top comments (0)