DEV Community

Cover image for Migrating to Brighter V10 with Kafka
Rafael Andrade
Rafael Andrade

Posted on

Migrating to Brighter V10 with Kafka

In previous articles, I covered Brighter integration with Kafka and Brighter V10 RC1. This guide focuses on migrating to Brighter V10, emphasizing Kafka configuration changes and breaking updates.

Requirement

Brighter Recap

Before continuing about Kafka configuration, let's recap what we already know about Brighter.

Request (Command/Event)

Define messages using IRequest:

[PublicationTopic("greeting.topic")
public class Greeting() : Event(Guid.NewGuid())
{
    public string Name { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode
  • Commands: Single-recipient operations (e.g., SendEmail).
  • Events: Broadcast notifications (e.g., OrderShipped).

Message Mapper (Optional)

Translates between Brighter messages and your app objects, for async workflow mappers now require IAmAMessageMapperAsync

Request Handler

Processes incoming messages:

public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting command)
    {
        logger.LogInformation("Hello {Name}", command.Name);
        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

Configuring Brighter with Kafka

1. Connection Setup

Define Kafka connection details:

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample", // Application Name
    BootStrapServers = ["localhost:9092"], // Broker address
    SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL in prod
    SaslMechanisms = SaslMechanism.Plain,
};
Enter fullscreen mode Exit fullscreen mode

2. Kafka Subscription

Subscribe to a topic:

 .AddServiceActivator(opt =>
 {
     opt.Subscriptions =
     [
         new KafkaSubscription<Greeting>(
             new SubscriptionName("kafka.greeting.subscription"), // The subscription name, it's used internal only, so you can put whatevery you want
             new ChannelName("greeting.topic"), // The topic name
             new RoutingKey("greeting.topic"),  // The topic name
             groupId: "some-consumer-group", // The Kafka Group ID
             makeChannels: OnMissingChannel.Create, // Tell to Brighter what to do when the topic not exists
             numOfPartitions: 2, // The number of topic partition, it's only useful when you want to create Kafka topic via code
             noOfPerformers: 2, // The number of subscription running in parallel, it doesn't make sense to be bigger than the number of partition
              messagePumpType: MessagePumpType.Proactor
           ),
     ];

     opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
Enter fullscreen mode Exit fullscreen mode

3. Kafka Producer Configuration

Publish events to a topic:

.UseExternalBus(opt =>
{
    opt.ProducerRegistry = new KafkaProducerRegistryFactory(connection,
    [
        new KafkaPublication
        {
            MakeChannels = OnMissingChannel.Create,
            Source = new Uri("test-app", UriKind.RelativeOrAbsolute), // Cloud events properties
            Topic = new RoutingKey("greeting.topic")
        }
    ]).Create();
})
Enter fullscreen mode Exit fullscreen mode

New functionality

CloudEvents supports

Brighter now has native support to CloudEvents so you can configure it on Publication (via Source, Subject, Type and other properties).

By default Brighter use the binary mode, for JSON mode you can set a registry the CloudEventJsonMessageMapper<T> or change the default mapper to CloudEvent

// Use CloudEvent JSON as default  
.MapperRegistry(registry => registry.SetCloudEventJsonAsDefaultMessageMapper())

// or
// Or register per-type 
.MapperRegistry(registry => registry.Register<Greeting, CloudEventJsonMessageMapper<Greeting>>())
Enter fullscreen mode Exit fullscreen mode

PublicationTopic Attribute

Because Brighter have a support to a default mapper, Brighter need a way to find which publication should be used, for this we have multiply ways(via RequestType in Publication, via PublicationTopicAttribute or a custom implementation)

[PublicationTopic("greeting.topic")]
public class Greeting() : Event(Guid.NewGuid()) { ... }

// Alternative: Explicit publication  
new KafkaPublication<Greeting> { ... }
Enter fullscreen mode Exit fullscreen mode

Breaking Changes in Brighter V10

Brighter V10 introduces updates to Kafka integration, Below are the key breaking changes:

Changing default value

The default Brighter Kafka value has been changes, because the previous values was too low(you could get Local queue is full).

  • KafkaPublication.BatchNumberMessages from 10 to 10000
  • KafkaPublication.QueueBufferingMaxMessages from 10 to 100000

Message Mapper Overhaul

Default JSON Serialization :

In V9, message mappers were mandatory. In V10, JSON serialization is built-in unless custom logic is required, also you can change the default Brighter serialization.

Subscription

We had 2 main changes on subscription.

Explicit Message Pump Types

The first one is before we had a field called runAsync or isAsync it was a boolean, to make everything clear we change it to messagePumpType and it's the MessagePumpType(Reactor, Proactor, Unknown).

Property Renaming

On the AddServiceActivator where rename the ChannelFactory property to DefaultChannelFactory

Publication

Use ExternalBusConfiguration to configure producers and outbox patterns:

// V10
.UseExternalBus(opt => { ... })

// V9
.UseExternalBus(new RmqProducerRegistryFactory(...))
Enter fullscreen mode Exit fullscreen mode

Kafka Tips

Prefer Post over PostAsync for Kafka producers to avoid unnecessary overhead (improves throughput by 100x+ in some cases)

Conclusion

Brighter V10 simplifies Kafka integration with CloudEvents support, improved defaults, and streamlined configuration. Breaking changes focus on clarity, but require updates to subscriptions, producers, and serialization logic. For details, refer to the Brighter GitHub repository and CloudEvents specification.

Reference

Full Code Github - https://github.com/lillo42/brighter-sample/commits/connect-to-kafka/

Top comments (0)