偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Kafka 為什么這么快的七大秘訣,漲知識了

云計算 Kafka
Kafka Reactor I/O 網(wǎng)絡(luò)模型是一種非阻塞 I/O 模型,利用事件驅(qū)動機制來處理網(wǎng)絡(luò)請求。該模型通過 Reactor 模式實現(xiàn),即一個或多個 I/O 多路復(fù)用器(如 Java 的 Selector)監(jiān)聽多個通道的事件,當(dāng)某個通道準(zhǔn)備好進行 I/O 操作時,觸發(fā)相應(yīng)的事件處理器進行處理。

我們都知道 Kafka 是基于磁盤進行存儲的,但 Kafka 官方又稱其具有高性能、高吞吐、低延時的特點,其吞吐量動輒幾十上百萬。

在座的靚仔和靚女們是不是有點困惑了,一般認(rèn)為在磁盤上讀寫數(shù)據(jù)是會降低性能的,因為尋址會比較消耗時間。那 Kafka 又是怎么做到其吞吐量動輒幾十上百萬的呢?

一、Kafka Reactor I/O 網(wǎng)絡(luò)模型

Kafka Reactor I/O 網(wǎng)絡(luò)模型是一種非阻塞 I/O 模型,利用事件驅(qū)動機制來處理網(wǎng)絡(luò)請求。

該模型通過 Reactor 模式實現(xiàn),即一個或多個 I/O 多路復(fù)用器(如 Java 的 Selector)監(jiān)聽多個通道的事件,當(dāng)某個通道準(zhǔn)備好進行 I/O 操作時,觸發(fā)相應(yīng)的事件處理器進行處理。

這種模型在高并發(fā)場景下具有很高的效率,能夠同時處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個連接創(chuàng)建一個線程,從而節(jié)省系統(tǒng)資源。

Reactor 線程模型如圖 2 所示。

圖 2

Reacotr 模型主要分為三個角色。

  • Reactor:把 I/O 事件根據(jù)類型分配給分配給對應(yīng)的 Handler 處理。
  • Acceptor:處理客戶端連接事件。
  • Handler:處理讀寫等任務(wù)。

Kafka 基于 Reactor 模型架構(gòu)如圖 3 所示。

圖 3

Kafka 的網(wǎng)絡(luò)通信模型基于 NIO(New Input/Output)庫,通過 Reactor 模式實現(xiàn),具體包括以下幾個關(guān)鍵組件:

  • SocketServer:管理所有的網(wǎng)絡(luò)連接,包括初始化 Acceptor 和 Processor 線程。
  • Acceptor:監(jiān)聽客戶端的連接請求,并將其分配給 Processor 線程。Acceptor 使用 Java NIO 的 Selector 進行 I/O 多路復(fù)用,并注冊 OP_ACCEPT 事件來監(jiān)聽新的連接請求。每當(dāng)有新的連接到達時,Acceptor 會接受連接并創(chuàng)建一個 SocketChannel,然后將其分配給一個 Processor 線程進行處理。
  • Processor:處理具體的 I/O 操作,包括讀取客戶端請求和寫入響應(yīng)數(shù)據(jù)。Processor 同樣使用 Selector 進行 I/O 多路復(fù)用,注冊 OP_READ 和 OP_WRITE 事件來處理讀寫操作。每個 Processor 線程都有一個獨立的 Selector,用于管理多個 SocketChannel。
  • RequestChannel:充當(dāng) Processor 和請求處理線程之間的緩沖區(qū),存儲請求和響應(yīng)數(shù)據(jù)。Processor 將讀取的請求放入 RequestChannel 的請求隊列,而請求處理線程則從該隊列中取出請求進行處理。
  • KafkaRequestHandler:請求處理線程,從 RequestChannel 中讀取請求,調(diào)用 KafkaApis 進行業(yè)務(wù)邏輯處理,并將響應(yīng)放回 RequestChannel 的響應(yīng)隊列。KafkaRequestHandler 線程池中的線程數(shù)量由配置參數(shù) num.io.threads 決定。

圖 4

Chaya:該模型和如何提高 kafka 的性能和效率?

高并發(fā)處理能力:通過 I/O 多路復(fù)用機制,Kafka 能夠同時處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個連接創(chuàng)建一個線程,從而節(jié)省了系統(tǒng)資源。

低延遲:非阻塞 I/O 操作避免了線程的阻塞等待,使得 I/O 操作能夠更快地完成,從而降低了系統(tǒng)的響應(yīng)延遲。

資源節(jié)省:通過減少線程的數(shù)量和上下文切換,Kafka 在處理高并發(fā)請求時能夠更有效地利用 CPU 和內(nèi)存資源。

