Brighter V10 now includes support for Apache RocketMQ, providing a robust solution for distributed messaging in .NET applications. This guide will walk you through the setup and configuration process for both Vanilla and Fluent Brighter approaches.
What is RocketMQ?
Apache RocketMQ is an open-source distributed messaging and streaming platform with low latency, high performance, reliability, and trillion-level capacity. Originally developed by Alibaba, it's designed to support a wide array of use cases, including asynchronous communication, event sourcing, and stream processing, making it an excellent choice for building decoupled, scalable microservices architectures.
Requirement
.NET 8 or superior
Podman or docker
For Vanilla Brighter, we will need these NuGet packages:
- Paramore.Brighter.MessagingGateway.RocketMQ: Integrate RocketMQ.
- Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection: Bridge Brighter with AWS messaging services.
- Paramore.Brighter.ServiceActivator.Extensions.Hosting: Hosts Brighter as a background service.
- Serilog.AspNetCore: For structured logging (optional but recommended).
For Fluent Brighter:
- Fluent.Brighter.RocketMQ\Fluent.Brighter.RocketMQ
- Paramore.Brighter.ServiceActivator.Extensions.Hosting
- Serilog.AspNetCore
Brighter Recap
Before diving into RocketMQ specifics, let's review the fundamental Brighter building blocks.
Request (Command/Event)
Messages are simple classes that implement IRequest
.
public class Greeting() : Event(Guid.NewGuid())
{
public string Name { get; set; } = string.Empty;
}
-
Commands: Single-recipient operations (e.g.,
SendEmail
). -
Events: Broadcast notifications (e.g.,
OrderShipped
).
Creating a Message Mapper (optional)
Mappers translate between your .NET objects and Brighter's Message format.
From Brighter V10, this step is optional
public class GreetingMapper : IAmAMessageMapper<Greeting>
{
public Message MapToMessage(Greeting request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "greeting.topic"; // The target topic to be publish
header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public Greeting MapToRequest(Message message)
{
return JsonSerializer.Deserialize<Greeting>(message.Body.Bytes)!;
}
}
Implementing a Request Handler
Handlers contain the business logic to process incoming messages.
public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting command)
{
logger.LogInformation("Hello {Name}", command.Name);
return base.Handle(command);
}
}
Configuring Brighter with RocketMQ
Step 1: Configure the RocketMQ Connection
First, define the connection details for your RocketMQ cluster.
var connection =new RocketMessagingGatewayConnection(new ClientConfig()
.SetEndpoints("localhost:8081") // Proxy endpoint from docker-compose
.EnableSsl(false)
.SetRequestTimeout(TimeSpan.FromSeconds(10))
.Build());
Important for Local Development: If you are running RocketMQ locally, ensure your VPN is turned off, because the current implementation of .NET RocketMQ SDK.
Step 2: Configure the Consumer (Subscriptions)
Register your message handlers and subscriptions in the DI container.
services.AddConsumers(opt =>
{
opt.Subscriptions = [
new RocketSubscription<Greeting>(
subscriptionName: "greeting-subscription", // Any value
channelName: "greeting-channel", // Any value
routingKey: "greeting", // The topic name
consumerGroup: "greeting-consumer"
);
];
opt.DefaultChannelFactory = new RocketMqChannelFactory(new RocketMessageConsumerFactory(connection));
})
Step 3: Configure the Producer (Publications)
Set up the producer registry to allow sending messages.
services.AddProduces(opt =>
{
opt.ProducerRegistry = new ProducerRegistry(new RocketMessageProducerFactory(connection,
[
new RocketMqPublication<Greeting>
{
Topic = "greeting"
}
]).Create());
});
Note: The RocketMQ .NET client does not automatically create topics. You must create the greeting topic manually, as shown in the docker-compose setup below.
Configuring Fluent Brighter with RocketMQ
The Fluent Brighter API offers a more streamlined configuration experience
service
.AddHostedService<ServiceActivatorHostedService>()
.AddFluentBrighter(brighter => brighter
.UsingRocketMq(rocket => rocket
.SetConnection(conn => conn
.SetClient(c => c
.SetEndpoints("localhost:8081")
.EnableSsl(false)
.SetRequestTimeout(TimeSpan.FromSeconds(10))
.Build()))
.UsePublications(pub => pub
.AddPublication<GreetingEvent>(p => p
.SetTopic("greeting")))
.UseSubscriptions(sub => sub
.AddSubscription<GreetingEvent>(s => s
.SetSubscriptionName("greeting-sub-name")
.SetTopic("greeting")
.SetConsumerGroup("greeting-consumer-group")
.UseReactorMode()
))));
Local Development: Running RocketMQ with Docker
Use the following docker-compose.yml file to quickly spin up a local RocketMQ cluster, including a management dashboard.
services:
# RocketMQ Nameserver (Service Discovery)
nameserver:
image: apache/rocketmq
container_name: rocketmq-nameserver
ports:
- "9876:9876" # Nameserver port
command: sh mqnamesrv
healthcheck: # Health check to ensure Nameserver is ready
test: [ "CMD", "sh", "-c", "netstat -an | grep 9876" ]
interval: 5s
timeout: 10s
retries: 10
environment:
- JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m -Drocketmq.log.level=DEBUG -Drocketmq.log.root.level=DEBUG # JVM memory settings
# RocketMQ Broker (Message Storage)
broker:
image: apache/rocketmq
container_name: rocketmq-broker
ports:
- "10909:10909" # Broker HA port
- "10911:10911" # Broker port
- "10912:10912" # Broker TLS port (optional)
environment:
- NAMESRV_ADDR=nameserver:9876 # Connect to nameserver
- BROKER_CLUSTER_NAME=DefaultCluster
- BROKER_NAME=broker-a
- BROKER_ID=0
- AUTO_CREATE_TOPIC_ENABLE=true # Auto-create topics
- JAVA_OPT_EXT=-Drocketmq.log.level=DEBUG -Drocketmq.log.root.level=DEBUG
command: sh mqbroker
depends_on:
- nameserver
healthcheck:
test: [ "CMD", "sh", "-c", "netstat -an | grep 10911" ]
interval: 5s
timeout: 10s
retries: 10
proxy:
image: apache/rocketmq
container_name: rmqproxy
depends_on:
- broker
- nameserver
ports:
- "8081:8081" # HTTP endpoint (for REST API)
- "9877:9877" # gRPC endpoint (for clients)
environment:
- NAMESRV_ADDR=nameserver:9876
- JAVA_OPT_EXT=-Drocketmq.log.level=DEBUG -Drocketmq.log.root.level=DEBUG
command: sh mqproxy
healthcheck:
test: [ "CMD", "sh", "-c", "netstat -an | grep 8081" ]
interval: 5s
timeout: 10s
retries: 10
# Service to create the RocketMQ topic
create-topic:
image: apache/rocketmq # Use the same RocketMQ image to get mqadmin tool
command: > # Multi-line command to create the topic
sh -c "
echo 'Waiting for broker to be healthy...' &&
until curl -s http://broker:10911/ &>/dev/null; do
echo 'Broker not fully ready yet, retrying in 60 seconds...'
sleep 60
done;
echo 'Broker is up. Creating multiple topics...' &&
/home/rocketmq/rocketmq-5.3.3/bin/mqadmin updateTopic -n nameserver:9876 -t greeting -c DefaultCluster -r 1 -w 1 -a +message.type=NORMAL &&
echo 'done'
"
depends_on: # Ensure broker is healthy before attempting topic creation
- broker
- nameserver
restart: "no" # This service should run once and then exit
# RocketMQ Dashboard (Web UI - Optional)
dashboard:
image: apacherocketmq/rocketmq-dashboard
container_name: rocketmq-dashboard
ports:
- "8080:8080" # Dashboard UI port
environment:
- NAMESRV_ADDR=nameserver:9876 # Connect to nameserver
depends_on:
- nameserver
- broker
Access the RocketMQ Dashboard at http://localhost:8080 to monitor your topics and messages.
Current Limitations
The initial Brighter RocketMQ integration has some limitations:
- Push Mode Unsupported: Only pull-based consumption is currently implemented
- Requeue Visibility: Message visibility timeout cannot be adjusted during requeue due to a bug in the RocketMQ .NET SDK
These limitations will be addressed in future SDK updates
Conclusion
You are now ready to build powerful, decoupled .NET applications using Brighter and RocketMQ. This guide has provided you with the foundational knowledge to set up both producers and consumers, configure a local development environment, and understand how RocketMQ compares to both Kafka and RabbitMQ for different use cases. For further details, explore the official Brighter and RocketMQ documentation.
See the full code: https://github.com/lillo42/brighter-sample/tree/v10-rocketmq
Top comments (0)