Kafka 為什么這么快的七大秘訣,漲知識了
我們都知道 Kafka 是基于磁盤進行存儲的,但 Kafka 官方又稱其具有高性能、高吞吐、低延時的特點,其吞吐量動輒幾十上百萬。
在座的靚仔和靚女們是不是有點困惑了,一般認(rèn)為在磁盤上讀寫數(shù)據(jù)是會降低性能的,因為尋址會比較消耗時間。那 Kafka 又是怎么做到其吞吐量動輒幾十上百萬的呢?
一、Kafka Reactor I/O 網(wǎng)絡(luò)模型
Kafka Reactor I/O 網(wǎng)絡(luò)模型是一種非阻塞 I/O 模型,利用事件驅(qū)動機制來處理網(wǎng)絡(luò)請求。
該模型通過 Reactor 模式實現(xiàn),即一個或多個 I/O 多路復(fù)用器(如 Java 的 Selector)監(jiān)聽多個通道的事件,當(dāng)某個通道準(zhǔn)備好進行 I/O 操作時,觸發(fā)相應(yīng)的事件處理器進行處理。
這種模型在高并發(fā)場景下具有很高的效率,能夠同時處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個連接創(chuàng)建一個線程,從而節(jié)省系統(tǒng)資源。
Reactor 線程模型如圖 2 所示。

圖 2
Reacotr 模型主要分為三個角色。
- Reactor:把 I/O 事件根據(jù)類型分配給分配給對應(yīng)的 Handler 處理。
- Acceptor:處理客戶端連接事件。
- Handler:處理讀寫等任務(wù)。
Kafka 基于 Reactor 模型架構(gòu)如圖 3 所示。

圖 3
Kafka 的網(wǎng)絡(luò)通信模型基于 NIO(New Input/Output)庫,通過 Reactor 模式實現(xiàn),具體包括以下幾個關(guān)鍵組件:
- SocketServer:管理所有的網(wǎng)絡(luò)連接,包括初始化 Acceptor 和 Processor 線程。
- Acceptor:監(jiān)聽客戶端的連接請求,并將其分配給 Processor 線程。Acceptor 使用 Java NIO 的 Selector 進行 I/O 多路復(fù)用,并注冊 OP_ACCEPT 事件來監(jiān)聽新的連接請求。每當(dāng)有新的連接到達時,Acceptor 會接受連接并創(chuàng)建一個 SocketChannel,然后將其分配給一個 Processor 線程進行處理。
- Processor:處理具體的 I/O 操作,包括讀取客戶端請求和寫入響應(yīng)數(shù)據(jù)。Processor 同樣使用 Selector 進行 I/O 多路復(fù)用,注冊 OP_READ 和 OP_WRITE 事件來處理讀寫操作。每個 Processor 線程都有一個獨立的 Selector,用于管理多個 SocketChannel。
- RequestChannel:充當(dāng) Processor 和請求處理線程之間的緩沖區(qū),存儲請求和響應(yīng)數(shù)據(jù)。Processor 將讀取的請求放入 RequestChannel 的請求隊列,而請求處理線程則從該隊列中取出請求進行處理。
- KafkaRequestHandler:請求處理線程,從 RequestChannel 中讀取請求,調(diào)用 KafkaApis 進行業(yè)務(wù)邏輯處理,并將響應(yīng)放回 RequestChannel 的響應(yīng)隊列。KafkaRequestHandler 線程池中的線程數(shù)量由配置參數(shù) num.io.threads 決定。

