DEV Community

Cover image for Brighter V10: Configure RocketMQ
Rafael Andrade
Rafael Andrade

Posted on

Brighter V10: Configure RocketMQ

Brighter V10 now includes support for Apache RocketMQ, providing a robust solution for distributed messaging in .NET applications. This guide will walk you through the setup and configuration process for both Vanilla and Fluent Brighter approaches.

What is RocketMQ?

Apache RocketMQ is an open-source distributed messaging and streaming platform with low latency, high performance, reliability, and trillion-level capacity. Originally developed by Alibaba, it's designed to support a wide array of use cases, including asynchronous communication, event sourcing, and stream processing, making it an excellent choice for building decoupled, scalable microservices architectures.

Requirement

.NET 8 or superior
Podman or docker

For Vanilla Brighter, we will need these NuGet packages:

For Fluent Brighter:

Brighter Recap

Before diving into RocketMQ specifics, let's review the fundamental Brighter building blocks.

Request (Command/Event)

Messages are simple classes that implement IRequest.

public class Greeting() : Event(Guid.NewGuid())
{
    public string Name { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode
  • Commands: Single-recipient operations (e.g., SendEmail).
  • Events: Broadcast notifications (e.g., OrderShipped).

Creating a Message Mapper (optional)

Mappers translate between your .NET objects and Brighter's Message format.

From Brighter V10, this step is optional

public class GreetingMapper : IAmAMessageMapper<Greeting>
{
    public Message MapToMessage(Greeting request)
    {
        var header = new MessageHeader();
        header.Id = request.Id; 
        header.TimeStamp = DateTime.UtcNow;
        header.Topic = "greeting.topic"; // The target topic to be publish
        header.MessageType = MessageType.MT_EVENT;

        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }

    public Greeting MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<Greeting>(message.Body.Bytes)!;
    }
}
Enter fullscreen mode Exit fullscreen mode

Implementing a Request Handler

Handlers contain the business logic to process incoming messages.

public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting command)
    {
        logger.LogInformation("Hello {Name}", command.Name);
        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

Configuring Brighter with RocketMQ

Step 1: Configure the RocketMQ Connection

First, define the connection details for your RocketMQ cluster.

var connection =new RocketMessagingGatewayConnection(new ClientConfig()
    .SetEndpoints("localhost:8081")  // Proxy endpoint from docker-compose
    .EnableSsl(false)
    .SetRequestTimeout(TimeSpan.FromSeconds(10))
    .Build());
Enter fullscreen mode Exit fullscreen mode

Important for Local Development: If you are running RocketMQ locally, ensure your VPN is turned off, because the current implementation of .NET RocketMQ SDK.

Step 2: Configure the Consumer (Subscriptions)

Register your message handlers and subscriptions in the DI container.

services.AddConsumers(opt =>
{
    opt.Subscriptions = [
        new RocketSubscription<Greeting>(
           subscriptionName: "greeting-subscription", // Any value
           channelName: "greeting-channel", // Any value
           routingKey: "greeting", // The topic name
           consumerGroup: "greeting-consumer"
        );
    ];

    opt.DefaultChannelFactory = new RocketMqChannelFactory(new RocketMessageConsumerFactory(connection));
})
Enter fullscreen mode Exit fullscreen mode

Step 3: Configure the Producer (Publications)

Set up the producer registry to allow sending messages.

services.AddProduces(opt =>
{
    opt.ProducerRegistry = new ProducerRegistry(new RocketMessageProducerFactory(connection,
        [
            new RocketMqPublication<Greeting>
            {
                Topic = "greeting"
            }
        ]).Create());
});
Enter fullscreen mode Exit fullscreen mode

Note: The RocketMQ .NET client does not automatically create topics. You must create the greeting topic manually, as shown in the docker-compose setup below.

Configuring Fluent Brighter with RocketMQ

The Fluent Brighter API offers a more streamlined configuration experience

service
    .AddHostedService<ServiceActivatorHostedService>()
    .AddFluentBrighter(brighter => brighter
        .UsingRocketMq(rocket => rocket
            .SetConnection(conn => conn
                .SetClient(c => c
                .SetEndpoints("localhost:8081")
                .EnableSsl(false)
                .SetRequestTimeout(TimeSpan.FromSeconds(10))
                .Build()))
            .UsePublications(pub => pub
                .AddPublication<GreetingEvent>(p => p
                    .SetTopic("greeting")))
                    .UseSubscriptions(sub => sub
                .AddSubscription<GreetingEvent>(s => s
                    .SetSubscriptionName("greeting-sub-name")
                    .SetTopic("greeting")
                    .SetConsumerGroup("greeting-consumer-group")
                    .UseReactorMode()
))));
Enter fullscreen mode Exit fullscreen mode

Local Development: Running RocketMQ with Docker

Use the following docker-compose.yml file to quickly spin up a local RocketMQ cluster, including a management dashboard.

services:
  # RocketMQ Nameserver (Service Discovery)
  nameserver:
    image: apache/rocketmq
    container_name: rocketmq-nameserver
    ports:
      - "9876:9876"  # Nameserver port
    command: sh mqnamesrv
    healthcheck: # Health check to ensure Nameserver is ready
      test: [ "CMD", "sh", "-c", "netstat -an | grep 9876" ]
      interval: 5s
      timeout: 10s
      retries: 10
    environment:
      - JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m -Drocketmq.log.level=DEBUG -Drocketmq.log.root.level=DEBUG # JVM memory settings

  # RocketMQ Broker (Message Storage)
  broker:
    image: apache/rocketmq
    container_name: rocketmq-broker
    ports:
      - "10909:10909"  # Broker HA port
      - "10911:10911"  # Broker port
      - "10912:10912"  # Broker TLS port (optional)
    environment:
      - NAMESRV_ADDR=nameserver:9876  # Connect to nameserver
      - BROKER_CLUSTER_NAME=DefaultCluster
      - BROKER_NAME=broker-a
      - BROKER_ID=0
      - AUTO_CREATE_TOPIC_ENABLE=true  # Auto-create topics
      - JAVA_OPT_EXT=-Drocketmq.log.level=DEBUG -Drocketmq.log.root.level=DEBUG
    command: sh mqbroker
    depends_on:
      - nameserver
    healthcheck:
      test: [ "CMD", "sh", "-c", "netstat -an | grep 10911" ]
      interval: 5s
      timeout: 10s
      retries: 10

  proxy:
    image: apache/rocketmq
    container_name: rmqproxy
    depends_on:
      - broker
      - nameserver
    ports:
      - "8081:8081"   # HTTP endpoint (for REST API)
      - "9877:9877"   # gRPC endpoint (for clients)
    environment:
      - NAMESRV_ADDR=nameserver:9876
      - JAVA_OPT_EXT=-Drocketmq.log.level=DEBUG -Drocketmq.log.root.level=DEBUG
    command: sh mqproxy
    healthcheck:
      test: [ "CMD", "sh", "-c", "netstat -an | grep 8081" ]
      interval: 5s
      timeout: 10s
      retries: 10

  # Service to create the RocketMQ topic
  create-topic:
    image: apache/rocketmq # Use the same RocketMQ image to get mqadmin tool
    command: > # Multi-line command to create the topic
      sh -c "
            echo 'Waiting for broker to be healthy...' &&
            until curl -s http://broker:10911/ &>/dev/null; do
              echo 'Broker not fully ready yet, retrying in 60 seconds...'
                sleep 60
            done;
            echo 'Broker is up. Creating multiple topics...' &&

            /home/rocketmq/rocketmq-5.3.3/bin/mqadmin updateTopic -n nameserver:9876 -t greeting -c DefaultCluster -r 1 -w 1 -a +message.type=NORMAL && 
            echo 'done'
        "
    depends_on: # Ensure broker is healthy before attempting topic creation
      - broker
      - nameserver
    restart: "no" # This service should run once and then exit

  # RocketMQ Dashboard (Web UI - Optional)
  dashboard:
    image: apacherocketmq/rocketmq-dashboard
    container_name: rocketmq-dashboard
    ports:
      - "8080:8080"  # Dashboard UI port
    environment:
      - NAMESRV_ADDR=nameserver:9876  # Connect to nameserver
    depends_on:
      - nameserver
      - broker
Enter fullscreen mode Exit fullscreen mode

Access the RocketMQ Dashboard at http://localhost:8080 to monitor your topics and messages.

Current Limitations

The initial Brighter RocketMQ integration has some limitations:

  • Push Mode Unsupported: Only pull-based consumption is currently implemented
  • Requeue Visibility: Message visibility timeout cannot be adjusted during requeue due to a bug in the RocketMQ .NET SDK

These limitations will be addressed in future SDK updates

Conclusion

You are now ready to build powerful, decoupled .NET applications using Brighter and RocketMQ. This guide has provided you with the foundational knowledge to set up both producers and consumers, configure a local development environment, and understand how RocketMQ compares to both Kafka and RabbitMQ for different use cases. For further details, explore the official Brighter and RocketMQ documentation.

See the full code: https://github.com/lillo42/brighter-sample/tree/v10-rocketmq

Top comments (0)