DEV Community

Felipe Mattioli
Felipe Mattioli

Posted on

Improving Azure Service Bus Development with AsbFlow

Today, I’d like to introduce a clean and effective way to bring organization and clarity to your Azure Service Bus integrations.

If you’ve ever struggled to keep message producers and consumers organized — or found yourself repeatedly writing boilerplate code to handle message publishing, retries, or custom behaviors — AsbFlow was built to simplify that.

It provides a single, fluent, and centralized configuration for your Azure Service Bus setup, but it goes beyond that.

AsbFlow also abstracts many of the platform’s possibilities:

middlewares for producers and consumers, multiple message production modes, and other extensibility features — all ready to use out of the box.

The result is clarity, organization, and quality in your message-driven development — making distributed communication as intuitive as working with MediatR.


⚙️ Message Production Modes

When producing a message with ICommandProducer, AsbFlow provides different ways to publish it, depending on your use case.

Here’s what you can do:

  • 📨 Basic message production Produce a simple command without any additional configuration. Ideal for straightforward scenarios where no metadata or delay is needed.
  await producer.ProduceCommandAsync(command, cancellationToken);
Enter fullscreen mode Exit fullscreen mode
  • 🕒 Delayed message production Define a delivery delay so the message becomes available for processing after a specific time. Useful for scheduling or deferring certain operations.,
await producer.ProduceCommandAsync(command, TimeSpan.FromMinutes(5), cancellationToken);

Enter fullscreen mode Exit fullscreen mode

🧩 Production with application properties

await producer.ProduceCommandAsync(command, new Dictionary<string, object>
{
    { "Source", "CRM" },
    { "Priority", "High" }
}, cancellationToken);
Enter fullscreen mode Exit fullscreen mode

🚀 Introducing AzureServiceBusFlow

AzureServiceBusFlow is an open-source library designed to improve the organization of producers and consumers when using Azure Service Bus.

The core idea is simple:

Have a single, fluent, and centralized place to configure all your producers and consumers — bringing outstanding clarity to your code.

You can refer to it as AzureServiceBusFlow or just AsbFlow, as you prefer.


⚙️ Getting Started

Everything starts with a basic configuration, where you set your Azure Service Bus connection string and define your producers and consumers.

builder.Services.AddAzureServiceBus(cfg => cfg
    .ConfigureAzureServiceBus(azureServiceBusConfig)
    .AddProducer<ExampleCommand1>(p => p
        .EnsureQueueExists("command-queue-one") // We support Queue and Topic
        .WithCommandProducer() // or WithEventProducer
        .ToQueue("command-queue-one")) // Or ToTopic
    .AddConsumer(c => c
        .FromQueue("command-queue-one") // Or FromTopic
        .AddHandler<ExampleCommand1, CommandExample1Handler>())
);
Enter fullscreen mode Exit fullscreen mode

🧠 What’s happening here?

  • We’re injecting a producer that sends messages of type ExampleCommand1.
  • Whenever you need to publish a message, just inject ICommandProducer<ExampleCommand1> and call it.
  • A corresponding consumer listens to the same queue and processes messages with the defined handler.

This example is intentionally simple, but it captures the core idea.

To dive deeper, check the official docs and GitHub repository — they include a full Samples project to explore in more detail:

🔗 GitHub Repository

📘 Documentation


🧩 A Real Example in Production

Here’s a slightly more advanced setup from a real API, which produces multiple message types between different services.

⚙️ Configuration

As you saw before, it’s mandatory to provide the configurations related to your Azure Service Bus — settings like the messaging mode (ReceiveAndDelete, PeekLock), MaxConcurrentCalls, ConnectionString, and so on.

These configurations can be set either by directly instantiating the AzureServiceBusConfiguration class.

🔗 See full sample on GitHub

Or also by defining them in your appsettings.json file and mapping them to a configuration class.

// appsettings.json example
"AzureServiceBusConfigurationSettings": {
  "ConnectionString": "",
  "ServiceBusReceiveMode": "ReceiveAndDelete", // You can use PeekLock or ReceiveAndDelete, depending on your needs
  "MaxAutoLockRenewalDurationInSeconds": "1800", // Used when using PeekLock mode
  "MaxConcurrentCalls": "10"
}
Enter fullscreen mode Exit fullscreen mode

