DEV Community

Cover image for Implementing the Inbox Pattern with Brighter V10 using Kafka and PostgreSQL
Rafael Andrade
Rafael Andrade

Posted on

Implementing the Inbox Pattern with Brighter V10 using Kafka and PostgreSQL

In my previous article, we explored the Inbox Pattern as a solution for ensuring exactly-once message processing in distributed systems. Today, we'll dive into a practical implementation using Brighter (a .NET command processor and dispatcher) combined with PostgreSQL as our inbox store and Kafka as our message broker.

This setup provides a robust foundation for building reliable event-driven microservices that can recover gracefully from failures while maintaining message integrity.

Project Overview

We will build a .NET 8+ service that consumes/produces messages from a Kafka topic, processes them using Brighter's, and uses a PostgreSQL database as the persistent inbox to ensure idempotency and reliability.

Required Packages

You can choose between two setup styles:

  1. Pure Brighter Packages:
  • Paramore.Brighter.Inbox.Postgres
  • Paramore.Brighter.MessagingGateway.Kafka
  • Paramore.Brighter.ServiceActivator.Extensions.Hosting
  • Paramore.Brighter.Extensions.DependencyInjection
  1. Fluent Brighter wrappers:
  • Fluent.Brighter.Kafka
  • Fluent.Brighter.Postgres
  • Paramore.Brighter.ServiceActivator.Extensions.Hosting

A Quick Brighter Recap

Before we dive into the configuration, let's quickly recap the core Brighter concepts.

Request: Command and Event

The first key concept is the IRequest interface, which is typically implemented as a Command or an Event. These objects are used by Brighter's IAmACommandProcessor to:

  • Send (expects exactly one handler)
  • Publish (expects zero or more handlers)
  • Post (sends to an external message broker)

Commands represent an action you want to execute, like CreateOrder. Events represent a fact or something that has already happened, like OrderCreated.

// A command to be executed
public class CreateOrder() : Command(Id.Random());

// An event that has occurred
public class OrderCreated() : Event(Id.Random());

// You can also implement IRequest directly
public class CustomRequest : IRequest { ... }
Enter fullscreen mode Exit fullscreen mode

Message Mapper

When you Post or consume messages from an external broker, you need a message mapper. This component is responsible for mapping your IRequest object to and from a Brighter Message object (which includes the header and body for the broker).

Brighter v10+ defaults to a JSON message mapper, but you can provide your own custom implementation by implementing IAmAMessageMapper or IAmAMessageMapperAsync.

Request Handler

Last but not least is the RequestHandler (or RequestHandlerAsync). This is the class that contains your actual business logic for processing a specific IRequest. Brighter's pipeline model allows you to chain handlers and add attributes for cross-cutting concerns like retries, logging, and, of course, the inbox pattern.

Here's a simple synchronous handler:

public class GreetingHandler : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting @event)
    {
        Console.WriteLine("===== Hello, {0}", @event.Name);
        return base.Handle(@event);
    }
}

public class GreetingHandlerAsync : RequestHandlerAsync<GreetingAsync>
{
    public override Task<Greeting> HandleAsync(GreetingAsync @event, CancellationToken ct = default)
    {
        Console.WriteLine("===== Hello, {0}", @event.Name);
        return base.Handle(@event);
    }
}
Enter fullscreen mode Exit fullscreen mode

Configuring Brighter with Kafka and Postgres

Now for the main setup. We need to tell Brighter how to connect to Kafka (our broker) and Postgres (our inbox store).

Kafka Configuration

You can configure Brighter using its core "pure" components or with the Fluent Brighter API. Both are shown here.

Using Fluent Brighter:

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddFluentBrighter(opt => opt
        .UsingKafka(kf => kf
            .SetConnection(c => c
                .SetName("sample")
                .SetBootstrapServers("localhost:9092")
                .SetSecurityProtocol(SecurityProtocol.Plaintext)
                .SetSaslMechanisms(SaslMechanism.Plain))
            .UseSubscriptions(s => s
                .AddSubscription<OrderPlaced>(sb => sb
                    .SetTopic("order-placed-topic")
                    .SetConsumerGroupId("order-placed-topic-1")
                    .SetRequeueCount(3)
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode())
                .AddSubscription<OrderPaid>(sb => sb
                    .SetTopic("order-paid-topic")
                    .SetConsumerGroupId("order-paid-topic-1")
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode()))
              .UsePublications(p => p
                  .AddPublication<OrderPaid>(kp => kp
                      .SetTopic("order-paid-topic")
                      .CreateTopicIfMissing())
                  .AddPublication<OrderPlaced>(kp => kp
                      .SetTopic("order-placed-topic")
                      .CreateTopicIfMissing()))));
Enter fullscreen mode Exit fullscreen mode

