In previous articles, I covered Brighter integration with Kafka and Brighter V10 RC1. This guide focuses on migrating to Brighter V10, emphasizing Kafka configuration changes and breaking updates.
Requirement
- .NET 8 or superior
- A .NET project with these NuGet packages
- Paramore.Brighter.MessagingGateway.Kafka: Enables Kafka integration.
- Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection: Enable register Brighter with Microsoft DI.
- Paramore.Brighter.ServiceActivator.Extensions.Hosting: Hosts Brighter as a background service.
- Serilog.AspNetCore: For structured logging (optional but recommended).
Brighter Recap
Before continuing about Kafka configuration, let's recap what we already know about Brighter.
Request (Command/Event)
Define messages using IRequest
:
[PublicationTopic("greeting.topic")
public class Greeting() : Event(Guid.NewGuid())
{
public string Name { get; set; } = string.Empty;
}
- Commands: Single-recipient operations (e.g.,
SendEmail
). - Events: Broadcast notifications (e.g.,
OrderShipped
).
Message Mapper (Optional)
Translates between Brighter messages and your app objects, for async workflow mappers now require IAmAMessageMapperAsync
Request Handler
Processes incoming messages:
public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting command)
{
logger.LogInformation("Hello {Name}", command.Name);
return base.Handle(command);
}
}
Configuring Brighter with Kafka
1. Connection Setup
Define Kafka connection details:
var connection = new KafkaMessagingGatewayConfiguration
{
Name = "sample", // Application Name
BootStrapServers = ["localhost:9092"], // Broker address
SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL in prod
SaslMechanisms = SaslMechanism.Plain,
};
2. Kafka Subscription
Subscribe to a topic:
.AddServiceActivator(opt =>
{
opt.Subscriptions =
[
new KafkaSubscription<Greeting>(
new SubscriptionName("kafka.greeting.subscription"), // The subscription name, it's used internal only, so you can put whatevery you want
new ChannelName("greeting.topic"), // The topic name
new RoutingKey("greeting.topic"), // The topic name
groupId: "some-consumer-group", // The Kafka Group ID
makeChannels: OnMissingChannel.Create, // Tell to Brighter what to do when the topic not exists
numOfPartitions: 2, // The number of topic partition, it's only useful when you want to create Kafka topic via code
noOfPerformers: 2, // The number of subscription running in parallel, it doesn't make sense to be bigger than the number of partition
messagePumpType: MessagePumpType.Proactor
),
];
opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
3. Kafka Producer Configuration
Publish events to a topic:
.UseExternalBus(opt =>
{
opt.ProducerRegistry = new KafkaProducerRegistryFactory(connection,
[
new KafkaPublication
{
MakeChannels = OnMissingChannel.Create,
Source = new Uri("test-app", UriKind.RelativeOrAbsolute), // Cloud events properties
Topic = new RoutingKey("greeting.topic")
}
]).Create();
})
New functionality
CloudEvents supports
Brighter now has native support to CloudEvents so you can configure it on Publication
(via Source
, Subject
, Type
and other properties).
By default Brighter use the binary mode, for JSON mode you can set a registry the CloudEventJsonMessageMapper<T>
or change the default mapper to CloudEvent
// Use CloudEvent JSON as default
.MapperRegistry(registry => registry.SetCloudEventJsonAsDefaultMessageMapper())
// or
// Or register per-type
.MapperRegistry(registry => registry.Register<Greeting, CloudEventJsonMessageMapper<Greeting>>())
PublicationTopic Attribute
Because Brighter have a support to a default mapper, Brighter need a way to find which publication should be used, for this we have multiply ways(via RequestType
in Publication, via PublicationTopicAttribute
or a custom implementation)
[PublicationTopic("greeting.topic")]
public class Greeting() : Event(Guid.NewGuid()) { ... }
// Alternative: Explicit publication
new KafkaPublication<Greeting> { ... }
Breaking Changes in Brighter V10
Brighter V10 introduces updates to Kafka integration, Below are the key breaking changes:
Changing default value
The default Brighter Kafka value has been changes, because the previous values was too low(you could get Local queue is full).
-
KafkaPublication.BatchNumberMessages
from10
to10000
-
KafkaPublication.QueueBufferingMaxMessages
from10
to100000
Message Mapper Overhaul
Default JSON Serialization :
In V9, message mappers were mandatory. In V10, JSON serialization is built-in unless custom logic is required, also you can change the default Brighter serialization.
Subscription
We had 2 main changes on subscription.
Explicit Message Pump Types
The first one is before we had a field called runAsync
or isAsync
it was a boolean, to make everything clear we change it to messagePumpType
and it's the MessagePumpType
(Reactor
, Proactor
, Unknown
).
Property Renaming
On the AddServiceActivator
where rename the ChannelFactory
property to DefaultChannelFactory
Publication
Use ExternalBusConfiguration
to configure producers and outbox patterns:
// V10
.UseExternalBus(opt => { ... })
// V9
.UseExternalBus(new RmqProducerRegistryFactory(...))
Kafka Tips
Prefer Post
over PostAsync
for Kafka producers to avoid unnecessary overhead (improves throughput by 100x+ in some cases)
Conclusion
Brighter V10 simplifies Kafka integration with CloudEvents support, improved defaults, and streamlined configuration. Breaking changes focus on clarity, but require updates to subscriptions, producers, and serialization logic. For details, refer to the Brighter GitHub repository and CloudEvents specification.
Reference
Full Code Github - https://github.com/lillo42/brighter-sample/commits/connect-to-kafka/
Top comments (0)