In my previous article, we explored the Inbox Pattern as a solution for ensuring exactly-once message processing in distributed systems. Today, we'll dive into a practical implementation using Brighter (a .NET command processor and dispatcher) combined with PostgreSQL as our inbox store and Kafka as our message broker.
This setup provides a robust foundation for building reliable event-driven microservices that can recover gracefully from failures while maintaining message integrity.
Project Overview
We will build a .NET 8+ service that consumes/produces messages from a Kafka topic, processes them using Brighter's, and uses a PostgreSQL database as the persistent inbox to ensure idempotency and reliability.
Required Packages
You can choose between two setup styles:
- Pure Brighter Packages:
- Paramore.Brighter.Inbox.Postgres
- Paramore.Brighter.MessagingGateway.Kafka
- Paramore.Brighter.ServiceActivator.Extensions.Hosting
- Paramore.Brighter.Extensions.DependencyInjection
- Fluent Brighter wrappers:
- Fluent.Brighter.Kafka
- Fluent.Brighter.Postgres
- Paramore.Brighter.ServiceActivator.Extensions.Hosting
A Quick Brighter Recap
Before we dive into the configuration, let's quickly recap the core Brighter concepts.
Request: Command and Event
The first key concept is the IRequest interface, which is typically implemented as a Command or an Event. These objects are used by Brighter's IAmACommandProcessor to:
-
Send(expects exactly one handler) -
Publish(expects zero or more handlers) -
Post(sends to an external message broker)
Commands represent an action you want to execute, like CreateOrder. Events represent a fact or something that has already happened, like OrderCreated.
// A command to be executed
public class CreateOrder() : Command(Id.Random());
// An event that has occurred
public class OrderCreated() : Event(Id.Random());
// You can also implement IRequest directly
public class CustomRequest : IRequest { ... }
Message Mapper
When you Post or consume messages from an external broker, you need a message mapper. This component is responsible for mapping your IRequest object to and from a Brighter Message object (which includes the header and body for the broker).
Brighter v10+ defaults to a JSON message mapper, but you can provide your own custom implementation by implementing IAmAMessageMapper or IAmAMessageMapperAsync.
Request Handler
Last but not least is the RequestHandler (or RequestHandlerAsync). This is the class that contains your actual business logic for processing a specific IRequest. Brighter's pipeline model allows you to chain handlers and add attributes for cross-cutting concerns like retries, logging, and, of course, the inbox pattern.
Here's a simple synchronous handler:
public class GreetingHandler : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting @event)
{
Console.WriteLine("===== Hello, {0}", @event.Name);
return base.Handle(@event);
}
}
public class GreetingHandlerAsync : RequestHandlerAsync<GreetingAsync>
{
public override Task<Greeting> HandleAsync(GreetingAsync @event, CancellationToken ct = default)
{
Console.WriteLine("===== Hello, {0}", @event.Name);
return base.Handle(@event);
}
}
Configuring Brighter with Kafka and Postgres
Now for the main setup. We need to tell Brighter how to connect to Kafka (our broker) and Postgres (our inbox store).
Kafka Configuration
You can configure Brighter using its core "pure" components or with the Fluent Brighter API. Both are shown here.
Using Fluent Brighter:
services
.AddHostedService<ServiceActivatorHostedService>()
.AddFluentBrighter(opt => opt
.UsingKafka(kf => kf
.SetConnection(c => c
.SetName("sample")
.SetBootstrapServers("localhost:9092")
.SetSecurityProtocol(SecurityProtocol.Plaintext)
.SetSaslMechanisms(SaslMechanism.Plain))
.UseSubscriptions(s => s
.AddSubscription<OrderPlaced>(sb => sb
.SetTopic("order-placed-topic")
.SetConsumerGroupId("order-placed-topic-1")
.SetRequeueCount(3)
.CreateInfrastructureIfMissing()
.UseReactorMode())
.AddSubscription<OrderPaid>(sb => sb
.SetTopic("order-paid-topic")
.SetConsumerGroupId("order-paid-topic-1")
.CreateInfrastructureIfMissing()
.UseReactorMode()))
.UsePublications(p => p
.AddPublication<OrderPaid>(kp => kp
.SetTopic("order-paid-topic")
.CreateTopicIfMissing())
.AddPublication<OrderPlaced>(kp => kp
.SetTopic("order-placed-topic")
.CreateTopicIfMissing()))));
Using Pure Brighter:
var connection = new KafkaMessagingGatewayConfiguration
{
Name = "sample",
BootStrapServers = ["localhost:9092"],
SecurityProtocol = SecurityProtocol.Plaintext,
SaslMechanisms = SaslMechanism.Plain,
};
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.Subscriptions =
[
new KafkaSubscription<OrderPlaced>(
new SubscriptionName("subscription-orderplaced"),
new ChannelName("order-placed-queue"),
new RoutingKey("order-placed-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-1"),
new KafkaSubscription<OrderPaid>(
new SubscriptionName("subscription-orderpaid"),
new ChannelName("order-paid-queue"),
new RoutingKey("order-paid-topic"),
makeChannels: OnMissingChannel.Create,
messagePumpType: MessagePumpType.Reactor,
groupId: "test-2"),
];
opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
.AutoFromAssemblies()
.AddProducers(opt =>
{
opt.ProducerRegistry = new KafkaProducerRegistryFactory(
connection,
[
new KafkaPublication<OrderPaid>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-paid-topic"),
},
new KafkaPublication<OrderPlaced>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("order-placed-topic"),
}
]).Create();
});
Postgres Inbox Setup
First, we need to ensure the inbox table exists in our database. Brighter provides a DDL script helper for this. You can run this once on application startup.
await using (NpgsqlConnection connection = new(connectionString))
{
await connection.OpenAsync();
await using var command = connection.CreateCommand();
command.CommandText = PostgreSqlInboxBuilder.GetDDL("inboxmessages");
_ = await command.ExecuteNonQueryAsync();
}
Configuring the Inbox Pattern
Finally, we link the Postgres Inbox to our consumer configuration.
Using Fluent Brighter:
services
.AddFluentBrighter(opt => opt
.UsingPostgres(pg => pg
.SetConnection(db => db
.SetConnectionString(connectionString)
.SetDatabaseName("brightertests")
.SetInboxTableName("inboxmessages"))
.UseInbox())
// ... other configuration (like Kafka) goes here
);
Using Pure Brighter:
services
.AddHostedService<ServiceActivatorHostedService>()
.AddConsumers(opt =>
{
opt.InboxConfiguration = new InboxConfiguration(new PostgreSqlInbox(new RelationalDatabaseConfiguration(connectionString, "brightertests", inboxTableName: "inboxmessages")));
// ... configure subscriptions etc.
});
Example: Message and Request Handlers
Let's define the messages our service will use.
Messages
public class CreateNewOrder() : Command(Id.Random())
{
public decimal Value { get; set; }
}
public class OrderPlaced() : Event(Id.Random())
{
public string OrderId { get; set; } = string.Empty;
public decimal Value { get; set; }
}
public class OrderPaid() : Event(Id.Random())
{
public string OrderId { get; set; } = string.Empty;
}
Request Handlers
For these examples, I'll use synchronous handlers (RequestHandler).
Note on Performance: As noted in the Brighter documentation, the synchronous Post method with the Kafka producer often yields significantly better performance than PostAsync due to the underlying implementation of the C# Kafka client.
CreateNewOrderHandler: This handler receives a CreateNewOrder command (via Send) and posts two new events, OrderPlaced and OrderPaid, to Kafka.
public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
ILogger<CreateNewOrderHandler> logger) : RequestHandler<CreateNewOrder>
{
public override CreateNewOrder Handle(CreateNewOrder command)
{
try
{
var id = Uuid.NewAsString();
logger.LogInformation("Creating a new order: {OrderId}", id);
commandProcessor.Post(new OrderPlaced { OrderId = id, Value = command.Value });
commandProcessor.Post(new OrderPaid { OrderId = id });
return base.Handle(command);
}
catch (Exception ex)
{
logger.LogError(ex, "Invalid data");
throw;
}
}
}
OrderPaidHandler: This handler consumes the OrderPaid event from Kafka and simply logs it.
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandler<OrderPaid>
{
public override OrderPaid Handle(OrderPaid command)
{
logger.LogInformation("{OrderId} paid", command.OrderId);
return base.Handle(command);
}
}
OrderPlaceHandler: This handler simulates a potential failure. If the order value is divisible by 3, it throws an exception. The [UseResiliencePipeline] attribute allows us to apply a retry policy (e.g., "kafka-policy" configured to retry 3 times).
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandler<OrderPlaced>
{
[UseResiliencePipeline("kafka-policy", 1)]
public override OrderPlaced Handle(OrderPlaced command)
{
logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
if (command.Value % 3 == 0)
{
logger.LogError("Simulate an error for {OrderId} with value {OrderValue}", command.OrderId, command.Value);
throw new InvalidOperationException("invalid error");
}
return base.Handle(command);
}
}
This handler demonstrates two important concepts working together:
- Resilience: The
[UseResiliencePipeline("kafka-policy", 1)]attribute adds in-handler retry logic (assuming you've defined a policy named "kafka-policy"). - Inbox Pattern: When this handler fails, the message won't be stored in the inbox. This ensures that when Kafka offsets are reset, only genuinely unprocessed messages will be reprocessed, preventing duplicates.
Conclusion
The Inbox Pattern is essential for building resilient, idempotent message-based systems. Paramore.Brighter, combined with its PostgreSQL inbox and Kafka integration, provides a clean, robust implementation for .NET applications.
This setup guarantees that your services can handle failures gracefully while maintaining data consistency across your distributed system.
The full code for this sample can be found on GitHub: https://github.com/lillo42/brighter-sample/tree/v10-inbox-postgres
The full code: https://github.com/lillo42/brighter-sample/tree/v10-inbox-postgres
Top comments (0)