圖解 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)演進(jìn)過(guò)程
大家好,我是 華仔, 又跟大家見(jiàn)面了。
上一篇作為專題系列的第一篇,我們深度剖析了關(guān)于 Kafka 存儲(chǔ)架構(gòu)設(shè)計(jì)的實(shí)現(xiàn)細(xì)節(jié),今天開(kāi)啟第二篇,我們來(lái)深度剖析下「Kafka Broker 端網(wǎng)絡(luò)架構(gòu)和請(qǐng)求處理流程」是如何設(shè)計(jì)的? 相信使用過(guò) Kafka 的朋友都知道其吞吐量可以高達(dá)百萬(wàn),但很少人理解其中的設(shè)計(jì)原理。
那么 Kafka Broker 端網(wǎng)絡(luò)架構(gòu)和請(qǐng)求處理到底是使用了哪些高大上的技術(shù)?它到底解決了什么問(wèn)題?究竟是怎么解決的?
只有了解了這些, 我們才能深刻掌握 Kafka 服務(wù)端設(shè)計(jì)精髓所在,更加深刻理解一個(gè)高并發(fā)、高性能服務(wù)端架構(gòu)該如何設(shè)計(jì)。
認(rèn)真讀完這篇文章,我相信你會(huì)對(duì)Kafka Broker請(qǐng)求處理流程和網(wǎng)絡(luò)架構(gòu)設(shè)計(jì)實(shí)現(xiàn)細(xì)節(jié),有更加深刻的理解。
這篇文章干貨很多,希望你可以耐心讀完。
一、總體概述
要想理解 Kafka Broker 請(qǐng)求處理架構(gòu)設(shè)計(jì),我們需要從簡(jiǎn)單請(qǐng)求處理模型來(lái)說(shuō)起。
對(duì)于日常系統(tǒng)開(kāi)發(fā),我們都知道是基于 Request/Response 的模式來(lái)實(shí)現(xiàn)的, 對(duì)于 Kafka 來(lái)說(shuō), 無(wú)論是 Producer 端、Consumer 端 還是 Broker 端,他們之間的請(qǐng)求交互也都是基于「Request/Response」模式來(lái)完成的。比如,客戶端會(huì)通過(guò)網(wǎng)絡(luò)發(fā)送消息生產(chǎn)請(qǐng)求給 Broker,而 Broker 處理完成后,會(huì)發(fā)送對(duì)應(yīng)的響應(yīng)給到客戶端。
下面,我會(huì)從自我設(shè)計(jì)角度出發(fā),如果是我們會(huì)如何設(shè)計(jì),帶你一步步演化出來(lái)「kafka Broker 的網(wǎng)絡(luò)請(qǐng)求處理」架構(gòu)。
在這個(gè)過(guò)程中,你會(huì)看到 Kafka 在處理請(qǐng)求的過(guò)程中會(huì)遇到哪些高性能和高并發(fā)問(wèn)題,以及架構(gòu)為什么要這樣演進(jìn),從而理解 Kafka 這么設(shè)計(jì)的意義和精妙之處。
二、順序處理模式
我們從最簡(jiǎn)單的網(wǎng)絡(luò)編程思路處理方式講起。
因?yàn)閷?duì)于 Kafka Broker 來(lái)說(shuō)就是用來(lái)接收生產(chǎn)者發(fā)送過(guò)來(lái)的請(qǐng)求,那這個(gè)時(shí)候最簡(jiǎn)單的實(shí)現(xiàn)大概是這樣的:
如上述代碼所示:我們可以理解 Kafka 每個(gè)服務(wù)器啟動(dòng)起來(lái)后就是一個(gè) while 循環(huán), 不斷的 accept 生產(chǎn)者提交上來(lái)的請(qǐng)求, 然后進(jìn)行處理并存儲(chǔ)到磁盤上,這種方式實(shí)現(xiàn)最簡(jiǎn)單,也非常好理解,但是這種方式存在2個(gè)致命的缺陷?
- 請(qǐng)求阻塞:只能順序處理每個(gè)請(qǐng)求,即每個(gè)請(qǐng)求都必須等待前一個(gè)請(qǐng)求處理完畢才能得到處理。
- 吞吐量非常差:由于只能順序處理,無(wú)法并發(fā),效率太低,所以吞吐量非常差,只適合請(qǐng)求發(fā)送非常不頻繁的系統(tǒng)。
從上面來(lái)看很明顯,如果你的 Kafka 系統(tǒng)請(qǐng)求并發(fā)量很大,意味著要處理的時(shí)間就會(huì)越久。那按照前面我們提到的 Kafka「吞吐量」的標(biāo)準(zhǔn),這個(gè)方案遠(yuǎn)遠(yuǎn)無(wú)法滿足我們對(duì)高性能、高并發(fā)的要求。
那有什么更好的方案可以快速處理請(qǐng)求嗎?
接下來(lái)我們可以試著采取這個(gè)方案:獨(dú)立線程異步處理模式。
三、多線程異步處理模式
既然同步方式會(huì)阻塞請(qǐng)求,吞吐量差, 我們可以嘗試著使用獨(dú)立線程異步方式進(jìn)行處理, 即經(jīng)典的 connection per thread 模型, 那這個(gè)時(shí)候的實(shí)現(xiàn)大概是這樣的:
如上述代碼所示:同上還是一個(gè) while 循環(huán)不斷的 accept 生產(chǎn)者提交上來(lái)的請(qǐng)求,但是這時(shí)候 Kafka 系統(tǒng)會(huì)為每個(gè)請(qǐng)求都創(chuàng)建一個(gè)「單獨(dú)的線程」來(lái)處理。
這個(gè)實(shí)現(xiàn)方案的好處就是:
- 吞吐量稍強(qiáng):相對(duì)上面同步方式的方案,一定程度上極大地提高了服務(wù)器的吞吐量。
- 非阻塞:它是完全異步的,每個(gè)請(qǐng)求的處理都不會(huì)阻塞下一個(gè)請(qǐng)求。
但同樣缺陷也同樣很明顯:即為每個(gè)請(qǐng)求都創(chuàng)建線程的做法開(kāi)銷很大,在某些高并發(fā)場(chǎng)景下會(huì)壓垮整個(gè)服務(wù)。可見(jiàn),這個(gè)方案也只適用于請(qǐng)求發(fā)送頻率很低的業(yè)務(wù)場(chǎng)景。還是無(wú)法滿足我們對(duì)高性能、高并發(fā)的要求。
既然這種方案還是不能滿足, 那么我們究竟該使用什么方案來(lái)支撐高并發(fā)呢?
這個(gè)時(shí)候我們可以想想我們?nèi)粘i_(kāi)發(fā)用到的7層負(fù)載Nginx或者Redis在處理高并發(fā)請(qǐng)求的時(shí)候是使用什么方案呢?
從上面啟發(fā)你可以看出,提升系統(tǒng) I/O 并發(fā)性能的關(guān)鍵思路就是:「事件驅(qū)動(dòng)」。
想必大家已經(jīng)猜到了,沒(méi)錯(cuò),就是「多路復(fù)用」。那么Kafka 是不是也是采用這種方案來(lái)實(shí)現(xiàn)呢?
這里我們先考慮采用基于「事件驅(qū)動(dòng)」的設(shè)計(jì)方案,當(dāng)有事件觸發(fā)時(shí),才會(huì)調(diào)用處理器進(jìn)行數(shù)據(jù)處理。
四、Reactor 模式
在高性能網(wǎng)絡(luò)編程領(lǐng)域,有一個(gè)非常著名的模式——Reactor模式。那么何為「Reactor模式」,首先它是基于事件驅(qū)動(dòng)的,有一個(gè)或多個(gè)并發(fā)輸入源,有一個(gè)Service Handler,有多個(gè)Request Handler;這個(gè)Service Handler會(huì)同步的將輸入的請(qǐng)求輪詢地分發(fā)給相應(yīng)的Request Handler進(jìn)行處理。
借助于 Doug Lea(就是那位讓人無(wú)限景仰的大爺)的 "Scalable IO in Java" 中講述的Reactor模式。
"Scalable IO in Java" 的地址是:
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf。
簡(jiǎn)單來(lái)說(shuō),Reactor 模式特別適合應(yīng)用于處理多個(gè)客戶端并發(fā)向服務(wù)器端發(fā)送請(qǐng)求的場(chǎng)景。這里借用大神 PDF 中的一幅圖來(lái)說(shuō)明 Reactor 架構(gòu):
從上面這張圖中,我們可以看出多個(gè)客戶端會(huì)發(fā)送請(qǐng)求給到 Reactor。Reactor 有個(gè)請(qǐng)求分發(fā)線程 Dispatcher,也就是圖中的綠色的 Acceptor,它會(huì)將不同的請(qǐng)求下分發(fā)到多個(gè)工作線程中處理。
在這個(gè)架構(gòu)中,Acceptor 線程只是用來(lái)進(jìn)行請(qǐng)求分發(fā),所以非常輕量級(jí),因此會(huì)有很高的吞吐量。而這些工作線程可以根據(jù)實(shí)際系統(tǒng)負(fù)載情況動(dòng)態(tài)調(diào)節(jié)系統(tǒng)負(fù)載能力,從而達(dá)到請(qǐng)求處理的平衡性。
基于上面的 Reactor 架構(gòu), 我們來(lái)看看如果是我們?cè)撊绾卧O(shè)計(jì) Kafka 服務(wù)端的架構(gòu)?
- 這里我們采用多路復(fù)用方案,Reactor 設(shè)計(jì)模式,并引用Java NIO的方式可以更好的解決上面并發(fā)請(qǐng)求問(wèn)題。
- 當(dāng) Client 端將請(qǐng)求發(fā)送到 Server 端的時(shí)候, 首先在 Server 端有個(gè)多路復(fù)用器(Selector),然后會(huì)啟動(dòng)一個(gè) Accepter 線程將OP_CONNECT事件注冊(cè)到多路復(fù)用器上, 主要用來(lái)監(jiān)聽(tīng)連接事件到來(lái)。
- 當(dāng)監(jiān)聽(tīng)到連接事件后,就會(huì)在多路復(fù)用器上注冊(cè) OP_READ 事件, 這樣 Cient 端發(fā)送過(guò)來(lái)的請(qǐng)求, 都會(huì)被接收到。如果請(qǐng)求特別多的話, 我們這里進(jìn)行優(yōu)化, 創(chuàng)建一個(gè)Read HandlePool 線程池。
- 當(dāng) Read HandlePool 線程池接收到請(qǐng)求數(shù)據(jù)后,最終會(huì)交給 Handler ThreadPool 線程池進(jìn)行后續(xù)處理。比如如果是生產(chǎn)者發(fā)送過(guò)來(lái)的請(qǐng)求,肯定會(huì)解析請(qǐng)求體,處理并最終存儲(chǔ)到磁盤中,待處理完后要返回處理結(jié)果狀態(tài), 這時(shí)候就由它在多路復(fù)用器上注冊(cè)O(shè)P_WRITE事件來(lái)完成。這樣多路復(fù)用器遍歷到 OP_WRITE 事件后就會(huì)將請(qǐng)求返回到 Client 端。
- 在上圖中我們看到在整個(gè)流程中還有一個(gè) MessageQueue 的隊(duì)列組件存在, 為什么要加這個(gè)組件呢? 我們可以想象一下, 如果請(qǐng)求量非常大,直接交給 Handler ThreadPool 線程池進(jìn)行處理, 可能會(huì)出現(xiàn)該線程池處理不過(guò)來(lái)的情況發(fā)生,如果處理不過(guò)來(lái),也會(huì)出現(xiàn)阻塞瓶頸。所以這里我們?cè)?Server 端內(nèi)部也設(shè)計(jì)一個(gè)消息隊(duì)列, 起到一個(gè)緩沖的作用,Handler ThreadPool 線程池會(huì)根據(jù)自己的負(fù)載能力進(jìn)行處理。
以上就是我們引用了「多路復(fù)用」的設(shè)計(jì)方案,但是 Kafka Broker 端就是這樣的架構(gòu)設(shè)計(jì)方案嗎?如果我們是 Kafka 系統(tǒng)架構(gòu)的設(shè)計(jì)者,采用這樣的架構(gòu)設(shè)計(jì)方案會(huì)不會(huì)還是有什么問(wèn)題,有沒(méi)有哪個(gè)環(huán)節(jié)會(huì)出現(xiàn)系統(tǒng)性能瓶頸呢?
這是個(gè)值得思考的問(wèn)題, 很考驗(yàn)?zāi)愕募軜?gòu)設(shè)計(jì)能力。
細(xì)心的讀者可能會(huì)發(fā)現(xiàn):對(duì)于 Kafka 這種超高并發(fā)系統(tǒng)來(lái)說(shuō),一個(gè) Selector 多路復(fù)用器是 Hold 不住的,從上圖可以得出,我們監(jiān)聽(tīng)這些連接、接收請(qǐng)求、處理響應(yīng)結(jié)果都是同一個(gè) Selector 在進(jìn)行處理,很容易成為系統(tǒng)性能瓶頸。
接下來(lái),我們將進(jìn)一步進(jìn)行優(yōu)化,為了減輕當(dāng)前 Selector 的處理負(fù)擔(dān),引入另外一個(gè)Selector 處理隊(duì)列,如下圖所示:
- 首先上圖是目前我認(rèn)為最接近 Kafka Broker 真實(shí)架構(gòu)設(shè)計(jì)方案的。
- 整體架構(gòu)跟上一版的類似,只不過(guò)這里多引入了一個(gè)多 Selector 處理隊(duì)列,原來(lái)的 Selector 只負(fù)責(zé)監(jiān)聽(tīng)連接, 這時(shí)候有讀者就會(huì)有疑問(wèn),請(qǐng)求量超級(jí)大的時(shí)候,一個(gè) Selector 會(huì)不會(huì)成為瓶頸呢? 這里可以大可放心, 這時(shí)候它的工作非常單一,是完全能 hold 住的。
- 那么對(duì)于我們接收請(qǐng)求、處理請(qǐng)求、返回狀態(tài)操作都會(huì)交由多 Selector 處理隊(duì)列,至于這里到底需要多少個(gè) Selector,又會(huì)跟什么參數(shù)和配置有關(guān)系,我們后續(xù)再進(jìn)行分析,總之這里記住有多個(gè) Selector 就行了,這樣系統(tǒng)壓力就會(huì)被分散處理。
- 另外我們要搞清楚的一點(diǎn)就是對(duì)于 Kafka 服務(wù)端指的是每個(gè) Broker 節(jié)點(diǎn),如果我們的服務(wù)集群總共有10個(gè)節(jié)點(diǎn), 每個(gè)節(jié)點(diǎn)內(nèi)部都是上面的這樣的架構(gòu),這樣我們就有理由相信如果采用這樣的架構(gòu)設(shè)計(jì)方案,是可以支持高并發(fā)和高性能的。
架構(gòu)設(shè)計(jì)方案演進(jìn)到這里,基本上已經(jīng)差不多了,接下來(lái)我們看看 Kafka 真實(shí)超高并發(fā)的網(wǎng)絡(luò)架構(gòu)是如何設(shè)計(jì)的。
五、Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)
在上面 Kafka 高性能、高吞吐量架構(gòu)演進(jìn)的時(shí)候,我們提到了 Java NIO 以及 Reactor 設(shè)計(jì)模式。實(shí)際上,搞透了「Kafka 究竟是怎么使用 NIO 來(lái)實(shí)現(xiàn)網(wǎng)絡(luò)通信的」,不僅能讓我們掌握 Kafka 請(qǐng)求處理全流程處理,也能讓我們對(duì) Reactor 設(shè)計(jì)模式有更深的理解,還能幫助我們解決很多實(shí)際問(wèn)題。
那么接下來(lái)我們就來(lái)深入剖析下 Kafka 的 NIO 通訊機(jī)制吧。
我們先從整體上看一下完整的網(wǎng)絡(luò)通信層架構(gòu),如下圖所示:
- 從上圖中我們可以看出,Kafka 網(wǎng)絡(luò)通信架構(gòu)中用到的組件主要由兩大部分構(gòu)成:SocketServer和RequestHandlerPool。
- SocketServer 組件是 Kafka 超高并發(fā)網(wǎng)絡(luò)通信層中最重要的子模塊。它包含Acceptor 線程、Processor 線程和 RequestChannel等對(duì)象,都是網(wǎng)絡(luò)通信的重要組成部分。它主要實(shí)現(xiàn)了 Reactor 設(shè)計(jì)模式,主要用來(lái)處理外部多個(gè) Clients(這里的 Clients 可能包含 Producer、Consumer 或其他 Broker)的并發(fā)請(qǐng)求,并負(fù)責(zé)將處理結(jié)果封裝進(jìn) Response 中,返還給 Clients。
- RequestHandlerPool 組件就是我們常說(shuō)的 I/O 工作線程池,里面定義了若干個(gè) I/O 線程,主要用來(lái)執(zhí)行真實(shí)的請(qǐng)求處理邏輯。
- 這里注意的是:跟 RequestHandler 相比, 上面所說(shuō)的Acceptor、Processor 線程 還有 RequestChannel 等都不做請(qǐng)求處理,它們只是請(qǐng)求和響應(yīng)的「搬運(yùn)工」。
接下來(lái)我們來(lái)具體聊聊SocketServer中的實(shí)現(xiàn)原理,這里先來(lái)講講:
- Acceptor 線程
- Processor 線程
以Kafka 2.5版本,源碼位置:
https://github.com/apache/kafka/blob/2.5.0-rc3/core/src/main/scala/kafka/network/SocketServer.scala。
1、聊聊 Acceptor 線程
在經(jīng)典的 Reactor 設(shè)計(jì)模式有個(gè) 「Dispatcher」 的角色,主要用來(lái)接收外部請(qǐng)求并分發(fā)給下面的實(shí)際處理線程。通過(guò)上面分析我們知道在 Kafka 網(wǎng)絡(luò)架構(gòu)設(shè)計(jì)中,這個(gè) Dispatcher 就是「Acceptor 線程」。
Acceptor 線程是用來(lái)接收和創(chuàng)建外部 TCP 連接的線程。在Broker 端每個(gè) SocketServer 實(shí)例只會(huì)創(chuàng)建一個(gè) Acceptor 線程。它的主要功能就是創(chuàng)建連接,并將接收到的 Request 請(qǐng)求傳遞給下游的 Processor 線程處理。
/**
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 1. 創(chuàng)建底層的NIO Selector對(duì)象,用來(lái)監(jiān)聽(tīng)連接創(chuàng)建請(qǐng)求、讀寫(xiě)請(qǐng)求等
private val nioSelector = NSelector.open()
// 2. Broker端創(chuàng)建對(duì)應(yīng)的ServerSocketChannel實(shí)例,然后將Channel注冊(cè)到Selector對(duì)象上
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
// 3. 創(chuàng)建Processor線程池
private val processors = new ArrayBuffer[Processor]()
......
/**
* Accept loop that checks for new connection attempts
*/
def run(): Unit = {
//注冊(cè)O(shè)P_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
// 等待Acceptor線程啟動(dòng)完成
startupComplete()
try {
// 當(dāng)前使用的Processor序號(hào),從0開(kāi)始
var currentProcessorIndex = 0
while (isRunning) {
try {
// 每500毫秒獲取一次就緒I/O事件
val ready = nioSelector.select(500)
// 如果有I/O事件準(zhǔn)備就緒
if (ready > 0) {
........
// 調(diào)用accept方法創(chuàng)建Socket連接
accept(key).foreach { socketChannel =>
........
// 指定由哪個(gè)Processor線程進(jìn)行處理
processor = synchronized {
.........
processors(currentProcessorIndex)
}
// 更新Processor線程序號(hào)
currentProcessorIndex += 1
}
.........
}
}
這里重點(diǎn)看下 Acceptor 線程中三個(gè)非常關(guān)鍵且重要的屬性和方法:
- nioSelector:它就是我們所熟悉的 Java NIO 庫(kù)中的 Selector 對(duì)象實(shí)例,所有網(wǎng)絡(luò)通信組件實(shí)現(xiàn) Java NIO 機(jī)制的基礎(chǔ)。
- processors:通過(guò)源碼我們可以知道在Acceptor 線程在初始化時(shí),需要?jiǎng)?chuàng)建對(duì)應(yīng)的 Processor 線程池。由此可以得出,Processor 線程是在 Acceptor 線程中管理和維護(hù)的。
- run方法:它是處理 Reactor 模式中分發(fā)邏輯的主要實(shí)現(xiàn)方法。
- 從上述源碼中,我們可以看出 Acceptor 線程主要使用了 Java NIO 的 Selector 以及 SocketChannel 的方式循環(huán)的輪詢準(zhǔn)備就緒的 I/O 事件。
- 這里的 I/O 事件主要是指網(wǎng)絡(luò)連接創(chuàng)建事件即:SelectionKey.OP_ACCEPT。
- 這樣注冊(cè)好事件后,一旦后續(xù)接收到連接請(qǐng)求后,Acceptor 線程就會(huì)指定一個(gè) Processor 線程,并將該請(qǐng)求交給它并創(chuàng)建網(wǎng)絡(luò)連接用于后續(xù)處理。
2、聊聊 Processor 線程
從上面分析我們知道 Acceptor 只是做了請(qǐng)求入口連接處理的,那么,真正創(chuàng)建網(wǎng)絡(luò)連接以及分發(fā)網(wǎng)絡(luò)請(qǐng)求是由 Processor 線程來(lái)完成的。
override def run(): Unit = {
// 等待Processor線程啟動(dòng)完成
startupComplete()
try {
while (isRunning) {
try {
// 創(chuàng)建新連接
configureNewConnections()
// 發(fā)送Response
processNewResponses()
// 執(zhí)行NIO poll,獲取對(duì)應(yīng)SocketChannel上準(zhǔn)備就緒的I/O操作
poll()
// 將接收到的Request放入Request隊(duì)列
processCompletedReceives()
.......
} catch {
.........
}
}
} finally {
........
}
}
........
// 默認(rèn)連接對(duì)接大小
val ConnectionQueueSize = 20
// 保存要?jiǎng)?chuàng)建的新連接信息
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
// 一個(gè)臨時(shí) Response 隊(duì)列
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
// Response 隊(duì)列
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
從上面 Processor 線程源碼,可以看出 Kafka 的代碼邏輯實(shí)現(xiàn)的非常好,各個(gè)子方法的邊界非常清楚。
這里我們就不展開(kāi)源碼分析了, 更深入詳細(xì)的等到源碼分析專題再進(jìn)行。我們簡(jiǎn)單的看下 Processor 線程初始化時(shí)要做的事情。
看上面代碼最后部分,我們知道每個(gè) Processor 線程在創(chuàng)建時(shí)都會(huì)創(chuàng)建 3 個(gè)隊(duì)列。
- newConnections 隊(duì)列:它主要是用來(lái)保存要?jiǎng)?chuàng)建的新連接信息,也就是SocketChannel 對(duì)象,目前是硬編碼隊(duì)列長(zhǎng)度大小為20。每當(dāng) Processor 線程接收到新的連接請(qǐng)求時(shí),都會(huì)將對(duì)應(yīng)的 SocketChannel 對(duì)象放入隊(duì)列,等到后面創(chuàng)建連接時(shí),從該隊(duì)列中獲取 SocketChannel,然后注冊(cè)新的連接。
- inflightResponse 隊(duì)列:它是一個(gè)臨時(shí)的 Response 隊(duì)列, 當(dāng) Processor 線程將 Repsonse 返回給 Client 之后,要將 Response 放入該隊(duì)列。它存在的意義:由于有些 Response 回調(diào)邏輯要在 Response 被發(fā)送回 Request 發(fā)送方后,才能執(zhí)行,因此需要暫存到臨時(shí)隊(duì)列。
- ResponseQueue 隊(duì)列:它主要是存放需要返回給Request 發(fā)送方的所有 Response 對(duì)象。通過(guò)源碼得知:每個(gè) Processor 線程都會(huì)維護(hù)自己的 Response 隊(duì)列。
六、請(qǐng)求處理核心流程剖析
上面深入的剖析了 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu) 以及 SocketServer 中的 Acceptor 線程跟 Processor 線程的實(shí)現(xiàn)原理, 接下來(lái)我們來(lái)將請(qǐng)求處理核心流程給串起來(lái)。
只有搞透這部分的實(shí)現(xiàn)原理,才能幫助我們有針對(duì)性的進(jìn)行 Broker端請(qǐng)求處理的性能調(diào)優(yōu)。
比如:在上面網(wǎng)絡(luò)架構(gòu)圖,有兩個(gè)參數(shù)跟整個(gè)流程有關(guān)系,分別是num.network.threads、num.io.threads。如果我們不掌握請(qǐng)求處理的整個(gè)流程,就不能更好的對(duì)此進(jìn)行調(diào)整,來(lái)達(dá)到更高的性能要求。
其中 num.io.threads 就是 I/O 工作線程池的大小配置,即 KafkaRequestHandlerPool 線程池,它才是「真正處理 Kafka 請(qǐng)求」的地方。
以Kafka 2.5版本,源碼位置:
https://github.com/apache/kafka/blob/2.5.0-rc3/core/src/main/scala/kafka/server/KafkaRequestHandler.scala。
/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int, //I/O線程序號(hào)
brokerId: Int, //所在Broker序號(hào),即broker.id值
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger, //I/O線程池大小
val requestChannel: RequestChannel, //請(qǐng)求處理通道
apis: KafkaApis, //KafkaApis類,用于真正實(shí)現(xiàn)請(qǐng)求處理邏輯的類
time: Time) extends Runnable with Logging {
......
def run(): Unit = {
while (!stopped) {
val startSelectTime = time.nanoseconds
// 從請(qǐng)求隊(duì)列中獲取下一個(gè)待處理的請(qǐng)求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
// 統(tǒng)計(jì)線程空閑時(shí)間
val idleTime = endTime - startSelectTime
// 更新線程空閑百分比指標(biāo)
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
req match {
// 當(dāng)關(guān)閉線程請(qǐng)求處理
case RequestChannel.ShutdownRequest =>
......
// 當(dāng)普通請(qǐng)求到來(lái)時(shí)
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
// 由KafkaApis.handle方法執(zhí)行相應(yīng)處理邏輯
apis.handle(request)
} catch {
....
} finally {
// 釋放請(qǐng)求對(duì)象資源
request.releaseBuffer()
}
case null => // continue
}
}
shutdownComplete.countDown()
}
}
下面我們結(jié)合 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)圖來(lái)講解下一個(gè)完整請(qǐng)求處理核心流程:
- Clients 發(fā)送請(qǐng)求給 Acceptor 線程。
- Acceptor 線程會(huì)創(chuàng)建 NIO Selector 對(duì)象,并創(chuàng)建 ServerSocketChannel 實(shí)例,然后將Channel和OP_ACCEPT事件綁定到 Selector 多路復(fù)用器上。
- Acceptor 線程還會(huì)默認(rèn)創(chuàng)建3個(gè)大小的 Processor 線程池,參數(shù):num.network.threads, 并輪詢的將請(qǐng)求對(duì)象 SocketChannel 放入到連接隊(duì)列中(newConnections)。
- 這時(shí)候連接隊(duì)列就源源不斷有請(qǐng)求數(shù)據(jù)了,然后不停地執(zhí)行NIO Poll, 獲取對(duì)應(yīng) SocketChannel 上已經(jīng)準(zhǔn)備就緒的 I/O 事件。
- Processor 線程向 SocketChannel 注冊(cè)了OP_READ/OP_WRITE事件,這樣 客戶端發(fā)過(guò)來(lái)的請(qǐng)求就會(huì)被該 SocketChannel 對(duì)象獲取到,具體就是CompleteReceives。
- 這個(gè)時(shí)候客戶端就可以源源不斷進(jìn)行請(qǐng)求發(fā)送了,服務(wù)端通過(guò)Selector NIO Poll不停的獲取準(zhǔn)備就緒的 I/O 事件。
- 然后根據(jù)Channel中獲取已經(jīng)完成的 Receive 對(duì)象,構(gòu)建 Request 對(duì)象,并將其存入到Requestchannel的RequestQueue請(qǐng)求隊(duì)列中 。
- 這個(gè)時(shí)候就該 I/O 線程池上場(chǎng)了,KafkaRequestHandler 線程循環(huán)地從請(qǐng)求隊(duì)列中獲取 Request 實(shí)例,然后交由KafkaApis的handle方法,執(zhí)行真正的請(qǐng)求處理邏輯,并最終將數(shù)據(jù)存儲(chǔ)到磁盤中。
- 待處理完請(qǐng)求后,KafkaRequestHandler線程會(huì)將 Response 對(duì)象放入Processor線程的Response隊(duì)列。
- 然后 Processor 線程通過(guò) Request 中的ProcessorID不停地從 Response 隊(duì)列中來(lái)定位并取出 Response 對(duì)象,返還給 Request 發(fā)送方。
至此,我們深入剖析完畢 Kafka 網(wǎng)絡(luò)架構(gòu)請(qǐng)求「核心流程」。
七、系統(tǒng)調(diào)優(yōu)
搞透了 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)設(shè)計(jì)和請(qǐng)求處理核心流程后,我們來(lái)聊聊 Broker 端參數(shù)調(diào)優(yōu)。
對(duì) Kafka 而言,性能一般是指吞吐量和延時(shí)。所以高吞吐量、低延時(shí)是我們調(diào)優(yōu) Kafka 集群的主要目標(biāo)。
Broker 端調(diào)優(yōu)主要就是合理地設(shè)置 Broker 端參數(shù)值,以匹配你的生產(chǎn)環(huán)境。另外還有一點(diǎn)要說(shuō)明的就是「保證服務(wù)器端和客戶端版本的一致」,做到這一點(diǎn),就能獲得很多性能收益了。
num.network.threads
創(chuàng)建 Processor 處理網(wǎng)絡(luò)請(qǐng)求線程個(gè)數(shù),建議設(shè)置為 Broker 當(dāng)前CPU核心數(shù)*2,這個(gè)值太低經(jīng)常出現(xiàn)網(wǎng)絡(luò)空閑太低而缺失副本。
num.io.threads
創(chuàng)建 KafkaRequestHandler 處理具體請(qǐng)求線程個(gè)數(shù),建議設(shè)置為Broker磁盤個(gè)數(shù)*2。
num.replica.fetchers
建議設(shè)置為CPU核心數(shù)/4,適當(dāng)提高可以提升CPU利用率及 Follower同步 Leader 數(shù)據(jù)當(dāng)并行度。
compression.type
建議采用lz4壓縮類型,壓縮可以提升CPU利用率同時(shí)可以減少網(wǎng)絡(luò)傳輸數(shù)據(jù)量。
queued.max.requests
在網(wǎng)絡(luò)線程停止讀取新請(qǐng)求之前,可以排隊(duì)等待I/O線程處理的最大請(qǐng)求個(gè)數(shù),生產(chǎn)環(huán)境建議配置最少500以上,默認(rèn)500。
log.flush.xxx
- ?log.flush.scheduler.interval.ms
- log.flush.interval.ms
- log.flush.interval.messages?
這幾個(gè)參數(shù)表示日志數(shù)據(jù)刷新到磁盤的策略,應(yīng)該保持默認(rèn)配置,刷盤策略讓操作系統(tǒng)去完成,由操作系統(tǒng)來(lái)決定什么時(shí)候把數(shù)據(jù)刷盤;如果設(shè)置來(lái)這個(gè)參數(shù),可能對(duì)吞吐量影響非常大。
auto.leader.rebalance.enable
表示是否開(kāi)啟leader自動(dòng)負(fù)載均衡,默認(rèn)true;我們應(yīng)該把這個(gè)參數(shù)設(shè)置為false,因?yàn)樽詣?dòng)負(fù)載均衡不可控,可能影響集群性能和穩(wěn)定。
八、總結(jié)
這里,我們一起來(lái)總結(jié)一下這篇文章的重點(diǎn)。
1、對(duì)于 Kafka 這樣一個(gè)優(yōu)秀的服務(wù)端系統(tǒng)架構(gòu)來(lái)說(shuō),應(yīng)該遵循高可用、高性能、高并發(fā) 3 大原則。
2、本文從最簡(jiǎn)單的網(wǎng)絡(luò)編程思路出發(fā)一步一步演進(jìn)到 Reactor 設(shè)計(jì)模式,假設(shè)我們就是 Kafka 架構(gòu)的設(shè)計(jì)者,我們?cè)撊绾卧O(shè)計(jì)其服務(wù)端網(wǎng)絡(luò)架構(gòu)。
3、通過(guò)本文的深度剖析,提升系統(tǒng)I/O性能的核心是基于「事件驅(qū)動(dòng)」模型實(shí)現(xiàn)。
4、在剖析完服務(wù)端網(wǎng)絡(luò)架構(gòu)后,我們也深度剖析了 SocketServer中兩個(gè)最重要的線程:Acceptor 線程和 Processor 線程。
5、接著我們結(jié)合 Kafka 超高并發(fā)網(wǎng)絡(luò)架構(gòu)圖又梳理了 Kafka 請(qǐng)求處理核心流程。
6、最后給大家分析并做了 Broker 端系統(tǒng)調(diào)優(yōu)的方案。