DEV Community

Cover image for Usando DynamoDB com Brighter V10
Rafael Andrade
Rafael Andrade

Posted on

Usando DynamoDB com Brighter V10

O Brighter possui suporte nativo para o DynamoDB por meio de provedores dedicados de Inbox, Outbox e bloqueio distribuído (distributed locking). Neste artigo, explorarei como implementar os padrões Inbox e Outbox usando o DynamoDB para criar serviços .NET resilientes e com consistência eventual.

Visão Geral do Projeto

Construiremos um serviço em .NET 8+ que consome e produz mensagens de um tópico Kafka, processa-as usando o Brighter e utiliza o DynamoDB como armazenamento persistente para o Inbox e o Outbox. Isso garante que todas as mensagens sejam processadas exatamente uma vez e que nenhuma mensagem seja perdida, mesmo diante de falhas. Também configuraremos um bloqueio distribuído para evitar a publicação de mensagens duplicadas quando a aplicação for executada em um ambiente multi-nó.

Pacotes Necessários

Escolha entre dois estilos de configuração:

Pacotes "Pure Brighter" (AWS SDK v4 recomendado)

<PackageReference Include="Paramore.Brighter.Inbox.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Outbox.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Locking.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.MessagingGateway.Kafka" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Extensions.DependencyInjection" Version="10.3.0" />
Enter fullscreen mode Exit fullscreen mode

Wrappers "Fluent Brighter"

<PackageReference Include="Fluent.Brighter.Kafka" Version="1.8.1" />
<PackageReference Include="Fluent.Brighter.AWS.V4" Version="1.8.1" />
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" Version="10.3.0" />
Enter fullscreen mode Exit fullscreen mode

Uma Breve Recapitulação do Brighter

Antes de mergulharmos na configuração, vamos revisar rapidamente os conceitos fundamentais do Brighter.

Request: Command e Event

O primeiro conceito-chave é a interface IRequest, normalmente implementada como um Command ou um Event. Esses objetos são usados pelo IAmACommandProcessor do Brighter para:

  • Send: espera exatamente um handler.
  • Publish: espera zero ou mais handlers.
  • Post: envia para um message broker externo.

Commands representam uma ação que você deseja executar, como CreateOrder. Events representam algo que já aconteceu, como OrderCreated.

public class CreateOrder() : Command(Id.Random());
public class OrderCreated() : Event(Id.Random());
Enter fullscreen mode Exit fullscreen mode

Message Mapper

Ao postar ou consumir mensagens de um broker externo, você precisa de um mapeador de mensagens. Este componente mapeia seu objeto IRequest para (e de) uma mensagem do Brighter (que inclui o cabeçalho e o corpo para o broker).

O Brighter v10+ usa por padrão um mapeador JSON, mas você pode fornecer o seu próprio implementando IAmAMessageMapper ou IAmAMessageMapperAsync.

Request Handler

O RequestHandler (ou RequestHandlerAsync) contém sua lógica de negócio para um IRequest específico. O modelo de pipeline do Brighter permite encadear handlers e adicionar atributos para preocupações transversais (cross-cutting concerns), como retentativas, logs e o padrão Inbox.

public class GreetingHandler : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting @event)
    {
        Console.WriteLine("Olá, {0}", @event.Name);
        return base.Handle(@event);
    }
}
Enter fullscreen mode Exit fullscreen mode

Configurando o Brighter com Kafka

Agora, a configuração principal. Precisamos dizer ao Brighter como se conectar ao Kafka. Abaixo, mostramos as abordagens "Fluent" e "Pure".

Usando Fluent Brighter

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddFluentBrighter(opt => opt
        .UsingKafka(kf => kf
            .SetConnection(c => c
                .SetName("sample")
                .SetBootstrapServers("localhost:9092")
                .SetSecurityProtocol(SecurityProtocol.Plaintext)
                .SetSaslMechanisms(SaslMechanism.Plain))
            .UseSubscriptions(s => s
                .AddSubscription<OrderPlaced>(sb => sb
                    .SetTopic("order-placed-topic")
                    .SetConsumerGroupId("order-placed-topic-1")
                    .SetRequeueCount(3)
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode())
                .AddSubscription<OrderPaid>(sb => sb
                    .SetTopic("order-paid-topic")
                    .SetConsumerGroupId("order-paid-topic-1")
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode()))
            .UsePublications(p => p
                .AddPublication<OrderPaid>(kp => kp
                    .SetTopic("order-paid-topic")
                    .CreateTopicIfMissing())
                .AddPublication<OrderPlaced>(kp => kp
                    .SetTopic("order-placed-topic")
                    .CreateTopicIfMissing()))));
Enter fullscreen mode Exit fullscreen mode

Usando Pure Brighter

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample",
    BootStrapServers = ["localhost:9092"],
    SecurityProtocol = SecurityProtocol.Plaintext,
    SaslMechanisms = SaslMechanism.Plain,
};

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.Subscriptions =
        [
            new KafkaSubscription<OrderPlaced>(
                new SubscriptionName("subscription-orderplaced"),
                new ChannelName("order-placed-queue"),
                new RoutingKey("order-placed-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-1"),
            new KafkaSubscription<OrderPaid>(
                new SubscriptionName("subscription-orderpaid"),
                new ChannelName("order-paid-queue"),
                new RoutingKey("order-paid-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-2"),
        ];

        opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
    })
    .AutoFromAssemblies()
    .AddProducers(opt =>
    {
        opt.ProducerRegistry = new KafkaProducerRegistryFactory(
            connection,
            [
                new KafkaPublication<OrderPaid>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-paid-topic"),
                },
                new KafkaPublication<OrderPlaced>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-placed-topic"),
                }
            ]).Create();
    });
