RocketMQ延時(shí)消息解析!你學(xué)會(huì)了嗎?
什么是延時(shí)消息?
指的是當(dāng)消息寫入到Broker后,不能立刻被消費(fèi)者消費(fèi),需要等待指定的時(shí)長后才可被消費(fèi)處理的消息。
延時(shí)消息等級(jí)
RocketMQ延時(shí)消息的延遲時(shí)長不支持隨意時(shí)長的延遲。
- 是通過特定的延遲等級(jí)來指定的。
默認(rèn)支持18個(gè)等級(jí)的延遲消息。
延時(shí)等級(jí)定義在RocketMQ服務(wù)端的MessageStoreConfig類中的如下變量中:
// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";level 有以下三種情況:
- level == 0,消息為非延遲消息。
- 1<=level<=maxLevel,消息延遲特定時(shí)間,例如:level==1,延遲1s。
- level > maxLevel,則level== maxLevel,例如level==20,延遲2h。
發(fā)消息時(shí),設(shè)置delayLevel等級(jí)即可:msg.setDelayLevel(level)。
例如指定的延時(shí)等級(jí)為3,則表示延遲時(shí)長為10s,延遲等級(jí)是從1開始計(jì)數(shù)的。
使用場景
1、電商交易系統(tǒng)的訂單超時(shí)未支付,自動(dòng)取消訂單。
2、超時(shí)自動(dòng)審批,系統(tǒng)審批流程可以設(shè)置為超過設(shè)定時(shí)間后自動(dòng)執(zhí)行通過或者拒絕流程。
3、限時(shí)優(yōu)惠活動(dòng),商品需要促銷,在活動(dòng)開始時(shí),發(fā)送一個(gè)兩小時(shí)后觸發(fā)的定時(shí)消息,用于在活動(dòng)結(jié)束時(shí)恢復(fù)原價(jià)。
為什么不支持任意時(shí)間
按照《RocketMQ Developer Guide》中的說法:
- 如果提供任意時(shí)間,就會(huì)涉及到消息的排序,會(huì)有一定的性能損耗。
而RocketMQ這種利用固定延遲級(jí)別到單個(gè)隊(duì)列的實(shí)現(xiàn)方式是一種妥協(xié),靈活性和極致性能的妥協(xié)。
延遲消息與消費(fèi)重試的關(guān)系
消息重試的16個(gè)級(jí)別,實(shí)際上是把延遲消息18個(gè)級(jí)別的前兩個(gè)Level去掉了。
事實(shí)上,RocketMQ的消息重試也是基于延遲消息來完成的。
- 在消息消費(fèi)失敗的情況下,將其重新當(dāng)做延遲消息投遞回Broker。
在投遞回去時(shí),會(huì)跳過前兩個(gè)Level,因此只重試16次。
詳細(xì)內(nèi)容可以看我之前的文章?。?!
實(shí)現(xiàn)原理
RocketMQ延時(shí)消息會(huì)暫存在名為SCHEDULE_TOPIC_XXXX的Topic中。
- 并根據(jù)delayTimeLevel存入特定的Queue。
queueId = delayTimeLevel – 1:即一個(gè)Queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費(fèi)。
Broker會(huì)調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫入真實(shí)的Topic。
圖片
主要步驟:
圖片
修改消息Topic名稱和隊(duì)列信息:
RocketMQ Broker端在存儲(chǔ)生產(chǎn)者寫入的消息時(shí),首先都會(huì)將其寫入到CommitLog中。
之后根據(jù)消息中的Topic信息和隊(duì)列信息,將其轉(zhuǎn)發(fā)到目標(biāo)Topic的指定隊(duì)列(ConsumeQueue)中。
由于消息一旦存儲(chǔ)到ConsumeQueue中,消費(fèi)者就能消費(fèi)到,而延遲消息不能被立即消費(fèi)。
所以將Topic的名稱修改為SCHEDULE_TOPIC_XXXX,并根據(jù)延遲級(jí)別確定要投遞到哪個(gè)隊(duì)列下。
- 同時(shí),還會(huì)將消息原來要發(fā)送到的目標(biāo)Topic和隊(duì)列信息存儲(chǔ)到消息的屬性中。
轉(zhuǎn)發(fā)消息到延遲主題SCHEDULE_TOPIC_XXXX的CosumeQueue中:
CommitLog中的消息轉(zhuǎn)發(fā)到CosumeQueue中是異步進(jìn)行的。
在轉(zhuǎn)發(fā)過程中,會(huì)對(duì)延遲消息進(jìn)行特殊處理,主要是計(jì)算這條延遲消息需要在什么時(shí)候進(jìn)行投遞。
- 投遞時(shí)間 = 消息存儲(chǔ)時(shí)間(StoreTimestamp) + 延遲級(jí)別對(duì)應(yīng)的時(shí)間。
圖片
延遲服務(wù)消費(fèi)SCHEDULE_TOPIC_XXXX消息:
Broker內(nèi)部有一個(gè)ScheduleMessageService類,其充當(dāng)延遲服務(wù)。
- 主要是消費(fèi)SCHEDULE_TOPIC_XXXX中的消息,并投遞到目標(biāo)Topic中。
ScheduleMessageService在啟動(dòng)時(shí),其會(huì)創(chuàng)建一個(gè)定時(shí)器Timer,并根據(jù)延遲級(jí)別的個(gè)數(shù),啟動(dòng)對(duì)應(yīng)數(shù)量的TimerTask。
- 每個(gè)TimerTask負(fù)責(zé)一個(gè)延遲級(jí)別的消費(fèi)與投遞。
如果可以投放,則在投放到原本的目的Topic。
每隔100ms,從TopicSCHEDULE_TOPIC_XXXX判斷18個(gè)隊(duì)列里的第一個(gè)消息是否可以被投放。
需要注意
每個(gè)TimeTask在檢查消息是否到期時(shí):
- 首先檢查對(duì)應(yīng)隊(duì)列中尚未投遞第一條消息。
- 如果這條消息沒到期,那么之后的消息都不會(huì)檢查。
- 如果到期了,則進(jìn)行投遞,并檢查之后的消息是否到期。
圖片
圖片
將信息重新存儲(chǔ)到CommitLog中:
在將消息到期后,需要投遞到目標(biāo)Topic。
由于在第一步已經(jīng)記錄了原來的Topic和隊(duì)列信息,因此這里重新設(shè)置,再存儲(chǔ)到CommitLog即可。
將消息投遞到目標(biāo)Topic中:
由于消息
的Topic名稱已經(jīng)改為了目標(biāo)Topic。
因此消息會(huì)直接投遞到目標(biāo)Topic的ConsumeQueue中,之后消費(fèi)者即消費(fèi)到這條消息。
消費(fèi)者消費(fèi)目標(biāo)Topic中的數(shù)據(jù)。






































