面試官:你對(duì)Kafka比較熟? 那說說kafka日志段如何讀寫的吧?
之所以寫這篇文章是因?yàn)橹懊嬖嚂r(shí)候被面試官問到(倒)了,面試官說:“你說你對(duì)Kafka比較熟?看過源碼? 那說說kafka日志段如何讀寫的吧?”
我心里默默的說了句 “擦…我說看過一點(diǎn)點(diǎn)源碼,不是億點(diǎn)點(diǎn)。早知道不提這句了!”,那怎么辦呢,只能回家等通知了啊。
但是為了以后找回場子,咱也不能坐以待斃,日拱一卒從一點(diǎn)點(diǎn)到億點(diǎn)點(diǎn)。今天我們就來看看源碼層面來Kafka日志段的是如何讀寫的。
Kafka的存儲(chǔ)結(jié)構(gòu)
總所周知,Kafka的Topic可以有多個(gè)分區(qū),分區(qū)其實(shí)就是最小的讀取和存儲(chǔ)結(jié)構(gòu),即Consumer看似訂閱的是Topic,實(shí)則是從Topic下的某個(gè)分區(qū)獲得消息,Producer也是發(fā)送消息也是如此。
topic-partition關(guān)系
上圖是總體邏輯上的關(guān)系,映射到實(shí)際代碼中在磁盤上的關(guān)系則是如下圖所示:
每個(gè)分區(qū)對(duì)應(yīng)一個(gè)Log對(duì)象,在磁盤中就是一個(gè)子目錄,子目錄下面會(huì)有多組日志段即多Log Segment,每組日志段包含:消息日志文件(以log結(jié)尾)、位移索引文件(以index結(jié)尾)、時(shí)間戳索引文件(以timeindex結(jié)尾)。其實(shí)還有其它后綴的文件,例如.txnindex、.deleted等等。篇幅有限,暫不提起。
以下為日志的定義
以下為日志段的定義
indexIntervalBytes可以理解為插了多少消息之后再建一個(gè)索引,由此可以看出Kafka的索引其實(shí)是稀疏索引,這樣可以避免索引文件占用過多的內(nèi)存,從而可以在內(nèi)存中保存更多的索引。對(duì)應(yīng)的就是Broker 端參數(shù)log.index.interval.bytes 值,默認(rèn)4KB。
實(shí)際的通過索引查找消息過程是先通過offset找到索引所在的文件,然后通過二分法找到離目標(biāo)最近的索引,再順序遍歷消息文件找到目標(biāo)文件。這波操作時(shí)間復(fù)雜度為O(log2n)+O(m),n是索引文件里索引的個(gè)數(shù),m為稀疏程度。
這就是空間和時(shí)間的互換,又經(jīng)過數(shù)據(jù)結(jié)構(gòu)與算法的平衡,妙啊!
再說下rollJitterMs,這其實(shí)是個(gè)擾動(dòng)值,對(duì)應(yīng)的參數(shù)是log.roll.jitter.ms,這其實(shí)就要說到日志段的切分了,log.segment.bytes,這個(gè)參數(shù)控制著日志段文件的大小,默認(rèn)是1G,即當(dāng)文件存儲(chǔ)超過1G之后就新起一個(gè)文件寫入。這是以大小為維度的,還有一個(gè)參數(shù)是log.segment.ms,以時(shí)間為維度切分。
那配置了這個(gè)參數(shù)之后如果有很多很多分區(qū),然后因?yàn)檫@個(gè)參數(shù)是全局的,因此同一時(shí)刻需要做很多文件的切分,這磁盤IO就頂不住了啊,因此需要設(shè)置個(gè)rollJitterMs,來岔開它們。
怎么樣有沒有聯(lián)想到redis緩存的過期時(shí)間?過期時(shí)間加個(gè)隨機(jī)數(shù),防止同一時(shí)刻大量緩存過期導(dǎo)致緩存擊穿數(shù)據(jù)庫??纯粗R(shí)都是通的啊!
日志段的寫入
1、判斷下當(dāng)前日志段是否為空,空的話記錄下時(shí)間,來作為之后日志段的切分依據(jù)
2、確保位移值合法,最終調(diào)用的是AbstractIndex.toRelative(..)方法,即使判斷offset是否小于0,是否大于int最大值。
3、append消息,實(shí)際上就是通過FileChannel將消息寫入,當(dāng)然只是寫入內(nèi)存中及頁緩存,是否刷盤看配置。
4、更新日志段最大時(shí)間戳和最大時(shí)間戳對(duì)應(yīng)的位移值。這個(gè)時(shí)間戳其實(shí)用來作為定期刪除日志的依據(jù)
5、更新索引項(xiàng),如果需要的話(bytesSinceLastIndexEntry > indexIntervalBytes)
最后再來個(gè)流程圖
消息寫入流程
日志段的讀取
1、根據(jù)第一條消息的offset,通過OffsetIndex找到對(duì)應(yīng)的消息所在的物理位置和大小。
2、獲取LogOffsetMetadata,元數(shù)據(jù)包含消息的offset、消息所在segment的起始o(jì)ffset和物理位置
3、判斷minOneMessage是否為true,若是則調(diào)整為必定返回一條消息大小,其實(shí)就是在單條消息大于maxSize的情況下得以返回,防止消費(fèi)者餓死
4、再計(jì)算最大的fetchSize,即(最大物理位移-此消息起始物理位移)和adjustedMaxSize的最小值(這波我不是很懂,因?yàn)橐陨弦徊ú僮鱝djustedMaxSize已經(jīng)最小為一條消息的大小了)
5、調(diào)用 FileRecords 的 slice 方法從指定位置讀取指定大小的消息集合,并且構(gòu)造FetchDataInfo返回
再來個(gè)流程圖:
消息讀取流程
小結(jié)
從哪里跌倒就從哪里爬起來對(duì)吧,這波操作下來咱也不怕下次遇到面試官問了。
區(qū)區(qū)源碼不過爾爾,哈哈哈哈(首先得要有氣勢(shì))
實(shí)際上這只是Kafka源碼的冰山一角,長路漫漫。雖說Kafka Broker都是由Scala寫的,不過語言不是問題,這不看下來也沒什么難點(diǎn),注釋也很豐富。遇到不知道的語法小查一下搞定。
所以強(qiáng)烈建議大家入手源碼,從源碼上理解。今天說的 append 和 read 是很核心的功能,但一看也并不復(fù)雜,所以不要被源碼這兩個(gè)字嚇到了。
看源碼可以讓我們深入的理解內(nèi)部的設(shè)計(jì)原理,精進(jìn)我們的代碼功力(經(jīng)常看著看著,我擦還能這么寫)。當(dāng)然還有系統(tǒng)架構(gòu)能力。
然后對(duì)我而言最重要的是可以裝逼了(哈哈哈)。
情景劇
老白正目不轉(zhuǎn)睛盯著監(jiān)控大屏,“為什么?為什么Kafka Broker物理磁盤 I/O 負(fù)載突然這么高?”。寥寥無幾的秀發(fā)矗立在老白的頭上,顯得如此的無助。
“是不是設(shè)置了 log.segment.ms參數(shù) ?試試 log.roll.jitter.ms吧”,老白抬頭間我已走出了辦公室,留下了一個(gè)偉岸的背影和一顆锃亮的光頭!
“我變禿了,也變強(qiáng)了”