圖 4
Chaya:該模型和如何提高 kafka 的性能和效率?
高并發(fā)處理能力:通過 I/O 多路復(fù)用機制,Kafka 能夠同時處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個連接創(chuàng)建一個線程,從而節(jié)省了系統(tǒng)資源。
低延遲:非阻塞 I/O 操作避免了線程的阻塞等待,使得 I/O 操作能夠更快地完成,從而降低了系統(tǒng)的響應(yīng)延遲。
資源節(jié)省:通過減少線程的數(shù)量和上下文切換,Kafka 在處理高并發(fā)請求時能夠更有效地利用 CPU 和內(nèi)存資源。
擴展性強:Reactor 模式的分層設(shè)計使得 Kafka 的網(wǎng)絡(luò)模塊具有很好的擴展性,可以根據(jù)需要增加更多的 I/O 線程或調(diào)整事件處理器的邏輯。
二、零拷貝技術(shù)的運用
零拷貝技術(shù)是一種計算機操作系統(tǒng)技術(shù),用于在內(nèi)存和存儲設(shè)備之間進行數(shù)據(jù)傳輸時,避免 CPU 的參與,從而減少 CPU 的負(fù)擔(dān)并提高數(shù)據(jù)傳輸效率。
Kafka 使用零拷貝技術(shù)來優(yōu)化數(shù)據(jù)傳輸,特別是在生產(chǎn)者將數(shù)據(jù)寫入 Kafka 和消費者從 Kafka 讀取數(shù)據(jù)的過程中。在 Kafka 中,零拷貝主要通過以下幾種方式實現(xiàn):
- sendfile() 系統(tǒng)調(diào)用:在發(fā)送數(shù)據(jù)時,Kafka 使用操作系統(tǒng)的 sendfile() 系統(tǒng)調(diào)用直接將文件從磁盤發(fā)送到網(wǎng)絡(luò)套接字,而無需將數(shù)據(jù)復(fù)制到應(yīng)用程序的用戶空間。這減少了數(shù)據(jù)復(fù)制次數(shù),提高了傳輸效率。
- 文件內(nèi)存映射(Memory-Mapped Files):Kafka 使用文件內(nèi)存映射技術(shù)(mmap),將磁盤上的日志文件映射到內(nèi)存中,使得讀寫操作可以在內(nèi)存中直接進行,無需進行額外的數(shù)據(jù)復(fù)制。
比如 Broker 讀取磁盤數(shù)據(jù)并把數(shù)據(jù)發(fā)送給 Consumer 的過程,傳統(tǒng) I/O 經(jīng)歷以下步驟。
- 讀取數(shù)據(jù):通過read 系統(tǒng)調(diào)用將磁盤數(shù)據(jù)通過 DMA copy 到內(nèi)核空間緩沖區(qū)(Read buffer)。
- 拷貝數(shù)據(jù):將數(shù)據(jù)從內(nèi)核空間緩沖區(qū)(Read buffer) 通過 CPU copy 到用戶空間緩沖區(qū)(Application buffer)。
- 寫入數(shù)據(jù):通過write()系統(tǒng)調(diào)用將數(shù)據(jù)從用戶空間緩沖區(qū)(Application) CPU copy 到內(nèi)核空間的網(wǎng)絡(luò)緩沖區(qū)(Socket buffer)。
- 發(fā)送數(shù)據(jù):將內(nèi)核空間的網(wǎng)絡(luò)緩沖區(qū)(Socket buffer)DMA copy 到網(wǎng)卡目標(biāo)端口,通過網(wǎng)卡將數(shù)據(jù)發(fā)送到目標(biāo)主機。
這一過程經(jīng)過的四次 copy 如圖 5 所示。

圖 5
Chaya:零拷貝技術(shù)如何提高 Kakfa 的性能?
零拷貝技術(shù)通過減少 CPU 負(fù)擔(dān)和內(nèi)存帶寬消耗,提高了 Kakfa 性能。
- 降低 CPU 使用率:由于數(shù)據(jù)不需要在內(nèi)核空間和用戶空間之間多次復(fù)制,CPU 的參與減少,從而降低了 CPU 使用率,騰出更多的 CPU 資源用于其他任務(wù)。
- 提高數(shù)據(jù)傳輸速度:直接從磁盤到網(wǎng)絡(luò)的傳輸路徑減少了中間步驟,使得數(shù)據(jù)傳輸更加高效,延遲更低。
- 減少內(nèi)存帶寬消耗:通過減少數(shù)據(jù)在內(nèi)存中的復(fù)制次數(shù),降低了內(nèi)存帶寬的消耗,使得系統(tǒng)能夠處理更多的并發(fā)請求。
三、Partition 并發(fā)和分區(qū)負(fù)載均衡
在說 Topic patition 分區(qū)并發(fā)之前,我們先了解下 kafka 架構(gòu)設(shè)計。
1.Kafka 架構(gòu)
一個典型的 Kafka 架構(gòu)包含以下幾個重要組件,如圖 6 所示。

