DEV Community

Cover image for Using MongoDB with Brighter V10
Rafael Andrade
Rafael Andrade

Posted on

Using MongoDB with Brighter V10

One of the new providers that Brighter V10 supports is MongoDB. In this article, I'll explore how to use it with the Inbox and Outbox patterns.

MongoDB

MongoDB is a popular NoSQL document database that stores data in flexible, JSON-like documents. Its scalability and performance make it an excellent choice for high-throughput applications, and its document model maps naturally to the message structures often used in distributed systems.

Project Overview

We will build a .NET 8+ service that consumes/produces messages from a Kafka topic, processes them using Brighter, and uses a MongoDB database as the persistent inbox and outbox to guarantee that all messages are sent. We also need to configure a distributed lock to avoid publishing duplicate messages when running this application in a multi-node environment.

Required Packages

You can choose between two setup styles:

  1. Pure Brighter Packages:
  • Paramore.Brighter.Inbox.MongoDb
  • Paramore.Brighter.Outbox.MongoDb
  • Paramore.Brighter.Locking.MongoDb
  • Paramore.Brighter.MessagingGateway.Kafka
  • Paramore.Brighter.ServiceActivator.Extensions.Hosting
  • Paramore.Brighter.Extensions.DependencyInjection
  1. Fluent Brighter wrappers:
  • Fluent.Brighter.Kafka
  • Fluent.Brighter.MongoDb
  • Paramore.Brighter.ServiceActivator.Extensions.Hosting

A Quick Brighter Recap

Before we dive into the configuration, let's quickly recap the core Brighter concepts.

Request: Command and Event

The first key concept is the IRequest interface, which is typically implemented as a Command or an Event. These objects are used by Brighter's IAmACommandProcessor to:

  • Send (expects exactly one handler)
  • Publish (expects zero or more handlers)
  • Post (sends to an external message broker)

Commands represent an action you want to execute, like CreateOrder. Events represent a fact or something that has already happened, like OrderCreated.

// A command to be executed
public class CreateOrder() : Command(Id.Random());

// An event that has occurred
public class OrderCreated() : Event(Id.Random());

// You can also implement IRequest directly
public class CustomRequest : IRequest { ... }
Enter fullscreen mode Exit fullscreen mode

Message Mapper

When you Post or consume messages from an external broker, you need a message mapper. This component is responsible for mapping your IRequest object to and from a Brighter Message object (which includes the header and body for the broker).

Brighter v10+ defaults to a JSON message mapper, but you can provide your own custom implementation by implementing IAmAMessageMapper or IAmAMessageMapperAsync.

Request Handler

Last but not least is the RequestHandler (or RequestHandlerAsync). This is the class that contains your actual business logic for processing a specific IRequest. Brighter's pipeline model allows you to chain handlers and add attributes for cross-cutting concerns like retries, logging, and, of course, the inbox pattern.

Here's a simple synchronous handler:

public class GreetingHandler : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting @event)
    {
        Console.WriteLine("===== Hello, {0}", @event.Name);
        return base.Handle(@event);
    }
}

public class GreetingHandlerAsync : RequestHandlerAsync<GreetingAsync>
{
    public override Task<Greeting> HandleAsync(GreetingAsync @event, CancellationToken ct = default)
    {
        Console.WriteLine("===== Hello, {0}", @event.Name);
        return base.Handle(@event);
    }
}
Enter fullscreen mode Exit fullscreen mode

Configuring Brighter with Kafka

Now for the main setup. We need to tell Brighter how to connect to Kafka (our broker).

You can configure Brighter using its core "pure" components or with the Fluent Brighter API. Both are shown here.

Using Fluent Brighter:

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddFluentBrighter(opt => opt
        .UsingKafka(kf => kf
            .SetConnection(c => c
                .SetName("sample")
                .SetBootstrapServers("localhost:9092")
                .SetSecurityProtocol(SecurityProtocol.Plaintext)
                .SetSaslMechanisms(SaslMechanism.Plain))
            .UseSubscriptions(s => s
                .AddSubscription<OrderPlaced>(sb => sb
                    .SetTopic("order-placed-topic")
                    .SetConsumerGroupId("order-placed-topic-1")
                    .SetRequeueCount(3)
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode())
                .AddSubscription<OrderPaid>(sb => sb
                    .SetTopic("order-paid-topic")
                    .SetConsumerGroupId("order-paid-topic-1")
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode()))
              .UsePublications(p => p
                  .AddPublication<OrderPaid>(kp => kp
                      .SetTopic("order-paid-topic")
                      .CreateTopicIfMissing())
                  .AddPublication<OrderPlaced>(kp => kp
                      .SetTopic("order-placed-topic")
                      .CreateTopicIfMissing()))));
Enter fullscreen mode Exit fullscreen mode