Enter fullscreen mode Exit fullscreen mode

Configuração do DynamoDB

Agora configuramos o Inbox, Outbox e o bloqueio distribuído usando o DynamoDB.

Usando Fluent Brighter

services
    .AddFluentBrighter(opt => opt
        .UsingAws(pg => pg
            .SetConnection(conn => conn
                .SetCredentials(new BasicAWSCredentials("test", "test"))
                .SetRegion(RegionEndpoint.USEast1)
                .SetClientConfigAction(cfg => cfg.ServiceURL = "http://localhost:4566"))  // LocalStack
            .UseDynamoDbOutbox("outbox")
            .UseDynamoDbInbox("inbox")
            .UseDynamoDbDistributedLock(c => c
                .SetLeaseholderGroupId("some-group")
                .SetTableName("locking")))
        // ... a configuração do Kafka continua aqui
    );
Enter fullscreen mode Exit fullscreen mode

Usando Pure Brighter

Para a abordagem pura, você precisa configurar o cliente DynamoDB e fornecer as instâncias dos provedores.

// Configurar cliente DynamoDB (exemplo usando LocalStack)
var dynamoDbConfig = new AmazonDynamoDBConfig
{
    RegionEndpoint = RegionEndpoint.USEast1,
    ServiceURL = "http://localhost:4566"
};
var dynamoDbClient = new AmazonDynamoDBClient(new BasicAWSCredentials("test", "test"), dynamoDbConfig);

// Criar os provedores de outbox, inbox e locking
var outbox = new DynamoDbOutbox(dynamoDbClient, new DynamoDbConfiguration("outbox"));

var inbox = new DynamoDbInbox(dynamoDbClient, new DynamoDbInboxConfiguration
{
    TableName = "inbox"
});

var lockingProvider = new DynamoDbLockingProvider(dynamoDbClient, new DynamoDbLockingProviderOptions("locking", "some-group"));

// Registrar na infraestrutura do Brighter
services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.InboxConfiguration = new InboxConfiguration(inbox);
        // ... outras configs de consumidor (subscriptions, etc.)
    })
    .AddProducers(opt =>
    {
        opt.Outbox = outbox;
        opt.DistributedLock = lockingProvider;
        opt.ConnectionProvider = typeof(DynamoDbUnitOfWork);  // Se estiver usando transações
        // ... config de registro de produtores
    });
Enter fullscreen mode Exit fullscreen mode

Nota: As tabelas do DynamoDB (outbox, inbox, locking) devem existir previamente. O Brighter fornece helpers para criá-las se necessário, mas em produção você deve gerenciar a criação das tabelas separadamente.


Esquemas de Tabela do DynamoDB

Abaixo estão os esquemas que o Brighter espera. Você pode criá-los manualmente via Console AWS, CloudFormation, Terraform, ou usar o DynamoDbTableFactory do Brighter.

Tabela Outbox

Configuração Valor
Nome padrão da tabela brighter_outbox
Modo de faturamento PAY_PER_REQUEST (recomendado) ou PROVISIONED

Chave Primária:

Chave Atributo Tipo
Chave de Partição (HASH) MessageId String (S)

Tabela Inbox

Configuração Valor
Nome padrão da tabela brighter_inbox
Modo de faturamento PAY_PER_REQUEST

Chave Primária (composta):

Chave Atributo Tipo
Chave de Partição (HASH) CommandId String (S)
Chave de Classificação (RANGE) ContextKey String (S)

A chave composta (CommandId + ContextKey) permite que o Inbox rastreie se um handler específico já processou uma mensagem específica, suportando o consumo idempotente.


Exemplo: Mensagens e Request Handlers

Request Handlers

O CreateNewOrderHandler recebe um comando CreateNewOrder (via Send) e deposita dois eventos no Outbox. O processo de varredura (sweeper) em segundo plano os publicará posteriormente no Kafka.

public class CreateNewOrderHandler : RequestHandler<CreateNewOrder>
{
    private readonly IAmACommandProcessor _commandProcessor;
    private readonly ILogger<CreateNewOrderHandler> _logger;

    public CreateNewOrderHandler(IAmACommandProcessor commandProcessor, ILogger<CreateNewOrderHandler> logger)
    {
        _commandProcessor = commandProcessor;
        _logger = logger;
    }

    public override CreateNewOrder Handle(CreateNewOrder command)
    {
        var id = Uuid.NewAsString();
        _logger.LogInformation("Criando novo pedido: {OrderId}", id);

        _commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
        _commandProcessor.DepositPost(new OrderPaid { OrderId = id });

        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

Como os padrões funcionam juntos

  • Padrão Inbox – Quando o OrderPlacedHandler falha, a mensagem não é marcada como processada no Inbox. Quando os offsets do Kafka são resetados, apenas mensagens genuinamente não processadas são reexecutadas, evitando duplicatas.
  • Padrão Outbox – O DepositPost armazena as mensagens no Outbox do DynamoDB antes de tentar publicar. Um processo "sweeper" publica-as no Kafka, garantindo que nenhuma mensagem seja perdida mesmo se o Kafka estiver temporariamente indisponível.
  • Bloqueio Distribuído – Em ambientes multi-nó, o sweeper usa o bloqueio distribuído para garantir que apenas uma instância publique as mensagens por vez, evitando publicações duplicadas.

Conclusão

Os padrões Inbox e Outbox são essenciais para construir sistemas distribuídos resilientes. O Paramore.Brighter, combinado com sua integração com DynamoDB e o gateway de mensagens Kafka, oferece uma implementação limpa e robusta para aplicações .NET.

Essa configuração garante que seus serviços lidem com falhas graciosamente, mantendo a consistência de dados em todo o seu sistema distribuído.

Top comments (0)