擴展性強:Reactor 模式的分層設(shè)計使得 Kafka 的網(wǎng)絡(luò)模塊具有很好的擴展性,可以根據(jù)需要增加更多的 I/O 線程或調(diào)整事件處理器的邏輯。

二、零拷貝技術(shù)的運用

零拷貝技術(shù)是一種計算機操作系統(tǒng)技術(shù),用于在內(nèi)存和存儲設(shè)備之間進行數(shù)據(jù)傳輸時,避免 CPU 的參與,從而減少 CPU 的負(fù)擔(dān)并提高數(shù)據(jù)傳輸效率。

Kafka 使用零拷貝技術(shù)來優(yōu)化數(shù)據(jù)傳輸,特別是在生產(chǎn)者將數(shù)據(jù)寫入 Kafka 和消費者從 Kafka 讀取數(shù)據(jù)的過程中。在 Kafka 中,零拷貝主要通過以下幾種方式實現(xiàn):

  • sendfile() 系統(tǒng)調(diào)用:在發(fā)送數(shù)據(jù)時,Kafka 使用操作系統(tǒng)的 sendfile() 系統(tǒng)調(diào)用直接將文件從磁盤發(fā)送到網(wǎng)絡(luò)套接字,而無需將數(shù)據(jù)復(fù)制到應(yīng)用程序的用戶空間。這減少了數(shù)據(jù)復(fù)制次數(shù),提高了傳輸效率。
  • 文件內(nèi)存映射(Memory-Mapped Files):Kafka 使用文件內(nèi)存映射技術(shù)(mmap),將磁盤上的日志文件映射到內(nèi)存中,使得讀寫操作可以在內(nèi)存中直接進行,無需進行額外的數(shù)據(jù)復(fù)制。

比如 Broker 讀取磁盤數(shù)據(jù)并把數(shù)據(jù)發(fā)送給 Consumer 的過程,傳統(tǒng) I/O 經(jīng)歷以下步驟。

  1. 讀取數(shù)據(jù):通過read 系統(tǒng)調(diào)用將磁盤數(shù)據(jù)通過 DMA copy 到內(nèi)核空間緩沖區(qū)(Read buffer)。
  2. 拷貝數(shù)據(jù):將數(shù)據(jù)從內(nèi)核空間緩沖區(qū)(Read buffer) 通過 CPU copy 到用戶空間緩沖區(qū)(Application buffer)。
  3. 寫入數(shù)據(jù):通過write()系統(tǒng)調(diào)用將數(shù)據(jù)從用戶空間緩沖區(qū)(Application) CPU copy 到內(nèi)核空間的網(wǎng)絡(luò)緩沖區(qū)(Socket buffer)。
  4. 發(fā)送數(shù)據(jù):將內(nèi)核空間的網(wǎng)絡(luò)緩沖區(qū)(Socket buffer)DMA copy 到網(wǎng)卡目標(biāo)端口,通過網(wǎng)卡將數(shù)據(jù)發(fā)送到目標(biāo)主機。

這一過程經(jīng)過的四次 copy 如圖 5 所示。

圖 5

Chaya:零拷貝技術(shù)如何提高 Kakfa 的性能?

零拷貝技術(shù)通過減少 CPU 負(fù)擔(dān)和內(nèi)存帶寬消耗,提高了 Kakfa 性能。

  • 降低 CPU 使用率:由于數(shù)據(jù)不需要在內(nèi)核空間和用戶空間之間多次復(fù)制,CPU 的參與減少,從而降低了 CPU 使用率,騰出更多的 CPU 資源用于其他任務(wù)。
  • 提高數(shù)據(jù)傳輸速度:直接從磁盤到網(wǎng)絡(luò)的傳輸路徑減少了中間步驟,使得數(shù)據(jù)傳輸更加高效,延遲更低。
  • 減少內(nèi)存帶寬消耗:通過減少數(shù)據(jù)在內(nèi)存中的復(fù)制次數(shù),降低了內(nèi)存帶寬的消耗,使得系統(tǒng)能夠處理更多的并發(fā)請求。

三、Partition 并發(fā)和分區(qū)負(fù)載均衡

在說 Topic patition 分區(qū)并發(fā)之前,我們先了解下 kafka 架構(gòu)設(shè)計。

1.Kafka 架構(gòu)

一個典型的 Kafka 架構(gòu)包含以下幾個重要組件,如圖 6 所示。