Using Pure Brighter:

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample",
    BootStrapServers = ["localhost:9092"],
    SecurityProtocol = SecurityProtocol.Plaintext,
    SaslMechanisms = SaslMechanism.Plain,
};

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.Subscriptions =
        [
            new KafkaSubscription<OrderPlaced>(
                new SubscriptionName("subscription-orderplaced"),
                new ChannelName("order-placed-queue"),
                new RoutingKey("order-placed-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-1"),
            new KafkaSubscription<OrderPaid>(
                new SubscriptionName("subscription-orderpaid"),
                new ChannelName("order-paid-queue"),
                new RoutingKey("order-paid-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-2"),
        ];

        opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
    })
    .AutoFromAssemblies()
    .AddProducers(opt =>
    {
        opt.ProducerRegistry = new KafkaProducerRegistryFactory(
            connection,
            [
                new KafkaPublication<OrderPaid>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-paid-topic"),
                },
                new KafkaPublication<OrderPlaced>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-placed-topic"),
                }
                ]).Create();
    });
Enter fullscreen mode Exit fullscreen mode

Postgres Inbox Setup

First, we need to ensure the inbox table exists in our database. Brighter provides a DDL script helper for this. You can run this once on application startup.

await using (NpgsqlConnection connection = new(connectionString))
{
    await connection.OpenAsync();
    await using var command = connection.CreateCommand();

    command.CommandText = PostgreSqlInboxBuilder.GetDDL("inboxmessages");
    _ = await command.ExecuteNonQueryAsync();
}
Enter fullscreen mode Exit fullscreen mode

Configuring the Inbox Pattern

Finally, we link the Postgres Inbox to our consumer configuration.

Using Fluent Brighter:

 services
    .AddFluentBrighter(opt => opt
        .UsingPostgres(pg => pg
            .SetConnection(db => db
                .SetConnectionString(connectionString)
                .SetDatabaseName("brightertests")
                .SetInboxTableName("inboxmessages"))
            .UseInbox())
 // ... other configuration (like Kafka) goes here
);
Enter fullscreen mode Exit fullscreen mode

Using Pure Brighter:

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.InboxConfiguration = new InboxConfiguration(new PostgreSqlInbox(new RelationalDatabaseConfiguration(connectionString, "brightertests", inboxTableName: "inboxmessages")));
 // ... configure subscriptions etc.
    });
Enter fullscreen mode Exit fullscreen mode

Example: Message and Request Handlers

Let's define the messages our service will use.

Messages

public class CreateNewOrder() : Command(Id.Random())
{
    public decimal Value { get; set; }
}

public class OrderPlaced() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;

    public decimal Value { get; set; }
}

public class OrderPaid() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Request Handlers

For these examples, I'll use synchronous handlers (RequestHandler).

Note on Performance: As noted in the Brighter documentation, the synchronous Post method with the Kafka producer often yields significantly better performance than PostAsync due to the underlying implementation of the C# Kafka client.

CreateNewOrderHandler: This handler receives a CreateNewOrder command (via Send) and posts two new events, OrderPlaced and OrderPaid, to Kafka.

public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
    ILogger<CreateNewOrderHandler> logger) : RequestHandler<CreateNewOrder>
{
    public override CreateNewOrder Handle(CreateNewOrder command)
    {
        try
        {
            var id = Uuid.NewAsString();
            logger.LogInformation("Creating a new order: {OrderId}", id);

            commandProcessor.Post(new OrderPlaced { OrderId = id, Value = command.Value });
            commandProcessor.Post(new OrderPaid { OrderId = id });
            return base.Handle(command);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Invalid data");
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPaidHandler: This handler consumes the OrderPaid event from Kafka and simply logs it.

public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandler<OrderPaid>
{
    public override OrderPaid Handle(OrderPaid command)
    {
        logger.LogInformation("{OrderId} paid", command.OrderId);
        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPlaceHandler: This handler simulates a potential failure. If the order value is divisible by 3, it throws an exception. The [UseResiliencePipeline] attribute allows us to apply a retry policy (e.g., "kafka-policy" configured to retry 3 times).

public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandler<OrderPlaced>
{
    [UseResiliencePipeline("kafka-policy", 1)]
    public override OrderPlaced Handle(OrderPlaced command)
    {
        logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
        if (command.Value % 3 == 0)
        {
            logger.LogError("Simulate an error for {OrderId} with value {OrderValue}", command.OrderId, command.Value);
            throw new InvalidOperationException("invalid error");
        }

        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

This handler demonstrates two important concepts working together:

  • Resilience: The [UseResiliencePipeline("kafka-policy", 1)] attribute adds in-handler retry logic (assuming you've defined a policy named "kafka-policy").
  • Inbox Pattern: When this handler fails, the message won't be stored in the inbox. This ensures that when Kafka offsets are reset, only genuinely unprocessed messages will be reprocessed, preventing duplicates.

Conclusion

The Inbox Pattern is essential for building resilient, idempotent message-based systems. Paramore.Brighter, combined with its PostgreSQL inbox and Kafka integration, provides a clean, robust implementation for .NET applications.

This setup guarantees that your services can handle failures gracefully while maintaining data consistency across your distributed system.

The full code for this sample can be found on GitHub: https://github.com/lillo42/brighter-sample/tree/v10-inbox-postgres

The full code: https://github.com/lillo42/brighter-sample/tree/v10-inbox-postgres

Top comments (0)