圖 6
- Producer(生產(chǎn)者):發(fā)送消息的一方,負(fù)責(zé)發(fā)布消息到 Kafka 主題(Topic)。
- Consumer(消費者):接受消息的一方,訂閱主題并處理消息。Kafka 有ConsumerGroup 的概念,每個Consumer 只能消費所分配到的 Partition 的消息,每一個Partition只能被一個ConsumerGroup 中的一個Consumer 所消費,所以同一個ConsumerGroup 中Consumer 的數(shù)量如果超過了Partiton 的數(shù)量,將會出現(xiàn)有些Consumer 分配不到 partition 消費。
- Broker(代理):服務(wù)代理節(jié)點,Kafka 集群中的一臺服務(wù)器就是一個 broker,可以水平無限擴展,同一個 Topic 的消息可以分布在多個 broker 中。
- Topic(主題)與 Partition(分區(qū)) :Kafka 中的消息以 Topic 為單位進行劃分,生產(chǎn)者將消息發(fā)送到特定的 Topic,而消費者負(fù)責(zé)訂閱 Topic 的消息并進行消費。圖中 TopicA 有三個 Partiton(TopicA-par0、TopicA-par1、TopicA-par2)
為了提升整個集群的吞吐量,Topic 在物理上還可以細(xì)分多個Partition,一個 Partition 在磁盤上對應(yīng)一個文件夾。 - Replica(副本):副本,是 Kafka 保證數(shù)據(jù)高可用的方式,Kafka 同一 Partition 的數(shù)據(jù)可以在多 Broker 上存在多個副本,通常只有 leader 副本對外提供讀寫服務(wù),當(dāng) leader副本所在 broker 崩潰或發(fā)生網(wǎng)絡(luò)一場,Kafka 會在 Controller 的管理下會重新選擇新的 Leader 副本對外提供讀寫服務(wù)。
- ZooKeeper:管理 Kafka 集群的元數(shù)據(jù)和分布式協(xié)調(diào)。
2.Topic 主題
Topic 是 Kafka 中數(shù)據(jù)的邏輯分類單元,可以理解成一個隊列。Broker 是所有隊列部署的機器,Producer 將消息發(fā)送到特定的 Topic,而 Consumer 則從特定的 Topic 中消費消息。

3.Partition
為了提高并行處理能力和擴展性,Kafka 將一個 Topic 分為多個 Partition。每個 Partition 是一個有序的消息隊列,消息在 Partition 內(nèi)部是有序的,但在不同的 Partition 之間沒有順序保證。
Producer 可以并行地將消息發(fā)送到不同的 Partition,Consumer 也可以并行地消費不同的 Partition,從而提升整體處理能力。

因此,可以說,每增加一個 Paritition 就增加了一個消費并發(fā)。Partition的引入不僅提高了系統(tǒng)的可擴展性,還使得數(shù)據(jù)處理更加靈活。
4.Partition 分區(qū)策略
碼樓:“生產(chǎn)者將消息發(fā)送到哪個分區(qū)是如何實現(xiàn)的?不合理的分配會導(dǎo)致消息集中在某些 Broker 上,豈不是完犢子。”
主要有以下幾種分區(qū)策略:
- 輪詢策略:也稱Round-robin策略,即順序分配。
- 隨機策略:也稱Randomness策略。所謂隨機就是我們隨意地將消息放置到任意一個分區(qū)上。
- 按消息鍵保序策略。
- 基于地理位置分區(qū)策略。
輪詢策略
比如一個 Topic 下有 3個分區(qū),那么第一條消息被發(fā)送到分區(qū)0,第二條被發(fā)送到分區(qū)1,第三條被發(fā)送到分區(qū)2,以此類推。
當(dāng)生產(chǎn)第4條消息時又會重新開始,即將其分配到分區(qū)0,如圖 5 所示。

輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一。
隨機策略
所謂隨機就是我們隨意地將消息放置到任意一個分區(qū)上。如圖所示,9 條消息隨機分配到不同分區(qū)。

按消息鍵分配策略
一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面,比如訂單 ID,那么綁定同一個 訂單 ID 的消息都會發(fā)布到同一個分區(qū),由于每個分區(qū)下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略,如圖所示。

基于地理位置
這種策略一般只針對那些大規(guī)模的 Kafka 集群,特別是跨城市、跨國家甚至是跨大洲的集群。
我們就可以根據(jù) Broker 所在的 IP 地址實現(xiàn)定制化的分區(qū)策略。比如下面這段代碼:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream()
.filter(p -> isSouth(p.leader().host()))
.map(PartitionInfo::partition)
.findAny()
.get();我們可以從所有分區(qū)中找出那些Leader副本在南方的所有分區(qū),然后隨機挑選一個進行消息發(fā)送。
四、Segment 日志文件和稀疏索引
前面已經(jīng)介紹過,Kafka 的 Topic 可以分為多個 Partition,每個 Partition 有多個副本,你可以理解為副本才是存儲消息的物理存在。其實每個副本都是以日志(Log)的形式存儲。
碼樓:“日志文件過大怎么辦?”
為了解決單一日志文件過大的問題,kafka采用了分段(Segment)的形式進行存儲。
所謂 Segment,就是當(dāng)一個日志文件大小到達一定條件之后,就新建一個新的 Segment,然后在新的Segment寫入數(shù)據(jù)。Topic、Partition、和日志的關(guān)系如圖 8 所示。