Using Pure Brighter:

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample",
    BootStrapServers = ["localhost:9092"],
    SecurityProtocol = SecurityProtocol.Plaintext,
    SaslMechanisms = SaslMechanism.Plain,
};

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.Subscriptions =
        [
            new KafkaSubscription<OrderPlaced>(
                new SubscriptionName("subscription-orderplaced"),
                new ChannelName("order-placed-queue"),
                new RoutingKey("order-placed-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-1"),
            new KafkaSubscription<OrderPaid>(
                new SubscriptionName("subscription-orderpaid"),
                new ChannelName("order-paid-queue"),
                new RoutingKey("order-paid-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-2"),
        ];

        opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
    })
    .AutoFromAssemblies()
    .AddProducers(opt =>
    {
        opt.ProducerRegistry = new KafkaProducerRegistryFactory(
            connection,
            [
                new KafkaPublication<OrderPaid>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-paid-topic"),
                },
                new KafkaPublication<OrderPlaced>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-placed-topic"),
                }
                ]).Create();
    });
Enter fullscreen mode Exit fullscreen mode

MongoDB

For this project, I'll configure the inbox, outbox and distributed lock for MongoDB.

Using Fluent Brighter:

services
    .AddFluentBrighter(opt => opt
        .UsingMongoDb(pg => pg
            .SetConnection(db => db
                .SetConnectionString(connectionString)
                .SetDatabaseName("brightertests")
                .SetInbox("inbox")
                .SetOutbox("outbox")
                .SetLocking("locking")))
            .UseDistributedLock()
            .UseInbox()
            .UseOutbox())
 // ... other configuration (like Kafka) goes here
);
Enter fullscreen mode Exit fullscreen mode

Using Pure Brighter:

var configuration = new MongoDbConfiguration(connectionString, "brighter")
{
    Inbox = new MongoDbCollectionConfiguration { Name = "inbox" },
    Outbox = new MongoDbCollectionConfiguration { Name = "outbox" },
    Locking = new MongoDbCollectionConfiguration { Name = "locking" },
};

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.InboxConfiguration = new InboxConfiguration(new MongoDbInbox(configuration));
    })
   .AddProducers(opt => 
   {
       opt.Outbox = new MongoDbOutbox(configuration);
       opt.DistributedLock = new MongoDbLockingProvider(configuration);
       opt.ConnectionProvider = typeof(MongoDbConnectionProvider);
       opt.TransactionProvider = typeof(MongoDbUnitOfWork);
   });
Enter fullscreen mode Exit fullscreen mode

Example: Message and Request Handlers

Let's define the messages our service will use.

Messages

public class CreateNewOrder() : Command(Id.Random())
{
    public decimal Value { get; set; }
}

public class OrderPlaced() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;

    public decimal Value { get; set; }
}

public class OrderPaid() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Request Handlers

For these examples, I'll use synchronous handlers (RequestHandler).

CreateNewOrderHandler: This handler receives a CreateNewOrder command (via Send) and posts two new events, OrderPlaced and OrderPaid, to Kafka. The DepositPost method stores the messages in the outbox, allowing the background sweeper process to publish them to the broker asynchronously.

public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
    ILogger<CreateNewOrderHandler> logger) : RequestHandler<CreateNewOrder>
{
    public override CreateNewOrder Handle(CreateNewOrder command)
    {
        try
        {
            var id = Uuid.NewAsString();
            logger.LogInformation("Creating a new order: {OrderId}", id);

            commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
            commandProcessor.DepositPost(new OrderPaid { OrderId = id });
            return base.Handle(command);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Invalid data");
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPaidHandler: This handler consumes the OrderPaid event from Kafka and simply logs it.

public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandler<OrderPaid>
{
    public override OrderPaid Handle(OrderPaid command)
    {
        logger.LogInformation("{OrderId} paid", command.OrderId);
        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPlaceHandler: This handler simulates a potential failure. If the order value is divisible by 3, it throws an exception. The [UseResiliencePipeline] attribute allows us to apply a retry policy (e.g., "kafka-policy" configured to retry 3 times).

public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandler<OrderPlaced>
{
    [UseResiliencePipeline("kafka-policy", 1)]
    public override OrderPlaced Handle(OrderPlaced command)
    {
        logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
        if (command.Value % 3 == 0)
        {
            logger.LogError("Simulate an error for {OrderId} with value {OrderValue}", command.OrderId, command.Value);
            throw new InvalidOperationException("invalid error");
        }

        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

This handler demonstrates two important concepts working together:

  • Resilience: The [UseResiliencePipeline("kafka-policy", 1)] attribute adds in-handler retry logic (assuming you've defined a policy named "kafka-policy").
  • Inbox Pattern: When this handler fails, the message won't be stored in the inbox. This ensures that when Kafka offsets are reset, only genuinely unprocessed messages will be reprocessed, preventing duplicates.
  • Outbox Pattern: By using DepositPost, messages are persisted to the local outbox store before being published. This guarantees that messages are not lost even if the broker is temporarily unavailable.

Conclusion

The Inbox Pattern is essential for building resilient distributed systems. Paramore.Brighter, combined with its MongoDB inbox and Kafka integration, provides a clean, robust implementation for .NET applications.

This setup guarantees that your services can handle failures gracefully while maintaining data consistency across your distributed system.

The full code for this sample can be found on GitHub: https://github.com/lillo42/brighter-sample/tree/v10-mongodb

Top comments (0)