圖 6

  • Producer(生產(chǎn)者):發(fā)送消息的一方,負(fù)責(zé)發(fā)布消息到 Kafka 主題(Topic)。
  • Consumer(消費者):接受消息的一方,訂閱主題并處理消息。Kafka 有ConsumerGroup 的概念,每個Consumer 只能消費所分配到的 Partition 的消息,每一個Partition只能被一個ConsumerGroup 中的一個Consumer 所消費,所以同一個ConsumerGroup 中Consumer 的數(shù)量如果超過了Partiton 的數(shù)量,將會出現(xiàn)有些Consumer 分配不到 partition 消費。
  • Broker(代理):服務(wù)代理節(jié)點,Kafka 集群中的一臺服務(wù)器就是一個 broker,可以水平無限擴展,同一個 Topic 的消息可以分布在多個 broker 中。
  • Topic(主題)與 Partition(分區(qū)) :Kafka 中的消息以 Topic 為單位進行劃分,生產(chǎn)者將消息發(fā)送到特定的 Topic,而消費者負(fù)責(zé)訂閱 Topic 的消息并進行消費。圖中 TopicA 有三個 Partiton(TopicA-par0、TopicA-par1、TopicA-par2)
    為了提升整個集群的吞吐量,Topic 在物理上還可以細(xì)分多個Partition,一個 Partition 在磁盤上對應(yīng)一個文件夾。
  • Replica(副本):副本,是 Kafka 保證數(shù)據(jù)高可用的方式,Kafka 同一 Partition 的數(shù)據(jù)可以在多 Broker 上存在多個副本,通常只有 leader 副本對外提供讀寫服務(wù),當(dāng) leader副本所在 broker 崩潰或發(fā)生網(wǎng)絡(luò)一場,Kafka 會在 Controller 的管理下會重新選擇新的 Leader 副本對外提供讀寫服務(wù)。
  • ZooKeeper:管理 Kafka 集群的元數(shù)據(jù)和分布式協(xié)調(diào)。

2.Topic 主題

Topic 是 Kafka 中數(shù)據(jù)的邏輯分類單元,可以理解成一個隊列。Broker 是所有隊列部署的機器,Producer 將消息發(fā)送到特定的 Topic,而 Consumer 則從特定的 Topic 中消費消息。

3.Partition

為了提高并行處理能力和擴展性,Kafka 將一個 Topic 分為多個 Partition。每個 Partition 是一個有序的消息隊列,消息在 Partition 內(nèi)部是有序的,但在不同的 Partition 之間沒有順序保證。

Producer 可以并行地將消息發(fā)送到不同的 Partition,Consumer 也可以并行地消費不同的 Partition,從而提升整體處理能力。

因此,可以說,每增加一個 Paritition 就增加了一個消費并發(fā)。Partition的引入不僅提高了系統(tǒng)的可擴展性,還使得數(shù)據(jù)處理更加靈活。

4.Partition 分區(qū)策略

碼樓:“生產(chǎn)者將消息發(fā)送到哪個分區(qū)是如何實現(xiàn)的?不合理的分配會導(dǎo)致消息集中在某些 Broker 上,豈不是完犢子。”

主要有以下幾種分區(qū)策略:

  • 輪詢策略:也稱Round-robin策略,即順序分配。
  • 隨機策略:也稱Randomness策略。所謂隨機就是我們隨意地將消息放置到任意一個分區(qū)上。
  • 按消息鍵保序策略。
  • 基于地理位置分區(qū)策略。

輪詢策略

比如一個 Topic 下有 3個分區(qū),那么第一條消息被發(fā)送到分區(qū)0,第二條被發(fā)送到分區(qū)1,第三條被發(fā)送到分區(qū)2,以此類推。

當(dāng)生產(chǎn)第4條消息時又會重新開始,即將其分配到分區(qū)0,如圖 5 所示。

輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一。

隨機策略

所謂隨機就是我們隨意地將消息放置到任意一個分區(qū)上。如圖所示,9 條消息隨機分配到不同分區(qū)。

按消息鍵分配策略

一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面,比如訂單 ID,那么綁定同一個 訂單 ID 的消息都會發(fā)布到同一個分區(qū),由于每個分區(qū)下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略,如圖所示。

基于地理位置

這種策略一般只針對那些大規(guī)模的 Kafka 集群,特別是跨城市、跨國家甚至是跨大洲的集群。

我們就可以根據(jù) Broker 所在的 IP 地址實現(xiàn)定制化的分區(qū)策略。比如下面這段代碼:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream()
  .filter(p -> isSouth(p.leader().host()))
  .map(PartitionInfo::partition)
  .findAny()
  .get();

我們可以從所有分區(qū)中找出那些Leader副本在南方的所有分區(qū),然后隨機挑選一個進行消息發(fā)送。

四、Segment 日志文件和稀疏索引

前面已經(jīng)介紹過,Kafka 的 Topic 可以分為多個 Partition,每個 Partition 有多個副本,你可以理解為副本才是存儲消息的物理存在。其實每個副本都是以日志(Log)的形式存儲。

碼樓:“日志文件過大怎么辦?”

為了解決單一日志文件過大的問題,kafka采用了分段(Segment)的形式進行存儲。