圖 8
一個 segment 對應(yīng)磁盤上多個文件。
.index: 消息的 offset 索引文件。.timeindex: 消息的時間索引文件(0.8版本加入的)。.log: 存儲實際的消息數(shù)據(jù)。.snapshot: 記錄了 producer 的事務(wù)信息。.swap: 用于 Segment 恢復(fù)。.txnindex文件,記錄了中斷的事務(wù)信息。
.log 文件存儲實際的 message,kafka為每一個日志文件添加了2 個索引文件 .index以及 .timeindex。
segment 文件命名規(guī)則:partition 第一個 segment 從 0 開始,后續(xù)每個 segment 文件名為上一個 segment 文件最后一條消息的 offset 值。數(shù)值最大為 64 位 long 大小,19 位數(shù)字字符長度,沒有數(shù)字用 0 填充。
碼樓:“為什么要有 .index 文件?”
為了提高查找消息的性能。kafka 為消息數(shù)據(jù)建了兩種稀疏索引,一種是方便 offset 查找的 .index 稀疏索引,還有一種是方便時間查找的 .timeindex 稀疏索引。
1.稀疏索引
Chaya:“為什么不創(chuàng)建一個哈希索引,從 offset 到物理消息日志文件偏移量的映射關(guān)系?”
萬萬不可,Kafka 作為海量數(shù)據(jù)處理的中間件,每秒高達幾百萬的消息寫入,這個哈希索引會把把內(nèi)存撐爆炸。
稀疏索引不會為每個記錄都保存索引,而是寫入一定的記錄之后才會增加一個索引值,具體這個間隔有多大則通過 log.index.interval.bytes 參數(shù)進行控制,默認(rèn)大小為 4 KB,意味著 Kafka 至少寫入 4KB 消息數(shù)據(jù)之后,才會在索引文件中增加一個索引項。
哈希稀疏索引把消息劃分為多個 block ,只索引每個 block 第一條消息的 offset 即可 。

- Offset 偏移量:表示第幾個消息。
- position:消息在磁盤的物理位置。
Chaya:如果消費者要查找 Offset 為 4 的消息,查找過程是怎樣的?
- 首先用二分法定位消息在哪個 Segment ,Segment 文件命名是 Partition 第一個 segment 從 0 開始,后續(xù)每個 segment 文件名為上一個 segment 文件最后一條消息的 offset 值。
- 打開這個 Segment 對應(yīng)的 index 索引文件,用二分法查找 offset 不大于 4 的索引條目,對應(yīng)上圖第二條條目,也就是 offset = 3 的那個索引。通過索引我們可以知道 offset 為 4 的消息所在的日志文件磁盤物理位置為 495。
- 打開日志文件,從 Position 為 495 位置開始開始順序掃描文件,將掃描過程中每條消息的 offset 與 4 比較,直到找到 offset 為 4 的那條Message。

.timeindex 文件同理,只不過它的查找結(jié)果是 offset,之后還要在走一遍 .index 索引查找流程。
由于 kafka 設(shè)計為順序讀寫磁盤,因此遍歷區(qū)間的數(shù)據(jù)并對速度有太大的影響,而選擇稀疏索引還能節(jié)約大量的磁盤空間。
2.mmap
有了稀疏索引,當(dāng)給定一個 offset 時,Kafka 采用的是二分查找來掃描索引定位不大于 offset 的物理位移 position,再到日志文件找到目標(biāo)消息。
利用稀疏索引,已經(jīng)基本解決了高效查詢的問題,但是這個過程中仍然有進一步的優(yōu)化空間,那便是通過 mmap(memory mapped files) 讀寫上面提到的稀疏索引文件,進一步提高查詢消息的速度。
就是基于 JDK nio 包下的 MappedByteBuffer 的 map 函數(shù),將磁盤文件映射到內(nèi)存中。
進程通過調(diào)用mmap系統(tǒng)函數(shù),將文件或物理內(nèi)存的一部分映射到其虛擬地址空間。這個過程中,操作系統(tǒng)會為映射的內(nèi)存區(qū)域分配一個虛擬地址,并將這個地址與文件或物理內(nèi)存的實際內(nèi)容關(guān)聯(lián)起來。
一旦內(nèi)存映射完成,進程就可以通過指針直接訪問映射的內(nèi)存區(qū)域。這種訪問方式就像訪問普通內(nèi)存一樣簡單和高效。

