Uma das novas implementação do Brighter V10 é o MongoDB. Neste artigo, explorarei como utilizá-lo com os padrões Inbox e Outbox.
MongoDB
O MongoDB é um popular banco de dados NoSQL de documentos que armazena dados em documentos flexíveis no formato JSON-like. Sua escalabilidade e desempenho o tornam uma excelente escolha para aplicações de alto throughput, e seu modelo de documentos mapeia naturalmente para estruturas de mensagens comumente usadas em sistemas distribuídos.
Visão Geral do Projeto
Construiremos um serviço .NET 8+ que consome/produz mensagens de um tópico Kafka, processa-as usando Brighter, e utiliza um banco de dados MongoDB como inbox e outbox persistentes para garantir que todas as mensagens sejam enviadas. Também precisamos configurar um lock distribuído para evitar a publicação de mensagens duplicadas quando esta aplicação estiver executando em um ambiente com múltiplos nós.
Pacotes Necessários
Você pode escolher entre dois estilos de configuração:
-
Pacotes Brighter Puros:
- Paramore.Brighter.Inbox.MongoDb
- Paramore.Brighter.Outbox.MongoDb
- Paramore.Brighter.Locking.MongoDb
- Paramore.Brighter.MessagingGateway.Kafka
- Paramore.Brighter.ServiceActivator.Extensions.Hosting
- Paramore.Brighter.Extensions.DependencyInjection
-
Wrappers Fluent Brighter:
- Fluent.Brighter.Kafka
- Fluent.Brighter.MongoDb
- Paramore.Brighter.ServiceActivator.Extensions.Hosting
Revisão Rápida do Brighter
Antes de mergulharmos na configuração, vamos recapitular rapidamente os conceitos fundamentais do Brighter.
Request: Command e Event
O primeiro conceito-chave é a interface IRequest, normalmente implementada como um Command ou um Event. Esses objetos são utilizados pelo IAmACommandProcessor do Brighter para:
-
Send(espera exatamente um handler) -
Publish(espera zero ou mais handlers) -
Post(envia para um broker de mensagens externo)
Commands representam uma ação que você deseja executar, como CreateOrder. Events representam um fato ou algo que já aconteceu, como OrderCreated.
// Um comando a ser executado
public class CreateOrder() : Command(Id.Random());
// Um evento que ocorreu
public class OrderCreated() : Event(Id.Random());
// Você também pode implementar IRequest diretamente
public class CustomRequest : IRequest { ... }
Message Mapper
Quando você realiza um Post ou consome mensagens de um broker externo, precisa de um message mapper. Este componente é responsável por mapear seu objeto IRequest para e de um objeto Message do Brighter (que inclui cabeçalho e corpo para o broker).
O Brighter v10+ usa por padrão um message mapper JSON, mas você pode fornecer sua própria implementação personalizada implementando IAmAMessageMapper ou IAmAMessageMapperAsync.
Request Handler
Por último, mas não menos importante, temos o RequestHandler (ou RequestHandlerAsync). Esta é a classe que contém sua lógica de negócios real para processar um IRequest específico. O modelo de pipeline do Brighter permite encadear handlers e adicionar atributos para preocupações transversais como retries, logging e, é claro, o padrão inbox.
Exemplo de handler síncrono simples:
public class GreetingHandler : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting @event)
{
Console.WriteLine("===== Hello, {0}", @event.Name);
return base.Handle(@event);
}
}
public class GreetingHandlerAsync : RequestHandlerAsync<GreetingAsync>
{
public override Task<Greeting> HandleAsync(GreetingAsync @event, CancellationToken ct = default)
{
Console.WriteLine("===== Hello, {0}", @event.Name);
return base.Handle(@event);
}
}
Configurando Brighter com Kafka
Agora para a configuração principal. Precisamos dizer ao Brighter como se conectar ao Kafka (nosso broker).
Você pode configurar o Brighter usando seus componentes "puros" ou com a API Fluent Brighter. Ambos são mostrados aqui.
Usando Fluent Brighter:
services
.AddHostedService<ServiceActivatorHostedService>()
.AddFluentBrighter(opt => opt
.UsingKafka(kf => kf
.SetConnection(c => c
.SetName("sample")
.SetBootstrapServers("localhost:9092")
.SetSecurityProtocol(SecurityProtocol.Plaintext)
.SetSaslMechanisms(SaslMechanism.Plain))
.UseSubscriptions(s => s
.AddSubscription<OrderPlaced>(sb => sb
.SetTopic("order-placed-topic")
.SetConsumerGroupId("order-placed-topic-1")
.SetRequeueCount(3)
.CreateInfrastructureIfMissing()
.UseReactorMode())
.AddSubscription<OrderPaid>(sb => sb
.SetTopic("order-paid-topic")
.SetConsumerGroupId("order-paid-topic-1")
.CreateInfrastructureIfMissing()
.UseReactorMode()))
.UsePublications(p => p
.AddPublication<OrderPaid>(kp => kp
.SetTopic("order-paid-topic")
.CreateTopicIfMissing())
.AddPublication<OrderPlaced>(kp => kp
.SetTopic("order-placed-topic")
.CreateTopicIfMissing()))));
Usando Brighter Puro:
var connection = new KafkaMessagingGatewayConfiguration
{
Name = "sample",
BootStrapServers = ["localhost:9092"],
SecurityProtocol = SecurityProtocol.Plaintext,
SaslMechanisms = SaslMechanism.Plain,
};
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.Subscriptions =
[
new KafkaSubscription<OrderPlaced>(
new SubscriptionName("subscription-orderplaced"),
new ChannelName("order-placed-queue"),
new RoutingKey("order-placed-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-1"),
new KafkaSubscription<OrderPaid>(
new SubscriptionName("subscription-orderpaid"),
new ChannelName("order-paid-queue"),
new RoutingKey("order-paid-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-2"),
];
opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
.AutoFromAssemblies()
.AddProducers(opt =>
{
opt.ProducerRegistry = new KafkaProducerRegistryFactory(
connection,
[
new KafkaPublication<OrderPaid>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-paid-topic"),
},
new KafkaPublication<OrderPlaced>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-placed-topic"),
}
]).Create();
});
MongoDB
Para este projeto, configurarei o inbox, outbox e lock distribuído para MongoDB.
Usando Fluent Brighter:
services
.AddFluentBrighter(opt => opt
.UsingMongoDb(pg => pg
.SetConnection(db => db
.SetConnectionString(connectionString)
.SetDatabaseName("brightertests")
.SetInbox("inbox")
.SetOutbox("outbox")
.SetLocking("locking")))
.UseDistributedLock()
.UseInbox()
.UseOutbox())
// ... outras configurações (como Kafka) ficam aqui
);
Usando Brighter Puro:
var configuration = new MongoDbConfiguration(connectionString, "brighter")
{
Inbox = new MongoDbCollectionConfiguration { Name = "inbox" },
Outbox = new MongoDbCollectionConfiguration { Name = "outbox" },
Locking = new MongoDbCollectionConfiguration { Name = "locking" },
};
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.InboxConfiguration = new InboxConfiguration(new MongoDbInbox(configuration));
})
.AddProducers(opt =>
{
opt.Outbox = new MongoDbOutbox(configuration);
opt.DistributedLock = new MongoDbLockingProvider(configuration);
opt.ConnectionProvider = typeof(MongoDbConnectionProvider);
opt.TransactionProvider = typeof(MongoDbUnitOfWork);
});
Exemplo: Mensagens e Request Handlers
Vamos definir as mensagens que nosso serviço utilizará.
Mensagens
public class CreateNewOrder() : Command(Id.Random())
{
public decimal Value { get; set; }
}
public class OrderPlaced() : Event(Id.Random())
{
public string OrderId { get; set; } = string.Empty;
public decimal Value { get; set; }
}
public class OrderPaid() : Event(Id.Random())
{
public string OrderId { get; set; } = string.Empty;
}
Request Handlers
Para estes exemplos, usarei handlers síncronos (RequestHandler).
CreateNewOrderHandler: Este handler recebe um comando CreateNewOrder (via Send) e publica dois novos eventos, OrderPlaced e OrderPaid, no Kafka. O método DepositPost armazena as mensagens no outbox, permitindo que o processo sweeper em background as publique no broker assincronamente.
public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
ILogger<CreateNewOrderHandler> logger) : RequestHandler<CreateNewOrder>
{
public override CreateNewOrder Handle(CreateNewOrder command)
{
try
{
var id = Uuid.NewAsString();
logger.LogInformation("Criando novo pedido: {OrderId}", id);
commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
commandProcessor.DepositPost(new OrderPaid { OrderId = id });
return base.Handle(command);
}
catch (Exception ex)
{
logger.LogError(ex, "Dados inválidos");
throw;
}
}
}
OrderPaidHandler: Este handler consome o evento OrderPaid do Kafka e simplesmente registra em log.
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandler<OrderPaid>
{
public override OrderPaid Handle(OrderPaid command)
{
logger.LogInformation("Pedido {OrderId} pago", command.OrderId);
return base.Handle(command);
}
}
OrderPlaceHandler: Este handler simula uma falha potencial. Se o valor do pedido for divisível por 3, ele lança uma exceção. O atributo [UseResiliencePipeline] nos permite aplicar uma política de retry (ex: "kafka-policy" configurada para 3 tentativas).
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandler<OrderPlaced>
{
[UseResiliencePipeline("kafka-policy", 1)]
public override OrderPlaced Handle(OrderPlaced command)
{
logger.LogInformation("Pedido {OrderId} criado com valor {OrderValue}", command.OrderId, command.Value);
if (command.Value % 3 == 0)
{
logger.LogError("Simulando erro para pedido {OrderId} com valor {OrderValue}", command.OrderId, command.Value);
throw new InvalidOperationException("erro inválido");
}
return base.Handle(command);
}
}
Este handler demonstra dois conceitos importantes trabalhando juntos:
-
Resiliência: O atributo
[UseResiliencePipeline("kafka-policy", 1)]adiciona lógica de retry dentro do handler (assumindo que você definiu uma política chamada "kafka-policy"). - Padrão Inbox: Quando este handler falha, a mensagem não é armazenada no inbox. Isso garante que, ao redefinir offsets do Kafka, apenas mensagens genuinamente não processadas sejam reprocessadas, evitando duplicatas.
-
Padrão Outbox: Ao usar
DepositPost, as mensagens são persistidas no outbox local antes da publicação. Isso garante que as mensagens não sejam perdidas mesmo se o broker estiver temporariamente indisponível.
Conclusão
O Padrão Inbox é essencial para construir sistemas distribuídos resilientes. O Paramore.Brighter, combinado com seu inbox MongoDB e integração Kafka, oferece uma implementação limpa e robusta para aplicações .NET.
Esta configuração garante que seus serviços possam lidar com falhas com elegância enquanto mantêm a consistência de dados em todo o sistema distribuído.
O código completo desta amostra está disponível no GitHub: https://github.com/lillo42/brighter-sample/tree/v10-mongodb
Top comments (0)