所謂 Segment,就是當(dāng)一個日志文件大小到達一定條件之后,就新建一個新的 Segment,然后在新的Segment寫入數(shù)據(jù)。Topic、Partition、和日志的關(guān)系如圖 8 所示。

圖 8

一個 segment 對應(yīng)磁盤上多個文件。

  • .index : 消息的 offset 索引文件。
  • .timeindex : 消息的時間索引文件(0.8版本加入的)。
  • .log  : 存儲實際的消息數(shù)據(jù)。
  • .snapshot : 記錄了 producer 的事務(wù)信息。
  • .swap : 用于 Segment 恢復(fù)。
  • .txnindex 文件,記錄了中斷的事務(wù)信息。

.log 文件存儲實際的 message,kafka為每一個日志文件添加了2 個索引文件 .index以及 .timeindex。

segment 文件命名規(guī)則:partition 第一個 segment 從 0 開始,后續(xù)每個 segment 文件名為上一個 segment 文件最后一條消息的 offset 值。數(shù)值最大為 64 位 long 大小,19 位數(shù)字字符長度,沒有數(shù)字用 0 填充。

碼樓:“為什么要有 .index 文件?”

為了提高查找消息的性能。kafka 為消息數(shù)據(jù)建了兩種稀疏索引,一種是方便 offset 查找的 .index 稀疏索引,還有一種是方便時間查找的 .timeindex 稀疏索引。

1.稀疏索引

Chaya:“為什么不創(chuàng)建一個哈希索引,從 offset 到物理消息日志文件偏移量的映射關(guān)系?”

萬萬不可,Kafka 作為海量數(shù)據(jù)處理的中間件,每秒高達幾百萬的消息寫入,這個哈希索引會把把內(nèi)存撐爆炸。

稀疏索引不會為每個記錄都保存索引,而是寫入一定的記錄之后才會增加一個索引值,具體這個間隔有多大則通過 log.index.interval.bytes 參數(shù)進行控制,默認(rèn)大小為 4 KB,意味著 Kafka 至少寫入 4KB 消息數(shù)據(jù)之后,才會在索引文件中增加一個索引項。

哈希稀疏索引把消息劃分為多個 block ,只索引每個 block 第一條消息的 offset 即可 。

  • Offset 偏移量:表示第幾個消息。
  • position:消息在磁盤的物理位置。

Chaya:如果消費者要查找 Offset 為 4 的消息,查找過程是怎樣的?

  • 首先用二分法定位消息在哪個 Segment ,Segment 文件命名是 Partition 第一個 segment 從 0 開始,后續(xù)每個 segment 文件名為上一個 segment 文件最后一條消息的 offset 值。
  • 打開這個 Segment 對應(yīng)的 index 索引文件,用二分法查找 offset 不大于 4 的索引條目,對應(yīng)上圖第二條條目,也就是 offset = 3 的那個索引。通過索引我們可以知道 offset 為 4 的消息所在的日志文件磁盤物理位置為 495。
  • 打開日志文件,從 Position 為 495 位置開始開始順序掃描文件,將掃描過程中每條消息的 offset 與 4 比較,直到找到 offset 為 4 的那條Message。

.timeindex 文件同理,只不過它的查找結(jié)果是 offset,之后還要在走一遍 .index 索引查找流程。

由于 kafka 設(shè)計為順序讀寫磁盤,因此遍歷區(qū)間的數(shù)據(jù)并對速度有太大的影響,而選擇稀疏索引還能節(jié)約大量的磁盤空間。

2.mmap

有了稀疏索引,當(dāng)給定一個 offset 時,Kafka 采用的是二分查找來掃描索引定位不大于 offset 的物理位移 position,再到日志文件找到目標(biāo)消息。

利用稀疏索引,已經(jīng)基本解決了高效查詢的問題,但是這個過程中仍然有進一步的優(yōu)化空間,那便是通過 mmap(memory mapped files) 讀寫上面提到的稀疏索引文件,進一步提高查詢消息的速度。

就是基于 JDK nio 包下的 MappedByteBuffer 的 map 函數(shù),將磁盤文件映射到內(nèi)存中。

進程通過調(diào)用mmap系統(tǒng)函數(shù),將文件或物理內(nèi)存的一部分映射到其虛擬地址空間。這個過程中,操作系統(tǒng)會為映射的內(nèi)存區(qū)域分配一個虛擬地址,并將這個地址與文件或物理內(nèi)存的實際內(nèi)容關(guān)聯(lián)起來。

一旦內(nèi)存映射完成,進程就可以通過指針直接訪問映射的內(nèi)存區(qū)域。這種訪問方式就像訪問普通內(nèi)存一樣簡單和高效。

