DEV Community

Cover image for Transactional Messaging in .NET: Integrating Brighter’s Outbox Pattern with SQL Server and RabbitMQ
Rafael Andrade
Rafael Andrade

Posted on

Transactional Messaging in .NET: Integrating Brighter’s Outbox Pattern with SQL Server and RabbitMQ

Introduction

In the last article, we explored the outbox pattern and a generic way to configure it. This time, we’ll dive into implementing the Outbox Pattern with SQL Server to guarantee transactional consistency between database updates and message publishing.

Project

The main idea of this project is to send a command to create an order, when the order is create, it'll send 2 messages OrderPlaced & OrderPaid, in case we have a failure, we shouldn't send any message.

Requirement

Messages

For this project we will need these 3 message: CreateNewOrder, OrderPlaced and OrderPaid

public class CreateNewOrder() : Command(Guid.NewGuid())
{
    public decimal Value { get; set; }
}

public class OrderPlaced() : Event(Guid.NewGuid())
{
    public string OrderId { get; set; } = string.Empty;
    public decimal Value { get; set; }
}

public class OrderPaid() : Event(Guid.NewGuid())
{
    public string OrderId { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Message Mappers

Since only OrderPlaced and OrderPaid events are published to RabbitMQ, we need to implement mappers for them using JSON serialization

public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced>
{
    public Message MapToMessage(OrderPlaced request)
    {
        var header = new MessageHeader();
        header.Id = request.Id;
        header.TimeStamp = DateTime.UtcNow;
        header.Topic = "order-placed";
        header.MessageType = MessageType.MT_EVENT;

        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }

    public OrderPlaced MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!;
    }
}

public class OrderPaidMapper : IAmAMessageMapper<OrderPaid>
{
    public Message MapToMessage(OrderPaid request)
    {
        var header = new MessageHeader();
        header.Id = request.Id;
        header.TimeStamp = DateTime.UtcNow;
        header.Topic = "order-paid";
        header.MessageType = MessageType.MT_EVENT;

        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }

    public OrderPaid MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!;
    }
}
Enter fullscreen mode Exit fullscreen mode

Request Handlers

For OrderPlaced and OrderPaid we are going to log the received message.

public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced>
{
    public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default)
    {
        logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
        return base.HandleAsync(command, cancellationToken);
    }
}