圖引自《碼農(nóng)的荒島求生》
五、順序讀寫磁盤
碼樓:“不管如何,Kafka 讀寫消息都要讀寫磁盤,如何變快呢?”
磁盤就一定很慢么?人們普遍錯誤地認(rèn)為硬盤很慢。然而,存儲介質(zhì)的性能,很大程度上依賴于數(shù)據(jù)被訪問的模式。
同樣在一塊普通的7200 RPM SATA硬盤上,隨機I/O(random I/O)與順序I/O相比,隨機I/O的性能要比順序I/O慢3到4個數(shù)量級。
合理的方式可以讓磁盤寫操作更加高效,減少了尋道時間和旋轉(zhuǎn)延遲。
碼樓,你還留著課本嗎?來,翻到講磁盤的章節(jié),讓我們回顧一下磁盤的運行原理。
碼樓:“鬼還留著哦,課程還沒上到一半書就沒了。要不是考試俺眼神好,就掛科了?!?/p>
磁盤的運行原理如圖所示。

硬盤在邏輯上被劃分為磁道、柱面以及扇區(qū)。硬盤的每個盤片的每個面都有一個讀寫磁頭。
完成一次磁盤 I/O ,需要經(jīng)過尋道、旋轉(zhuǎn)和數(shù)據(jù)傳輸三個步驟。
- 尋道:首先必須找到柱面,即磁頭需要移動到相應(yīng)磁道,這個過程叫做尋道,所耗費時間叫做尋道時間。尋道時間越短,I/O 操作越快,目前磁盤的平均尋道時間一般在 3-15ms。
- 旋轉(zhuǎn):磁盤旋轉(zhuǎn)將目標(biāo)扇區(qū)旋轉(zhuǎn)到磁頭下。這個過程耗費的時間叫做旋轉(zhuǎn)時間。旋轉(zhuǎn)延遲取決于磁盤轉(zhuǎn)速,通常用磁盤旋轉(zhuǎn)一周所需時間的 1/2 表示。比如:7200rpm 的磁盤平均旋轉(zhuǎn)延遲大約為 60*1000/7200/2 = 4.17ms,而轉(zhuǎn)速為 15000rpm 的磁盤其平均旋轉(zhuǎn)延遲為 2ms。
- 數(shù)據(jù)傳輸:數(shù)據(jù)在磁盤與內(nèi)存之間的實際傳輸。
因此,如果在寫磁盤的時候省去尋道、旋轉(zhuǎn)可以極大地提高磁盤讀寫的性能。
Kafka 采用順序?qū)懳募姆绞絹硖岣叽疟P寫入性能。順序?qū)懳募?,順?I/O 的時候,磁頭幾乎不用換道,或者換道的時間很短。減少了磁盤尋道和旋轉(zhuǎn)的次數(shù)。磁頭再也不用在磁道上亂舞了,而是一路向前飛速前行。
Kafka 中每個Partition 是一個有序的,不可變的消息序列,新的消息可以不斷追加到 Partition 的末尾,在 Kafka 中 Partition 只是一個邏輯概念,每個Partition 劃分為多個 Segment,每個 Segment 對應(yīng)一個物理文件,Kafka 對 Segment 文件追加寫,這就是順序?qū)懳募?/p>
每條消息在發(fā)送前會根據(jù)負(fù)載均衡策略計算出要發(fā)往的目標(biāo) Partition 中,broker 收到消息之后把該條消息按照追加的方式順序?qū)懭?Partition 的日志文件中。

如下圖所示,可以看到磁盤順序?qū)懙男阅苓h(yuǎn)高于磁盤隨機寫,甚至比內(nèi)存隨機寫還快。