圖引自《碼農(nóng)的荒島求生》

五、順序讀寫磁盤

碼樓:“不管如何,Kafka 讀寫消息都要讀寫磁盤,如何變快呢?”

磁盤就一定很慢么?人們普遍錯誤地認(rèn)為硬盤很慢。然而,存儲介質(zhì)的性能,很大程度上依賴于數(shù)據(jù)被訪問的模式。

同樣在一塊普通的7200 RPM SATA硬盤上,隨機I/O(random I/O)與順序I/O相比,隨機I/O的性能要比順序I/O慢3到4個數(shù)量級。

合理的方式可以讓磁盤寫操作更加高效,減少了尋道時間和旋轉(zhuǎn)延遲。

碼樓,你還留著課本嗎?來,翻到講磁盤的章節(jié),讓我們回顧一下磁盤的運行原理。

碼樓:“鬼還留著哦,課程還沒上到一半書就沒了。要不是考試俺眼神好,就掛科了?!?/p>

磁盤的運行原理如圖所示。

硬盤在邏輯上被劃分為磁道、柱面以及扇區(qū)。硬盤的每個盤片的每個面都有一個讀寫磁頭。

完成一次磁盤 I/O ,需要經(jīng)過尋道、旋轉(zhuǎn)和數(shù)據(jù)傳輸三個步驟。

  • 尋道:首先必須找到柱面,即磁頭需要移動到相應(yīng)磁道,這個過程叫做尋道,所耗費時間叫做尋道時間。尋道時間越短,I/O 操作越快,目前磁盤的平均尋道時間一般在 3-15ms。
  • 旋轉(zhuǎn):磁盤旋轉(zhuǎn)將目標(biāo)扇區(qū)旋轉(zhuǎn)到磁頭下。這個過程耗費的時間叫做旋轉(zhuǎn)時間。旋轉(zhuǎn)延遲取決于磁盤轉(zhuǎn)速,通常用磁盤旋轉(zhuǎn)一周所需時間的 1/2 表示。比如:7200rpm 的磁盤平均旋轉(zhuǎn)延遲大約為 60*1000/7200/2 = 4.17ms,而轉(zhuǎn)速為 15000rpm 的磁盤其平均旋轉(zhuǎn)延遲為 2ms。
  • 數(shù)據(jù)傳輸:數(shù)據(jù)在磁盤與內(nèi)存之間的實際傳輸。

因此,如果在寫磁盤的時候省去尋道、旋轉(zhuǎn)可以極大地提高磁盤讀寫的性能。

Kafka 采用順序?qū)懳募姆绞絹硖岣叽疟P寫入性能。順序?qū)懳募?,順?I/O 的時候,磁頭幾乎不用換道,或者換道的時間很短。減少了磁盤尋道和旋轉(zhuǎn)的次數(shù)。磁頭再也不用在磁道上亂舞了,而是一路向前飛速前行。

Kafka 中每個Partition 是一個有序的,不可變的消息序列,新的消息可以不斷追加到 Partition 的末尾,在 Kafka 中 Partition 只是一個邏輯概念,每個Partition 劃分為多個 Segment,每個 Segment 對應(yīng)一個物理文件,Kafka 對 Segment 文件追加寫,這就是順序?qū)懳募?/p>

每條消息在發(fā)送前會根據(jù)負(fù)載均衡策略計算出要發(fā)往的目標(biāo) Partition 中,broker 收到消息之后把該條消息按照追加的方式順序?qū)懭?Partition 的日志文件中。

如下圖所示,可以看到磁盤順序?qū)懙男阅苓h(yuǎn)高于磁盤隨機寫,甚至比內(nèi)存隨機寫還快。

六、PageCache

Chaya:“碼哥,使用稀疏索引和 mmap 內(nèi)存映射技術(shù)提高讀消息的性能;Topic Partition 加磁盤順序?qū)懗志没⒌脑O(shè)計已經(jīng)很快了,但是與內(nèi)存順序?qū)戇€是慢了,還有優(yōu)化空間么?”

小姑娘,你的想法很好,作為快到令人發(fā)指的 Kafka,確實想到了一個方式來提高讀寫寫磁盤文件的性能。這就是接下來的主角 Page Cache 。

簡而言之:利用操作系統(tǒng)的緩存技術(shù),在讀寫磁盤日志文件時,操作的是內(nèi)存,而不是文件,由操作系統(tǒng)決定什么在某個時間將 Page Cache 的數(shù)據(jù)刷寫到磁盤中。

  • Producer 發(fā)送消息到 Broker 時,Broker 會使用 pwrite() 系統(tǒng)調(diào)用寫入數(shù)據(jù),此時數(shù)據(jù)都會先寫入page cache。
  • Consumer 消費消息時,Broker 使用 sendfile() 系統(tǒng)調(diào)用函數(shù),通零拷貝技術(shù)地將 Page Cache 中的數(shù)據(jù)傳輸?shù)?Broker 的 Socket buffer,再通過網(wǎng)絡(luò)傳輸?shù)?Consumer。
  • leader 與 follower 之間的同步,與上面 consumer 消費數(shù)據(jù)的過程是同理的。

