O Brighter possui suporte nativo para o DynamoDB por meio de provedores dedicados de Inbox, Outbox e bloqueio distribuído (distributed locking). Neste artigo, explorarei como implementar os padrões Inbox e Outbox usando o DynamoDB para criar serviços .NET resilientes e com consistência eventual.
Visão Geral do Projeto
Construiremos um serviço em .NET 8+ que consome e produz mensagens de um tópico Kafka, processa-as usando o Brighter e utiliza o DynamoDB como armazenamento persistente para o Inbox e o Outbox. Isso garante que todas as mensagens sejam processadas exatamente uma vez e que nenhuma mensagem seja perdida, mesmo diante de falhas. Também configuraremos um bloqueio distribuído para evitar a publicação de mensagens duplicadas quando a aplicação for executada em um ambiente multi-nó.
Pacotes Necessários
Escolha entre dois estilos de configuração:
Pacotes "Pure Brighter" (AWS SDK v4 recomendado)
<PackageReference Include="Paramore.Brighter.Inbox.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Outbox.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Locking.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.MessagingGateway.Kafka" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Extensions.DependencyInjection" Version="10.3.0" />
Wrappers "Fluent Brighter"
<PackageReference Include="Fluent.Brighter.Kafka" Version="1.8.1" />
<PackageReference Include="Fluent.Brighter.AWS.V4" Version="1.8.1" />
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" Version="10.3.0" />
Uma Breve Recapitulação do Brighter
Antes de mergulharmos na configuração, vamos revisar rapidamente os conceitos fundamentais do Brighter.
Request: Command e Event
O primeiro conceito-chave é a interface IRequest, normalmente implementada como um Command ou um Event. Esses objetos são usados pelo IAmACommandProcessor do Brighter para:
- Send: espera exatamente um handler.
- Publish: espera zero ou mais handlers.
- Post: envia para um message broker externo.
Commands representam uma ação que você deseja executar, como CreateOrder. Events representam algo que já aconteceu, como OrderCreated.
public class CreateOrder() : Command(Id.Random());
public class OrderCreated() : Event(Id.Random());
Message Mapper
Ao postar ou consumir mensagens de um broker externo, você precisa de um mapeador de mensagens. Este componente mapeia seu objeto IRequest para (e de) uma mensagem do Brighter (que inclui o cabeçalho e o corpo para o broker).
O Brighter v10+ usa por padrão um mapeador JSON, mas você pode fornecer o seu próprio implementando IAmAMessageMapper ou IAmAMessageMapperAsync.
Request Handler
O RequestHandler (ou RequestHandlerAsync) contém sua lógica de negócio para um IRequest específico. O modelo de pipeline do Brighter permite encadear handlers e adicionar atributos para preocupações transversais (cross-cutting concerns), como retentativas, logs e o padrão Inbox.
public class GreetingHandler : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting @event)
{
Console.WriteLine("Olá, {0}", @event.Name);
return base.Handle(@event);
}
}
Configurando o Brighter com Kafka
Agora, a configuração principal. Precisamos dizer ao Brighter como se conectar ao Kafka. Abaixo, mostramos as abordagens "Fluent" e "Pure".
Usando Fluent Brighter
services
.AddHostedService<ServiceActivatorHostedService>()
.AddFluentBrighter(opt => opt
.UsingKafka(kf => kf
.SetConnection(c => c
.SetName("sample")
.SetBootstrapServers("localhost:9092")
.SetSecurityProtocol(SecurityProtocol.Plaintext)
.SetSaslMechanisms(SaslMechanism.Plain))
.UseSubscriptions(s => s
.AddSubscription<OrderPlaced>(sb => sb
.SetTopic("order-placed-topic")
.SetConsumerGroupId("order-placed-topic-1")
.SetRequeueCount(3)
.CreateInfrastructureIfMissing()
.UseReactorMode())
.AddSubscription<OrderPaid>(sb => sb
.SetTopic("order-paid-topic")
.SetConsumerGroupId("order-paid-topic-1")
.CreateInfrastructureIfMissing()
.UseReactorMode()))
.UsePublications(p => p
.AddPublication<OrderPaid>(kp => kp
.SetTopic("order-paid-topic")
.CreateTopicIfMissing())
.AddPublication<OrderPlaced>(kp => kp
.SetTopic("order-placed-topic")
.CreateTopicIfMissing()))));
Usando Pure Brighter
var connection = new KafkaMessagingGatewayConfiguration
{
Name = "sample",
BootStrapServers = ["localhost:9092"],
SecurityProtocol = SecurityProtocol.Plaintext,
SaslMechanisms = SaslMechanism.Plain,
};
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.Subscriptions =
[
new KafkaSubscription<OrderPlaced>(
new SubscriptionName("subscription-orderplaced"),
new ChannelName("order-placed-queue"),
new RoutingKey("order-placed-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-1"),
new KafkaSubscription<OrderPaid>(
new SubscriptionName("subscription-orderpaid"),
new ChannelName("order-paid-queue"),
new RoutingKey("order-paid-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-2"),
];
opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
.AutoFromAssemblies()
.AddProducers(opt =>
{
opt.ProducerRegistry = new KafkaProducerRegistryFactory(
connection,
[
new KafkaPublication<OrderPaid>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-paid-topic"),
},
new KafkaPublication<OrderPlaced>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-placed-topic"),
}
]).Create();
});
Configuração do DynamoDB
Agora configuramos o Inbox, Outbox e o bloqueio distribuído usando o DynamoDB.
Usando Fluent Brighter
services
.AddFluentBrighter(opt => opt
.UsingAws(pg => pg
.SetConnection(conn => conn
.SetCredentials(new BasicAWSCredentials("test", "test"))
.SetRegion(RegionEndpoint.USEast1)
.SetClientConfigAction(cfg => cfg.ServiceURL = "http://localhost:4566")) // LocalStack
.UseDynamoDbOutbox("outbox")
.UseDynamoDbInbox("inbox")
.UseDynamoDbDistributedLock(c => c
.SetLeaseholderGroupId("some-group")
.SetTableName("locking")))
// ... a configuração do Kafka continua aqui
);
Usando Pure Brighter
Para a abordagem pura, você precisa configurar o cliente DynamoDB e fornecer as instâncias dos provedores.
// Configurar cliente DynamoDB (exemplo usando LocalStack)
var dynamoDbConfig = new AmazonDynamoDBConfig
{
RegionEndpoint = RegionEndpoint.USEast1,
ServiceURL = "http://localhost:4566"
};
var dynamoDbClient = new AmazonDynamoDBClient(new BasicAWSCredentials("test", "test"), dynamoDbConfig);
// Criar os provedores de outbox, inbox e locking
var outbox = new DynamoDbOutbox(dynamoDbClient, new DynamoDbConfiguration("outbox"));
var inbox = new DynamoDbInbox(dynamoDbClient, new DynamoDbInboxConfiguration
{
TableName = "inbox"
});
var lockingProvider = new DynamoDbLockingProvider(dynamoDbClient, new DynamoDbLockingProviderOptions("locking", "some-group"));
// Registrar na infraestrutura do Brighter
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.InboxConfiguration = new InboxConfiguration(inbox);
// ... outras configs de consumidor (subscriptions, etc.)
})
.AddProducers(opt =>
{
opt.Outbox = outbox;
opt.DistributedLock = lockingProvider;
opt.ConnectionProvider = typeof(DynamoDbUnitOfWork); // Se estiver usando transações
// ... config de registro de produtores
});
Nota: As tabelas do DynamoDB (outbox, inbox, locking) devem existir previamente. O Brighter fornece helpers para criá-las se necessário, mas em produção você deve gerenciar a criação das tabelas separadamente.
Esquemas de Tabela do DynamoDB
Abaixo estão os esquemas que o Brighter espera. Você pode criá-los manualmente via Console AWS, CloudFormation, Terraform, ou usar o DynamoDbTableFactory do Brighter.
Tabela Outbox
| Configuração | Valor |
|---|---|
| Nome padrão da tabela | brighter_outbox |
| Modo de faturamento | PAY_PER_REQUEST (recomendado) ou PROVISIONED |
Chave Primária:
| Chave | Atributo | Tipo |
|---|---|---|
| Chave de Partição (HASH) | MessageId |
String (S) |
Tabela Inbox
| Configuração | Valor |
|---|---|
| Nome padrão da tabela | brighter_inbox |
| Modo de faturamento | PAY_PER_REQUEST |
Chave Primária (composta):
| Chave | Atributo | Tipo |
|---|---|---|
| Chave de Partição (HASH) | CommandId |
String (S) |
| Chave de Classificação (RANGE) | ContextKey |
String (S) |
A chave composta (
CommandId+ContextKey) permite que o Inbox rastreie se um handler específico já processou uma mensagem específica, suportando o consumo idempotente.
Exemplo: Mensagens e Request Handlers
Request Handlers
O CreateNewOrderHandler recebe um comando CreateNewOrder (via Send) e deposita dois eventos no Outbox. O processo de varredura (sweeper) em segundo plano os publicará posteriormente no Kafka.
public class CreateNewOrderHandler : RequestHandler<CreateNewOrder>
{
private readonly IAmACommandProcessor _commandProcessor;
private readonly ILogger<CreateNewOrderHandler> _logger;
public CreateNewOrderHandler(IAmACommandProcessor commandProcessor, ILogger<CreateNewOrderHandler> logger)
{
_commandProcessor = commandProcessor;
_logger = logger;
}
public override CreateNewOrder Handle(CreateNewOrder command)
{
var id = Uuid.NewAsString();
_logger.LogInformation("Criando novo pedido: {OrderId}", id);
_commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
_commandProcessor.DepositPost(new OrderPaid { OrderId = id });
return base.Handle(command);
}
}
Como os padrões funcionam juntos
-
Padrão Inbox – Quando o
OrderPlacedHandlerfalha, a mensagem não é marcada como processada no Inbox. Quando os offsets do Kafka são resetados, apenas mensagens genuinamente não processadas são reexecutadas, evitando duplicatas. -
Padrão Outbox – O
DepositPostarmazena as mensagens no Outbox do DynamoDB antes de tentar publicar. Um processo "sweeper" publica-as no Kafka, garantindo que nenhuma mensagem seja perdida mesmo se o Kafka estiver temporariamente indisponível. - Bloqueio Distribuído – Em ambientes multi-nó, o sweeper usa o bloqueio distribuído para garantir que apenas uma instância publique as mensagens por vez, evitando publicações duplicadas.
Conclusão
Os padrões Inbox e Outbox são essenciais para construir sistemas distribuídos resilientes. O Paramore.Brighter, combinado com sua integração com DynamoDB e o gateway de mensagens Kafka, oferece uma implementação limpa e robusta para aplicações .NET.
Essa configuração garante que seus serviços lidem com falhas graciosamente, mantendo a consistência de dados em todo o seu sistema distribuído.
Top comments (0)