RocketMQ這樣做,壓測(cè)后性能提高30%
從官方這邊獲悉,RocketMQ在4.9.1版本中對(duì)消息發(fā)送進(jìn)行了大量的優(yōu)化,性能提升十分顯著,接下來(lái)請(qǐng)跟著我一起來(lái)欣賞大神們的杰作。
根據(jù)RocketMQ4.9.1的更新日志,我們從中提取到關(guān)于消息發(fā)送性能優(yōu)化的【Issues:2883】,詳細(xì)鏈接如下:
具體優(yōu)化點(diǎn)如截圖所示:
首先先嘗試對(duì)上述優(yōu)化點(diǎn)做一個(gè)簡(jiǎn)單的介紹:
- 對(duì)WaitNotifyObject的鎖進(jìn)行優(yōu)化(item2)
- 移除HAService中的鎖(item3)
- 移除GroupCommitService中的鎖(item4)
- 消除HA中不必要的數(shù)組拷貝(item5)
- 調(diào)整消息發(fā)送幾個(gè)參數(shù)的默認(rèn)值(item7)
- sendMessageThreadPoolNums
- useReentrantLockWhenPutMessage
- flushCommitLogTimed
- endTransactionThreadPoolNums
減少瑣的作用范圍(item8-12)
通過(guò)閱讀上述的變更,總結(jié)出優(yōu)化手段主要包括如下三點(diǎn):
- 移除不必要的鎖
- 降低鎖粒度(范圍)
- 修改消息發(fā)送相關(guān)參數(shù)
接下來(lái)結(jié)合源碼,從中挑選具有代表性功能進(jìn)行詳細(xì)剖析,一起領(lǐng)悟Java高并發(fā)編程的魅力。
1、移除不必要的鎖
本次性能優(yōu)化,主要針對(duì)的是RocketMQ同步復(fù)制場(chǎng)景。
我們首先先來(lái)簡(jiǎn)單介紹一下RocketMQ主從同步在編程方面的技巧。
RocketMQ主節(jié)點(diǎn)將消息寫(xiě)入內(nèi)存后, 如果采用同步復(fù)制,需要等待從節(jié)點(diǎn)成功寫(xiě)入后才能向消息發(fā)送客戶(hù)端返回成功,在代碼編寫(xiě)方面也極具技巧性,時(shí)許圖如下圖所示:
溫馨提示:在RocketMQ4.7版本開(kāi)始對(duì)消息發(fā)送進(jìn)行了優(yōu)化,同步消息發(fā)送模型引入了jdk的CompletableFuture實(shí)現(xiàn)消息的異步發(fā)送。
核心步驟解讀:
- 消息發(fā)送線程調(diào)用Commitlog的aysncPutMessage方法寫(xiě)入消息。
- Commitlog調(diào)用submitReplicaRequest方法,將任務(wù)提交到GroupTransferService中,并獲取一個(gè)Future,實(shí)現(xiàn)異步編程。值得注意的是這里需要等待,待數(shù)據(jù)成功寫(xiě)入從節(jié)點(diǎn)(內(nèi)部基于CompletableFuture機(jī)制的內(nèi)部線程池ForkJoin)。
- GroupTransferService中對(duì)提交的任務(wù)依次進(jìn)行判斷,判斷對(duì)應(yīng)的請(qǐng)求是否已同步到從節(jié)點(diǎn)。
- 如果已經(jīng)復(fù)制到從節(jié)點(diǎn),則通過(guò)Future喚醒,并將結(jié)果返回給消息發(fā)送端。
GroupTransferService代碼如下圖所示:
為了更加方便大家理解接下來(lái)的優(yōu)化點(diǎn),首先再總結(jié)提煉一下GroupTransferService的設(shè)計(jì)理念:
- 首先引入兩個(gè)List結(jié)合,分別命名為讀、寫(xiě)鏈表。
- 外部調(diào)用GroupTransferService的putRequest請(qǐng)求,將存儲(chǔ)在寫(xiě)鏈表中(requestWrite)。
- GroupTransferService的run方法從requestRead鏈表中獲取任務(wù),判斷這些任務(wù)對(duì)應(yīng)的請(qǐng)求的數(shù)據(jù)是否成功寫(xiě)入到從節(jié)點(diǎn)。
- 每當(dāng)requestRead中沒(méi)有數(shù)據(jù)可讀時(shí),兩個(gè)隊(duì)列進(jìn)行交互,從而實(shí)現(xiàn)讀寫(xiě)分離,降低鎖競(jìng)爭(zhēng)。
新版本的優(yōu)化點(diǎn)主要包括:
- 更改putRequest的鎖類(lèi)型,用自旋鎖替換synchronized
- 去除doWaitTransfer方法中多余的鎖
1.1 使用自旋鎖替換synchronized
正入下圖所示,GroupTransferService向外提供接口putRequest,用來(lái)接受外部的同步任務(wù),需要對(duì)ArrayList加鎖進(jìn)行保護(hù),往ArrayList中添加數(shù)據(jù)屬于一個(gè)內(nèi)存操作,操作耗時(shí)小。
故這里沒(méi)必要采取synchronized這種synchronized,而是可以自旋鎖,自旋鎖的實(shí)現(xiàn)非常輕量級(jí),其實(shí)現(xiàn)如下圖所示:
整個(gè)鎖的實(shí)現(xiàn)就只需引入一個(gè)AtomicBoolean,加鎖、釋放鎖都是基于CAS操作,非常的輕量,并且自旋鎖不會(huì)發(fā)生線程切換。
1.2 去除多余的鎖
“鎖”的濫用是一個(gè)非常普遍的現(xiàn)象,多線程環(huán)境編程是一個(gè)非常復(fù)雜的交互過(guò)程,在編寫(xiě)代碼過(guò)程中我們可能覺(jué)得自己無(wú)法預(yù)知這段代碼是否會(huì)被多個(gè)線程并發(fā)執(zhí)行,為了謹(jǐn)慎起見(jiàn),就直接簡(jiǎn)單粗暴的對(duì)其進(jìn)行加鎖,帶來(lái)的自然是性能的損耗,這里將該鎖去除,我們就要結(jié)合該類(lèi)的調(diào)用鏈條,判斷是否需要加鎖。
整個(gè)GroupTransferService中在多線程環(huán)境中運(yùn)行需要被保護(hù)的主要是requestRead與requestWrite集合,引入的鎖的目的也是確保這兩個(gè)集合在多線程環(huán)境下安全訪問(wèn),故我們首先應(yīng)該梳理一下。
GroupTransferService的核心方法的運(yùn)作流程:
doWaitTransfer方法操作的主要對(duì)象是requestRead鏈表,而且該方法只會(huì)被GroupTransferService線程調(diào)用,并且requestRead中方法會(huì)在swapRequest中被修改,但這兩個(gè)方法是串行執(zhí)行,而且在同一個(gè)線程中,故無(wú)需引入鎖,該鎖可以移除。
但由于該鎖被移除,在swapRequests中進(jìn)行加鎖,因?yàn)閞equestWrite這個(gè)隊(duì)列會(huì)被多個(gè)線程訪問(wèn),優(yōu)化后的代碼如下:
從這個(gè)角度來(lái)看,其實(shí)主要是將鎖的類(lèi)型由synchronized替換為更加輕量的自旋鎖。
2、降低鎖的范圍
被鎖包裹的代碼塊是串行執(zhí)行,即無(wú)法并發(fā),在無(wú)法避免鎖的情況下,降低鎖的代碼塊,能有效提高并發(fā)度,圖解如下:
如果多個(gè)線程區(qū)訪問(wèn)lock1,lock2,在lock1中domSomeThing1、domSomeThing2這兩個(gè)方法都必須串行執(zhí)行,而多個(gè)線程同時(shí)訪問(wèn)lock2方法,doSomeThing1能被多個(gè)線程同時(shí)執(zhí)行,只有doSomething2時(shí)才需要串行執(zhí)行,其整體并發(fā)效果肯定是lock2,基于這樣理論:得出一個(gè)鎖使用的最佳實(shí)踐:被鎖包裹的代碼塊越少越好。
在老版本中,消息寫(xiě)入加鎖的代碼塊比較大,一些可以并發(fā)執(zhí)行的動(dòng)作也被鎖包裹,例如生成offsetMsgId。
新版本采用函數(shù)式編程的思路,只是定義來(lái)獲取msgId的方法,在進(jìn)行消息寫(xiě)入時(shí)并不會(huì)執(zhí)行,降低鎖的粒度,使得offsetMsgId的生成并行化,其編程手段之巧妙,值得我們學(xué)習(xí)。
3、調(diào)整消息發(fā)送相關(guān)的參數(shù)
sendMessageThreadPoolNums
Broker端消息發(fā)送端線程池?cái)?shù)量,該值在4.9.0版本之前默認(rèn)為1,新版本調(diào)整為操作系統(tǒng)的CPU核數(shù),并且不小于4。該參數(shù)的調(diào)整有利有弊。提高了消息發(fā)送的并發(fā)度,但同時(shí)會(huì)導(dǎo)致消息順序的亂序,其示例圖如下同步發(fā)送下不會(huì)有順序問(wèn)題,可放心修改。
在順序消費(fèi)場(chǎng)景,該參數(shù)不建議修改。在實(shí)際過(guò)程中應(yīng)該對(duì)RocketMQ集群進(jìn)行治理,順序消費(fèi)的場(chǎng)景使用專(zhuān)門(mén)集群。
useReentrantLockWhenPutMessage MQ消息寫(xiě)入時(shí)對(duì)內(nèi)存加鎖使用的鎖類(lèi)型,低版本之前默認(rèn)為false,表示默認(rèn)使用自旋鎖;新版本使用ReentrantLock。自旋主要的優(yōu)勢(shì)是沒(méi)有線程切換成本,但自旋容易造成CPU的浪費(fèi),內(nèi)存寫(xiě)入大部分情況下是很快,但RocketMQ比較依賴(lài)頁(yè)緩存,如果出現(xiàn)也緩存抖動(dòng),帶來(lái)的CPU浪費(fèi)是非常不值得,在sendMessageThreadPoolNums設(shè)置超過(guò)1之后,鎖的類(lèi)型使用ReentrantLock更加穩(wěn)定。
flushCommitLogTimed 首先我們通過(guò)觀察源碼了解一下該參數(shù)的含義:
其主要作用是控制刷盤(pán)線程阻塞等待的方式,低版本flushCommitLogTimed為false,默認(rèn)使用CountDownLatch,而高版本則直接使用Thread.sleep。猜想的原因是刷盤(pán)線程比較獨(dú)立,無(wú)需與其他線程進(jìn)行直接的交互協(xié)作,故無(wú)需使用CountDownLatch這種專(zhuān)門(mén)用來(lái)線程協(xié)作的“外來(lái)和尚”。
endTransactionThreadPoolNums
主要用于設(shè)置事務(wù)消息線程池的大小。
新版本主要是可通過(guò)調(diào)整發(fā)送線程池來(lái)動(dòng)態(tài)調(diào)節(jié)事務(wù)消息的值,這個(gè)大家可以根據(jù)壓測(cè)結(jié)果動(dòng)態(tài)調(diào)整。