No meu artigo anterior, exploramos o Padrão Inbox como uma solução para garantir o processamento exatamente uma vez de mensagens em sistemas distribuídos. Hoje, vamos mergulhar em uma implementação prática usando o Brighter (um processador e despachante de comandos .NET) combinado com PostgreSQL como nosso repositório de inbox e Kafka como nosso broker de mensagens.
Esta configuração oferece uma base robusta para construir microsserviços orientados a eventos confiáveis que podem se recuperar graciosamente de falhas enquanto mantêm a integridade das mensagens.
Visão Geral do Projeto
Vamos construir um serviço .NET 8+ que consome/produz mensagens de um tópico Kafka, processa-as usando o Brighter e utiliza um banco de dados PostgreSQL como inbox persistente para garantir idempotência e confiabilidade.
Pacotes Necessários
Você pode escolher entre dois estilos de configuração:
- Pacotes Brighter Puros:
- Paramore.Brighter.Inbox.Postgres
- Paramore.Brighter.MessagingGateway.Kafka
- Paramore.Brighter.ServiceActivator.Extensions.Hosting
- Paramore.Brighter.Extensions.DependencyInjection
- Wrappers Fluent Brighter:
- Fluent.Brighter.Kafka
- Fluent.Brighter.Postgres
- Paramore.Brighter.ServiceActivator.Extensions.Hosting
Um Breve Resumo do Brighter
Antes de mergulharmos na configuração, vamos recapitular rapidamente os conceitos fundamentais do Brighter.
Requisição: Comando e Evento
O primeiro conceito-chave é a interface IRequest, que geralmente é implementada como um Command ou um Event. Esses objetos são usados pelo IAmACommandProcessor do Brighter para:
-
Send(espera exatamente um handler) -
Publish(espera zero ou mais handlers) -
Post(envia para um broker de mensagens externo)
Comandos representam uma ação que você deseja executar, como CreateOrder. Eventos representam um fato ou algo que já aconteceu, como OrderCreated.
// Um comando a ser executado
public class CreateOrder() : Command(Id.Random());
// Um evento que ocorreu
public class OrderCreated() : Event(Id.Random());
// Você também pode implementar IRequest diretamente
public class CustomRequest : IRequest { ... }
Mapeador de Mensagem
Quando você envia ou consome mensagens de um broker externo, precisa de um mapeador de mensagem. Este componente é responsável por mapear seu objeto IRequest para e a partir de um objeto Message do Brighter (que inclui o cabeçalho e corpo para o broker).
O Brighter v10+ usa por padrão um mapeador de mensagem JSON, mas você pode fornecer sua própria implementação personalizada implementando IAmAMessageMapper ou IAmAMessageMapperAsync.
Manipulador de Requisição
Por último, mas não menos importante, está o RequestHandler (ou RequestHandlerAsync). Esta é a classe que contém sua lógica de negócios real para processar um IRequest específico. O modelo de pipeline do Brighter permite encadear handlers e adicionar atributos para preocupações transversais como retries, logging e, é claro, o padrão inbox.
Aqui está um handler síncrono simples:
public class GreetingHandler : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting @event)
{
Console.WriteLine("===== Olá, {0}", @event.Name);
return base.Handle(@event);
}
}
public class GreetingHandlerAsync : RequestHandlerAsync<GreetingAsync>
{
public override Task<Greeting> HandleAsync(GreetingAsync @event, CancellationToken ct = default)
{
Console.WriteLine("===== Olá, {0}", @event.Name);
return base.Handle(@event);
}
}
Configurando o Brighter com Kafka e Postgres
Agora para a configuração principal. Precisamos informar ao Brighter como se conectar ao Kafka (nosso broker) e ao Postgres (nosso repositório de inbox).
Configuração do Kafka
Você pode configurar o Brighter usando seus componentes "puros" ou com a API Fluent Brighter. Ambos são mostrados aqui.
Usando Fluent Brighter:
services
.AddHostedService<ServiceActivatorHostedService>()
.AddFluentBrighter(opt => opt
.UsingKafka(kf => kf
.SetConnection(c => c
.SetName("sample")
.SetBootstrapServers("localhost:9092")
.SetSecurityProtocol(SecurityProtocol.Plaintext)
.SetSaslMechanisms(SaslMechanism.Plain))
.UseSubscriptions(s => s
.AddSubscription<OrderPlaced>(sb => sb
.SetTopic("order-placed-topic")
.SetConsumerGroupId("order-placed-topic-1")
.SetRequeueCount(3)
.CreateInfrastructureIfMissing()
.UseReactorMode())
.AddSubscription<OrderPaid>(sb => sb
.SetTopic("order-paid-topic")
.SetConsumerGroupId("order-paid-topic-1")
.CreateInfrastructureIfMissing()
.UseReactorMode()))
.UsePublications(p => p
.AddPublication<OrderPaid>(kp => kp
.SetTopic("order-paid-topic")
.CreateTopicIfMissing())
.AddPublication<OrderPlaced>(kp => kp
.SetTopic("order-placed-topic")
.CreateTopicIfMissing()))));
Usando Brighter Puro:
var connection = new KafkaMessagingGatewayConfiguration
{
Name = "sample",
BootStrapServers = ["localhost:9092"],
SecurityProtocol = SecurityProtocol.Plaintext,
SaslMechanisms = SaslMechanism.Plain,
};
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.Subscriptions =
[
new KafkaSubscription<OrderPlaced>(
new SubscriptionName("subscription-orderplaced"),
new ChannelName("order-placed-queue"),
new RoutingKey("order-placed-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-1"),
new KafkaSubscription<OrderPaid>(
new SubscriptionName("subscription-orderpaid"),
new ChannelName("order-paid-queue"),
new RoutingKey("order-paid-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-2"),
];
opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
.AutoFromAssemblies()
.AddProducers(opt =>
{
opt.ProducerRegistry = new KafkaProducerRegistryFactory(
connection,
[
new KafkaPublication<OrderPaid>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-paid-topic"),
},
new KafkaPublication<OrderPlaced>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-placed-topic"),
}
]).Create();
});
Configuração do Inbox no Postgres
Primeiro, precisamos garantir que a tabela do inbox exista em nosso banco de dados. O Brighter fornece um helper de script DDL para isso. Você pode executar isso uma vez no início da aplicação.
await using (NpgsqlConnection connection = new(connectionString))
{
await connection.OpenAsync();
await using var command = connection.CreateCommand();
command.CommandText = PostgreSqlInboxBuilder.GetDDL("inboxmessages");
_ = await command.ExecuteNonQueryAsync();
}
Configurando o Padrão Inbox
Finalmente, vinculamos o Inbox do Postgres à nossa configuração de consumidor.
Usando Fluent Brighter:
services
.AddFluentBrighter(opt => opt
.UsingPostgres(pg => pg
.SetConnection(db => db
.SetConnectionString(connectionString)
.SetDatabaseName("brightertests")
.SetInboxTableName("inboxmessages"))
.UseInbox())
// ... outras configurações (como Kafka) vão aqui
);
Usando Brighter Puro:
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.InboxConfiguration = new InboxConfiguration(new PostgreSqlInbox(new RelationalDatabaseConfiguration(connectionString, "brightertests", inboxTableName: "inboxmessages")));
// ... configure subscriptions etc.
});
Exemplo: Mensagens e Manipuladores de Requisição
Vamos definir as mensagens que nosso serviço usará.
Mensagens
public class CreateNewOrder() : Command(Id.Random())
{
public decimal Value { get; set; }
}
public class OrderPlaced() : Event(Id.Random())
{
public string OrderId { get; set; } = string.Empty;
public decimal Value { get; set; }
}
public class OrderPaid() : Event(Id.Random())
{
public string OrderId { get; set; } = string.Empty;
}
Manipuladores de Requisição
Para estes exemplos, usarei handlers síncronos (RequestHandler).
Nota sobre Performance: Como observado na documentação do Brighter, o método síncrono Post com o produtor Kafka geralmente oferece desempenho significativamente melhor do que o PostAsync devido à implementação subjacente do cliente C# do Kafka.
CreateNewOrderHandler: Este handler recebe um comando CreateNewOrder (via Send) e publica dois novos eventos, OrderPlaced e OrderPaid, no Kafka.
public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
ILogger<CreateNewOrderHandler> logger) : RequestHandler<CreateNewOrder>
{
public override CreateNewOrder Handle(CreateNewOrder command)
{
try
{
var id = Uuid.NewAsString();
logger.LogInformation("Criando um novo pedido: {OrderId}", id);
commandProcessor.Post(new OrderPlaced { OrderId = id, Value = command.Value });
commandProcessor.Post(new OrderPaid { OrderId = id });
return base.Handle(command);
}
catch (Exception ex)
{
logger.LogError(ex, "Dados inválidos");
throw;
}
}
}
OrderPaidHandler: Este handler consome o evento OrderPaid do Kafka e simplesmente registra no log.
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandler<OrderPaid>
{
public override OrderPaid Handle(OrderPaid command)
{
logger.LogInformation("{OrderId} pago", command.OrderId);
return base.Handle(command);
}
}
OrderPlaceHandler: Este handler simula uma falha potencial. Se o valor do pedido for divisível por 3, ele lança uma exceção. O atributo [UseResiliencePipeline] nos permite aplicar uma política de retry (por exemplo, "kafka-policy" configurada para tentar 3 vezes).
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandler<OrderPlaced>
{
[UseResiliencePipeline("kafka-policy", 1)]
public override OrderPlaced Handle(OrderPlaced command)
{
logger.LogInformation("{OrderId} criado com valor {OrderValue}", command.OrderId, command.Value);
if (command.Value % 3 == 0)
{
logger.LogError("Simulando erro para {OrderId} com valor {OrderValue}", command.OrderId, command.Value);
throw new InvalidOperationException("erro inválido");
}
return base.Handle(command);
}
}
Este handler demonstra dois conceitos importantes trabalhando juntos:
- Resiliência: O atributo
[UseResiliencePipeline("kafka-policy", 1)]adiciona lógica de retry dentro do handler (assumindo que você definiu uma política chamada "kafka-policy"). - Padrão Inbox: Quando este handler falha, a mensagem não será armazenada no inbox. Isso garante que, quando os offsets do Kafka forem redefinidos, apenas mensagens genuinamente não processadas serão reprocessadas, evitando duplicatas.
Conclusão
O Padrão Inbox é essencial para construir sistemas baseados em mensagens resilientes e idempotentes. O Paramore.Brighter, combinado com seu inbox PostgreSQL e integração com Kafka, fornece uma implementação limpa e robusta para aplicações .NET.
Esta configuração garante que seus serviços possam lidar com falhas graciosamente enquanto mantêm a consistência de dados em todo o seu sistema distribuído.
O código completo desta amostra pode ser encontrado no GitHub: https://github.com/lillo42/brighter-sample/tree/v10-inbox-postgres
Top comments (0)