DEV Community

Cover image for Implementando o Padrão Inbox com Brighter V10 usando Kafka e PostgreSQL
Rafael Andrade
Rafael Andrade

Posted on

Implementando o Padrão Inbox com Brighter V10 usando Kafka e PostgreSQL

No meu artigo anterior, exploramos o Padrão Inbox como uma solução para garantir o processamento exatamente uma vez de mensagens em sistemas distribuídos. Hoje, vamos mergulhar em uma implementação prática usando o Brighter (um processador e despachante de comandos .NET) combinado com PostgreSQL como nosso repositório de inbox e Kafka como nosso broker de mensagens.

Esta configuração oferece uma base robusta para construir microsserviços orientados a eventos confiáveis que podem se recuperar graciosamente de falhas enquanto mantêm a integridade das mensagens.

Visão Geral do Projeto

Vamos construir um serviço .NET 8+ que consome/produz mensagens de um tópico Kafka, processa-as usando o Brighter e utiliza um banco de dados PostgreSQL como inbox persistente para garantir idempotência e confiabilidade.

Pacotes Necessários

Você pode escolher entre dois estilos de configuração:

  1. Pacotes Brighter Puros:
  • Paramore.Brighter.Inbox.Postgres
  • Paramore.Brighter.MessagingGateway.Kafka
  • Paramore.Brighter.ServiceActivator.Extensions.Hosting
  • Paramore.Brighter.Extensions.DependencyInjection
  1. Wrappers Fluent Brighter:
  • Fluent.Brighter.Kafka
  • Fluent.Brighter.Postgres
  • Paramore.Brighter.ServiceActivator.Extensions.Hosting

Um Breve Resumo do Brighter

Antes de mergulharmos na configuração, vamos recapitular rapidamente os conceitos fundamentais do Brighter.

Requisição: Comando e Evento

O primeiro conceito-chave é a interface IRequest, que geralmente é implementada como um Command ou um Event. Esses objetos são usados pelo IAmACommandProcessor do Brighter para:

  • Send (espera exatamente um handler)
  • Publish (espera zero ou mais handlers)
  • Post (envia para um broker de mensagens externo)

Comandos representam uma ação que você deseja executar, como CreateOrder. Eventos representam um fato ou algo que já aconteceu, como OrderCreated.

// Um comando a ser executado
public class CreateOrder() : Command(Id.Random());

// Um evento que ocorreu
public class OrderCreated() : Event(Id.Random());

// Você também pode implementar IRequest diretamente
public class CustomRequest : IRequest { ... }
Enter fullscreen mode Exit fullscreen mode

Mapeador de Mensagem

Quando você envia ou consome mensagens de um broker externo, precisa de um mapeador de mensagem. Este componente é responsável por mapear seu objeto IRequest para e a partir de um objeto Message do Brighter (que inclui o cabeçalho e corpo para o broker).

O Brighter v10+ usa por padrão um mapeador de mensagem JSON, mas você pode fornecer sua própria implementação personalizada implementando IAmAMessageMapper ou IAmAMessageMapperAsync.

Manipulador de Requisição

Por último, mas não menos importante, está o RequestHandler (ou RequestHandlerAsync). Esta é a classe que contém sua lógica de negócios real para processar um IRequest específico. O modelo de pipeline do Brighter permite encadear handlers e adicionar atributos para preocupações transversais como retries, logging e, é claro, o padrão inbox.

Aqui está um handler síncrono simples:

public class GreetingHandler : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting @event)
    {
        Console.WriteLine("===== Olá, {0}", @event.Name);
        return base.Handle(@event);
    }
}

public class GreetingHandlerAsync : RequestHandlerAsync<GreetingAsync>
{
    public override Task<Greeting> HandleAsync(GreetingAsync @event, CancellationToken ct = default)
    {
        Console.WriteLine("===== Olá, {0}", @event.Name);
        return base.Handle(@event);
    }
}
Enter fullscreen mode Exit fullscreen mode

Configurando o Brighter com Kafka e Postgres

Agora para a configuração principal. Precisamos informar ao Brighter como se conectar ao Kafka (nosso broker) e ao Postgres (nosso repositório de inbox).

Configuração do Kafka

Você pode configurar o Brighter usando seus componentes "puros" ou com a API Fluent Brighter. Ambos são mostrados aqui.

Usando Fluent Brighter:

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddFluentBrighter(opt => opt
        .UsingKafka(kf => kf
            .SetConnection(c => c
                .SetName("sample")
                .SetBootstrapServers("localhost:9092")
                .SetSecurityProtocol(SecurityProtocol.Plaintext)
                .SetSaslMechanisms(SaslMechanism.Plain))
            .UseSubscriptions(s => s
                .AddSubscription<OrderPlaced>(sb => sb
                    .SetTopic("order-placed-topic")
                    .SetConsumerGroupId("order-placed-topic-1")
                    .SetRequeueCount(3)
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode())
                .AddSubscription<OrderPaid>(sb => sb
                    .SetTopic("order-paid-topic")
                    .SetConsumerGroupId("order-paid-topic-1")
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode()))
              .UsePublications(p => p
                  .AddPublication<OrderPaid>(kp => kp
                      .SetTopic("order-paid-topic")
                      .CreateTopicIfMissing())
                  .AddPublication<OrderPlaced>(kp => kp
                      .SetTopic("order-placed-topic")
                      .CreateTopicIfMissing()))));
Enter fullscreen mode Exit fullscreen mode

