DEV Community

Cover image for Using DynamoDB with Brighter V10
Rafael Andrade
Rafael Andrade

Posted on

Using DynamoDB with Brighter V10

Brighter has native support for DynamoDB via dedicated inbox, outbox, and distributed locking providers. In this article, I'll explore how to implement the Inbox and Outbox patterns using DynamoDB to build resilient, eventually consistent .NET services.

Project Overview

We'll build a .NET 8+ service that consumes and produces messages from a Kafka topic, processes them using Brighter, and uses DynamoDB as the persistent store for the inbox and outbox. This guarantees that all messages are processed exactly once and that no messages are lost, even in the face of failures. We also need to configure a distributed lock to avoid publishing duplicate messages when the application runs in a multi‑node environment.

Required Packages

Choose between two setup styles:

Pure Brighter Packages (AWS SDK v4 recommended)

<PackageReference Include="Paramore.Brighter.Inbox.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Outbox.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Locking.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.MessagingGateway.Kafka" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Extensions.DependencyInjection" Version="10.3.0" />
Enter fullscreen mode Exit fullscreen mode

Fluent Brighter Wrappers

<PackageReference Include="Fluent.Brighter.Kafka" Version="1.8.1" />
<PackageReference Include="Fluent.Brighter.AWS.V4" Version="1.8.1" />
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" Version="10.3.0" />
Enter fullscreen mode Exit fullscreen mode

A Quick Brighter Recap

Before we dive into the configuration, let's quickly recap the core Brighter concepts.

Request: Command and Event

The first key concept is the IRequest interface, typically implemented as a Command or an Event. These objects are used by Brighter’s IAmACommandProcessor to:

  • Send: expects exactly one handler.
  • Publish: expects zero or more handlers.
  • Post: sends to an external message broker.

Commands represent an action you want to execute, like CreateOrder. Events represent something that has already happened, like OrderCreated.

public class CreateOrder() : Command(Id.Random());

public class OrderCreated() : Event(Id.Random());
Enter fullscreen mode Exit fullscreen mode

Message Mapper

When you post or consume messages from an external broker, you need a message mapper. This component maps your IRequest object to and from a Brighter Message (which includes the header and body for the broker).

Brighter v10+ defaults to a JSON message mapper, but you can provide your own by implementing IAmAMessageMapper or IAmAMessageMapperAsync.

Request Handler

The RequestHandler (or RequestHandlerAsync) contains your business logic for a specific IRequest. Brighter’s pipeline model allows you to chain handlers and add attributes for cross‑cutting concerns like retries, logging, and the inbox pattern.

public class GreetingHandler : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting @event)
    {
        Console.WriteLine("Hello, {0}", @event.Name);
        return base.Handle(@event);
    }
}
Enter fullscreen mode Exit fullscreen mode

Configuring Brighter with Kafka

Now for the main setup. We need to tell Brighter how to connect to Kafka.

Both the “pure” and “fluent” approaches are shown below

Using Fluent Brighter

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddFluentBrighter(opt => opt
        .UsingKafka(kf => kf
            .SetConnection(c => c
                .SetName("sample")
                .SetBootstrapServers("localhost:9092")
                .SetSecurityProtocol(SecurityProtocol.Plaintext)
                .SetSaslMechanisms(SaslMechanism.Plain))
            .UseSubscriptions(s => s
                .AddSubscription<OrderPlaced>(sb => sb
                    .SetTopic("order-placed-topic")
                    .SetConsumerGroupId("order-placed-topic-1")
                    .SetRequeueCount(3)
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode())
                .AddSubscription<OrderPaid>(sb => sb
                    .SetTopic("order-paid-topic")
                    .SetConsumerGroupId("order-paid-topic-1")
                    .CreateInfrastructureIfMissing()
                    .UseReactorMode()))
            .UsePublications(p => p
                .AddPublication<OrderPaid>(kp => kp
                    .SetTopic("order-paid-topic")
                    .CreateTopicIfMissing())
                .AddPublication<OrderPlaced>(kp => kp
                    .SetTopic("order-placed-topic")
                    .CreateTopicIfMissing()))));
Enter fullscreen mode Exit fullscreen mode

Using Pure Brighter

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample",
    BootStrapServers = ["localhost:9092"],
    SecurityProtocol = SecurityProtocol.Plaintext,
    SaslMechanisms = SaslMechanism.Plain,
};

