DEV Community

Cover image for How to Implement Batch Message Consumption with RocketMQ in Spring Boot
EVan Wilson
EVan Wilson

Posted on

How to Implement Batch Message Consumption with RocketMQ in Spring Boot

1. Adding Dependencies

First, add the necessary dependencies to your pom.xml file:

<!-- RocketMQ Spring Boot dependency for Spring Boot 3 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!-- Dependency compatible with MQ cluster version 5.3.0 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.3.0</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

2. Configuration File bootstrap.yaml

Configure your RocketMQ settings in the bootstrap.yaml file:

rocketmq:
  name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
  consumer:
    group: consume-group-test
    access-key: access # Configure if ACL is used
    secret-key: secret
    consume-message-batch-max-size: 50  # Max messages per batch
    pull-batch-size: 100  # Max messages pulled from Broker
  topics:
    project: "group-topic-1"
  groups:
    project: "consume-group-1"  # Use different groups for different business processes
Enter fullscreen mode Exit fullscreen mode

3. Configuration Class MqConfigProperties

Create the configuration class MqConfigProperties:

import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;

import java.io.Serializable;

/**
 * RocketMQ Configuration Class
 */
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {

    private static final long serialVersionUID = 1L;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    private TopicProperties topics;
    private GroupProperties groups;

    /**
     * Topic Configuration Class
     */
    @Data
    public static class TopicProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }

    /**
     * Consumer Group Configuration Class
     */
    @Data
    public static class GroupProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Implementing the Consumer Code

Create the consumer class UserConsumer:

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;
import java.util.List;

/**
 * Batch Consumer Implementation
 */
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private ApplicationContext applicationContext;

    private volatile boolean running;
    private DefaultMQPushConsumer consumer;

    @Override
    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("Consumer is already running");
        }
        initConsumer();
        setRunning(true);
        log.info("UserConsumer started successfully.");
    }

    @Override
    public void stop() {
        if (isRunning() && consumer != null) {
            consumer.shutdown();
            setRunning(false);
            log.info("UserConsumer stopped.");
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    private void initConsumer() {
        String topic = mqConfigProperties.getTopics().getProject();
        String group = mqConfigProperties.getGroups().getProject();
        String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
        String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
        String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
        consumer = rpcHook != null
                ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
                : new DefaultMQPushConsumer(group);

        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(100);  // Set the batch size for consumption
        consumer.subscribe(topic, "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                log.info("Received {} messages", msgs.size());
                for (MessageExt message : msgs) {
                    String body = new String(message.getBody());
                    log.info("Processing message: {}", body);
                    User user = JSONObject.parseObject(body, User.class);
                    processUser(user);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
    }

    private void processUser(User user) {
        log.info("Processing user with ID: {}", user.getId());
        // Handle user-related business logic
    }
}
Enter fullscreen mode Exit fullscreen mode

5. Producer Example Code

To produce batch messages, you can use the following UserProducer class:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

public class UserProducer {

    private DefaultMQProducer producer;

    public void sendBatchMessages(List<User> users, String topic) {
        List<Message> messages = new ArrayList<>();
        for (User user : users) {
            messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
        }
        try {
            producer.send(messages);
        } catch (Exception e) {
            log.error("Error sending batch messages", e);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

6. Additional Optimization Suggestions

  • Performance Optimization: You can adjust the size of the consumer thread pool. By default, it's set to consumeThreadMin=20 and consumeThreadMax=20. In high-concurrency scenarios, increasing the thread pool size can enhance performance.

  • Error Handling: When consumption fails, be cautious with RECONSUME_LATER to avoid infinite retry loops. Set a maximum retry count based on your business requirements.

  • Tenant Isolation: Use different groups for different business modules to avoid consuming data from the wrong group. This is especially crucial in production environments.

Top comments (0)