DEV Community

Oleksandr Viktor
Oleksandr Viktor

Posted on

Build a Priority-Aware Background Job Runner

Build a Priority-Aware Background Job Runner with WJb 0.7.1‑beta1 (Console App, .NET 8/10)

Goal: A minimal console app that enqueues two jobs and prints:

Hello Oleksandr!
Hello World!

…in that exact order, thanks to priority queues—the High priority job runs before the Normal one.


Why WJb?

WJb is a lightweight background job runner for .NET that embraces familiar primitives:

  • Hosted service (BackgroundService)
  • Priority queues using Channel<T>
  • Configurable parallelism
  • DI-friendly actions via IActionFactory
  • Compact job payloads in JSON (code + more)

In 0.7.1‑beta1, actions initialize from a merged payload (more) and execute with a cancellation token, keeping things clean and testable.


Project Setup

Create a new console app (dotnet new console) and add the package:

<!-- ConsoleApp.csproj -->
<Project Sdk="Microsoft.NET.Sdk">

    <PropertyGroup>
        <OutputType>Exe</OutputType>
        <TargetFramework>net10.0</TargetFramework>
        <ImplicitUsings>enable</ImplicitUsings>
        <Nullable>enable</Nullable>
    </PropertyGroup>

    <ItemGroup>
        <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="10.0.0" />
        <PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.0" />
        <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="10.0.0" />
        <PackageReference Include="WJb" Version="0.7.1-beta1" />
    </ItemGroup>

</Project>
Enter fullscreen mode Exit fullscreen mode

💡 If you’re on .NET 10 preview, set <TargetFramework>net10.0</TargetFramework>.


The Complete Program

Paste the following into your Program.cs. It wires up:

  • Action map (code → type + defaults)
  • Action factory (IActionFactory)
  • A single shared JobProcessor used both as a hosted service and an IJobProcessor (so you can enqueue before RunAsync)
  • A minimal custom action: WritelnAction
using System.Text.Json.Nodes;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using WJb;

class Program
{
    static async Task Main(string[] args)
    {
        using var host = Host.CreateDefaultBuilder(args)
            .ConfigureServices(services =>
            {
                // Processor settings
                services.Configure<Dictionary<string, object>>(cfg =>
                {
                    cfg["MaxParallelJobs"] = 2;
                    // cfg["CapacityASAP"]   = 100;
                    // cfg["CapacityHigh"]   = 100;
                    // cfg["CapacityNormal"] = 100;
                    // cfg["CapacityLow"]    = 100;
                });

                // Register your action type
                services.AddTransient<WritelnAction>();

                // Register the action map (code -> type, defaults in 'more')
                services.AddSingleton(provider =>
                    new Dictionary<string, ActionItem>
                    {
                        ["WritelnAction"] = new ActionItem(
                            Code: "WritelnAction",
                            Type: typeof(WritelnAction).AssemblyQualifiedName!,
                            More: [] // default payload; can be empty
                        )
                    });

                // Register IActionFactory
                services.AddSingleton<IActionFactory, ActionFactory>();

                // Register ONE shared JobProcessor; expose it as IJobProcessor AND hosted service.
                services.AddSingleton<JobProcessor>();
                services.AddSingleton<IJobProcessor>(sp => sp.GetRequiredService<JobProcessor>());
                services.AddHostedService(sp => sp.GetRequiredService<JobProcessor>());
            })
            .ConfigureLogging(logging =>
            {
                logging.ClearProviders();
                logging.AddConsole();
                logging.SetMinimumLevel(LogLevel.Error); // keep output clean for demo
            })
            .Build();

        // Get processor and enqueue compact jobs (camelCase: code + more)
        var processor = host.Services.GetRequiredService<IJobProcessor>();

        var more1 = new JsonObject { ["message"] = "Hello World!" };
        var more2 = new JsonObject { ["message"] = "Hello Oleksandr!" };

        var job1 = await processor.CompactAsync("WritelnAction", more1);
        var job2 = await processor.CompactAsync("WritelnAction", more2);

        await processor.EnqueueJobAsync(job1, Priority.Normal);
        await processor.EnqueueJobAsync(job2, Priority.High);

        await host.RunAsync();
    }
}