Kafka重度依賴底層操作系統(tǒng)提供的PageCache功能。當(dāng)上層有寫操作時,操作系統(tǒng)只是將數(shù)據(jù)寫入PageCache,同時標(biāo)記Page屬性為Dirty。

當(dāng)讀操作發(fā)生時,先從PageCache中查找,如果發(fā)生缺頁才進行磁盤調(diào)度,最終返回需要的數(shù)據(jù)。

于是我們得到一個重要結(jié)論:如果Kafka producer的生產(chǎn)速率與consumer的消費速率相差不大,那么就能幾乎只靠對broker page cache的讀寫完成整個生產(chǎn)-消費過程,磁盤訪問非常少。

實際上PageCache是把盡可能多的空閑內(nèi)存都當(dāng)做了磁盤緩存來使用。

七、數(shù)據(jù)壓縮和批量處理

數(shù)據(jù)壓縮在 Kafka 中有助于減少磁盤空間的使用和網(wǎng)絡(luò)帶寬的消耗,從而提升整體性能。

通過減少消息的大小,壓縮可以顯著降低生產(chǎn)者和消費者之間的數(shù)據(jù)傳輸時間。

Chaya:Kafka 支持的壓縮算法有哪些?

在Kafka 2.1.0版本之前,Kafka支持3種壓縮算法:GZIP、Snappy和LZ4。從2.1.0開始,Kafka正式支持Zstandard算法(簡寫為zstd)。

Chaya:這么多壓縮算法,我如何選擇?

一個壓縮算法的優(yōu)劣,有兩個重要的指標(biāo):壓縮比,文件壓縮前的大小與壓縮后的大小之比,比如源文件占用 1000 M 內(nèi)存,經(jīng)過壓縮后變成了 200 M,壓縮比 = 1000 /200 = 5,壓縮比越高越高;另一個指標(biāo)是壓縮/解壓縮吞吐量,比如每秒能壓縮或者解壓縮多少 M 數(shù)據(jù),吞吐量越高越好。

1.生產(chǎn)者壓縮

Kafka 的數(shù)據(jù)壓縮主要在生產(chǎn)者端進行。具體步驟如下:

  • 生產(chǎn)者配置壓縮方式:在 KafkaProducer 配置中設(shè)置 compression.type 參數(shù),可以選擇 gzip、snappy、lz4  zstd
  • 消息壓縮:生產(chǎn)者將消息批量收集到一個 batch 中,然后對整個 batch 進行壓縮。這種批量壓縮方式可以獲得更高的壓縮率。
  • 壓縮消息存儲:壓縮后的 batch 以壓縮格式存儲在 Kafka 的主題(Topic)分區(qū)中。
  • 消費者解壓縮:消費者從 Kafka 主題中獲取消息時,首先對接收到的 batch 進行解壓縮,然后處理其中的每一條消息。

2.解壓縮

有壓縮,那必有解壓縮。通常情況下,Producer 發(fā)送壓縮后的消息到 Broker ,原樣保存起來。

Consumer 消費這些消息的時候,Broker 原樣發(fā)給 Consumer,由 Consumer 執(zhí)行解壓縮還原出原本的信息。

Chaya:Consumer 咋知道用什么壓縮算法解壓縮?

Kafka會將啟用了哪種壓縮算法封裝進消息集合中,這樣當(dāng)Consumer讀取到消息集合時,它自然就知道了這些消息使用的是哪種壓縮算法。

總之一句話:Producer端壓縮、Broker端保持、Consumer端解壓縮。

3.批量數(shù)據(jù)處理

Kafka Producer 向 Broker 發(fā)送消息不是一條消息一條消息的發(fā)送,將多條消息打包成一個批次發(fā)送。

批量數(shù)據(jù)處理可以顯著提高 Kafka 的吞吐量并減少網(wǎng)絡(luò)開銷。

Kafka Producer 的執(zhí)行流程如下圖所示:

