RocketMQ中各類重復(fù)消費的原理淺析
Labs 導(dǎo)讀
隨著大數(shù)據(jù)和云計算時代的到來,我國的各個產(chǎn)業(yè)每天都在產(chǎn)生不可估計的數(shù)據(jù),以及對數(shù)據(jù)的各式各樣的需求,消息中間件在處理數(shù)據(jù)、消費數(shù)據(jù)的過程中越來越受到重視。在高并發(fā)、微服務(wù)、分布式的場景下,如何合理地利用消息中間件,如何保證MQ消費消息的冪等性?所謂知其然,才能知其所以然。
Part 01、 RocketMQ如何生產(chǎn)和消費消息
先簡單介紹下RocketMQ正常生產(chǎn)消息和消費消息的流程,如下圖。
1.生產(chǎn)者在發(fā)送消息之前根據(jù)負載均衡策略(默認是輪詢)選擇一個Queue,然后跟這個Queue所在的機器建立連接,把消息發(fā)送到這個Queue上。
2.消費者消費這個Queue,就能獲取到對應(yīng)的消息。
圖片
- 問題出現(xiàn)
當異常情況出現(xiàn)時,如消息發(fā)送超時或者消息消費超時,RocketMQ為保證消息發(fā)送成功,會啟動重試機制,選擇另一臺機器的Queue重發(fā)?,F(xiàn)在假設(shè)有這樣一種情況,消費者實際正確接收到了消息,只是由于網(wǎng)絡(luò)波動導(dǎo)致響應(yīng)超時了,那就會出現(xiàn)消息重復(fù)發(fā)送,導(dǎo)致消費者重復(fù)消費的情況出現(xiàn)。
那除此之外,還有沒有其他情況會導(dǎo)致消息重復(fù)消費的情況呢?總結(jié)起來一共有如下幾種情況。
1)消息發(fā)送異常時的重復(fù)消費
2)消費消息時拋出了異常
3)消費者提交offset失敗
4)Broker持久化offset失敗
5)主從同步失敗
6)重平衡
7)清理長時間消費的消息
Part 02、 淺析各類情況
- 消費消息時拋出異常
- 問題分析一
RocketMQ在并發(fā)消費的模式下會調(diào)用MessageListenerConcurrently的consumeMessage方法,入?yún)⑹莔sgs集合。當調(diào)用該方法消費消息出現(xiàn)異常時,返回的結(jié)果status就會是null。這種情況下會導(dǎo)致status被設(shè)置為RECONSUME_LATER,也就是說消息之后會被重復(fù)消費。
- 問題分析二
傳入的是msgs集合。上述原因一中消息處理之后,不管成功失敗,都會對結(jié)果進行處理。而集合中的任意一個失敗,都會導(dǎo)致status被設(shè)置為RECONSUME_LATER。在對結(jié)果處理是,判斷到RECONSUME_LATER時,就會對msgs重新遍歷并發(fā)送消息,重新消費,從而導(dǎo)致之前成功處理的消息都會被重復(fù)消費。不過好在msgs消息的數(shù)量默認情況下是1。
圖片
- 消費者提交offset失敗
- 何為offset
producer發(fā)送消息到broker,Rocketmq會將消息的內(nèi)容持久化到commitLog文件中,再分發(fā)到topic下的消費隊列consume Queue,消費者提交消費請求時,broker從該consumer負責(zé)的消費隊列中根據(jù)請求參數(shù)傳入的起始offset來獲取需要消費的消息索引信息,再從commitLog中獲取具體的消息內(nèi)容返回給consumer。消費成功之后,消費者提交offset,來記錄這個queue消費到哪個位置了。
- 問題分析
RocketMq設(shè)計的時候,消費完消息,并不是同步提交offset,而是將offset保存到內(nèi)存中,通過一個定時任務(wù)(默認是5S一次),以網(wǎng)絡(luò)請求的方式將offset提交給broker。如果最新的offset還沒提交,此時服務(wù)器宕機了,那么重啟之后,就會從broker中讀取到之前的提交的offset,并從此處開始消費,此時就會出現(xiàn)重復(fù)消費的情況了。
圖片
- broker持久化offset失敗
- 問題分析
與消費者提交offset同理,Broker為了防止數(shù)據(jù)丟失,會將offset持久化到磁盤中。同樣的也是通過一個默認5S的定時任務(wù)來處理持久化操作。所以offset的完整過程就如下圖。當broker宕機時,就會導(dǎo)致offset丟失,此時如果消費者重新拉取消費進度,就會比實際消費的進度要低,導(dǎo)致重復(fù)消費。
圖片
- 主從同步失敗
- 問題分析
為保證RocketMQ服務(wù)的高可用,一般項目中都會啟用主從備份的模式,當主節(jié)點掛掉之后,從節(jié)點就會升級為主節(jié)點對外提供服務(wù)。因此就需要進行主從同步,保證數(shù)據(jù)的一致性。默認情況下每隔10S,從節(jié)點會向主節(jié)點請求,同步元數(shù)據(jù),包括消費進度。此時如果主節(jié)點宕機了,從節(jié)點就無法獲取到10S之內(nèi)的消費進度,自然也就會導(dǎo)致重復(fù)消費。
圖片
- 重平衡
- 何為重平衡
RocketMQ的消費者有兩種模式,集群消費模式和廣播消費模式,絕大多數(shù)場景采用的都是集群消費模式。前面提到的消費進度就是在集群消費模式下才會存在。集群消費模式中有一個消費組的概念。一個消費組可以有多個消費者,不同消費組之間消費消息互不干擾,而同一消費組的消費者按照一定的算法分配消息隊列進行消息消費,保證一個消息只能被一個消費組消費一次。當消費組中的消費組增加或者減少時就會觸發(fā)重平衡。如圖,原先消費組中有兩個消費者,平均消費4個隊列,每個消費組2個隊列;當加入了一個新的消費者時,為了保證新的消費者能夠消費消息,就會進行重平衡,重新分配消息隊列。
圖片
- 問題分析
假設(shè)在重平衡發(fā)生時,此時消費者2還在正常消費Queue4,當消費者3加入,重平衡完成時,此時消費者2判斷到Queue4已經(jīng)不屬于自己消費了,就會將Queue4設(shè)置為dropped,消費完成時,發(fā)現(xiàn)隊列是dropped狀態(tài),那么消費者2的消費進度offset就不會被提交。成功消費了消息,但是消費進度卻沒有被提交,于是當消費者3開始消費消息時,就會從服務(wù)端拉取到之前的消費進度,造成隊列4的消息被重復(fù)消費。
- 清理長時間消費的消息
- 清理機制講解
RocketMQ中有一個機制會定時清理長時間正在消費的消息,默認是15分鐘執(zhí)行一次清理任務(wù)。之所以這么做,是有原因的。我們說過,消息被消費之后,就會提交offset。當一個線程消費了所有消息時,就會把消息從集合中移除,提交的消息進度offset就是msg5的offset+1。
假設(shè),現(xiàn)在是兩個線程消費,線程2消費完成,之后提交offset,但是此時線程1還在處理前兩條消息,因此為了保證消費消息的不丟失,移除之后發(fā)現(xiàn)集合中還有剩余消息,就會把msg1的offset 返回提交上去。而一旦集合最前面的消息長時間處理,就會導(dǎo)致這個消費進度一直在最前面。此時如果服務(wù)器重啟,就會導(dǎo)致很多消費過的消息都會被重復(fù)消費。因此引入了清理長時間消費的機制。
圖片
- 問題分析
引入清理長時間消費的消息機制后,一旦發(fā)現(xiàn)某個消息已經(jīng)處理超過15分鐘了,就會將消息移除,保障后續(xù)消息消費進度的正常提交,之后會隔一定的時間再次消費這個被移除的消息。但是,這個消息雖然被移除了,卻并不是沒有消費過,因此再次消費就會導(dǎo)致重復(fù)消費的問題出現(xiàn)。
Part 03、 總結(jié)
RocketMq的官方文檔中對消息傳遞有這樣的解釋:RocketMq確保所有消息至少被傳遞一次,在大多數(shù)情況下,消息不會重復(fù)??梢奟ocketMq為了保證消息的不丟失,犧牲了消息投遞的重復(fù)率。因此我們在使用RokcetMq時需要合理使用它的特點,設(shè)計合理的冪等技術(shù)方案來解決重復(fù)消費的問題。