// IAction implementation: InitAsync receives ONLY the merged 'more' payload.
public class WritelnAction : IAction
{
    private readonly ILogger<WritelnAction> _logger;
    private string _message = "[missing message]";

    public WritelnAction(ILogger<WritelnAction> logger) => _logger = logger;

    public Task InitAsync(JsonObject jobMore, CancellationToken stoppingToken)
    {
        _logger.LogDebug("InitAsync more: {json}", jobMore?.ToJsonString());

        if (jobMore?["message"] is not null)
            _message = jobMore["message"]!.ToString();

        return Task.CompletedTask;
    }

    public Task ExecAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("ExecAsync: {message}", _message);

        Console.WriteLine(_message);

        return Task.CompletedTask;
    }
}
Enter fullscreen mode Exit fullscreen mode

Expected Output

Because we enqueue the High priority job last (so it jumps the queue), you’ll see:

Hello Oleksandr!
Hello World!
Enter fullscreen mode Exit fullscreen mode
  • Priority.Highruns first (strict priority draining)
  • Priority.Normal → runs right after

How It Works

1) Compact Jobs in JSON (camelCase)

We use IJobProcessor.CompactAsync(code, more) to create a payload like:

{"code":"WritelnAction","more":{"message":"Hello World!"}}
Enter fullscreen mode Exit fullscreen mode

This ensures downstream code (ExpandAsync) gets the exact shape it expects.

2) Resolve Action Types via the Map

We register an ActionItem map:

["WritelnAction"] = new ActionItem(
    Code: "WritelnAction",
    Type: typeof(WritelnAction).AssemblyQualifiedName!,
    More: []
)
Enter fullscreen mode Exit fullscreen mode

When ExpandAsync runs, it uses the code to look up the assembly-qualified type and merges defaults with job overrides.

3) Priority Channels + Concurrency

JobProcessor maintains four channels: ASAP, High, Normal, Low, and drains them in strict order. Concurrency is controlled by MaxParallelJobs.

4) Clean separation of init vs run

IAction.InitAsync(JsonObject more, CancellationToken) receives the merged payload; ExecAsync performs the work. This keeps your action testable and DI-friendly.


Variations You Might Want

  • Default message in the map (overridden by job payload):

    ["WritelnAction"] = new ActionItem(
        Code: "WritelnAction",
        Type: typeof(WritelnAction).AssemblyQualifiedName!,
        More: new JsonObject { ["message"] = "Default from map" }
    );
    
  • Different priorities:

    await processor.EnqueueJobAsync(job1, Priority.ASAP);  // top priority
    await processor.EnqueueJobAsync(job2, Priority.Low);
    
  • Verbosity control:

    • Global: logging.SetMinimumLevel(LogLevel.Information)
    • Category filters:

      logging.AddFilter("Microsoft", LogLevel.Warning);
      logging.AddFilter("System", LogLevel.Warning);
      logging.AddFilter("WJb", LogLevel.Information);
      

Troubleshooting

  • “No output”: Ensure you enqueue via IJobProcessor.EnqueueJobAsync and not a custom Channel<string>. JobProcessor uses internal Channel<JobItem> queues.
  • “Unable to resolve IActionFactory”: Register services.AddSingleton<IActionFactory, ActionFactory>();
  • Actions not found: Use assembly-qualified names in ActionItem.Type (via typeof(WritelnAction).AssemblyQualifiedName!).
  • Payload mismatches: The compact job must use camelCase keys: code, more.

Wrap-up

With WJb 0.7.1‑beta1, you can compose background processing that’s both predictable (priority queues, fixed payload shape) and extensible (actions via DI). The minimal pattern above gives you:

  • Clean job JSON (code + more)
  • Strict priority scheduling
  • DI-friendly action execution
  • Low ceremony, high clarity

Top comments (0)