🧠 Configuring the use of AsbFlow

After that, I configure everything inside an extension method that extends IServiceCollection.

This method is then called in Program.cs to set up the entire AsbFlow.

You can explore this in more detail by checking out the example available in the repository.

public static class AzureServiceBusExtensions
{
    public static IServiceCollection AddAzureServiceBusFlow(
        this IServiceCollection services,
        AzureServiceBusConfiguration azureServiceBusConfiguration) // receiving the mapped settings made above
    {
        services.AddAzureServiceBus(cfg => cfg
            .ConfigureAzureServiceBus(azureServiceBusConfiguration)
            .AddProducer<CpfValidationMessage>(p => p
                .WithCommandProducer()
                .ToQueue(conversationQueueName))
            .AddProducer<StartSimulationConversationMessage>(p => p
                .WithCommandProducer()
                .ToQueue(conversationQueueName))
            .AddProducer<SimulateAvailableFgtsBalanceMessage>(p => p
                .WithCommandProducer()
                .ToQueue(conversationQueueName))
            .AddProducer<PixValidationMessage>(p => p
                .WithCommandProducer()
                .ToQueue(conversationQueueName))
            .AddConsumer(c => c
                .FromQueue(conversationQueueName)
                .AddHandler<CpfValidationMessage, DocumentValidationMessageHandler>()
                .AddHandler<StartSimulationConversationMessage, StartSimulationConversationMessageHandler>()
                .AddHandler<PixValidationMessage, PixValidationMessageHandler>())
        );

        return services;
    }
}
Enter fullscreen mode Exit fullscreen mode

Once that’s done, all I need to do is create the producer and the consumer.


💬 Producing a Message

public sealed class CpfValidationCommandHandler(ICommandProducer<CpfValidationMessage> commandProducer)
    : ICommandHandler<CpfValidationCommand, Result>
{
    public async Task<Result> HandleAsync(CpfValidationCommand message, CancellationToken cancellationToken = default)
    {       
        await commandProducer.ProduceCommandAsync(
            message.ToCpfDocumentValidationMessage(), 
            cancellationToken
        ); // ProduceCommandAsync expects a CpfDocumentValidationMessage

        return Result.Success();
    }
}
Enter fullscreen mode Exit fullscreen mode

📥 Consuming a Message

public sealed partial class CpfValidationMessageHandler() : IMessageHandler<CpfValidationMessage>
{
    public async Task HandleAsync(
        CpfValidationMessage message, 
        ServiceBusReceivedMessage rawMessage, 
        CancellationToken cancellationToken)
    {        
        // Implement your custom logic for the consumed message here
    }
}
Enter fullscreen mode Exit fullscreen mode

As APIs grow, they often produce and consume many message types — and keeping this configuration readable can be a challenge.

That’s where AsbFlow really shines.


🔧 What Else Can AsbFlow Do?

Beyond simplifying producers and consumers, AsbFlow provides several powerful features:

  • 🧭 Supports both Queues and Topics

  • ⚙️ Organizes Commands and Events

    • Commands represent an action (create, update, delete)
    • Events represent a change of state
  • 🧩 Middleware Support (Before producing or consuming messages)

    • Add global headers, execute custom logic, or extend behavior dynamically
  • 🕒 Flexible Message Production

    • Schedule delayed messages
    • Add ApplicationProperties
    • Or just send simple messages instantly

🏗️ In Production

AsbFlow is currently running in production and has been performing reliably, greatly helping to streamline message processing and maintain clean, organized code.

If this project sounds useful to you, I’d love for you to check it out — or even contribute by adding features or even improve what we have today :)

🔗 GitHub Repository


💡 Inspiration

This project was inspired by the idea behind KafkaFlow.

During some part of my carreer I worked on Farfetch, there I had the opportunity to work with KafkaFlow and really appreciated its architecture and design principles.

Later in my career, when I started working more closely with Azure Service Bus, I wanted to replicate some of those concepts in a similar way.

Of course, KafkaFlow is a much more mature project — it has existed for some years and has an community of contributors — but it was a inspiration to work better with AzureServiceBusFlow or gently AsbFlow.

Top comments (0)