六、PageCache
Chaya:“碼哥,使用稀疏索引和 mmap 內(nèi)存映射技術(shù)提高讀消息的性能;Topic Partition 加磁盤順序?qū)懗志没⒌脑O(shè)計已經(jīng)很快了,但是與內(nèi)存順序?qū)戇€是慢了,還有優(yōu)化空間么?”
小姑娘,你的想法很好,作為快到令人發(fā)指的 Kafka,確實想到了一個方式來提高讀寫寫磁盤文件的性能。這就是接下來的主角 Page Cache 。
簡而言之:利用操作系統(tǒng)的緩存技術(shù),在讀寫磁盤日志文件時,操作的是內(nèi)存,而不是文件,由操作系統(tǒng)決定什么在某個時間將 Page Cache 的數(shù)據(jù)刷寫到磁盤中。

- Producer 發(fā)送消息到 Broker 時,Broker 會使用
pwrite()系統(tǒng)調(diào)用寫入數(shù)據(jù),此時數(shù)據(jù)都會先寫入page cache。 - Consumer 消費消息時,Broker 使用
sendfile()系統(tǒng)調(diào)用函數(shù),通零拷貝技術(shù)地將 Page Cache 中的數(shù)據(jù)傳輸?shù)?Broker 的 Socket buffer,再通過網(wǎng)絡(luò)傳輸?shù)?Consumer。 - leader 與 follower 之間的同步,與上面 consumer 消費數(shù)據(jù)的過程是同理的。
Kafka重度依賴底層操作系統(tǒng)提供的PageCache功能。當(dāng)上層有寫操作時,操作系統(tǒng)只是將數(shù)據(jù)寫入PageCache,同時標(biāo)記Page屬性為Dirty。
當(dāng)讀操作發(fā)生時,先從PageCache中查找,如果發(fā)生缺頁才進行磁盤調(diào)度,最終返回需要的數(shù)據(jù)。

于是我們得到一個重要結(jié)論:如果Kafka producer的生產(chǎn)速率與consumer的消費速率相差不大,那么就能幾乎只靠對broker page cache的讀寫完成整個生產(chǎn)-消費過程,磁盤訪問非常少。
實際上PageCache是把盡可能多的空閑內(nèi)存都當(dāng)做了磁盤緩存來使用。
七、數(shù)據(jù)壓縮和批量處理
數(shù)據(jù)壓縮在 Kafka 中有助于減少磁盤空間的使用和網(wǎng)絡(luò)帶寬的消耗,從而提升整體性能。
通過減少消息的大小,壓縮可以顯著降低生產(chǎn)者和消費者之間的數(shù)據(jù)傳輸時間。
Chaya:Kafka 支持的壓縮算法有哪些?
在Kafka 2.1.0版本之前,Kafka支持3種壓縮算法:GZIP、Snappy和LZ4。從2.1.0開始,Kafka正式支持Zstandard算法(簡寫為zstd)。
Chaya:這么多壓縮算法,我如何選擇?
一個壓縮算法的優(yōu)劣,有兩個重要的指標(biāo):壓縮比,文件壓縮前的大小與壓縮后的大小之比,比如源文件占用 1000 M 內(nèi)存,經(jīng)過壓縮后變成了 200 M,壓縮比 = 1000 /200 = 5,壓縮比越高越高;另一個指標(biāo)是壓縮/解壓縮吞吐量,比如每秒能壓縮或者解壓縮多少 M 數(shù)據(jù),吞吐量越高越好。
1.生產(chǎn)者壓縮
Kafka 的數(shù)據(jù)壓縮主要在生產(chǎn)者端進行。具體步驟如下:
- 生產(chǎn)者配置壓縮方式:在 KafkaProducer 配置中設(shè)置
compression.type參數(shù),可以選擇gzip、snappy、lz4或zstd。 - 消息壓縮:生產(chǎn)者將消息批量收集到一個
batch中,然后對整個batch進行壓縮。這種批量壓縮方式可以獲得更高的壓縮率。 - 壓縮消息存儲:壓縮后的
batch以壓縮格式存儲在 Kafka 的主題(Topic)分區(qū)中。 - 消費者解壓縮:消費者從 Kafka 主題中獲取消息時,首先對接收到的
batch進行解壓縮,然后處理其中的每一條消息。
2.解壓縮
有壓縮,那必有解壓縮。通常情況下,Producer 發(fā)送壓縮后的消息到 Broker ,原樣保存起來。
Consumer 消費這些消息的時候,Broker 原樣發(fā)給 Consumer,由 Consumer 執(zhí)行解壓縮還原出原本的信息。
Chaya:Consumer 咋知道用什么壓縮算法解壓縮?
Kafka會將啟用了哪種壓縮算法封裝進消息集合中,這樣當(dāng)Consumer讀取到消息集合時,它自然就知道了這些消息使用的是哪種壓縮算法。
總之一句話:Producer端壓縮、Broker端保持、Consumer端解壓縮。
3.批量數(shù)據(jù)處理
Kafka Producer 向 Broker 發(fā)送消息不是一條消息一條消息的發(fā)送,將多條消息打包成一個批次發(fā)送。
批量數(shù)據(jù)處理可以顯著提高 Kafka 的吞吐量并減少網(wǎng)絡(luò)開銷。
Kafka Producer 的執(zhí)行流程如下圖所示:

