Brighter has native support for DynamoDB via dedicated inbox, outbox, and distributed locking providers. In this article, I'll explore how to implement the Inbox and Outbox patterns using DynamoDB to build resilient, eventually consistent .NET services.
Project Overview
We'll build a .NET 8+ service that consumes and produces messages from a Kafka topic, processes them using Brighter, and uses DynamoDB as the persistent store for the inbox and outbox. This guarantees that all messages are processed exactly once and that no messages are lost, even in the face of failures. We also need to configure a distributed lock to avoid publishing duplicate messages when the application runs in a multi‑node environment.
Required Packages
Choose between two setup styles:
Pure Brighter Packages (AWS SDK v4 recommended)
<PackageReference Include="Paramore.Brighter.Inbox.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Outbox.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Locking.DynamoDB.V4" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.MessagingGateway.Kafka" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" Version="10.3.0" />
<PackageReference Include="Paramore.Brighter.Extensions.DependencyInjection" Version="10.3.0" />
Fluent Brighter Wrappers
<PackageReference Include="Fluent.Brighter.Kafka" Version="1.8.1" />
<PackageReference Include="Fluent.Brighter.AWS.V4" Version="1.8.1" />
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" Version="10.3.0" />
A Quick Brighter Recap
Before we dive into the configuration, let's quickly recap the core Brighter concepts.
Request: Command and Event
The first key concept is the IRequest interface, typically implemented as a Command or an Event. These objects are used by Brighter’s IAmACommandProcessor to:
-
Send: expects exactly one handler. -
Publish: expects zero or more handlers. -
Post: sends to an external message broker.
Commands represent an action you want to execute, like CreateOrder. Events represent something that has already happened, like OrderCreated.
public class CreateOrder() : Command(Id.Random());
public class OrderCreated() : Event(Id.Random());
Message Mapper
When you post or consume messages from an external broker, you need a message mapper. This component maps your IRequest object to and from a Brighter Message (which includes the header and body for the broker).
Brighter v10+ defaults to a JSON message mapper, but you can provide your own by implementing IAmAMessageMapper or IAmAMessageMapperAsync.
Request Handler
The RequestHandler (or RequestHandlerAsync) contains your business logic for a specific IRequest. Brighter’s pipeline model allows you to chain handlers and add attributes for cross‑cutting concerns like retries, logging, and the inbox pattern.
public class GreetingHandler : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting @event)
{
Console.WriteLine("Hello, {0}", @event.Name);
return base.Handle(@event);
}
}
Configuring Brighter with Kafka
Now for the main setup. We need to tell Brighter how to connect to Kafka.
Both the “pure” and “fluent” approaches are shown below
Using Fluent Brighter
services
.AddHostedService<ServiceActivatorHostedService>()
.AddFluentBrighter(opt => opt
.UsingKafka(kf => kf
.SetConnection(c => c
.SetName("sample")
.SetBootstrapServers("localhost:9092")
.SetSecurityProtocol(SecurityProtocol.Plaintext)
.SetSaslMechanisms(SaslMechanism.Plain))
.UseSubscriptions(s => s
.AddSubscription<OrderPlaced>(sb => sb
.SetTopic("order-placed-topic")
.SetConsumerGroupId("order-placed-topic-1")
.SetRequeueCount(3)
.CreateInfrastructureIfMissing()
.UseReactorMode())
.AddSubscription<OrderPaid>(sb => sb
.SetTopic("order-paid-topic")
.SetConsumerGroupId("order-paid-topic-1")
.CreateInfrastructureIfMissing()
.UseReactorMode()))
.UsePublications(p => p
.AddPublication<OrderPaid>(kp => kp
.SetTopic("order-paid-topic")
.CreateTopicIfMissing())
.AddPublication<OrderPlaced>(kp => kp
.SetTopic("order-placed-topic")
.CreateTopicIfMissing()))));
Using Pure Brighter
var connection = new KafkaMessagingGatewayConfiguration
{
Name = "sample",
BootStrapServers = ["localhost:9092"],
SecurityProtocol = SecurityProtocol.Plaintext,
SaslMechanisms = SaslMechanism.Plain,
};
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.Subscriptions =
[
new KafkaSubscription<OrderPlaced>(
new SubscriptionName("subscription-orderplaced"),
new ChannelName("order-placed-queue"),
new RoutingKey("order-placed-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-1"),
new KafkaSubscription<OrderPaid>(
new SubscriptionName("subscription-orderpaid"),
new ChannelName("order-paid-queue"),
new RoutingKey("order-paid-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-2"),
];
opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
.AutoFromAssemblies()
.AddProducers(opt =>
{
opt.ProducerRegistry = new KafkaProducerRegistryFactory(
connection,
[
new KafkaPublication<OrderPaid>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-paid-topic"),
},
new KafkaPublication<OrderPlaced>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-placed-topic"),
}
]).Create();
});
DynamoDB Configuration
Now we configure the inbox, outbox, and distributed lock using DynamoDB.
Using Fluent Brighter
services
.AddFluentBrighter(opt => opt
.UsingAws(pg => pg
.SetConnection(conn => conn
.SetCredentials(new BasicAWSCredentials("test", "test"))
.SetRegion(RegionEndpoint.USEast1)
.SetClientConfigAction(cfg => cfg.ServiceURL = "http://localhost:4566")) // LocalStack
.UseDynamoDbOutbox("outbox")
.UseDynamoDbInbox("inbox")
.UseDynamoDbDistributedLock(c => c
.SetLeaseholderGroupId("some-group")
.SetTableName("locking")))
// ... Kafka configuration continues here
);
Using Pure Brighter
For the pure approach, you need to set up the DynamoDB client and provide the outbox, inbox, and lock provider instances.
// Configure DynamoDB client (example using LocalStack)
var dynamoDbConfig = new AmazonDynamoDBConfig
{
RegionEndpoint = RegionEndpoint.USEast1,
ServiceURL = "http://localhost:4566"
};
var dynamoDbClient = new AmazonDynamoDBClient(new BasicAWSCredentials("test", "test"), dynamoDbConfig);
// Create the outbox, inbox, and locking providers
var outbox = new DynamoDbOutbox(dynamoDbClient, new DynamoDbConfiguration("outbox"));
var inbox = new DynamoDbInbox(dynamoDbClient, new DynamoDbInboxConfiguration
{
TableName = "inbox"
});
var lockingProvider = new DynamoDbLockingProvider(dynamoDbClient, new DynamoDbLockingProviderOptions("locking", "some-group"));
// Register them with the Brighter infrastructure
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.InboxConfiguration = new InboxConfiguration(inbox);
// ... other consumer config (subscriptions, etc.)
})
.AddProducers(opt =>
{
opt.Outbox = outbox;
opt.DistributedLock = lockingProvider;
opt.ConnectionProvider = typeof(DynamoDbUnitOfWork); // if using transactions
// ... producer registry config
});
Note: The DynamoDB tables (outbox, inbox, locking) must exist beforehand. Brighter provides helpers to create them if needed, but for production you should manage table creation separately.
DynamoDB Table Schemas
Below are the DynamoDB table schemas Brighter expects. You can create them manually via the AWS Console, CloudFormation, Terraform, or use Brighter's built-in DynamoDbTableFactory to generate the CreateTableRequest from the entity types.
Outbox Table
| Setting | Value |
|---|---|
| Default table name | brighter_outbox |
| Billing mode | PAY_PER_REQUEST (recommended) or PROVISIONED |
Primary Key:
| Key | Attribute | Type |
|---|---|---|
| Partition key (HASH) | MessageId |
String (S) |
Attributes:
| Attribute | Type | Description |
|---|---|---|
MessageId |
S | Unique message identifier |
Body |
B | Message body (binary) |
ContentType |
S | MIME content type |
CorrelationId |
S | Correlation identifier |
CreatedTime |
N | Creation time in ticks |
OutstandingCreatedTime |
N | Creation time in ticks; set to null once dispatched |
DeliveryTime |
N | Delivery time in ticks; null until dispatched |
DeliveredAt |
S | Delivery date (yyyy-MM-dd) |
ExpiresAt |
N | TTL in ticks (optional) |
TopicShard |
S | {Topic}_{ShardNumber} |
Topic |
S | Routing key / topic name |
PartitionKey |
S | Broker partition key |
ReplyTo |
S | Reply-to channel |
MessageType |
S |
MT_COMMAND, MT_EVENT, etc. |
HeaderBag |
S | JSON dictionary of extra headers |
CharacterEncoding |
S | Body encoding (e.g. UTF8) |
CreatedAt |
S | ISO 8601 timestamp |
HandledCount |
N | Retry counter |
DelayedMilliseconds |
N | Delayed delivery offset |
Type |
S | CloudEvents type |
Source |
S | CloudEvents source URI |
Subject |
S | CloudEvents subject |
SpecVersion |
S | CloudEvents spec version |
DataSchema |
S | CloudEvents data schema URI |
DataRef |
S | Claim-check data reference |
TraceParent |
S | W3C Trace Parent |
TraceState |
S | W3C Trace State |
Baggage |
S | W3C Baggage |
JobId |
S | Workflow instance identifier |
WorkflowId |
S | Workflow identity |
Global Secondary Indexes (GSIs):
| Index Name | Partition Key (HASH) | Sort Key (RANGE) | Purpose |
|---|---|---|---|
Outstanding |
TopicShard (S) |
OutstandingCreatedTime (N) |
Query outstanding messages per topic shard |
OutstandingAllTopics |
OutstandingCreatedTime (N) |
MessageId (S) |
Scan all outstanding messages across topics |
Delivered |
TopicShard (S) |
DeliveryTime (N) |
Query delivered messages per topic shard |
DeliveredAllTopics |
DeliveryTime (N) |
MessageId (S) |
Scan all delivered messages across topics |
The outbox uses sharding (
TopicShard = "{Topic}_{ShardNumber}") to avoid hot partitions on active topics. The default number of shards is 3 and can be configured viaDynamoDbConfiguration.NumberOfShards.
Inbox Table
| Setting | Value |
|---|---|
| Default table name | brighter_inbox |
| Billing mode | PAY_PER_REQUEST (recommended) or PROVISIONED |
Primary Key (composite):
| Key | Attribute | Type |
|---|---|---|
| Partition key (HASH) | CommandId |
String (S) |
| Sort key (RANGE) | ContextKey |
String (S) |
Attributes:
| Attribute | Type | Description |
|---|---|---|
CommandId |
S | The request/command identifier |
ContextKey |
S | Handler context key (enables the same message ID to be handled by different handlers) |
CommandType |
S | The .NET type name of the command |
CommandBody |
S | Serialized command as JSON |
TimeStamp |
S | When the command was stored |
Time |
S | Timestamp in ticks (as string) |
The composite key (
CommandId+ContextKey) allows the inbox to track whether a specific handler has already processed a specific message, supporting idempotent consumption.
Lock Table
| Setting | Value |
|---|---|
| Default table name | brighter_distributed_lock |
| Billing mode | PAY_PER_REQUEST (recommended) or PROVISIONED |
Primary Key:
| Key | Attribute | Type |
|---|---|---|
| Partition key (HASH) | ResourceId |
String (S) |
Attributes:
| Attribute | Type | Description |
|---|---|---|
ResourceId |
S |
{LeaseholderGroupId}_{Resource} — identifies the locked resource scoped to a group |
LeaseExpiry |
N | Unix timestamp in milliseconds when the lease expires |
LockId |
S | Unique identifier of the lock holder |
Locking uses a conditional
PutItemwithattribute_not_exists(LockId) OR LeaseExpiry <= :nowto atomically acquire the lock. Expired leases are automatically superseded. The default lease validity is 1 minute, configurable viaDynamoDbLockingProviderOptions.LeaseValidity.
Creating the Tables with AWS CLI
For quick local development (e.g. with LocalStack), you can create the tables using the AWS CLI:
# Outbox table
aws dynamodb create-table \
--table-name outbox \
--attribute-definitions \
AttributeDefinition={AttributeName=MessageId,AttributeType=S} \
AttributeDefinition={AttributeName=TopicShard,AttributeType=S} \
AttributeDefinition={AttributeName=OutstandingCreatedTime,AttributeType=N} \
AttributeDefinition={AttributeName=DeliveryTime,AttributeType=N} \
--key-schema AttributeDefinition={AttributeName=MessageId,KeyType=HASH} \
--global-secondary-indexes \
'[{"IndexName":"Outstanding","KeySchema":[{"AttributeName":"TopicShard","KeyType":"HASH"},{"AttributeName":"OutstandingCreatedTime","KeyType":"RANGE"}],"Projection":{"ProjectionType":"ALL"}},
{"IndexName":"OutstandingAllTopics","KeySchema":[{"AttributeName":"OutstandingCreatedTime","KeyType":"HASH"},{"AttributeName":"MessageId","KeyType":"RANGE"}],"Projection":{"ProjectionType":"ALL"}},
{"IndexName":"Delivered","KeySchema":[{"AttributeName":"TopicShard","KeyType":"HASH"},{"AttributeName":"DeliveryTime","KeyType":"RANGE"}],"Projection":{"ProjectionType":"ALL"}},
{"IndexName":"DeliveredAllTopics","KeySchema":[{"AttributeName":"DeliveryTime","KeyType":"HASH"},{"AttributeName":"MessageId","KeyType":"RANGE"}],"Projection":{"ProjectionType":"ALL"}}]' \
--billing-mode PAY_PER_REQUEST \
--endpoint-url http://localhost:4566
# Inbox table
aws dynamodb create-table \
--table-name inbox \
--attribute-definitions \
AttributeDefinition={AttributeName=CommandId,AttributeType=S} \
AttributeDefinition={AttributeName=ContextKey,AttributeType=S} \
--key-schema \
AttributeDefinition={AttributeName=CommandId,KeyType=HASH} \
AttributeDefinition={AttributeName=ContextKey,KeyType=RANGE} \
--billing-mode PAY_PER_REQUEST \
--endpoint-url http://localhost:4566
# Lock table
aws dynamodb create-table \
--table-name locking \
--attribute-definitions \
AttributeDefinition={AttributeName=ResourceId,AttributeType=S} \
--key-schema AttributeDefinition={AttributeName=ResourceId,KeyType=HASH} \
--billing-mode PAY_PER_REQUEST \
--endpoint-url http://localhost:4566
Example: Message and Request Handlers
Let's define the messages and handlers for our order service.
Messages
public class CreateNewOrder() : Command(Id.Random())
{
public decimal Value { get; set; }
}
public class OrderPlaced() : Event(Id.Random())
{
public string OrderId { get; set; } = string.Empty;
public decimal Value { get; set; }
}
public class OrderPaid() : Event(Id.Random())
{
public string OrderId { get; set; } = string.Empty;
}
Request Handlers
CreateNewOrderHandler receives a CreateNewOrder command (via Send) and deposits two events into the outbox. The background sweeper will later publish them to Kafka.
public class CreateNewOrderHandler : RequestHandler<CreateNewOrder>
{
private readonly IAmACommandProcessor _commandProcessor;
private readonly ILogger<CreateNewOrderHandler> _logger;
public CreateNewOrderHandler(IAmACommandProcessor commandProcessor, ILogger<CreateNewOrderHandler> logger)
{
_commandProcessor = commandProcessor;
_logger = logger;
}
public override CreateNewOrder Handle(CreateNewOrder command)
{
var id = Uuid.NewAsString();
_logger.LogInformation("Creating new order: {OrderId}", id);
_commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
_commandProcessor.DepositPost(new OrderPaid { OrderId = id });
return base.Handle(command);
}
}
OrderPaidHandler consumes the OrderPaid event from Kafka and logs it.
public class OrderPaidHandler : RequestHandler<OrderPaid>
{
private readonly ILogger<OrderPaidHandler> _logger;
public OrderPaidHandler(ILogger<OrderPaidHandler> logger) => _logger = logger;
public override OrderPaid Handle(OrderPaid command)
{
_logger.LogInformation("{OrderId} paid", command.OrderId);
return base.Handle(command);
}
}
OrderPlacedHandler simulates a failure for orders with a value divisible by 3. The [UseResiliencePipeline] attribute adds retry logic (e.g., 3 retries using a policy named "kafka-policy").
public class OrderPlacedHandler : RequestHandler<OrderPlaced>
{
private readonly ILogger<OrderPlacedHandler> _logger;
public OrderPlacedHandler(ILogger<OrderPlacedHandler> logger) => _logger = logger;
[UseResiliencePipeline("kafka-policy", 1)]
public override OrderPlaced Handle(OrderPlaced command)
{
_logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
if (command.Value % 3 == 0)
{
_logger.LogError("Simulating error for {OrderId} with value {OrderValue}", command.OrderId, command.Value);
throw new InvalidOperationException("Simulated error");
}
return base.Handle(command);
}
}
How the Patterns Work Together
- Inbox Pattern – When
OrderPlacedHandlerfails, the message is not marked as processed in the inbox. When Kafka offsets are reset, only genuinely unprocessed messages are reprocessed, preventing duplicates. - Outbox Pattern –
DepositPoststores messages in the DynamoDB outbox before attempting to publish. A background sweeper process then publishes them to Kafka, ensuring no messages are lost even if Kafka is temporarily unavailable. - Distributed Lock – In a multi‑node environment, the sweeper uses the distributed lock to ensure that only one instance publishes messages at a time, avoiding duplicate publications.
Conclusion
The inbox and outbox patterns are essential for building resilient distributed systems. Paramore.Brighter, combined with its DynamoDB integration and Kafka messaging gateway, provides a clean, robust implementation for .NET applications.
This setup guarantees that your services can handle failures gracefully while maintaining data consistency across your distributed system.
Top comments (0)