public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid>
{
    public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default)
    {
        logger.LogInformation("{OrderId} paid", command.OrderId);
        return base.HandleAsync(command, cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

Create New Order

The CreateNewOrder handler is going to wait for 10ms to emulate a process, then publish the OrderPlaced, if the value is mod 3 throw an exception (emulation a business error), otherwise publish OrderPaid.

public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
    IUnitOfWork unitOfWork,
    ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder>
{
    public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default)
    {
        await unitOfWork.BeginTransactionAsync(cancellationToken);
        try
        {
            string id = Guid.NewGuid().ToString();
            logger.LogInformation("Creating a new order: {OrderId}", id);

            await Task.Delay(10, cancellationToken); // emulating an process

            _ = await commandProcessor.DepositPostAsync(new OrderPlaced { OrderId = id, Value = command.Value }, cancellationToken: cancellationToken);
            if (command.Value % 3 == 0)
            {
                throw new InvalidOperationException("invalid value");
            }

            _ = await commandProcessor.DepositPostAsync(new OrderPaid { OrderId = id }, cancellationToken: cancellationToken);

            await unitOfWork.CommitAsync(cancellationToken);
            return await base.HandleAsync(command, cancellationToken);
        }
        catch
        {
            logger.LogError("Invalid data");
            await unitOfWork.RollbackAsync(cancellationToken);
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Key Insight:

  • IUnitOfWork shares Brighter's SQL transaction to ensure atomicity (order persistence + outbox writes).
  • Events are only published if the transaction commits.

Configuring Microsoft SQL Server

To integrate the Outbox Pattern with SQL Server, first ensure the OutboxMessages table exists.

1. SQL Table Schema

IF OBJECT_ID('OutboxMessages', 'U') IS NULL
BEGIN 
  CREATE TABLE [OutboxMessages]
  (
    [Id] [BIGINT] NOT NULL IDENTITY,
    [MessageId] UNIQUEIDENTIFIER NOT NULL,
    [Topic] NVARCHAR(255) NULL,
    [MessageType] NVARCHAR(32) NULL,
    [Timestamp] DATETIME NULL,
    [CorrelationId] UNIQUEIDENTIFIER NULL,
    [ReplyTo] NVARCHAR(255) NULL,
    [ContentType] NVARCHAR(128) NULL,  
    [Dispatched] DATETIME NULL,
    [HeaderBag] NTEXT NULL ,
    [Body] NTEXT NULL,
    PRIMARY KEY ( [Id] ) 
  );
END
Enter fullscreen mode Exit fullscreen mode

2. Dependency Injection Setup

Register the outbox and transaction.

services
    .AddServiceActivator(opt => { // Subscription setup (see previous article) })
    .UseMsSqlOutbox(new MsSqlConfiguration(ConnectionString, "OutboxMessages"), typeof(SqlConnectionProvider), ServiceLifetime.Scoped)            
    .UseMsSqlTransactionConnectionProvider(typeof(SqlConnectionProvider))
    .UseOutboxSweeper(opt => opt.BatchSize = 10);
Enter fullscreen mode Exit fullscreen mode

Why This Works:

  • UseMsSqlOutbox links the outbox to SQL Server.
  • UseOutboxSweeper configures background polling for undelivered messages.

3. Transaction Management

To ensure atomicity between business logic and message publishing in Brighter, implement IMsSqlTransactionConnectionProvider and IUnitOfWork for shared transaction context. This guarantees that messages are only stored in the outbox if the database transaction commits successfully.

a. SqlConnectionProvider
public class SqlConnectionProvider(SqlUnitOfWork sqlConnection) : IMsSqlTransactionConnectionProvider
{
    private readonly SqlUnitOfWork _sqlConnection = sqlConnection;

    public SqlConnection GetConnection()
    {
        return _sqlConnection.Connection;
    }

    public Task<SqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
    {
        return Task.FromResult(_sqlConnection.Connection);
    }

    public SqlTransaction? GetTransaction()
    {
        return _sqlConnection.Transaction;
    }

    public bool HasOpenTransaction => _sqlConnection.Transaction != null;
    public bool IsSharedConnection => true;
}
Enter fullscreen mode Exit fullscreen mode
b. Unit of work

And finally we need to create a new interface and implement an interface called IUnitOfWork

public interface IUnitOfWork
{
    Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable);
    Task CommitAsync(CancellationToken cancellationToken);
    Task RollbackAsync(CancellationToken cancellationToken);
}
Enter fullscreen mode Exit fullscreen mode
c. SqlUnitOfWork Implementation
public class SqlUnitOfWork(MsSqlConfiguration configuration) : IUnitOfWork
{
    public SqlConnection Connection { get; } = new(configuration.ConnectionString);
    public SqlTransaction? Transaction { get; private set; }

    public async Task BeginTransactionAsync(CancellationToken cancellationToken,
        IsolationLevel isolationLevel = IsolationLevel.Serializable)
    {

        if (Transaction == null)
        {
            if (Connection.State != ConnectionState.Open)
            {
                await Connection.OpenAsync(cancellationToken);
            }

            Transaction = Connection.BeginTransaction(isolationLevel);
        }
    }

    public async Task CommitAsync(CancellationToken cancellationToken)
    {
        if (Transaction != null)
        {
            await Transaction.CommitAsync(cancellationToken);
        }
    }

    public async Task RollbackAsync(CancellationToken cancellationToken)
    {
        if (Transaction != null)
        {
            await Transaction.RollbackAsync(cancellationToken);
        }
    }

    public async Task<SqlCommand> CreateSqlCommandAsync(string sql, SqlParameter[] parameters, CancellationToken cancellationToken)
    {
        if (Connection.State != ConnectionState.Open)
        {
            await Connection.OpenAsync(cancellationToken);
        }

        SqlCommand command = Connection.CreateCommand();

        if (Transaction != null)
        {
            command.Transaction = Transaction;
        }

        command.CommandText = sql;
        if (parameters.Length > 0)
        {
            command.Parameters.AddRange(parameters);
        }

        return command;
    }
}
Enter fullscreen mode Exit fullscreen mode
d. Register Services in Dependency Injection
services
    .AddScoped<SqlUnitOfWork, SqlUnitOfWork>()
    .TryAddScoped<IUnitOfWork>(provider => provider.GetRequiredService<SqlUnitOfWork>());
Enter fullscreen mode Exit fullscreen mode

Conclusion

By implementing the Outbox Pattern with Brighter and SQL Server, we’ve demonstrated how to achieve transactional consistency between database updates and message publishing. This approach ensures that:

  1. Messages are only published if the transaction commits successfully

    • Using DepositPostAsync, messages like OrderPlaced and OrderPaid are stored in the OutboxMessages table within the same transaction as business data. If the handler fails (e.g., due to a simulated error), the transaction rolls back, and no orphaned messages are sent.
    • Brighter's IMsSqlTransactionConnectionProvider guarantees that database updates and message deposits share the same transaction.
  2. Fault Tolerance via the Outbox Sweeper

    • The UseOutboxSweeper polls for undelivered messages and retries them until acknowledged by RabbitMQ. This decouples message publishing from the handler’s execution, ensuring reliability even if the broker is temporarily unavailable.
  3. Decoupled Architecture

    • Applications focus on local transactions, while Brighter handles message delivery asynchronously. This avoids tight coupling to the messaging infrastructure and simplifies scalability.

This implementation showcases how Brighter abstracts complexity, enabling developers to focus on business logic while ensuring reliability in distributed systems. For production use, pair this pattern with monitoring tools (e.g., Prometheus), dead-letter queues (DLQs) to handle poisoned messages and add index on the outbox table on Dispatched and Timestamp columns.

Reference

Top comments (0)