發(fā)送消息依次經(jīng)過以下處理器:
- Serialize:鍵和值都根據(jù)傳遞的序列化器進行序列化。優(yōu)秀的序列化方式可以提高網(wǎng)絡(luò)傳輸?shù)男省?/li>
- Partition:決定將消息寫入主題的哪個分區(qū),默認(rèn)情況下遵循 murmur2 算法。自定義分區(qū)程序也可以傳遞給生產(chǎn)者,以控制應(yīng)將消息寫入哪個分區(qū)。
- Compression:默認(rèn)情況下,在 Kafka 生產(chǎn)者中不啟用壓縮。Compression 不僅可以更快地從生產(chǎn)者傳輸?shù)酱?,還可以在復(fù)制過程中進行更快的傳輸。壓縮有助于提高吞吐量,降低延遲并提高磁盤利用率。
- Record Accumulator:
Accumulate顧名思義,就是一個消息累計器。其內(nèi)部為每個 Partition 維護一個Deque雙端隊列,隊列保存將要發(fā)送的 Batch批次數(shù)據(jù),Accumulate將數(shù)據(jù)累計到一定數(shù)量,或者在一定過期時間內(nèi),便將數(shù)據(jù)以批次的方式發(fā)送出去。記錄被累積在主題每個分區(qū)的緩沖區(qū)中。根據(jù)生產(chǎn)者批次大小屬性將記錄分組。主題中的每個分區(qū)都有一個單獨的累加器 / 緩沖區(qū)。 - Group Send:記錄累積器中分區(qū)的批次按將它們發(fā)送到的代理分組。批處理中的記錄基于
batch.size和linger.ms屬性發(fā)送到代理。記錄由生產(chǎn)者根據(jù)兩個條件發(fā)送。當(dāng)達到定義的批次大小或達到定義的延遲時間時。 - Send Thread:發(fā)送線程,從 Accumulator 的隊列取出待發(fā)送的 Batch 批次消息發(fā)送到 Broker。
- Broker 端處理:Kafka Broker 接收到
batch后,將其存儲在對應(yīng)的主題分區(qū)中。 - 消費者端的批量消費:消費者可以配置一次拉取多條消息的數(shù)量,通過
fetch.min.bytes和fetch.max.wait.ms參數(shù)控制批量大小和等待時間。
八、無鎖輕量級 offset
Offset 是 Kafka 中的一個重要概念,用于標(biāo)識消息在分區(qū)中的位置。
每個分區(qū)中的消息都有一個唯一的 offset,消費者通過維護自己的 offset 來確保準(zhǔn)確消費消息。offset 的高效管理對于 Kafka 的性能至關(guān)重要。

