In previous articles, I covered Brighter integration with Kafka and Brighter V10 RC1. This guide focuses on migrating to Brighter V10, emphasizing Kafka configuration changes and breaking updates.
Requirement
- .NET 8 or superior
- A .NET project with these NuGet packages
- Paramore.Brighter.MessagingGateway.Kafka: Enables Kafka integration.
- Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection: Enable register Brighter with Microsoft DI.
- Paramore.Brighter.ServiceActivator.Extensions.Hosting: Hosts Brighter as a background service.
- Serilog.AspNetCore: For structured logging (optional but recommended).
Brighter Recap
Before continuing about Kafka configuration, let's recap what we already know about Brighter.
Request (Command/Event)
Define messages using IRequest:
[PublicationTopic("greeting.topic")
public class Greeting() : Event(Guid.NewGuid())
{
public string Name { get; set; } = string.Empty;
}
- Commands: Single-recipient operations (e.g.,
SendEmail). - Events: Broadcast notifications (e.g.,
OrderShipped).
Message Mapper (Optional)
Translates between Brighter messages and your app objects, for async workflow mappers now require IAmAMessageMapperAsync
Request Handler
Processes incoming messages:
public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting command)
{
logger.LogInformation("Hello {Name}", command.Name);
return base.Handle(command);
}
}
Configuring Brighter with Kafka
1. Connection Setup
Define Kafka connection details:
var connection = new KafkaMessagingGatewayConfiguration
{
Name = "sample", // Application Name
BootStrapServers = ["localhost:9092"], // Broker address
SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL in prod
SaslMechanisms = SaslMechanism.Plain,
};
2. Kafka Subscription
Subscribe to a topic:
.AddServiceActivator(opt =>
{
opt.Subscriptions =
[
new KafkaSubscription<Greeting>(
new SubscriptionName("kafka.greeting.subscription"), // The subscription name, it's used internal only, so you can put whatevery you want
new ChannelName("greeting.topic"), // The topic name
new RoutingKey("greeting.topic"), // The topic name
groupId: "some-consumer-group", // The Kafka Group ID
makeChannels: OnMissingChannel.Create, // Tell to Brighter what to do when the topic not exists
numOfPartitions: 2, // The number of topic partition, it's only useful when you want to create Kafka topic via code
noOfPerformers: 2, // The number of subscription running in parallel, it doesn't make sense to be bigger than the number of partition
messagePumpType: MessagePumpType.Proactor
),
];
opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
3. Kafka Producer Configuration
Publish events to a topic:
.UseExternalBus(opt =>
{
opt.ProducerRegistry = new KafkaProducerRegistryFactory(connection,
[
new KafkaPublication
{
MakeChannels = OnMissingChannel.Create,
Source = new Uri("test-app", UriKind.RelativeOrAbsolute), // Cloud events properties
Topic = new RoutingKey("greeting.topic")
}
]).Create();
})
New functionality
CloudEvents supports
Brighter now has native support to CloudEvents so you can configure it on Publication (via Source, Subject, Type and other properties).
By default Brighter use the binary mode, for JSON mode you can set a registry the CloudEventJsonMessageMapper<T> or change the default mapper to CloudEvent
// Use CloudEvent JSON as default
.MapperRegistry(registry => registry.SetCloudEventJsonAsDefaultMessageMapper())
// or
// Or register per-type
.MapperRegistry(registry => registry.Register<Greeting, CloudEventJsonMessageMapper<Greeting>>())
PublicationTopic Attribute
Because Brighter have a support to a default mapper, Brighter need a way to find which publication should be used, for this we have multiply ways(via RequestType in Publication, via PublicationTopicAttribute or a custom implementation)
[PublicationTopic("greeting.topic")]
public class Greeting() : Event(Guid.NewGuid()) { ... }
// Alternative: Explicit publication
new KafkaPublication<Greeting> { ... }
Breaking Changes in Brighter V10
Brighter V10 introduces updates to Kafka integration, Below are the key breaking changes:
Changing default value
The default Brighter Kafka value has been changes, because the previous values was too low(you could get Local queue is full).
-
KafkaPublication.BatchNumberMessagesfrom10to10000 -
KafkaPublication.QueueBufferingMaxMessagesfrom10to100000
Message Mapper Overhaul
Default JSON Serialization :
In V9, message mappers were mandatory. In V10, JSON serialization is built-in unless custom logic is required, also you can change the default Brighter serialization.
Subscription
We had 2 main changes on subscription.
Explicit Message Pump Types
The first one is before we had a field called runAsync or isAsync it was a boolean, to make everything clear we change it to messagePumpType and it's the MessagePumpType(Reactor, Proactor, Unknown).
Property Renaming
On the AddServiceActivator where rename the ChannelFactory property to DefaultChannelFactory
Publication
Use ExternalBusConfiguration to configure producers and outbox patterns:
// V10
.UseExternalBus(opt => { ... })
// V9
.UseExternalBus(new RmqProducerRegistryFactory(...))
Kafka Tips
Prefer Post over PostAsync for Kafka producers to avoid unnecessary overhead (improves throughput by 100x+ in some cases)
Conclusion
Brighter V10 simplifies Kafka integration with CloudEvents support, improved defaults, and streamlined configuration. Breaking changes focus on clarity, but require updates to subscriptions, producers, and serialization logic. For details, refer to the Brighter GitHub repository and CloudEvents specification.
Reference
Full Code Github - https://github.com/lillo42/brighter-sample/commits/connect-to-kafka/
Top comments (0)