services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.Subscriptions =
        [
            new KafkaSubscription<OrderPlaced>(
                new SubscriptionName("subscription-orderplaced"),
                new ChannelName("order-placed-queue"),
                new RoutingKey("order-placed-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-1"),
            new KafkaSubscription<OrderPaid>(
                new SubscriptionName("subscription-orderpaid"),
                new ChannelName("order-paid-queue"),
                new RoutingKey("order-paid-topic"),
                makeChannels: OnMissingChannel.Create,
                messagePumpType: MessagePumpType.Reactor,
                groupId: "test-2"),
        ];

        opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
    })
    .AutoFromAssemblies()
    .AddProducers(opt =>
    {
        opt.ProducerRegistry = new KafkaProducerRegistryFactory(
            connection,
            [
                new KafkaPublication<OrderPaid>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-paid-topic"),
                },
                new KafkaPublication<OrderPlaced>
                {
                    MakeChannels = OnMissingChannel.Create,
                    Topic = new RoutingKey("order-placed-topic"),
                }
            ]).Create();
    });
Enter fullscreen mode Exit fullscreen mode

DynamoDB Configuration

Now we configure the inbox, outbox, and distributed lock using DynamoDB.

Using Fluent Brighter

services
    .AddFluentBrighter(opt => opt
        .UsingAws(pg => pg
            .SetConnection(conn => conn
                .SetCredentials(new BasicAWSCredentials("test", "test"))
                .SetRegion(RegionEndpoint.USEast1)
                .SetClientConfigAction(cfg => cfg.ServiceURL = "http://localhost:4566"))  // LocalStack
            .UseDynamoDbOutbox("outbox")
            .UseDynamoDbInbox("inbox")
            .UseDynamoDbDistributedLock(c => c
                .SetLeaseholderGroupId("some-group")
                .SetTableName("locking")))
        // ... Kafka configuration continues here
    );
Enter fullscreen mode Exit fullscreen mode

Using Pure Brighter

For the pure approach, you need to set up the DynamoDB client and provide the outbox, inbox, and lock provider instances.

// Configure DynamoDB client (example using LocalStack)
var dynamoDbConfig = new AmazonDynamoDBConfig
{
    RegionEndpoint = RegionEndpoint.USEast1,
    ServiceURL = "http://localhost:4566"
};
var dynamoDbClient = new AmazonDynamoDBClient(new BasicAWSCredentials("test", "test"), dynamoDbConfig);

// Create the outbox, inbox, and locking providers
var outbox = new DynamoDbOutbox(dynamoDbClient, new DynamoDbConfiguration("outbox"));

var inbox = new DynamoDbInbox(dynamoDbClient, new DynamoDbInboxConfiguration
{
    TableName = "inbox"
});

var lockingProvider = new DynamoDbLockingProvider(dynamoDbClient, new DynamoDbLockingProviderOptions("locking", "some-group"));

// Register them with the Brighter infrastructure
services
    .AddHostedService<ServiceActivatorHostedService>()
    .AddConsumers(opt =>
    {
        opt.InboxConfiguration = new InboxConfiguration(inbox);
        // ... other consumer config (subscriptions, etc.)
    })
    .AddProducers(opt =>
    {
        opt.Outbox = outbox;
        opt.DistributedLock = lockingProvider;
        opt.ConnectionProvider = typeof(DynamoDbUnitOfWork);  // if using transactions
        // ... producer registry config
    });
Enter fullscreen mode Exit fullscreen mode

Note: The DynamoDB tables (outbox, inbox, locking) must exist beforehand. Brighter provides helpers to create them if needed, but for production you should manage table creation separately.

DynamoDB Table Schemas

Below are the DynamoDB table schemas Brighter expects. You can create them manually via the AWS Console, CloudFormation, Terraform, or use Brighter's built-in DynamoDbTableFactory to generate the CreateTableRequest from the entity types.

Outbox Table

Setting Value
Default table name brighter_outbox
Billing mode PAY_PER_REQUEST (recommended) or PROVISIONED

Primary Key:

Key Attribute Type
Partition key (HASH) MessageId String (S)

Attributes:

Attribute Type Description
MessageId S Unique message identifier
Body B Message body (binary)
ContentType S MIME content type
CorrelationId S Correlation identifier
CreatedTime N Creation time in ticks
OutstandingCreatedTime N Creation time in ticks; set to null once dispatched
DeliveryTime N Delivery time in ticks; null until dispatched
DeliveredAt S Delivery date (yyyy-MM-dd)
ExpiresAt N TTL in ticks (optional)
TopicShard S {Topic}_{ShardNumber}
Topic S Routing key / topic name
PartitionKey S Broker partition key
ReplyTo S Reply-to channel
MessageType S MT_COMMAND, MT_EVENT, etc.
HeaderBag S JSON dictionary of extra headers
CharacterEncoding S Body encoding (e.g. UTF8)
CreatedAt S ISO 8601 timestamp
HandledCount N Retry counter
DelayedMilliseconds N Delayed delivery offset
Type S CloudEvents type
Source S CloudEvents source URI
Subject S CloudEvents subject
SpecVersion S CloudEvents spec version
DataSchema S CloudEvents data schema URI
DataRef S Claim-check data reference
TraceParent S W3C Trace Parent
TraceState S W3C Trace State
Baggage S W3C Baggage
JobId S Workflow instance identifier
WorkflowId S Workflow identity

Global Secondary Indexes (GSIs):

Index Name Partition Key (HASH) Sort Key (RANGE) Purpose
Outstanding TopicShard (S) OutstandingCreatedTime (N) Query outstanding messages per topic shard
OutstandingAllTopics OutstandingCreatedTime (N) MessageId (S) Scan all outstanding messages across topics
Delivered TopicShard (S) DeliveryTime (N) Query delivered messages per topic shard
DeliveredAllTopics DeliveryTime (N) MessageId (S) Scan all delivered messages across topics

The outbox uses sharding (TopicShard = "{Topic}_{ShardNumber}") to avoid hot partitions on active topics. The default number of shards is 3 and can be configured via DynamoDbConfiguration.NumberOfShards.

Inbox Table

Setting Value
Default table name brighter_inbox
Billing mode PAY_PER_REQUEST (recommended) or PROVISIONED

Primary Key (composite):

Key Attribute Type
Partition key (HASH) CommandId String (S)
Sort key (RANGE) ContextKey String (S)

Attributes:

Attribute Type Description
CommandId S The request/command identifier
ContextKey S Handler context key (enables the same message ID to be handled by different handlers)
CommandType S The .NET type name of the command
CommandBody S Serialized command as JSON
TimeStamp S When the command was stored
Time S Timestamp in ticks (as string)

The composite key (CommandId + ContextKey) allows the inbox to track whether a specific handler has already processed a specific message, supporting idempotent consumption.

Lock Table

Setting Value
Default table name brighter_distributed_lock
Billing mode PAY_PER_REQUEST (recommended) or PROVISIONED

Primary Key:

Key Attribute Type
Partition key (HASH) ResourceId String (S)

Attributes:

Attribute Type Description
ResourceId S {LeaseholderGroupId}_{Resource} — identifies the locked resource scoped to a group
LeaseExpiry N Unix timestamp in milliseconds when the lease expires
LockId S Unique identifier of the lock holder

Locking uses a conditional PutItem with attribute_not_exists(LockId) OR LeaseExpiry <= :now to atomically acquire the lock. Expired leases are automatically superseded. The default lease validity is 1 minute, configurable via DynamoDbLockingProviderOptions.LeaseValidity.

Creating the Tables with AWS CLI

For quick local development (e.g. with LocalStack), you can create the tables using the AWS CLI:

# Outbox table
aws dynamodb create-table \
  --table-name outbox \
  --attribute-definitions \
    AttributeDefinition={AttributeName=MessageId,AttributeType=S} \
    AttributeDefinition={AttributeName=TopicShard,AttributeType=S} \
    AttributeDefinition={AttributeName=OutstandingCreatedTime,AttributeType=N} \
    AttributeDefinition={AttributeName=DeliveryTime,AttributeType=N} \
  --key-schema AttributeDefinition={AttributeName=MessageId,KeyType=HASH} \
  --global-secondary-indexes \
    '[{"IndexName":"Outstanding","KeySchema":[{"AttributeName":"TopicShard","KeyType":"HASH"},{"AttributeName":"OutstandingCreatedTime","KeyType":"RANGE"}],"Projection":{"ProjectionType":"ALL"}},
      {"IndexName":"OutstandingAllTopics","KeySchema":[{"AttributeName":"OutstandingCreatedTime","KeyType":"HASH"},{"AttributeName":"MessageId","KeyType":"RANGE"}],"Projection":{"ProjectionType":"ALL"}},
      {"IndexName":"Delivered","KeySchema":[{"AttributeName":"TopicShard","KeyType":"HASH"},{"AttributeName":"DeliveryTime","KeyType":"RANGE"}],"Projection":{"ProjectionType":"ALL"}},
      {"IndexName":"DeliveredAllTopics","KeySchema":[{"AttributeName":"DeliveryTime","KeyType":"HASH"},{"AttributeName":"MessageId","KeyType":"RANGE"}],"Projection":{"ProjectionType":"ALL"}}]' \
  --billing-mode PAY_PER_REQUEST \
  --endpoint-url http://localhost:4566

