談一談消息隊(duì)列
本文轉(zhuǎn)載自微信公眾號(hào)「SH的全棧筆記」,作者SH。轉(zhuǎn)載本文請(qǐng)聯(lián)系SH的全棧筆記公眾號(hào)。
本篇文章聊聊消息隊(duì)列相關(guān)的東西,內(nèi)容局限于我們?yōu)槭裁匆孟㈥?duì)列,消息隊(duì)列究竟解決了什么問(wèn)題,消息隊(duì)列的選型。
為了更容易的理解消息隊(duì)列,我們首先通過(guò)一個(gè)開(kāi)發(fā)場(chǎng)景來(lái)切入。
不使用消息隊(duì)列的場(chǎng)景
首先,我們假設(shè)A同學(xué)負(fù)責(zé)訂單系統(tǒng)的開(kāi)發(fā),B、C同學(xué)負(fù)責(zé)開(kāi)發(fā)積分系統(tǒng)、倉(cāng)儲(chǔ)系統(tǒng)。我們知道,在一般的購(gòu)物電商平臺(tái)上,我們下單完成后,積分系統(tǒng)會(huì)給下單的用戶增加積分,然后倉(cāng)儲(chǔ)系統(tǒng)會(huì)按照下單時(shí)填寫的信息,發(fā)出用戶購(gòu)買的商品。
那問(wèn)題來(lái)了,積分系統(tǒng)、倉(cāng)儲(chǔ)系統(tǒng)如何感知到用戶的下單操作?
你可能會(huì)說(shuō),當(dāng)然是訂單系統(tǒng)在創(chuàng)建完訂單之后調(diào)用積分系統(tǒng)、倉(cāng)儲(chǔ)系統(tǒng)的接口了
OK,直接調(diào)用接口的方式在目前來(lái)看沒(méi)有什么問(wèn)題。于是B、C同學(xué)就找到A同學(xué),說(shuō)讓他在訂單完成后,調(diào)用一下他們的接口來(lái)通知一下積分系統(tǒng)和倉(cāng)儲(chǔ)系統(tǒng),來(lái)給用戶增加積分、發(fā)貨。A同學(xué)想著,就這兩個(gè)系統(tǒng),應(yīng)該還好,OK我給你加了。
但是隨著系統(tǒng)的迭代,需要感知訂單操作的系統(tǒng)也越來(lái)越多,從之前的積分系統(tǒng)、倉(cāng)儲(chǔ)系統(tǒng)2個(gè)系統(tǒng),擴(kuò)充到了5個(gè)。每個(gè)系統(tǒng)的負(fù)責(zé)同學(xué)都需要去找A同學(xué),讓他人肉的把對(duì)應(yīng)系統(tǒng)的通知接口加上。然后就因?yàn)榧恿诉@一個(gè)接口,又需要把訂單重新發(fā)布一遍。
這對(duì)A同學(xué)來(lái)說(shuō)實(shí)際上是很痛苦的一件事情,因?yàn)锳同學(xué)有自己的任務(wù)、排期,一有新系統(tǒng)就需要去添加通知接口,發(fā)布服務(wù),會(huì)打亂A的開(kāi)發(fā)計(jì)劃,增加開(kāi)發(fā)量。同時(shí)還需要去梳理在開(kāi)發(fā)期間,新增的代碼到底能不能夠上線。一旦不能上線,但是又沒(méi)有檢查到,上線就直接炸了
而且,如果5個(gè)系統(tǒng)如果有哪個(gè)需要額外的字段,或者是更新了接口什么的,都需要麻煩A同學(xué)修改。5個(gè)系統(tǒng)就這樣跟A系統(tǒng)強(qiáng)耦合在了一起。
除此之外,整個(gè)創(chuàng)建訂單的調(diào)用鏈因?yàn)橥秸{(diào)用這5個(gè)系統(tǒng)的通知接口而加長(zhǎng),這減慢了接口的響應(yīng)速度,降低了用戶側(cè)的購(gòu)物、下單體驗(yàn)。前面的至少影響的還是內(nèi)部的員工,但是現(xiàn)在直接是影響到了用戶,明顯是不可取的方案。
可以看到,整個(gè)的調(diào)用鏈路加長(zhǎng)了,更別提,在同步調(diào)用中,如果其余的系統(tǒng)發(fā)生了錯(cuò)誤,或者是調(diào)用其他系統(tǒng)的時(shí)候出現(xiàn)了網(wǎng)絡(luò)抖動(dòng),核心的下單流程就會(huì)被阻塞住,甚至?xí)谙聠蔚慕缑嫣崾咎崾居脩舫鲥e(cuò),整個(gè)的購(gòu)物體驗(yàn)又被拉低了一個(gè)檔次。更何況,在實(shí)際的業(yè)務(wù)中,調(diào)用鏈比這個(gè)長(zhǎng)的多。
可能有人會(huì)說(shuō)了, 這不就是個(gè)同步調(diào)用問(wèn)題嘛?訂單系統(tǒng)的核心邏輯,我還是采用同步來(lái)處理,但是后續(xù)的通知我采用異步的方式,用線程池去處理,這樣調(diào)用鏈路不就恢復(fù)正常了?
就單純對(duì)于減少鏈路來(lái)說(shuō),的確可行。但是如果某一個(gè)流程失敗了呢?難道失敗就失敗了嗎?我下單成功了不漲積分?該給我發(fā)的貨甚至沒(méi)有發(fā)貨?這合理嗎?
同時(shí),失敗了訂單系統(tǒng)是不是要去處理呢?否則因?yàn)槠渌南到y(tǒng)拉垮了整個(gè)主流程,誰(shuí)還來(lái)你這買東西呢?
那有什么辦法,既能夠減少調(diào)用的鏈路,又能夠在發(fā)生錯(cuò)誤的時(shí)候重試呢?歸根結(jié)底,核心思想就是像增加積分、返優(yōu)惠券的流程不應(yīng)該和主流程耦和在一起,更不應(yīng)該影響主流程。
試想,我們能不能在訂單系統(tǒng)完成自己的核心邏輯之后,把訂單創(chuàng)建的消息放到一個(gè)隊(duì)列中去,然后訂單系統(tǒng)就返回給用戶下單成功的結(jié)果了。然后其他的系統(tǒng)從這個(gè)隊(duì)列中收到了下單成功的消息,就各自的去執(zhí)行各自的操作,例如增加積分、返優(yōu)惠券等等操作。
后續(xù)如果有新的系統(tǒng)需要感知訂單創(chuàng)建的消息,直接去訂閱這個(gè)隊(duì)列,消費(fèi)里面的消息就好了?這雖然跟真實(shí)的消息的隊(duì)列有些出入,但其思路是完成吻合的。
為什么需要消息隊(duì)列
通過(guò)上面的例子,我們大致就能夠理解為什么要引入消息隊(duì)列了,這里簡(jiǎn)單總結(jié)一下。
異步
對(duì)于實(shí)時(shí)性不是很高的業(yè)務(wù),例如給用戶發(fā)送短信、郵件通知,調(diào)用第三方的接口,都可以放到消息隊(duì)列里去。因?yàn)橄鄬?duì)于核心訂單流程來(lái)說(shuō),短信、郵件晚一些發(fā)送,對(duì)用戶來(lái)說(shuō)影響不是很大。同時(shí)還可以提升整個(gè)鏈路的響應(yīng)時(shí)間。
削峰
假設(shè)我們有服務(wù)A,是個(gè)無(wú)狀態(tài)的服務(wù)。通過(guò)橫向擴(kuò)展,它可以輕松抗住1w的并發(fā)量,但是這N個(gè)服務(wù)實(shí)例,底層訪問(wèn)的都是同一個(gè)數(shù)據(jù)庫(kù)。數(shù)據(jù)庫(kù)能抗住的并發(fā)量是有限的,如果你的機(jī)器足夠好的話,可能能夠抗住5000的并發(fā),如果服務(wù)A的所有請(qǐng)求全部打向數(shù)據(jù)庫(kù),會(huì)直接把數(shù)據(jù)打掛。
解耦
像上文舉的例子,訂單系統(tǒng)在創(chuàng)建了訂單之后需要通知其他的所有系統(tǒng),這樣一來(lái)就把訂單系統(tǒng)和其余的系統(tǒng)強(qiáng)耦合在了一起。后續(xù)的可維護(hù)性、擴(kuò)展性都大大降低了。
而通過(guò)消息隊(duì)列來(lái)關(guān)聯(lián)所有系統(tǒng),可以達(dá)到解耦的目的。
像上圖這種模式,如果后續(xù)再有新系統(tǒng)需要感知訂單創(chuàng)建的消息,只需要去消費(fèi)「訂單系統(tǒng)」發(fā)送到MQ中的消息即可。同樣,訂單系統(tǒng)如果需要感知其余系統(tǒng)的某些事件,也只是從MQ中消費(fèi)即可。
通過(guò)MQ,達(dá)成服務(wù)之間的松耦合,服務(wù)內(nèi)的高內(nèi)聚,提升了服務(wù)的自治性。
消息隊(duì)列選型
已知的消息隊(duì)列有Kafka、RocketMQ、RabbitMQ和ActiveMQ。但是由于ActiveMQ現(xiàn)在用的公司比較少了,這里就不做討論,我們著重討論前三種。
Kafka
Kafka最初來(lái)自于LinkedIn,是用于做日志收集的工具,采用Java和Scala開(kāi)發(fā)。其實(shí)那個(gè)時(shí)候已經(jīng)有ActiveMQ了,但是在當(dāng)時(shí)ActiveMQ沒(méi)有辦法滿足LinkedIn的需求,于是Kafka就應(yīng)運(yùn)而生。
在2010年底,Kakfa的0.7.0被開(kāi)源到了Github上。到了2011年,由于Kafka非常受關(guān)注,被納入了Apache Incubator,所有想要成為Apache正式項(xiàng)目的外部項(xiàng)目,都必須要經(jīng)過(guò)Incubator,翻譯過(guò)來(lái)就是孵化器。旨在將一些項(xiàng)目孵化成完全成熟的Apache開(kāi)源項(xiàng)目。
你也可以把它想象成一個(gè)學(xué)校,所有想要成為Apache正式開(kāi)源項(xiàng)目的外部項(xiàng)目都必須要進(jìn)入Incubator學(xué)習(xí),并且拿到畢業(yè)證,才能走入社會(huì)。于是在2012年,Kafka成功從Apache Incubator畢業(yè),正式成為Apache中的一員。
Kafka擁有很高的吞吐量,單機(jī)能夠抗下十幾w的并發(fā),而且寫入的性能也很高,能夠達(dá)到毫秒級(jí)別。但是有優(yōu)點(diǎn)就有缺點(diǎn),能夠達(dá)到這么高的并發(fā)的代價(jià)是,可能會(huì)出現(xiàn)消息的丟失。至于具體的丟失場(chǎng)景,我們后續(xù)會(huì)討論。
所以一般Kafka都用于大數(shù)據(jù)的日志收集,這種日志丟個(gè)一兩條無(wú)傷大雅。
而且Kafka的功能較為簡(jiǎn)單,就是簡(jiǎn)單的接收生產(chǎn)者的消息,消費(fèi)者從Kafka消費(fèi)消息。
RabbitMQ
RabbitMQ是很多公司對(duì)于ActiveMQ的替代方法,現(xiàn)在仍然有很多公司在使用。其優(yōu)點(diǎn)在于能保證消息不丟失,同Kafka,天平往數(shù)據(jù)的可靠性方向傾斜必然導(dǎo)致其吞吐量下降。其吞吐量只能夠達(dá)到幾萬(wàn),比起Kafka的十萬(wàn)吞吐來(lái)說(shuō),的確是較低的。如果遇到需要支撐特別高并發(fā)的情況,RabbitMQ可能會(huì)無(wú)法勝任。
但是RabbitMQ有比Kafka更多的高級(jí)特性,例如消息重試和死信隊(duì)列,而且寫入的延遲能夠降低到微妙級(jí),這也是RabbitMQ一大特點(diǎn)。
但RabbitMQ還有一個(gè)致命的弱點(diǎn),其開(kāi)發(fā)語(yǔ)言為Erlang,現(xiàn)在國(guó)內(nèi)精通Erlang的人不多,社區(qū)也不怎么活躍。這也就導(dǎo)致可能公司內(nèi)沒(méi)有人能夠去閱讀Erlang的源碼,更別說(shuō)基于其源碼進(jìn)行二次開(kāi)發(fā)或者排查問(wèn)題了。所以就存在RabbitMQ出了問(wèn)題可能公司里沒(méi)人能夠兜的住,維護(hù)成本非常的高。
之所以有中小型公司還在使用,是覺(jué)得其不會(huì)面臨高并發(fā)的場(chǎng)景,RabbitMQ的功能已經(jīng)完全夠用了。
RocketMQ
RocketMQ來(lái)自阿里,同Kakfa一樣也是從Apache Incubator出來(lái)的頂級(jí)項(xiàng)目,用Java語(yǔ)言進(jìn)行開(kāi)發(fā),單機(jī)吞吐量和Kafka一樣,也是十w量級(jí)。
RocketMQ的前身是阿里的MetaQ項(xiàng)目,2012年在淘寶內(nèi)部大量的使用,在阿里內(nèi)部迭代到3.0版本之后,將MetaQ的核心功能抽離出來(lái),就有了RocketMQ。RocketMQ整合了Kafka和RabbitMQ的優(yōu)點(diǎn),例如較高的吞吐量和通過(guò)參數(shù)配置能夠做到消息絕對(duì)不丟失。
其底層的設(shè)計(jì)參考了Kafka,具有低延遲、高性能、高可用的特點(diǎn)。不同于Kafka的單一日志收集功能,RocketMQ被廣泛運(yùn)用于訂單、交易、計(jì)算、消息推送、binlog分發(fā)等場(chǎng)景。
之所以能夠被運(yùn)用到多種場(chǎng)景,這要?dú)w功于RocketMQ提供的豐富的功能。例如延遲消息、事務(wù)消息、消息回溯、死信隊(duì)列等等。
- 延遲消息 就是不會(huì)立即消費(fèi)的消息,例如某個(gè)活動(dòng)開(kāi)始前15分鐘提醒用戶這樣的場(chǎng)景
- 事務(wù)消息 其主要解決數(shù)據(jù)庫(kù)事務(wù)和MQ消息的數(shù)據(jù)一致性,例如用戶下單,先發(fā)送消息到MQ,積分增加了,但是訂單系統(tǒng)在發(fā)出消息之后掛了。這樣用戶并沒(méi)有下單成功,但是積分卻增加了,明顯是不符合預(yù)期的
- 消息回溯 顧名思義,就是去消費(fèi)某個(gè)Topic下某段時(shí)間的歷史消息
- 死信隊(duì)列 沒(méi)有被正常消費(fèi)的消息,首先會(huì)按照RocketMQ的重試機(jī)制重試,當(dāng)達(dá)到了最大的重試次數(shù)之后,如果消費(fèi)仍然失敗,RocketMQ不會(huì)立即丟掉這條消息,而是會(huì)把消息放入死信隊(duì)列中。放入死信隊(duì)列的消息會(huì)在3天后過(guò)期,所以需要及時(shí)的處理
消息隊(duì)列會(huì)丟消息嗎
在不使用消息隊(duì)列的場(chǎng)景中,我們吹了很多消息隊(duì)列的優(yōu)點(diǎn),但同時(shí)也提到了消息隊(duì)列可能會(huì)丟失消息,我們也可以通過(guò)參數(shù)的配置來(lái)使消息絕對(duì)不丟失。
那消息是在什么情況下丟失的呢?消息隊(duì)列中的角色可以分為3類,分別是生產(chǎn)者、MQ和消費(fèi)者。一條消息在整個(gè)的傳輸鏈路中需要經(jīng)過(guò)如下的流程。
生產(chǎn)者將消息發(fā)送給MQ,MQ接收到這條消息后會(huì)將消息存儲(chǔ)到磁盤上,消費(fèi)者來(lái)消費(fèi)的時(shí)候就會(huì)把消息返給消費(fèi)者。先給出結(jié)論,在這3種場(chǎng)景下,消息都有可能會(huì)丟失。
接下來(lái)我們一步一步來(lái)分析一下。
生產(chǎn)者發(fā)送消息給MQ
生產(chǎn)者在發(fā)送消息的過(guò)程中,由于某些意外的情況例如網(wǎng)絡(luò)抖動(dòng)等,導(dǎo)致本次網(wǎng)絡(luò)通信失敗,消息并沒(méi)有被發(fā)送給MQ。
MQ存儲(chǔ)消息
當(dāng)MQ接收到了來(lái)自生產(chǎn)者的消息之后,還沒(méi)有來(lái)得及處理,MQ就突然宕機(jī),此時(shí)該消息也會(huì)丟失。
即使MQ開(kāi)始處理消息,并且將該消息寫入了磁盤,消息仍然可能會(huì)丟失。因?yàn)楝F(xiàn)代的操作系統(tǒng)都會(huì)有自己的OS Cache,因?yàn)楹痛疟P交互是一件代價(jià)相當(dāng)大的事情,所以當(dāng)我們寫入文件的時(shí)候會(huì)先將數(shù)據(jù)寫入OS Cache中,然后由OS調(diào)度,根據(jù)策略觸發(fā)真正的I/O操作,將數(shù)據(jù)刷入磁盤。
而在刷入磁盤之前,MQ如果宕機(jī),在OS Cache中的數(shù)據(jù)就會(huì)全部丟失。
消費(fèi)者消費(fèi)消息
當(dāng)消息順利的經(jīng)歷了生產(chǎn)者、MQ之后,消費(fèi)者拉取到了這條消息,但是當(dāng)其還沒(méi)來(lái)得及處理的時(shí)候,消費(fèi)者突然宕機(jī)了,這條消息就丟了(當(dāng)然你如果沒(méi)有提交Offset的話,重啟之后仍然可以消費(fèi)到這條消息)
原來(lái)我們以為用上了消息隊(duì)列,就萬(wàn)無(wú)一失了,沒(méi)想到逐步分析下來(lái)能有這么多坑。任何一個(gè)步驟出錯(cuò)都有可能導(dǎo)致消息丟失。那既然這樣,上文提到的可以通過(guò)參數(shù)配置來(lái)實(shí)現(xiàn)消息不會(huì)丟失是怎么一回事呢?
這里我們不去聊具體的MQ是如何實(shí)現(xiàn)的,我們來(lái)聊聊消息零丟失的實(shí)現(xiàn)思路。
消息最終一致性方案
涉及到的系統(tǒng)有訂單系統(tǒng)、MQ和積分系統(tǒng),訂單系統(tǒng)為生產(chǎn)者,積分系統(tǒng)為消費(fèi)者。
首先訂單系統(tǒng)發(fā)送一個(gè)訂單創(chuàng)建的消息給MQ,該消息的狀態(tài)為Prepare狀態(tài),狀態(tài)為Prepare狀態(tài)的消息不會(huì)被消費(fèi)者給消費(fèi)到,所以可以放心的發(fā)送。
然后訂單系統(tǒng)開(kāi)始執(zhí)行自身的核心邏輯,你可能會(huì)說(shuō),訂單系統(tǒng)本身的邏輯執(zhí)行失敗了怎么辦?剛剛的prepare消息不就成了臟數(shù)據(jù)?實(shí)際上在訂單系統(tǒng)的事務(wù)失敗之后,就會(huì)觸發(fā)回滾操作,就會(huì)向MQ發(fā)送消息,將該條狀態(tài)為Prepare的數(shù)據(jù)給刪除。
訂單系統(tǒng)核心事務(wù)成功之后,就會(huì)發(fā)送消息給MQ,將狀態(tài)為prepare的消息更新為commit。沒(méi)錯(cuò),這就是2PC,一個(gè)保證分布式事務(wù)數(shù)據(jù)一致性的協(xié)議。
眼尖的你可能發(fā)現(xiàn)了一個(gè)問(wèn)題,我發(fā)送了prepare消息之后,還沒(méi)來(lái)得及執(zhí)行本地事務(wù),訂單系統(tǒng)就掛了怎么辦?此時(shí)訂單系統(tǒng)即使重啟也不會(huì)向MQ發(fā)送刪除操作,這個(gè)prepare消息不就是一直存在MQ中了?
先給出結(jié)論,不會(huì)。
如果訂單系統(tǒng)發(fā)送了prepare消息給MQ之后自己就宕機(jī)了,MQ確實(shí)會(huì)存在一條不會(huì)被commit的數(shù)據(jù)。MQ為了解決這個(gè)問(wèn)題,會(huì)定時(shí)輪詢所有prepare的消息,跟對(duì)應(yīng)的系統(tǒng)溝通,這條prepare消息是要進(jìn)行重試還是回滾。所以prepare消息不會(huì)一直存在于MQ中。這樣一來(lái),就保證了消息對(duì)于生產(chǎn)者的DB事務(wù)和MQ中消息的數(shù)據(jù)一致性。
再來(lái)看一種更加極端的情況,假設(shè)訂單系統(tǒng)本地事務(wù)執(zhí)行成功之后,發(fā)送了commit消息到MQ,此時(shí)MQ突然掛了。導(dǎo)致MQ沒(méi)有收到該commit消息,在MQ中該消息仍然處于prepare狀態(tài),這怎么辦?
同樣的,依賴于MQ的輪詢機(jī)制和訂單系統(tǒng)溝通,訂單系統(tǒng)會(huì)告訴MQ這個(gè)事務(wù)已經(jīng)完成了,MQ就會(huì)將這條消息設(shè)置成commit,消費(fèi)者就可以消費(fèi)到該消息了。
接下來(lái)的流程就是消息被消費(fèi)者消費(fèi)了,如果消費(fèi)者消費(fèi)消息的時(shí)候本地事務(wù)失敗了,則會(huì)進(jìn)行重試,再次嘗試消費(fèi)這條消息。