發(fā)送消息依次經(jīng)過以下處理器:

  • Serialize:鍵和值都根據(jù)傳遞的序列化器進行序列化。優(yōu)秀的序列化方式可以提高網(wǎng)絡(luò)傳輸?shù)男省?/li>
  • Partition:決定將消息寫入主題的哪個分區(qū),默認(rèn)情況下遵循 murmur2 算法。自定義分區(qū)程序也可以傳遞給生產(chǎn)者,以控制應(yīng)將消息寫入哪個分區(qū)。
  • Compression:默認(rèn)情況下,在 Kafka 生產(chǎn)者中不啟用壓縮。Compression 不僅可以更快地從生產(chǎn)者傳輸?shù)酱?,還可以在復(fù)制過程中進行更快的傳輸。壓縮有助于提高吞吐量,降低延遲并提高磁盤利用率。
  • Record Accumulator:Accumulate顧名思義,就是一個消息累計器。其內(nèi)部為每個 Partition 維護一個Deque雙端隊列,隊列保存將要發(fā)送的 Batch批次數(shù)據(jù),Accumulate將數(shù)據(jù)累計到一定數(shù)量,或者在一定過期時間內(nèi),便將數(shù)據(jù)以批次的方式發(fā)送出去。記錄被累積在主題每個分區(qū)的緩沖區(qū)中。根據(jù)生產(chǎn)者批次大小屬性將記錄分組。主題中的每個分區(qū)都有一個單獨的累加器 / 緩沖區(qū)。
  • Group Send:記錄累積器中分區(qū)的批次按將它們發(fā)送到的代理分組。批處理中的記錄基于 batch.size  linger.ms 屬性發(fā)送到代理。記錄由生產(chǎn)者根據(jù)兩個條件發(fā)送。當(dāng)達到定義的批次大小或達到定義的延遲時間時。
  • Send Thread:發(fā)送線程,從 Accumulator 的隊列取出待發(fā)送的 Batch 批次消息發(fā)送到 Broker。
  • Broker 端處理:Kafka Broker 接收到 batch 后,將其存儲在對應(yīng)的主題分區(qū)中。
  • 消費者端的批量消費:消費者可以配置一次拉取多條消息的數(shù)量,通過 fetch.min.bytes  fetch.max.wait.ms 參數(shù)控制批量大小和等待時間。

八、無鎖輕量級 offset

Offset 是 Kafka 中的一個重要概念,用于標(biāo)識消息在分區(qū)中的位置。

每個分區(qū)中的消息都有一個唯一的 offset,消費者通過維護自己的 offset 來確保準(zhǔn)確消費消息。offset 的高效管理對于 Kafka 的性能至關(guān)重要。

offset 是從 0 開始的,每當(dāng)有新的消息寫入分區(qū)時,offset 就會加 1。offset 是不可變的,即使消息被刪除或過期,offset 也不會改變或重用。

Consumer需要向Kafka匯報自己的位移數(shù)據(jù),這個匯報過程被稱為提交位移(Committing Offsets)。因為Consumer能夠同時消費多個partition的數(shù)據(jù),所以位移的提交實際上是在partition粒度上進行的,即Consumer需要為分配給它的每個partition提交各自的位移數(shù)據(jù)。

提交位移主要是為了表征Consumer的消費進度,這樣當(dāng)Consumer發(fā)生故障重啟之后,就能夠從Kafka中讀取之前提交的位移值,然后從相應(yīng)的位移處繼續(xù)消費。

在傳統(tǒng)的消息隊列系統(tǒng)中,offset 通常需要通過鎖機制來保證一致性,但這會帶來性能瓶頸。Kafka 的設(shè)計哲學(xué)是盡量減少鎖的使用,以提升并發(fā)處理能力和整體性能。

1.無鎖設(shè)計思想

Kafka 在 offset 設(shè)計中采用了一系列無鎖的技術(shù),使其能夠在高并發(fā)的環(huán)境中保持高效。

  • 順序?qū)懭?/strong>:Kafka 使用順序?qū)懭氲姆绞綄⑾⒆芳拥饺罩疚募哪┪玻苊饬宋募恢玫念l繁變動,從而減少了鎖的使用。
  • MMAP 內(nèi)存映射文件:Kafka 使用內(nèi)存映射文件(Memory Mapped File)來訪問日志數(shù)據(jù)和索引文件。這種方式使得文件數(shù)據(jù)可以直接映射到進程的虛擬地址空間中,從而減少了系統(tǒng)調(diào)用的開銷,提高了數(shù)據(jù)訪問的效率。
  • 零拷貝:Kafka 使用零拷貝(Zero Copy)技術(shù),將數(shù)據(jù)從磁盤直接傳輸?shù)骄W(wǎng)絡(luò),繞過了用戶態(tài)的復(fù)制過程,大大提高了數(shù)據(jù)傳輸?shù)男省?/li>
  • 批量處理:Kafka 支持批量處理消息,在一個批次中同時處理多個消息,減少了網(wǎng)絡(luò)和 I/O 的開銷。

2.消費者 Offset 管理流程

