Apache RocketMQ 實戰學習與架構指南
導讀:本指南由淺入深,涵蓋了 RocketMQ 的核心概念、本地 Docker 開發環境搭建、Spring Boot 實戰代碼,以及進階的底層架構思維(延時機制與進度書籤理論)。
一、 核心角色與資料模型 (Core Concepts)
1. 四大物理角色
- NameServer(路由中心):充當「通訊錄」。Broker 啟動時向它註冊自己管轄哪些 Topic;Producer / Consumer 啟動時向它詢問「我要發/收的 Topic 在哪台 Broker 上」。幾乎無狀態,可集群部署。
- Broker(訊息中轉站):MQ 的心臟,負責訊息的儲存、投遞、查詢與高可用保證。
- Producer(生產者):建立訊息並發送給 Broker 的應用程式(例如:訂單微服務)。
- Consumer(消費者):從 Broker 拉取訊息並執行後續業務邏輯的應用程式(例如:通知微服務)。
2. 邏輯資料模型
-
Topic(主題):訊息的第一級業務分類(如:
Order_Topic)。 -
Tag(標籤):Topic 下的第二級分類,方便消費者在 Broker 端直接過濾(如:
Pay_Success、Order_Cancel)。 - Message Queue(訊息隊列):一個 Topic 在實體上會被切分成多個 Queue 分散在不同的 Broker 上,這是 RocketMQ 實現並發擴展的關鍵。
-
Consumer Group(消費者組):承載特定業務功能的消費者集合(如:
group_notification)。
二、 本地開發環境搭建 (Docker Compose)
在空資料夾下建立 docker-compose.yml,此配置已完美相容 Apple Silicon (M系列晶片) 及跨容器直連本機:
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.3.0
platform: linux/amd64
container_name: rmqnamesrv
ports:
- "9876:9876"
environment:
- JAVA_OPT_EXT=-Xms256m -Xmx256m -Xmn128m
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.3.0
platform: linux/amd64
container_name: rmqbroker
ports:
- "10909:10909"
- "10911:10911"
environment:
- NAMESRV_ADDR=namesrv:9876
- JAVA_OPT_EXT=-Xms512m -Xmx512m
volumes:
- ./broker.conf:/home/rocketmq/rocketmq-5.3.0/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/rocketmq-5.3.0/conf/broker.conf
depends_on:
- namesrv
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
platform: linux/amd64
container_name: rmqdashboard
ports:
- "8082:8080" # 左邊為本機訪問 Port,右邊為容器內建 Port
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 -Xms256m -Xmx256m
depends_on:
- namesrv
關鍵配套:同級目錄下的 broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 127.0.0.1
啟動指令:docker-compose up -d
視覺化後台:打開瀏覽器存取 http://localhost:8082
三、 Spring Boot 實戰代碼(模式一:主動發送)
1. Maven 依賴 (pom.xml)
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
</dependency>
2. 設定檔 (application.yml)
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: default_producer_group
3. 生產者發送 API(同步發送)
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendNormalMessage(String orderId) {
rocketMQTemplate.convertAndSend("Order_Topic", orderId);
}
4. 消費者監聽器(宣告式優雅寫法)
@Component
@RocketMQMessageListener(
topic = "Order_Topic",
consumerGroup = "group_order_processor"
)
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到訂單訊息:" + message);
}
}
四、 核心實戰場景:延時訊息 (購物車超時取消)
業務場景:使用者落單後,給予 10 秒(實務上為 30 分鐘)付款時間,超時未付則取消訂單釋放庫存。
1. Producer 發送延時訊息
// 建立 Spring Message 封裝體
Message<String> msg = MessageBuilder.withPayload("ORD_999").build();
// 參數:Topic, 訊息, 發送Timeout(毫秒), 延時時間(秒)
rocketMQTemplate.syncSendDelayTimeSeconds("TestTopic", msg, 3000, 10);
2. 底層運作魔術(偷天換日大法)
當 Broker 收到這條延時 10 秒的訊息時,它不會立刻放進 TestTopic,而是:
把原本的 Topic (TestTopic) 存入訊息的隱藏屬性中。
將 Topic 強制篡改為系統內建的專用緩衝主題:SCHEDULE_TOPIC_XXX。
啟動內部 Timer,10 秒時間一到,將訊息撈出,把 Topic 還原回 TestTopic,正式推入隊列供 Consumer 消費。
五、 架構師深度思維 (Deep Dive)
1. 「圖書館書籤理論」:訊息被消費後真的消失了嗎?
答案:物理上絕對沒有消失。
CommitLog(實體書本):Broker 收到訊息後,會依序以 Append-only 方式寫入硬碟的 CommitLog 檔案中。管你多少個 Consumer 讀過,實體資料永遠貼在磁碟上(預設保留 48 小時後由系統整批清除)。
Consumer Offset(進度書籤):
當 Group_A 讀完了第 10 筆訊息,Broker 會在名冊上記下:Group_A 的 Offset = 11。
對 Group_A 來說,第 10 筆訊息「視覺上消失了」。
此時若登記一個全新的 Group_B 來聽同一個 Topic,Broker 會給它一本全新的進度名冊,Group_B 會從第 1 筆訊息重新讀起。
2. 負載均衡 (Clustering) vs. 廣播發佈 (Pub/Sub)
六、 開發避坑 Checklist
1. 消費邏輯必須具備「冪等性 (Idempotency)」:
在網路不穩時,MQ 保證的是「At least once(至少投遞一次)」,訊息極可能被重複派發。Consumer 收到訂單 ID 時,第一步永遠是查 DB 狀態:if (order.status != PENDING) return;。
2. 嚴禁在 Consumer 內開新 Thread 處理任務:
若在 onMessage() 內使用 CompletableFuture.runAsync(),主執行緒會立刻回傳成功給 Broker;萬一背景 Thread 跑到一半當機,該筆資料將永遠遺失且無法觸發 MQ 的重試機制。
3. 本地端測試先至 Dashboard 建立 Topic:
RocketMQ 5.x 預設關閉了 autoCreateTopicEnable,代碼發送前務必先去後台點選 ADD/UPDATE 建立 Topic。

Top comments (0)