Usando Brighter Puro:

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample",
    BootStrapServers = ["localhost:9092"],
    SecurityProtocol = SecurityProtocol.Plaintext,
    SaslMechanisms = SaslMechanism.Plain,
};

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.Subscriptions =
        [
            new KafkaSubscription<OrderPlaced>(
                new SubscriptionName("subscription-orderplaced"),
                new ChannelName("order-placed-queue"),
                new RoutingKey("order-placed-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-1"),
            new KafkaSubscription<OrderPaid>(
                new SubscriptionName("subscription-orderpaid"),
                new ChannelName("order-paid-queue"),
                new RoutingKey("order-paid-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-2"),
        ];

        opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
    })
    .AutoFromAssemblies()
    .AddProducers(opt =>
    {
        opt.ProducerRegistry = new KafkaProducerRegistryFactory(
            connection,
            [
                new KafkaPublication<OrderPaid>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-paid-topic"),
                },
                new KafkaPublication<OrderPlaced>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-placed-topic"),
                }
                ]).Create();
    });
Enter fullscreen mode Exit fullscreen mode

Configuração do Inbox no Postgres

Primeiro, precisamos garantir que a tabela do inbox exista em nosso banco de dados. O Brighter fornece um helper de script DDL para isso. Você pode executar isso uma vez no início da aplicação.

await using (NpgsqlConnection connection = new(connectionString))
{
    await connection.OpenAsync();
    await using var command = connection.CreateCommand();

    command.CommandText = PostgreSqlInboxBuilder.GetDDL("inboxmessages");
    _ = await command.ExecuteNonQueryAsync();
}
Enter fullscreen mode Exit fullscreen mode

Configurando o Padrão Inbox

Finalmente, vinculamos o Inbox do Postgres à nossa configuração de consumidor.

Usando Fluent Brighter:

 services
    .AddFluentBrighter(opt => opt
        .UsingPostgres(pg => pg
            .SetConnection(db => db
                .SetConnectionString(connectionString)
                .SetDatabaseName("brightertests")
                .SetInboxTableName("inboxmessages"))
            .UseInbox())
 // ... outras configurações (como Kafka) vão aqui
);
Enter fullscreen mode Exit fullscreen mode

Usando Brighter Puro:

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.InboxConfiguration = new InboxConfiguration(new PostgreSqlInbox(new RelationalDatabaseConfiguration(connectionString, "brightertests", inboxTableName: "inboxmessages")));
 // ... configure subscriptions etc.
    });
Enter fullscreen mode Exit fullscreen mode

Exemplo: Mensagens e Manipuladores de Requisição

Vamos definir as mensagens que nosso serviço usará.

Mensagens

public class CreateNewOrder() : Command(Id.Random())
{
    public decimal Value { get; set; }
}

public class OrderPlaced() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;

    public decimal Value { get; set; }
}

public class OrderPaid() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Manipuladores de Requisição

Para estes exemplos, usarei handlers síncronos (RequestHandler).

Nota sobre Performance: Como observado na documentação do Brighter, o método síncrono Post com o produtor Kafka geralmente oferece desempenho significativamente melhor do que o PostAsync devido à implementação subjacente do cliente C# do Kafka.

CreateNewOrderHandler: Este handler recebe um comando CreateNewOrder (via Send) e publica dois novos eventos, OrderPlaced e OrderPaid, no Kafka.

public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
    ILogger<CreateNewOrderHandler> logger) : RequestHandler<CreateNewOrder>
{
    public override CreateNewOrder Handle(CreateNewOrder command)
    {
        try
        {
            var id = Uuid.NewAsString();
            logger.LogInformation("Criando um novo pedido: {OrderId}", id);

            commandProcessor.Post(new OrderPlaced { OrderId = id, Value = command.Value });
            commandProcessor.Post(new OrderPaid { OrderId = id });
            return base.Handle(command);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Dados inválidos");
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPaidHandler: Este handler consome o evento OrderPaid do Kafka e simplesmente registra no log.

public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandler<OrderPaid>
{
    public override OrderPaid Handle(OrderPaid command)
    {
        logger.LogInformation("{OrderId} pago", command.OrderId);
        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPlaceHandler: Este handler simula uma falha potencial. Se o valor do pedido for divisível por 3, ele lança uma exceção. O atributo [UseResiliencePipeline] nos permite aplicar uma política de retry (por exemplo, "kafka-policy" configurada para tentar 3 vezes).

public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandler<OrderPlaced>
{
    [UseResiliencePipeline("kafka-policy", 1)]
    public override OrderPlaced Handle(OrderPlaced command)
    {
        logger.LogInformation("{OrderId} criado com valor {OrderValue}", command.OrderId, command.Value);
        if (command.Value % 3 == 0)
        {
            logger.LogError("Simulando erro para {OrderId} com valor {OrderValue}", command.OrderId, command.Value);
            throw new InvalidOperationException("erro inválido");
        }

        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

Este handler demonstra dois conceitos importantes trabalhando juntos:

  • Resiliência: O atributo [UseResiliencePipeline("kafka-policy", 1)] adiciona lógica de retry dentro do handler (assumindo que você definiu uma política chamada "kafka-policy").
  • Padrão Inbox: Quando este handler falha, a mensagem não será armazenada no inbox. Isso garante que, quando os offsets do Kafka forem redefinidos, apenas mensagens genuinamente não processadas serão reprocessadas, evitando duplicatas.

Conclusão

O Padrão Inbox é essencial para construir sistemas baseados em mensagens resilientes e idempotentes. O Paramore.Brighter, combinado com seu inbox PostgreSQL e integração com Kafka, fornece uma implementação limpa e robusta para aplicações .NET.

Esta configuração garante que seus serviços possam lidar com falhas graciosamente enquanto mantêm a consistência de dados em todo o seu sistema distribuído.

O código completo desta amostra pode ser encontrado no GitHub: https://github.com/lillo42/brighter-sample/tree/v10-inbox-postgres

Top comments (0)