DEV Community

Cyrus Tse
Cyrus Tse

Posted on

RocketMQ 實戰學習與架構指南

Apache RocketMQ 實戰學習與架構指南

導讀:本指南由淺入深,涵蓋了 RocketMQ 的核心概念、本地 Docker 開發環境搭建、Spring Boot 實戰代碼,以及進階的底層架構思維(延時機制與進度書籤理論)。


一、 核心角色與資料模型 (Core Concepts)

1. 四大物理角色

  • NameServer(路由中心):充當「通訊錄」。Broker 啟動時向它註冊自己管轄哪些 Topic;Producer / Consumer 啟動時向它詢問「我要發/收的 Topic 在哪台 Broker 上」。幾乎無狀態,可集群部署。
  • Broker(訊息中轉站):MQ 的心臟,負責訊息的儲存、投遞、查詢與高可用保證。
  • Producer(生產者):建立訊息並發送給 Broker 的應用程式(例如:訂單微服務)。
  • Consumer(消費者):從 Broker 拉取訊息並執行後續業務邏輯的應用程式(例如:通知微服務)。

2. 邏輯資料模型

  • Topic(主題):訊息的第一級業務分類(如:Order_Topic)。
  • Tag(標籤):Topic 下的第二級分類,方便消費者在 Broker 端直接過濾(如:Pay_SuccessOrder_Cancel)。
  • Message Queue(訊息隊列):一個 Topic 在實體上會被切分成多個 Queue 分散在不同的 Broker 上,這是 RocketMQ 實現並發擴展的關鍵。
  • Consumer Group(消費者組):承載特定業務功能的消費者集合(如:group_notification)。

二、 本地開發環境搭建 (Docker Compose)

在空資料夾下建立 docker-compose.yml,此配置已完美相容 Apple Silicon (M系列晶片)跨容器直連本機

version: '3.8'
services:
  namesrv:
    image: apache/rocketmq:5.3.0
    platform: linux/amd64
    container_name: rmqnamesrv
    ports:
      - "9876:9876"
    environment:
      - JAVA_OPT_EXT=-Xms256m -Xmx256m -Xmn128m
    command: sh mqnamesrv

  broker:
    image: apache/rocketmq:5.3.0
    platform: linux/amd64
    container_name: rmqbroker
    ports:
      - "10909:10909"
      - "10911:10911"
    environment:
      - NAMESRV_ADDR=namesrv:9876
      - JAVA_OPT_EXT=-Xms512m -Xmx512m
    volumes:
      - ./broker.conf:/home/rocketmq/rocketmq-5.3.0/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/rocketmq-5.3.0/conf/broker.conf
    depends_on:
      - namesrv

  dashboard:
    image: apacherocketmq/rocketmq-dashboard:latest
    platform: linux/amd64
    container_name: rmqdashboard
    ports:
      - "8082:8080" # 左邊為本機訪問 Port,右邊為容器內建 Port
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 -Xms256m -Xmx256m
    depends_on:
      - namesrv
Enter fullscreen mode Exit fullscreen mode

關鍵配套:同級目錄下的 broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 127.0.0.1
Enter fullscreen mode Exit fullscreen mode

啟動指令:docker-compose up -d

視覺化後台:打開瀏覽器存取 http://localhost:8082

三、 Spring Boot 實戰代碼(模式一:主動發送)

1. Maven 依賴 (pom.xml)

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.1</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

2. 設定檔 (application.yml)

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: default_producer_group
Enter fullscreen mode Exit fullscreen mode

3. 生產者發送 API(同步發送)

@Autowired
private RocketMQTemplate rocketMQTemplate;

public void sendNormalMessage(String orderId) {
    rocketMQTemplate.convertAndSend("Order_Topic", orderId);
}
Enter fullscreen mode Exit fullscreen mode

4. 消費者監聽器(宣告式優雅寫法)

@Component
@RocketMQMessageListener(
    topic = "Order_Topic", 
    consumerGroup = "group_order_processor"
)
public class OrderConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("收到訂單訊息:" + message);
    }
}
Enter fullscreen mode Exit fullscreen mode

四、 核心實戰場景:延時訊息 (購物車超時取消)

業務場景:使用者落單後,給予 10 秒(實務上為 30 分鐘)付款時間,超時未付則取消訂單釋放庫存。

1. Producer 發送延時訊息

// 建立 Spring Message 封裝體
Message<String> msg = MessageBuilder.withPayload("ORD_999").build();

// 參數:Topic, 訊息, 發送Timeout(毫秒), 延時時間(秒)
rocketMQTemplate.syncSendDelayTimeSeconds("TestTopic", msg, 3000, 10);
Enter fullscreen mode Exit fullscreen mode

2. 底層運作魔術(偷天換日大法)

當 Broker 收到這條延時 10 秒的訊息時,它不會立刻放進 TestTopic,而是:

把原本的 Topic (TestTopic) 存入訊息的隱藏屬性中。

將 Topic 強制篡改為系統內建的專用緩衝主題:SCHEDULE_TOPIC_XXX。

啟動內部 Timer,10 秒時間一到,將訊息撈出,把 Topic 還原回 TestTopic,正式推入隊列供 Consumer 消費。

五、 架構師深度思維 (Deep Dive)

1. 「圖書館書籤理論」:訊息被消費後真的消失了嗎?

答案:物理上絕對沒有消失。

CommitLog(實體書本):Broker 收到訊息後,會依序以 Append-only 方式寫入硬碟的 CommitLog 檔案中。管你多少個 Consumer 讀過,實體資料永遠貼在磁碟上(預設保留 48 小時後由系統整批清除)。

Consumer Offset(進度書籤):

當 Group_A 讀完了第 10 筆訊息,Broker 會在名冊上記下:Group_A 的 Offset = 11。

對 Group_A 來說,第 10 筆訊息「視覺上消失了」。

此時若登記一個全新的 Group_B 來聽同一個 Topic,Broker 會給它一本全新的進度名冊,Group_B 會從第 1 筆訊息重新讀起。

2. 負載均衡 (Clustering) vs. 廣播發佈 (Pub/Sub)

六、 開發避坑 Checklist

1. 消費邏輯必須具備「冪等性 (Idempotency)」:

在網路不穩時,MQ 保證的是「At least once(至少投遞一次)」,訊息極可能被重複派發。Consumer 收到訂單 ID 時,第一步永遠是查 DB 狀態:if (order.status != PENDING) return;。

2. 嚴禁在 Consumer 內開新 Thread 處理任務:

若在 onMessage() 內使用 CompletableFuture.runAsync(),主執行緒會立刻回傳成功給 Broker;萬一背景 Thread 跑到一半當機,該筆資料將永遠遺失且無法觸發 MQ 的重試機制。

3. 本地端測試先至 Dashboard 建立 Topic:

RocketMQ 5.x 預設關閉了 autoCreateTopicEnable,代碼發送前務必先去後台點選 ADD/UPDATE 建立 Topic。

Top comments (0)