聊聊 RocketMQ 4.X 知識體系
本文將帶您深入了解 RocketMQ 4.X 的核心知識體系,從架構(gòu)設(shè)計到關(guān)鍵機制,一探這款高可用消息中間件的底層邏輯。
一、整體架構(gòu)
RocketMQ 4.X 架構(gòu)中包含四種角色 :
1.NameServer
名字服務(wù)是是一個幾乎無狀態(tài)節(jié)點,可集群部署,節(jié)點之間無任何信息同步。它是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動態(tài)注冊與發(fā)現(xiàn)。
2.BrokerServer
Broker 主要負(fù)責(zé)消息的存儲、投遞和查詢以及服務(wù)高可用保證 。
3.Producer
消息發(fā)布的角色,Producer 通過 MQ 的負(fù)載均衡模塊選擇相應(yīng)的 Broker 集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。
4.Consumer
消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。
圖片
RocketMQ 集群工作流程:
1、啟動 NameServer,NameServer 起來后監(jiān)聽端口,等待 Broker、Producer 、Consumer 連上來,相當(dāng)于一個路由控制中心。
2、Broker 啟動,跟所有的 NameServer 保持長連接,定時發(fā)送心跳包。心跳包中包含當(dāng)前 Broker信息( IP+端口等 )以及存儲所有 Topic 信息。注冊成功后,NameServer 集群中就有 Topic 跟 Broker 的映射關(guān)系。
3、收發(fā)消息前,先創(chuàng)建 Topic,創(chuàng)建 Topic 時需要指定該 Topic 要存儲在哪些 Broker 上,也可以在發(fā)送消息時自動創(chuàng)建 Topic。
4、Producer 發(fā)送消息,啟動時先跟 NameServer 集群中的其中一臺建立長連接,并從 NameServer 中獲取當(dāng)前發(fā)送的 Topic 存在哪些 Broker 上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的 Broker 建立長連接從而向 Broker 發(fā)消息。
5、Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連接,獲取當(dāng)前訂閱 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立連接通道,開始消費消息。
二、發(fā)布訂閱模型
傳統(tǒng)的消息隊列 ActiveMQ 是典型的點對點模式。
圖片來自公眾號武哥漫談IT
圖片來自公眾號武哥漫談IT
- 在點對點模式中,消息發(fā)送者(生產(chǎn)者)將消息發(fā)送到一個特定的隊列,而消息接收者(消費者)從該隊列中接收消息。
- 消息在隊列中存儲,一旦一個消息被消費者接收,它就從隊列中移除,這確保了每個消息只被一個消費者處理。
- 這種模式適用于一對一的通信,其中一個生產(chǎn)者向一個特定的消費者發(fā)送消息,確保消息的可靠傳遞和處理。
RocketMQ 和 Kafka 是發(fā)布訂閱模式。
圖片來自公眾號武哥漫談IT
圖片來自公眾號武哥漫談IT
- 在發(fā)布訂閱模式中,消息發(fā)送者將消息發(fā)布到一個主題(topic),而消息訂閱者則訂閱感興趣的主題。
- 每個主題可以有多個訂閱者,因此消息會被廣播到所有訂閱了相同主題的消費者。
- 這種模式適用于一對多或多對多的通信,允許多個消費者同時接收和處理相同主題的消息。
- 發(fā)布訂閱模式通常用于構(gòu)建實時事件處理系統(tǒng)、日志處理、通知系統(tǒng)等,其中多個消費者需要訂閱相同類型的消息并進行處理。
三、通訊框架
圖片
1.通訊協(xié)議