# Inbox table
aws dynamodb create-table \
  --table-name inbox \
  --attribute-definitions \
    AttributeDefinition={AttributeName=CommandId,AttributeType=S} \
    AttributeDefinition={AttributeName=ContextKey,AttributeType=S} \
  --key-schema \
    AttributeDefinition={AttributeName=CommandId,KeyType=HASH} \
    AttributeDefinition={AttributeName=ContextKey,KeyType=RANGE} \
  --billing-mode PAY_PER_REQUEST \
  --endpoint-url http://localhost:4566

# Lock table
aws dynamodb create-table \
  --table-name locking \
  --attribute-definitions \
    AttributeDefinition={AttributeName=ResourceId,AttributeType=S} \
  --key-schema AttributeDefinition={AttributeName=ResourceId,KeyType=HASH} \
  --billing-mode PAY_PER_REQUEST \
  --endpoint-url http://localhost:4566
Enter fullscreen mode Exit fullscreen mode

Example: Message and Request Handlers

Let's define the messages and handlers for our order service.

Messages

public class CreateNewOrder() : Command(Id.Random())
{
    public decimal Value { get; set; }
}

public class OrderPlaced() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;
    public decimal Value { get; set; }
}

public class OrderPaid() : Event(Id.Random())
{
    public string OrderId { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Request Handlers

CreateNewOrderHandler receives a CreateNewOrder command (via Send) and deposits two events into the outbox. The background sweeper will later publish them to Kafka.

public class CreateNewOrderHandler : RequestHandler<CreateNewOrder>
{
    private readonly IAmACommandProcessor _commandProcessor;
    private readonly ILogger<CreateNewOrderHandler> _logger;

    public CreateNewOrderHandler(IAmACommandProcessor commandProcessor, ILogger<CreateNewOrderHandler> logger)
    {
        _commandProcessor = commandProcessor;
        _logger = logger;
    }

    public override CreateNewOrder Handle(CreateNewOrder command)
    {
        var id = Uuid.NewAsString();
        _logger.LogInformation("Creating new order: {OrderId}", id);

        _commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
        _commandProcessor.DepositPost(new OrderPaid { OrderId = id });

        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPaidHandler consumes the OrderPaid event from Kafka and logs it.

public class OrderPaidHandler : RequestHandler<OrderPaid>
{
    private readonly ILogger<OrderPaidHandler> _logger;

    public OrderPaidHandler(ILogger<OrderPaidHandler> logger) => _logger = logger;

    public override OrderPaid Handle(OrderPaid command)
    {
        _logger.LogInformation("{OrderId} paid", command.OrderId);
        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

OrderPlacedHandler simulates a failure for orders with a value divisible by 3. The [UseResiliencePipeline] attribute adds retry logic (e.g., 3 retries using a policy named "kafka-policy").

public class OrderPlacedHandler : RequestHandler<OrderPlaced>
{
    private readonly ILogger<OrderPlacedHandler> _logger;

    public OrderPlacedHandler(ILogger<OrderPlacedHandler> logger) => _logger = logger;

    [UseResiliencePipeline("kafka-policy", 1)]
    public override OrderPlaced Handle(OrderPlaced command)
    {
        _logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
        if (command.Value % 3 == 0)
        {
            _logger.LogError("Simulating error for {OrderId} with value {OrderValue}", command.OrderId, command.Value);
            throw new InvalidOperationException("Simulated error");
        }
        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

How the Patterns Work Together

  • Inbox Pattern – When OrderPlacedHandler fails, the message is not marked as processed in the inbox. When Kafka offsets are reset, only genuinely unprocessed messages are reprocessed, preventing duplicates.
  • Outbox Pattern – DepositPost stores messages in the DynamoDB outbox before attempting to publish. A background sweeper process then publishes them to Kafka, ensuring no messages are lost even if Kafka is temporarily unavailable.
  • Distributed Lock – In a multi‑node environment, the sweeper uses the distributed lock to ensure that only one instance publishes messages at a time, avoiding duplicate publications.

Conclusion

The inbox and outbox patterns are essential for building resilient distributed systems. Paramore.Brighter, combined with its DynamoDB integration and Kafka messaging gateway, provides a clean, robust implementation for .NET applications.

This setup guarantees that your services can handle failures gracefully while maintaining data consistency across your distributed system.

Top comments (0)