DEV Community

Cover image for Usando MongoDB com Brighter V10
Rafael Andrade
Rafael Andrade

Posted on

Usando MongoDB com Brighter V10

Uma das novas implementação do Brighter V10 é o MongoDB. Neste artigo, explorarei como utilizá-lo com os padrões Inbox e Outbox.

MongoDB

O MongoDB é um popular banco de dados NoSQL de documentos que armazena dados em documentos flexíveis no formato JSON-like. Sua escalabilidade e desempenho o tornam uma excelente escolha para aplicações de alto throughput, e seu modelo de documentos mapeia naturalmente para estruturas de mensagens comumente usadas em sistemas distribuídos.

Visão Geral do Projeto

Construiremos um serviço .NET 8+ que consome/produz mensagens de um tópico Kafka, processa-as usando Brighter, e utiliza um banco de dados MongoDB como inbox e outbox persistentes para garantir que todas as mensagens sejam enviadas. Também precisamos configurar um lock distribuído para evitar a publicação de mensagens duplicadas quando esta aplicação estiver executando em um ambiente com múltiplos nós.

Pacotes Necessários

Você pode escolher entre dois estilos de configuração:

  1. Pacotes Brighter Puros:

    • Paramore.Brighter.Inbox.MongoDb
    • Paramore.Brighter.Outbox.MongoDb
    • Paramore.Brighter.Locking.MongoDb
    • Paramore.Brighter.MessagingGateway.Kafka
    • Paramore.Brighter.ServiceActivator.Extensions.Hosting
    • Paramore.Brighter.Extensions.DependencyInjection
  2. Wrappers Fluent Brighter:

    • Fluent.Brighter.Kafka
    • Fluent.Brighter.MongoDb
    • Paramore.Brighter.ServiceActivator.Extensions.Hosting

Revisão Rápida do Brighter

Antes de mergulharmos na configuração, vamos recapitular rapidamente os conceitos fundamentais do Brighter.

Request: Command e Event

O primeiro conceito-chave é a interface IRequest, normalmente implementada como um Command ou um Event. Esses objetos são utilizados pelo IAmACommandProcessor do Brighter para:

  • Send (espera exatamente um handler)
  • Publish (espera zero ou mais handlers)
  • Post (envia para um broker de mensagens externo)

Commands representam uma ação que você deseja executar, como CreateOrder. Events representam um fato ou algo que já aconteceu, como OrderCreated.

// Um comando a ser executado
public class CreateOrder() : Command(Id.Random());

// Um evento que ocorreu
public class OrderCreated() : Event(Id.Random());

// Você também pode implementar IRequest diretamente
public class CustomRequest : IRequest { ... }
Enter fullscreen mode Exit fullscreen mode

Message Mapper

Quando você realiza um Post ou consome mensagens de um broker externo, precisa de um message mapper. Este componente é responsável por mapear seu objeto IRequest para e de um objeto Message do Brighter (que inclui cabeçalho e corpo para o broker).

O Brighter v10+ usa por padrão um message mapper JSON, mas você pode fornecer sua própria implementação personalizada implementando IAmAMessageMapper ou IAmAMessageMapperAsync.

Request Handler

Por último, mas não menos importante, temos o RequestHandler (ou RequestHandlerAsync). Esta é a classe que contém sua lógica de negócios real para processar um IRequest específico. O modelo de pipeline do Brighter permite encadear handlers e adicionar atributos para preocupações transversais como retries, logging e, é claro, o padrão inbox.

Exemplo de handler síncrono simples:

public class GreetingHandler : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting @event)
    {
        Console.WriteLine("===== Hello, {0}", @event.Name);
        return base.Handle(@event);
    }
}

public class GreetingHandlerAsync : RequestHandlerAsync<GreetingAsync>
{
    public override Task<Greeting> HandleAsync(GreetingAsync @event, CancellationToken ct = default)
    {
        Console.WriteLine("===== Hello, {0}", @event.Name);
        return base.Handle(@event);
    }
}
Enter fullscreen mode Exit fullscreen mode

Configurando Brighter com Kafka

Agora para a configuração principal. Precisamos dizer ao Brighter como se conectar ao Kafka (nosso broker).

Você pode configurar o Brighter usando seus componentes "puros" ou com a API Fluent Brighter. Ambos são mostrados aqui.

Usando Fluent Brighter:

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddFluentBrighter(opt => opt
        .UsingKafka(kf => kf
            .SetConnection(c => c
                .SetName("sample")
                .SetBootstrapServers("localhost:9092")
                .SetSecurityProtocol(SecurityProtocol.Plaintext)
                .SetSaslMechanisms(SaslMechanism.Plain))
            .UseSubscriptions(s => s
                .AddSubscription<OrderPlaced>(sb => sb
                    .SetTopic("order-placed-topic")
                    .SetConsumerGroupId("order-placed-topic-1")
                    .SetRequeueCount(3)
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode())
                .AddSubscription<OrderPaid>(sb => sb
                    .SetTopic("order-paid-topic")
                    .SetConsumerGroupId("order-paid-topic-1")
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode()))
              .UsePublications(p => p
                  .AddPublication<OrderPaid>(kp => kp
                      .SetTopic("order-paid-topic")
                      .CreateTopicIfMissing())
                  .AddPublication<OrderPlaced>(kp => kp
                      .SetTopic("order-placed-topic")
                      .CreateTopicIfMissing()))));
Enter fullscreen mode Exit fullscreen mode