graph TD;
    A[啟動消費者] --> B[從分區(qū)讀取消息];
    B --> C[處理消息];
    C --> D{是否成功處理?};
    D --> |是| E[更新 Offset];
    D --> |否| F[記錄失敗, 重新處理];
    E --> G[提交 Offset];
    G --> H[繼續(xù)處理下一個消息];
    F --> B;
    H --> B;
  • 啟動消費者:消費者啟動并訂閱 Kafka 主題的某個分區(qū)。
  • 從分區(qū)讀取消息:消費者從指定分區(qū)中讀取消息。
  • 處理消息:消費者處理讀取到的消息。
  • 是否成功處理:判斷消息是否成功處理。
  • 如果成功處理,更新 Offset。
  • 如果處理失敗,記錄失敗原因并準(zhǔn)備重新處理。
  • 更新 Offset:成功處理消息后,更新 Offset 以記錄已處理消息的位置。
  • 提交 Offset:將更新后的 Offset 提交到 Kafka,以確保消息處理進度的持久化。
  • 繼續(xù)處理下一個消息:提交 Offset 后,繼續(xù)讀取并處理下一個消息。

Kafka 通過無鎖輕量級 offset 的設(shè)計,實現(xiàn)了高性能、高吞吐和低延時的目標(biāo)。

九、總結(jié)

Kafka 通過無鎖輕量級 offset 的設(shè)計,實現(xiàn)了高性能、高吞吐和低延時的目標(biāo)。

其 Reactor I/O 網(wǎng)絡(luò)模型、磁盤順序?qū)懭?、?nèi)存映射文件、零拷貝、數(shù)據(jù)壓縮和批量處理等技術(shù),為 Kafka 提供了強大的數(shù)據(jù)處理能力和高效的消息隊列服務(wù)。

  • Reactor I/O 網(wǎng)絡(luò)模型:通過 I/O 多路復(fù)用機制,Kafka 能夠同時處理大量的網(wǎng)絡(luò)連接請求,而不需要為每個連接創(chuàng)建一個線程,從而節(jié)省了系統(tǒng)資源。
  • 順序?qū)懭?/strong>:Kafka 使用順序?qū)懭氲姆绞綄⑾⒆芳拥饺罩疚募哪┪?,避免了文件位置的頻繁變動,從而減少了鎖的使用。
  • MMAP 內(nèi)存映射文件:Kafka 使用內(nèi)存映射文件(Memory Mapped File)來訪問日志數(shù)據(jù)和索引文件。這種方式使得文件數(shù)據(jù)可以直接映射到進程的虛擬地址空間中,從而減少了系統(tǒng)調(diào)用的開銷,提高了數(shù)據(jù)訪問的效率。
  • 零拷貝:Kafka 使用零拷貝(Zero Copy)技術(shù),將數(shù)據(jù)從磁盤直接傳輸?shù)骄W(wǎng)絡(luò),繞過了用戶態(tài)的復(fù)制過程,大大提高了數(shù)據(jù)傳輸?shù)男省?/li>
  • 數(shù)據(jù)壓縮和批量處理:數(shù)據(jù)壓縮在 Kafka 中有助于減少磁盤空間的使用和網(wǎng)絡(luò)帶寬的消耗,從而提升整體性能。;Kafka 支持批量處理消息,在一個批次中同時處理多個消息,減少了網(wǎng)絡(luò)和 I/O 的開銷。
責(zé)任編輯:姜華 來源: 碼哥跳動
相關(guān)推薦

2024-07-30 09:01:12

2024-02-26 21:15:20

Kafka緩存參數(shù)

2020-03-30 15:05:46

Kafka消息數(shù)據(jù)

2024-11-26 08:52:34

SQL優(yōu)化Kafka

2020-02-27 21:03:30

調(diào)度器架構(gòu)效率

2020-02-27 15:44:41

Nginx服務(wù)器反向代理

2010-09-09 16:26:54

CSS選擇符

2020-10-13 17:54:18

開發(fā)Kafka數(shù)據(jù)

2023-08-29 07:46:08

Redis數(shù)據(jù)ReHash

2020-10-15 09:19:36

Elasticsear查詢速度

2021-05-27 20:56:51

esbuild 工具JavaScript

2022-04-21 15:57:37

數(shù)字化轉(zhuǎn)型疫情云服務(wù)

2021-03-22 08:30:33

Kafka源碼架構(gòu)開發(fā)技術(shù)

2013-04-23 10:11:41

PaaS

2025-05-27 04:00:01

Docker容器掛載

2020-08-13 09:19:10

Kafka存儲MQ

2022-09-24 09:52:42

TopicQueuekafka

2021-05-31 07:44:08

Kafka分布式系統(tǒng)

2023-03-21 08:02:36

Redis6.0IO多線程

2020-04-27 07:13:37

Nginx底層進程
點贊
收藏

51CTO技術(shù)棧公眾號