傳輸內(nèi)容分為以下四個部分:
1)消息長度:總長度,四個字節(jié)存儲,占用一個 int 類型;
2)序列化類型 & 消息頭長度:占用一個 int 類型,第一個字節(jié)表示序列化類型,后面三個字節(jié)表示消息頭長度;
3)消息頭數(shù)據(jù):經(jīng)過序列化后的消息頭數(shù)據(jù);
4)消息主體數(shù)據(jù):消息主體的二進制字節(jié)數(shù)據(jù)內(nèi)容。
消息頭數(shù)據(jù)序列化默認(rèn)是 JSON 格式 ,示例如下:
圖片
2.Reactor 模型
Reactor 線程模型抽象出三種組件:
- Reactor(反應(yīng)器):Reactor 負(fù)責(zé)監(jiān)聽和分發(fā)事件,它是整個 Reactor 模型的調(diào)度中心。
- Acceptor(接收器):用于處理 IO 連接請求。
- Handlers(處理器):Handlers 負(fù)責(zé)具體的事件處理邏輯,即執(zhí)行與事件相關(guān)的業(yè)務(wù)操作
Remoting 通訊框架采用了典型的主從多線程模型 ,但還是有變化,即:獨立的業(yè)務(wù)線程池對應(yīng)不同的請求業(yè)務(wù)類型。
圖片
一個 Reactor 主線程 ( eventLoopGroupBoss )責(zé)監(jiān)聽 TCP網(wǎng)絡(luò)連接請求,建立好連接,創(chuàng)建 SocketChannel , 并注冊到 selector 上。
RocketMQ 源碼會自動根據(jù) OS 的類型選擇 NIO 和 Epoll ,也可以通過參數(shù)配置 ), 然后監(jiān)聽真正的網(wǎng)絡(luò)數(shù)據(jù)。
拿到網(wǎng)絡(luò)數(shù)據(jù)后,再丟給 Worker 線程池(eventLoopGroupSelector ),再真正執(zhí)行業(yè)務(wù)邏輯之前需要進行 SSL 驗證、編解碼、空閑檢查、網(wǎng)絡(luò)連接管理,這些工作都交給 defaultEventExecutorGroup 去做。
而業(yè)務(wù)操作由業(yè)務(wù)線程池中處理,根據(jù) RemotingCommand 的業(yè)務(wù)請求編號 requestCode , 從處理器表 processorTable 這個本地緩存中找到對應(yīng)的處理器 , 然后封裝成 task 任務(wù)后,提交到對應(yīng)的業(yè)務(wù)處理器的線程池執(zhí)行。
圖片
RocketMQ 的線程模型如下所示 :
線程數(shù) | 線程名 | 線程具體說明 |
1 | NettyBoss_%d | Reactor 主線程 |
N | NettyServerEPOLLSelector_%d_%d | Reactor 線程池 |
M1 | NettyServerCodecThread_%d | Worker線程池 |
M2 | RemotingExecutorThread_%d | 業(yè)務(wù) processor 處理線程池 |
三、文件存儲機制
我們先進入 broker 的文件存儲目錄 。
圖片
消息存儲和下面三個文件關(guān)系非常緊密:
- 數(shù)據(jù)文件 commitlog
消息主體以及元數(shù)據(jù)的存儲主體 ; - 消費文件 consumequeue
消息消費隊列,引入的目的主要是提高消息消費的性能 ; - 索引文件 indexfile
索引文件,提供了一種可以通過 key 或時間區(qū)間來查詢消息。
圖片
RocketMQ 采用的是混合型的存儲結(jié)構(gòu),Broker 單個實例下所有的隊列共用一個數(shù)據(jù)文件(commitlog)來存儲。
生產(chǎn)者發(fā)送消息至 Broker 端,然后 Broker 端使用同步或者異步的方式對消息刷盤持久化,保存至 commitlog 文件中。只要消息被刷盤持久化至磁盤文件 commitlog 中,那么生產(chǎn)者發(fā)送的消息就不會丟失。
Broker 端的后臺服務(wù)線程會不停地分發(fā)請求并異步構(gòu)建 consumequeue(消費文件)和 indexfile(索引文件)。
四、高性能讀寫
1.順序?qū)?/h3>
首先消息是一條一條寫入到文件,每條消息的格式是固定的,這種設(shè)計對于文件讀寫來講有兩點優(yōu)勢:
磁盤的存取速度相對內(nèi)存來講并不快,一次磁盤 IO 的耗時主要取決于:尋道時間和盤片旋轉(zhuǎn)時間,提高磁盤 IO 性能最有效的方法就是:減少隨機 IO,增加順序 IO 。
圖片
《 The Pathologies of Big Data 》這篇文章指出:內(nèi)存隨機讀寫的速度遠(yuǎn)遠(yuǎn)低于磁盤順序讀寫的速度。磁盤順序?qū)懭胨俣瓤梢赃_(dá)到幾百兆/s,而隨機寫入速度只有幾百 KB /s,相差上千倍。
因為消息是一條一條寫入到 commitlog 文件 ,寫入完成后,我們可以得到這條消息的物理偏移量。
每條消息的物理偏移量是唯一的, commitlog 文件名是遞增的,可以根據(jù)消息的物理偏移量通過二分查找,定位消息位于那個文件中,并獲取到消息實體數(shù)據(jù)。
2.內(nèi)存映射機制
mmap 是 Linux 提供的一種內(nèi)存映射文件的機制,它實現(xiàn)了將內(nèi)核中讀緩沖區(qū)地址與用戶空間緩沖區(qū)地址進行映射,從而實現(xiàn)內(nèi)核緩沖區(qū)與用戶緩沖區(qū)的共享。
圖片
基于 mmap + write 系統(tǒng)調(diào)用的零拷貝方式,整個拷貝過程會發(fā)生 4 次上下文切換,1 次 CPU 拷貝和 2 次 DMA 拷貝。
圖片
用戶程序讀寫數(shù)據(jù)的流程如下:
- 用戶進程通過 mmap() 函數(shù)向內(nèi)核發(fā)起系統(tǒng)調(diào)用,上下文從用戶態(tài)切換為內(nèi)核態(tài)。
- 將用戶進程的內(nèi)核空間的讀緩沖區(qū)與用戶空間的緩存區(qū)進行內(nèi)存地址映射。
- CPU 利用 DMA 控制器將數(shù)據(jù)從主存或硬盤拷貝到內(nèi)核空間的讀緩沖區(qū)。
- 上下文從內(nèi)核態(tài)切換回用戶態(tài),mmap 系統(tǒng)調(diào)用執(zhí)行返回。
- 用戶進程通過 write() 函數(shù)向內(nèi)核發(fā)起系統(tǒng)調(diào)用,上下文從用戶態(tài)切換為內(nèi)核態(tài)。
- CPU 將讀緩沖區(qū)中的數(shù)據(jù)拷貝到的網(wǎng)絡(luò)緩沖區(qū)。
- CPU 利用 DMA 控制器將數(shù)據(jù)從網(wǎng)絡(luò)緩沖區(qū)(socket buffer)拷貝到網(wǎng)卡進行數(shù)據(jù)傳輸。
- 上下文從內(nèi)核態(tài)切換回用戶態(tài),write 系統(tǒng)調(diào)用執(zhí)行返回。
拷貝方式 | CPU拷貝 | DMA拷貝 | 系統(tǒng)調(diào)用 | 上下文切換 |
傳統(tǒng)方式(read + write) | 2 | 2 | read / write | 4 |
內(nèi)存映射(mmap + write) | 1 | 2 | mmap / write | 4 |
sendfile | 1 | 2 | sendfile | 2 |
sendfile + DMA gather copy | 0 | 2 | sendfile | 2 |
RocketMQ 選擇了 mmap + write 這種零拷貝方式,適用于業(yè)務(wù)級消息這種小塊文件的數(shù)據(jù)持久化和傳輸;
而 Kafka 采用的是 sendfile 這種零拷貝方式,適用于系統(tǒng)日志消息這種高吞吐量的大塊文件的數(shù)據(jù)持久化和傳輸。
五、消費流程
圖片
核心流程如下:
- 消費者啟動后,觸發(fā)負(fù)載均衡服務(wù) ,負(fù)載均衡服務(wù)為消費者實例分配對應(yīng)的隊列 ;
- 分配完隊列后,負(fù)載均衡服務(wù)會為每個分配的新隊列創(chuàng)建一個消息拉取請求
pullRequest, 拉取請求保存一個處理隊列processQueue,內(nèi)部是紅黑樹(TreeMap),用來保存拉取到的消息 ; - 拉取消息服務(wù)單線程從拉取請求隊列
pullRequestQueue中彈出拉取消息,執(zhí)行拉取任務(wù) ,拉取請求是異步回調(diào)模式,將拉取到的消息放入到處理隊列; - 拉取請求在一次拉取消息完成之后會復(fù)用,重新被放入拉取請求隊列
pullRequestQueue中 ; - 拉取完成后,調(diào)用消費消息服務(wù)
consumeMessageService的submitConsumeRequest方法 ,消費消息服務(wù)內(nèi)部有一個消費線程池; - 消費線程池的消費線程從消費任務(wù)隊列中獲取消費請求,執(zhí)行消費監(jiān)聽器
listener.consumeMessage; - 消費完成后,若消費成功,則更新偏移量
updateOffset,先更新到內(nèi)存offsetTable,定時上報到 Broker ;若消費失敗,則將失敗消費發(fā)送到 Broker 。 - Broker 端接收到請求后, 調(diào)用消費進度管理器的
commitOffset方法修改內(nèi)存的消費進度,定時刷盤到consumerOffset.json。
六、傳統(tǒng)部署模式
1.雙 Master 模式
所有節(jié)點都是 master 主節(jié)點(比如 2 個或 3 個主節(jié)點),沒有 slave 從節(jié)點的模式。
圖片
該模式的優(yōu)缺點如下:
- 優(yōu)點
配置簡單 , 性能極高。一個 master 節(jié)點的宕機或者重啟(維護)對應(yīng)用程序沒有影響。
當(dāng)磁盤配置為 RAID10 時,消息不會丟失,因為 RAID10 磁盤非常可靠,即使機器不可恢復(fù)(消息異步刷盤模式的情況下,會丟失少量消息;如果消息是同步刷盤模式,不會丟失任何消息)。
- 缺點
單臺機器宕機時,本機未消費的消息,直到機器恢復(fù)后才會訂閱,影響消息實時性。
2.多 Master 多 Slave(異步)
每個主節(jié)點配置多個從節(jié)點,多對主從。HA 采用異步復(fù)制,主節(jié)點和從節(jié)點之間有短消息延遲(毫秒)。
圖片
所謂異步復(fù)制,是指消息發(fā)送到的 master 后直接返回,不必等待主從復(fù)制,而是內(nèi)部通過異步的方式進行復(fù)制。
圖片
這種模式的優(yōu)缺點如下:
- 優(yōu)點
即使磁盤損壞,也只會丟失極少的消息,不影響消息的實時性能。
同時,當(dāng)主節(jié)點宕機時,消費者仍然可以消費從節(jié)點的消息,這個過程對應(yīng)用本身是透明的,不需要人為干預(yù)。
性能幾乎與多 Master 模式一樣高。
- 缺點:
主節(jié)點宕機、磁盤損壞時,會丟失少量消息。
3.多 Master 多 Slave (同步)
每個 master 節(jié)點配置多個 slave 節(jié)點,有多對 Master-Slave 。
HA 采用同步雙寫,即只有消息成功寫入到主節(jié)點并復(fù)制到多個從節(jié)點,才會返回成功響應(yīng)給應(yīng)用程序。
圖片
異步復(fù)制指 producer 發(fā)送一條消息給 broker 的主節(jié)點,只有主節(jié)點將數(shù)據(jù)同步到從節(jié)點才會返回結(jié)果。
圖片
這種模式的優(yōu)缺點如下:
- 優(yōu)點
數(shù)據(jù)和服務(wù)都沒有單點故障。在 master 節(jié)點關(guān)閉的情況下,消息也沒有延遲。同時服務(wù)可用性和數(shù)據(jù)可用性非常高。
- 缺點:
這種模式下的性能略低于異步復(fù)制模式(大約低 10%)。發(fā)送單條消息的 RT 略高,目前版本,master 節(jié)點宕機后,slave 節(jié)點無法自動切換到 master 。
七、Deleger 集群部署
在 RocketMQ 4.5 版本之前,RocketMQ 只有一種 Master/Slave 的部署方式。在這種模式下,一組 broker 包含一個 Master 和零到多個 Slave,Slave 通過同步或異步復(fù)制的方式與 Master 保持?jǐn)?shù)據(jù)一致。
但這種部署模式提供了一定程度的高可用性,但也存在一些缺陷。例如,在故障轉(zhuǎn)移方面,如果主節(jié)點發(fā)生故障,仍然需要手動重啟或切換,無法自動將一個從節(jié)點轉(zhuǎn)換為主節(jié)點。
因此,核心問題是:多副本架構(gòu)需要解決自動故障轉(zhuǎn)移的問題,也就是自動選主。
這個問題的解決方案基本可以分為兩種:
1、第三方協(xié)調(diào)服務(wù)
我們利用第三方協(xié)調(diào)服務(wù)集群(如 Zookeeper 或 etcd)進行選主,但這樣會引入額外的外部組件,增加了部署、運維和故障診斷的成本,我們不僅需要維護 RocketMQ 集群,還需要維護 Zookeeper 集群。
所以,我們看到 Kafka 的新版本已經(jīng)擯棄了 Zookeeper 而是選擇了第二種方案。
2、不需要引入外部組件,使用 Raft 協(xié)議進行自動選主
自動選主邏輯集成在各個節(jié)點的進程中,節(jié)點之間通過通信即可完成選主。
因此,最終選擇 Raft 協(xié)議來解決這個問題,而 DLedger 就是基于 Raft 協(xié)議的 commitlog 存儲庫,是 RocketMQ 實現(xiàn)新的高可用多副本架構(gòu)的關(guān)鍵。
圖片
如圖,我們定義了兩個 DLedger Group ,分別是:RaftNode00 和 RaftNode01。
每個 DLedger Group 要求包含 至少 3 臺機器 部署,每臺機器部署 Broker 服務(wù) , 機器數(shù)量為奇數(shù)。
通過 Raft 自動選舉出一個 Leader,其余節(jié)點作為 Follower,并在 Leader 和 Follower 之間復(fù)制數(shù)據(jù)以保證高可用。
RocketMQ 的 DLedger 模式能自動容災(zāi)切換,并保證數(shù)據(jù)一致,同時支持水平擴展的,即:部署任意多個 RocketMQ Group 同時對外提供服務(wù)。
圖片
八、事務(wù)消息
RocketMQ 事務(wù)消息是支持在分布式場景下保障消息生產(chǎn)和本地事務(wù)的最終一致性。交互流程如下圖所示:
圖片
1、生產(chǎn)者將消息發(fā)送至 Broker 。
2、Broker 將消息持久化成功之后,向生產(chǎn)者返回 Ack 確認(rèn)消息已經(jīng)發(fā)送成功,此時消息被標(biāo)記為"暫不能投遞",這種狀態(tài)下的消息即為半事務(wù)消息。
3、生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。
4、生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果( Commit 或是 Rollback ),Broker 收到確認(rèn)結(jié)果后處理邏輯如下:
- 二次確認(rèn)結(jié)果為 Commit :Broker 將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費者。
- 二次確認(rèn)結(jié)果為 Rollback :Broker 將回滾事務(wù),不會將半事務(wù)消息投遞給消費者。
5、在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若 Broker 未收到發(fā)送者提交的二次確認(rèn)結(jié)果,或 Broker 收到的二次確認(rèn)結(jié)果為 Unknown 未知狀態(tài),經(jīng)過固定時間后,服務(wù)端將對消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實例發(fā)起消息回查。
- 生產(chǎn)者收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對半事務(wù)消息進行處理。
九、廣播消息
當(dāng)使用 RocketMQ 廣播消費模式時,每條消息推送給集群內(nèi)所有的消費者,保證消息至少被每個消費者消費一次。
圖片
廣播消費主要用于兩種場景:消息推送和緩存同步。
1.消息推送
筆者第一次接觸廣播消費的業(yè)務(wù)場景是神州專車司機端的消息推送。
用戶下單之后,訂單系統(tǒng)生成專車訂單,派單系統(tǒng)會根據(jù)相關(guān)算法將訂單派給某司機,司機端就會收到派單推送。
圖片
推送服務(wù)是一個 TCP 服務(wù)(自定義協(xié)議),同時也是一個消費者服務(wù),消息模式是廣播消費。
司機打開司機端 APP 后,APP 會通過負(fù)載均衡和推送服務(wù)創(chuàng)建長連接,推送服務(wù)會保存 TCP 連接引用 (比如司機編號和 TCP channel 的引用)。
派單服務(wù)是生產(chǎn)者,將派單數(shù)據(jù)發(fā)送到 MetaQ , 每個推送服務(wù)都會消費到該消息,推送服務(wù)判斷本地內(nèi)存中是否存在該司機的 TCP channel , 若存在,則通過 TCP 連接將數(shù)據(jù)推送給司機端。
肯定有同學(xué)會問:假如網(wǎng)絡(luò)原因,推送失敗怎么處理 ?有兩個要點:
- 司機端 APP 定時主動拉取派單信息;
- 當(dāng)推送服務(wù)沒有收到司機端的 ACK 時 ,也會一定時限內(nèi)再次推送,達(dá)到閾值后,不再推送。
2.緩存同步
高并發(fā)場景下,很多應(yīng)用使用本地緩存,提升系統(tǒng)性能 。
圖片
如上圖,應(yīng)用A啟動后,作為一個 RocketMQ 消費者,消息模式設(shè)置為廣播消費。為了提升接口性能,每個應(yīng)用節(jié)點都會將字典表加載到本地緩存里。
當(dāng)字典表數(shù)據(jù)變更時,可以通過業(yè)務(wù)系統(tǒng)發(fā)送一條消息到 RocketMQ ,每個應(yīng)用節(jié)點都會消費消息,刷新本地緩存。
十、順序消息
順序消息是指對于一個指定的 Topic ,消息嚴(yán)格按照先進先出(FIFO)的原則進行消息發(fā)布和消費,即先發(fā)布的消息先消費,后發(fā)布的消息后消費。
順序消息分為分區(qū)順序消息和全局順序消息。
1.分區(qū)順序消息
對于指定的一個 Topic ,所有消息根據(jù) Sharding Key 進行區(qū)塊分區(qū),同一個分區(qū)內(nèi)的消息按照嚴(yán)格的先進先出(FIFO)原則進行發(fā)布和消費。同一分區(qū)內(nèi)的消息保證順序,不同分區(qū)之間的消息順序不做要求。
- 適用場景:適用于性能要求高,以 Sharding Key 作為分區(qū)字段,在同一個區(qū)塊中嚴(yán)格地按照先進先出(FIFO)原則進行消息發(fā)布和消費的場景。
- 示例:電商的訂單創(chuàng)建,以訂單 ID 作為 Sharding Key ,那么同一個訂單相關(guān)的創(chuàng)建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照發(fā)布的先后順序來消費。
2.全局順序消息
對于指定的一個 Topic ,所有消息按照嚴(yán)格的先入先出(FIFO)的順序來發(fā)布和消費。
- 適用場景:適用于性能要求不高,所有的消息嚴(yán)格按照 FIFO 原則來發(fā)布和消費的場景。
- 示例:在證券處理中,以人民幣兌換美元為 Topic,在價格相同的情況下,先出價者優(yōu)先處理,則可以按照 FIFO 的方式發(fā)布和消費全局順序消息。
全局順序消息實際上是一種特殊的分區(qū)順序消息,即 Topic 中只有一個分區(qū),因此全局順序和分區(qū)順序的實現(xiàn)原理相同。
因為分區(qū)順序消息有多個分區(qū),所以分區(qū)順序消息比全局順序消息的并發(fā)度和性能更高。
圖片
消息的順序需要由兩個階段保證:
- 消息發(fā)送如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的消息產(chǎn)生的順序,業(yè)務(wù)上要求同一訂單的消息保持順序,例如訂單 A 的消息發(fā)送和消費都按照 A1、A2、A3 的順序。如果是普通消息,訂單A 的消息可能會被輪詢發(fā)送到不同的隊列中,不同隊列的消息將無法保持順序,而順序消息發(fā)送時 RocketMQ 支持將 Sharding Key 相同(例如同一訂單號)的消息序路由到同一個隊列中。下圖是生產(chǎn)者發(fā)送順序消息的封裝,原理是發(fā)送消息時,實現(xiàn) MessageQueueSelector 接口, 根據(jù) Sharding Key 使用 Hash 取模法來選擇待發(fā)送的隊列。
生產(chǎn)者順序發(fā)送消息封裝
- 生產(chǎn)者順序發(fā)送消息封裝
- 消息消費
消費者消費消息時,需要保證單線程消費每個隊列的消息數(shù)據(jù),從而實現(xiàn)消費順序和發(fā)布順序的一致。
順序消費服務(wù)的類是 ConsumeMessageOrderlyService ,在負(fù)載均衡階段,并發(fā)消費和順序消費并沒有什么大的差別。
最大的差別在于:順序消費會向 Borker 申請鎖 。消費者根據(jù)分配的隊列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取消息,如果失敗,則定時任務(wù)每隔20秒會重新嘗試。
圖片
十一、架構(gòu)缺點
RocketMQ 包含兩種部署架構(gòu): Master-Slave 架構(gòu) 和 Deleger 架構(gòu) 。
圖片
首先是 Master-Slave 架構(gòu),它的問題很明顯,由于組內(nèi)沒有 failover 能力,所以
- Master 故障后,故障組的消息發(fā)送將會中斷。雖然客戶端可以向其他 Master 進行發(fā)送,但Topic整體可寫入分區(qū)數(shù)將減少并短時間內(nèi)無法恢復(fù),這會影響對分區(qū)敏感的業(yè)務(wù),比如順序消息或者流計算應(yīng)用。
- Master 故障后,一些僅限于在Master上進行的操作將無法進行,這里包括一些順序消息的上鎖,管控中searchOffset、maxOffset、minOffset等操作,會影響到順序消息的消費以及一些管控操作。
- Master故障后,故障Broker組上的二級消息消費將會中斷,二級消息特點是它可以分為兩個階段,第一階段是把消息發(fā)送到CommitLog上的特殊Topic,第二階段是將滿足要求的消息還原投放回CommitLog。比如延遲消息,第一階段是投放到名為SCHEDULE_TOPIC_XXXX的Topic上,等掃描線程發(fā)現(xiàn)消息到期后再還原成原來的Topic重新投遞,這樣它就能被下游消費到。
但如果Master Broker下線,掃描和重投放都會停止,因此會出現(xiàn)二級消息的消費延遲或丟失,具體會影響到延遲消息、事務(wù)消息等二級消息。
然后是 Deleger 架構(gòu) ,通過 Master 故障后短時間內(nèi)重新選出新的 Master 來解決上述問題,但是由于 Raft 選主和復(fù)制能力在復(fù)制鏈路上,因此存在以下問題:
- Broker 組內(nèi)的副本數(shù)必須是 3副本 及以上才有切換能力,因此成本是有上升的。
- Raft 多數(shù)派限制導(dǎo)致三副本副本必須兩副本響應(yīng)才能返回,五副本需要三副本才能返回,因此ACK是不夠靈活的,這也導(dǎo)致發(fā)送延遲和副本冗余間沒有一個很好的可協(xié)商的方案。
- 由于存儲復(fù)制鏈路用的是 OpenMessaging DLedger庫,導(dǎo)致 RocketMQ 原生的一些存儲能力沒辦法利用,包括像 TransientPool、零拷貝的能力,如果要在Raft模式下使用的話,就需要移植一遍到DLedger庫,開發(fā)特性以及bug修復(fù)也需要做兩次,這樣的維護和開發(fā)成本是非常高的。
同時,我們提到了 RocketMQ 4.X 的消費流程,它的消費邏輯有兩個非常明顯的特點:
- 客戶端代碼邏輯較重。假如要支持一種新的編程語言,那么客戶端就必須實現(xiàn)完整的負(fù)載均衡邏輯,此外還需要實現(xiàn)拉消息、位點管理、消費失敗后將消息發(fā)回 Broker 重試等邏輯。這給多語言客戶端的支持造成很大的阻礙。
- 保證冪等非常重要。當(dāng)客戶端升級或者下線時,或者 Broker 宕機,都要進行負(fù)載均衡操作,可能造成消息堆積,同時有一定幾率造成重復(fù)消費。
RocketMQ 5.0 引入了全新的彈性無狀態(tài)代理模式,將當(dāng)前的Broker職責(zé)進行拆分,對于客戶端協(xié)議適配、權(quán)限管理、消費管理等計算邏輯進行抽離,獨立無狀態(tài)的代理角色提供服務(wù),Broker則繼續(xù)專注于存儲能力的持續(xù)優(yōu)化。這套模式可以更好地實現(xiàn)在云環(huán)境的資源彈性調(diào)度。 值得注意的是RocketMQ 5.0的全新模式是和4.0的極簡架構(gòu)模式相容相通的,5.0的代理架構(gòu)完全可以以Local模式運行,實現(xiàn)與4.0架構(gòu)完全一致的效果。開發(fā)者可以根據(jù)自身的業(yè)務(wù)場景自由選擇架構(gòu)部署。
圖片