Usando Brighter Puro:

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample",
    BootStrapServers = ["localhost:9092"],
    SecurityProtocol = SecurityProtocol.Plaintext,
    SaslMechanisms = SaslMechanism.Plain,
};

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.Subscriptions =
        [
            new KafkaSubscription<OrderPlaced>(
                new SubscriptionName("subscription-orderplaced"),
                new ChannelName("order-placed-queue"),
                new RoutingKey("order-placed-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-1"),
            new KafkaSubscription<OrderPaid>(
                new SubscriptionName("subscription-orderpaid"),
                new ChannelName("order-paid-queue"),
                new RoutingKey("order-paid-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-2"),
        ];

        opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
    })
    .AutoFromAssemblies()
    .AddProducers(opt =>
    {
        opt.ProducerRegistry = new KafkaProducerRegistryFactory(
            connection,
            [
                new KafkaPublication<OrderPaid>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-paid-topic"),
                },
                new KafkaPublication<OrderPlaced>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-placed-topic"),
                }
                ]).Create();
    });
Enter fullscreen mode Exit fullscreen mode

MongoDB

Para este projeto, configurarei o inbox, outbox e lock distribuído para MongoDB.

Usando Fluent Brighter:

services
    .AddFluentBrighter(opt => opt
        .UsingMongoDb(pg => pg
            .SetConnection(db => db
                .SetConnectionString(connectionString)
                .SetDatabaseName("brightertests")
                .SetInbox("inbox")
                .SetOutbox("outbox")
                .SetLocking("locking")))
            .UseDistributedLock()
            .UseInbox()
            .UseOutbox())
 // ... outras configurações (como Kafka) ficam aqui
);
Enter fullscreen mode Exit fullscreen mode

Usando Brighter Puro:

var configuration = new MongoDbConfiguration(connectionString, "brighter")
{
    Inbox = new MongoDbCollectionConfiguration { Name = "inbox" },
    Outbox = new MongoDbCollectionConfiguration { Name = "outbox" },
    Locking = new MongoDbCollectionConfiguration { Name = "locking" },
};

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.InboxConfiguration = new InboxConfiguration(new MongoDbInbox(configuration));
    })
   .AddProducers(opt => 
   {
       opt.Outbox = new MongoDbOutbox(configuration);
       opt.DistributedLock = new MongoDbLockingProvider(configuration);
       opt.ConnectionProvider = typeof(MongoDbConnectionProvider);
       opt.TransactionProvider = typeof(MongoDbUnitOfWork);
   });
Enter fullscreen mode Exit fullscreen mode

Exemplo: Mensagens e Request Handlers

Vamos definir as mensagens que nosso serviço utilizará.

Mensagens

public class CreateNewOrder() : Command(Id.Random())
{
    public decimal Value { get; set; }
}

public class OrderPlaced() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;
    public decimal Value { get; set; }
}

public class OrderPaid() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Request Handlers

Para estes exemplos, usarei handlers síncronos (RequestHandler).

CreateNewOrderHandler: Este handler recebe um comando CreateNewOrder (via Send) e publica dois novos eventos, OrderPlaced e OrderPaid, no Kafka. O método DepositPost armazena as mensagens no outbox, permitindo que o processo sweeper em background as publique no broker assincronamente.

public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
    ILogger<CreateNewOrderHandler> logger) : RequestHandler<CreateNewOrder>
{
    public override CreateNewOrder Handle(CreateNewOrder command)
    {
        try
        {
            var id = Uuid.NewAsString();
            logger.LogInformation("Criando novo pedido: {OrderId}", id);

            commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
            commandProcessor.DepositPost(new OrderPaid { OrderId = id });
            return base.Handle(command);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Dados inválidos");
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPaidHandler: Este handler consome o evento OrderPaid do Kafka e simplesmente registra em log.

public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandler<OrderPaid>
{
    public override OrderPaid Handle(OrderPaid command)
    {
        logger.LogInformation("Pedido {OrderId} pago", command.OrderId);
        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPlaceHandler: Este handler simula uma falha potencial. Se o valor do pedido for divisível por 3, ele lança uma exceção. O atributo [UseResiliencePipeline] nos permite aplicar uma política de retry (ex: "kafka-policy" configurada para 3 tentativas).

public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandler<OrderPlaced>
{
    [UseResiliencePipeline("kafka-policy", 1)]
    public override OrderPlaced Handle(OrderPlaced command)
    {
        logger.LogInformation("Pedido {OrderId} criado com valor {OrderValue}", command.OrderId, command.Value);
        if (command.Value % 3 == 0)
        {
            logger.LogError("Simulando erro para pedido {OrderId} com valor {OrderValue}", command.OrderId, command.Value);
            throw new InvalidOperationException("erro inválido");
        }

        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

Este handler demonstra dois conceitos importantes trabalhando juntos:

  • Resiliência: O atributo [UseResiliencePipeline("kafka-policy", 1)] adiciona lógica de retry dentro do handler (assumindo que você definiu uma política chamada "kafka-policy").
  • Padrão Inbox: Quando este handler falha, a mensagem não é armazenada no inbox. Isso garante que, ao redefinir offsets do Kafka, apenas mensagens genuinamente não processadas sejam reprocessadas, evitando duplicatas.
  • Padrão Outbox: Ao usar DepositPost, as mensagens são persistidas no outbox local antes da publicação. Isso garante que as mensagens não sejam perdidas mesmo se o broker estiver temporariamente indisponível.

Conclusão

O Padrão Inbox é essencial para construir sistemas distribuídos resilientes. O Paramore.Brighter, combinado com seu inbox MongoDB e integração Kafka, oferece uma implementação limpa e robusta para aplicações .NET.

Esta configuração garante que seus serviços possam lidar com falhas com elegância enquanto mantêm a consistência de dados em todo o sistema distribuído.

O código completo desta amostra está disponível no GitHub: https://github.com/lillo42/brighter-sample/tree/v10-mongodb

Top comments (0)