offset 是從 0 開始的,每當(dāng)有新的消息寫入分區(qū)時,offset 就會加 1。offset 是不可變的,即使消息被刪除或過期,offset 也不會改變或重用。
Consumer需要向Kafka匯報自己的位移數(shù)據(jù),這個匯報過程被稱為提交位移(Committing Offsets)。因為Consumer能夠同時消費多個partition的數(shù)據(jù),所以位移的提交實際上是在partition粒度上進行的,即Consumer需要為分配給它的每個partition提交各自的位移數(shù)據(jù)。
提交位移主要是為了表征Consumer的消費進度,這樣當(dāng)Consumer發(fā)生故障重啟之后,就能夠從Kafka中讀取之前提交的位移值,然后從相應(yīng)的位移處繼續(xù)消費。
在傳統(tǒng)的消息隊列系統(tǒng)中,offset 通常需要通過鎖機制來保證一致性,但這會帶來性能瓶頸。Kafka 的設(shè)計哲學(xué)是盡量減少鎖的使用,以提升并發(fā)處理能力和整體性能。
1.無鎖設(shè)計思想
Kafka 在 offset 設(shè)計中采用了一系列無鎖的技術(shù),使其能夠在高并發(fā)的環(huán)境中保持高效。
- 順序?qū)懭?/strong>:Kafka 使用順序?qū)懭氲姆绞綄⑾⒆芳拥饺罩疚募哪┪玻苊饬宋募恢玫念l繁變動,從而減少了鎖的使用。
- MMAP 內(nèi)存映射文件:Kafka 使用內(nèi)存映射文件(Memory Mapped File)來訪問日志數(shù)據(jù)和索引文件。這種方式使得文件數(shù)據(jù)可以直接映射到進程的虛擬地址空間中,從而減少了系統(tǒng)調(diào)用的開銷,提高了數(shù)據(jù)訪問的效率。
- 零拷貝:Kafka 使用零拷貝(Zero Copy)技術(shù),將數(shù)據(jù)從磁盤直接傳輸?shù)骄W(wǎng)絡(luò),繞過了用戶態(tài)的復(fù)制過程,大大提高了數(shù)據(jù)傳輸?shù)男省?/li>
- 批量處理:Kafka 支持批量處理消息,在一個批次中同時處理多個消息,減少了網(wǎng)絡(luò)和 I/O 的開銷。
2.消費者 Offset 管理流程
graph TD;
A[啟動消費者] --> B[從分區(qū)讀取消息];
B --> C[處理消息];
C --> D{是否成功處理?};
D --> |是| E[更新 Offset];
D --> |否| F[記錄失敗, 重新處理];
E --> G[提交 Offset];
G --> H[繼續(xù)處理下一個消息];
F --> B;
H --> B;- 啟動消費者:消費者啟動并訂閱 Kafka 主題的某個分區(qū)。
- 從分區(qū)讀取消息:消費者從指定分區(qū)中讀取消息。
- 處理消息:消費者處理讀取到的消息。
- 是否成功處理:判斷消息是否成功處理。
- 如果成功處理,更新 Offset。
- 如果處理失敗,記錄失敗原因并準(zhǔn)備重新處理。
- 更新 Offset:成功處理消息后,更新 Offset 以記錄已處理消息的位置。
- 提交 Offset:將更新后的 Offset 提交到 Kafka,以確保消息處理進度的持久化。
- 繼續(xù)處理下一個消息:提交 Offset 后,繼續(xù)讀取并處理下一個消息。
Kafka 通過無鎖輕量級 offset 的設(shè)計,實現(xiàn)了高性能、高吞吐和低延時的目標(biāo)。
九、總結(jié)
Kafka 通過無鎖輕量級 offset 的設(shè)計,實現(xiàn)了高性能、高吞吐和低延時的目標(biāo)。
其 Reactor I/O 網(wǎng)絡(luò)模型、磁盤順序?qū)懭?、?nèi)存映射文件、零拷貝、數(shù)據(jù)壓縮和批量處理等技術(shù),為 Kafka 提供了強大的數(shù)據(jù)處理能力和高效的消息隊列服務(wù)。
- Reactor I/O 網(wǎng)絡(luò)模型:通過 I/O 多路復(fù)用機制,Kafka 能夠同時處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個連接創(chuàng)建一個線程,從而節(jié)省了系統(tǒng)資源。
- 順序?qū)懭?/strong>:Kafka 使用順序?qū)懭氲姆绞綄⑾⒆芳拥饺罩疚募哪┪?,避免了文件位置的頻繁變動,從而減少了鎖的使用。
- MMAP 內(nèi)存映射文件:Kafka 使用內(nèi)存映射文件(Memory Mapped File)來訪問日志數(shù)據(jù)和索引文件。這種方式使得文件數(shù)據(jù)可以直接映射到進程的虛擬地址空間中,從而減少了系統(tǒng)調(diào)用的開銷,提高了數(shù)據(jù)訪問的效率。
- 零拷貝:Kafka 使用零拷貝(Zero Copy)技術(shù),將數(shù)據(jù)從磁盤直接傳輸?shù)骄W(wǎng)絡(luò),繞過了用戶態(tài)的復(fù)制過程,大大提高了數(shù)據(jù)傳輸?shù)男省?/li>
- 數(shù)據(jù)壓縮和批量處理:數(shù)據(jù)壓縮在 Kafka 中有助于減少磁盤空間的使用和網(wǎng)絡(luò)帶寬的消耗,從而提升整體性能。;Kafka 支持批量處理消息,在一個批次中同時處理多個消息,減少了網(wǎng)絡(luò)和 I/O 的開銷。


























