偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

六個(gè)技術(shù)點(diǎn)帶你理解 Kafka 高性能背后的原理

云計(jì)算 Kafka
Kafka 是一款性能非常優(yōu)秀的消息隊(duì)列,每秒處理的消息體量可以達(dá)到千萬(wàn)級(jí)別。今天來(lái)聊一聊 Kafka 高性能背后的技術(shù)原理。

大家好,我是君哥。

Kafka 是一款性能非常優(yōu)秀的消息隊(duì)列,每秒處理的消息體量可以達(dá)到千萬(wàn)級(jí)別。今天來(lái)聊一聊 Kafka 高性能背后的技術(shù)原理。

1、批量發(fā)送

Kafka 收發(fā)消息都是批量進(jìn)行處理的。我們看一下 Kafka 生產(chǎn)者發(fā)送消息的代碼:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
 TopicPartition tp = null;
 try {
  //省略前面代碼
  Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
  //把消息追加到之前緩存的這一批消息上
  RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
    serializedValue, headers, interceptCallback, remainingWaitMs);
  //積累到設(shè)置的緩存大小,則發(fā)送出去
  if (result.batchIsFull || result.newBatchCreated) {
   log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
   this.sender.wakeup();
  }
  return result.future;
  // handling exceptions and record the errors;
  // for API exceptions return them in the future,
  // for other exceptions throw directly
 } catch /**省略 catch 代碼*/
}

從代碼中可以看到,生產(chǎn)者調(diào)用 doSend 方法后,并不會(huì)直接把消息發(fā)送出去,而是把消息緩存起來(lái),緩存消息量達(dá)到配置的批量大小后,才會(huì)發(fā)送出去。

注意:從上面 accumulator.append 代碼可以看到,一批消息屬于同一個(gè) topic 下面的同一個(gè) partition。

Broker 收到消息后,并不會(huì)把批量消息解析成單條消息后落盤,而是作為批量消息進(jìn)行落盤,同時(shí)也會(huì)把批量消息直接同步給其他副本。

消費(fèi)者拉取消息,也不會(huì)按照單條進(jìn)行拉取,而是按照批量進(jìn)行拉取,拉取到一批消息后,再解析成單條消息進(jìn)行消費(fèi)。

使用批量收發(fā)消息,減輕了客戶端和 Broker 的交互次數(shù),提升了 Broker 處理能力。

2、消息壓縮

如果消息體比較大,Kafka 消息吞吐量要達(dá)到千萬(wàn)級(jí)別,網(wǎng)卡支持的網(wǎng)絡(luò)傳輸帶寬會(huì)是一個(gè)瓶頸。Kafka 的解決方案是消息壓縮。發(fā)送消息時(shí),如果增加參數(shù) compression.type,就可以開啟消息壓縮:

public static void main(String[] args) {
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    //開啟消息壓縮
 props.put("compression.type", "gzip");
 Producer<String, String> producer = new KafkaProducer<>(props);

 ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key1", "value1");

 producer.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
   if (exception != null) {
    logger.error("sending message error: ", e);
   } else {
    logger.info("sending message successful, Offset: ", metadata.offset());
   }
  }
 });

 producer.close();
}

如果 compression.type 的值設(shè)置為 none,則不開啟壓縮。那消息是在什么時(shí)候進(jìn)行壓縮呢?前面提到過(guò),生產(chǎn)者緩存一批消息后才會(huì)發(fā)送,在發(fā)送這批消息之前就會(huì)進(jìn)行壓縮,代碼如下:

public RecordAppendResult append(TopicPartition tp,
         long timestamp,
         byte[] key,
         byte[] value,
         Header[] headers,
         Callback callback,
         long maxTimeToBlock) throws InterruptedException {
 // ...
 try {
  // ...
  buffer = free.allocate(size, maxTimeToBlock);
  synchronized (dq) {
   //...
   RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
   if (appendResult != null) {
    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
    return appendResult;
   }
            //這批消息緩存已滿,這里進(jìn)行壓縮
   MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
   ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
   FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

   dq.addLast(batch);
   incomplete.add(batch);

   // Don't deallocate this buffer in the finally block as it's being used in the record batch
   buffer = null;

   return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
  }
 } finally {
  if (buffer != null)
   free.deallocate(buffer);
  appendsInProgress.decrementAndGet();
 }
}

上面的 recordsBuilder 方法最終調(diào)用了下面 MemoryRecordsBuilder 的構(gòu)造方法。

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
       byte magic,
       CompressionType compressionType,
       TimestampType timestampType,
       long baseOffset,
       long logAppendTime,
       long producerId,
       short producerEpoch,
       int baseSequence,
       boolean isTransactional,
       boolean isControlBatch,
       int partitionLeaderEpoch,
       int writeLimit) {
 //省略其他代碼
 this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}

上面的 wrapForOutput 方法會(huì)根據(jù)配置的壓縮算法進(jìn)行壓縮或者選擇不壓縮。目前 Kafka 支持的壓縮算法包括:gzip、snappy、lz4,從 2.1.0 版本開始,Kafka 支持 Zstandard 算法。

在 Broker 端,會(huì)解壓 header 做一些校驗(yàn),但不會(huì)解壓消息體。消息體的解壓是在消費(fèi)端,消費(fèi)者拉取到一批消息后,首先會(huì)進(jìn)行解壓,然后進(jìn)行消息處理。

因?yàn)閴嚎s和解壓都是耗費(fèi) CPU 的操作,所以在開啟消息壓縮時(shí),也要考慮生產(chǎn)者和消費(fèi)者的 CPU 資源情況。

有了消息批量收集和壓縮,kafka 生產(chǎn)者發(fā)送消息的過(guò)程如下圖:

3、磁盤順序讀寫

順序讀寫省去了尋址的時(shí)間,只要一次尋址,就可以連續(xù)讀寫。

在固態(tài)硬盤上,順序讀寫的性能是隨機(jī)讀寫的好幾倍。而在機(jī)械硬盤上,尋址時(shí)需要移動(dòng)磁頭,這個(gè)機(jī)械運(yùn)動(dòng)會(huì)花費(fèi)很多時(shí)間,因此機(jī)械硬盤的順序讀寫性能是隨機(jī)讀寫的幾十倍。

Kafka 的 Broker 在寫消息數(shù)據(jù)時(shí),首先為每個(gè) Partition 創(chuàng)建一個(gè)文件,然后把數(shù)據(jù)順序地追加到該文件對(duì)應(yīng)的磁盤空間中,如果這個(gè)文件寫滿了,就再創(chuàng)建一個(gè)新文件繼續(xù)追加寫。這樣大大減少了尋址時(shí)間,提高了讀寫性能。

4、PageCache

在 Linux 系統(tǒng)中,所有文件 IO 操作都要通過(guò) PageCache,PageCache 是磁盤文件在內(nèi)存中建立的緩存。當(dāng)應(yīng)用程序讀寫文件時(shí),并不會(huì)直接讀寫磁盤上的文件,而是操作 PageCache。

應(yīng)用程序?qū)懳募r(shí),都先會(huì)把數(shù)據(jù)寫入 PageCache,然后操作系統(tǒng)定期地將 PageCache 的數(shù)據(jù)寫到磁盤上。如下圖:

而應(yīng)用程序在讀取文件數(shù)據(jù)時(shí),首先會(huì)判斷數(shù)據(jù)是否在 PageCache 中,如果在則直接讀取,如果不在,則讀取磁盤,并且將數(shù)據(jù)緩存到 PageCache。

Kafka 充分利用了 PageCache 的優(yōu)勢(shì),當(dāng)生產(chǎn)者生產(chǎn)消息的速率和消費(fèi)者消費(fèi)消息的速率差不多時(shí),Kafka 基本可以不用落盤就能完成消息的傳輸。

5、零拷貝

Kafka Broker 將消息發(fā)送給消費(fèi)端時(shí),即使命中了 PageCache,也需要將 PageCache 中的數(shù)據(jù)先復(fù)制到應(yīng)用程序的內(nèi)存空間,然后從應(yīng)用程序的內(nèi)存空間復(fù)制到 Socket 緩存區(qū),將數(shù)據(jù)發(fā)送出去。如下圖:

Kafka 采用了零拷貝技術(shù)把數(shù)據(jù)直接從 PageCache 復(fù)制到 Socket 緩沖區(qū)中,這樣數(shù)據(jù)不用復(fù)制到用戶態(tài)的內(nèi)存空間,同時(shí) DMA 控制器直接完成數(shù)據(jù)復(fù)制,不需要 CPU 參與。如下圖:

Java 零拷貝技術(shù)采用 FileChannel.transferTo() 方法,底層調(diào)用了 sendfile 方法。

6、mmap

Kafka 的日志文件分為數(shù)據(jù)文件(.log)和索引文件(.index),Kafka 為了提高索引文件的讀取性能,對(duì)索引文件采用了 mmap 內(nèi)存映射,將索引文件映射到進(jìn)程的內(nèi)存空間,這樣讀取索引文件就不需要從磁盤進(jìn)行讀取。如下圖:

7、總結(jié)

本文介紹了 Kafka 實(shí)現(xiàn)高性能用到的關(guān)鍵技術(shù),這些技術(shù)可以為我們學(xué)習(xí)和工作提供參考。

責(zé)任編輯:姜華 來(lái)源: 君哥聊技術(shù)
相關(guān)推薦

2009-02-12 09:44:48

Web應(yīng)用高性能習(xí)慣

2018-04-02 10:37:10

Linux命令size

2012-04-24 09:59:05

APP移動(dòng)社交

2009-07-08 11:27:05

敏捷方法

2023-10-10 18:24:46

PostgreSQL性能RDBMS

2010-03-18 14:07:09

無(wú)線USB技術(shù)特點(diǎn)

2023-08-08 11:32:38

光互連技術(shù)光纖電纜

2024-01-02 18:01:12

SQLSELECT查詢

2024-04-11 08:29:35

Kafka異步發(fā)送發(fā)送端重試

2022-06-28 08:42:03

磁盤kafka高性能

2009-07-29 17:40:05

高性能計(jì)算刀片服務(wù)器多核

2019-10-17 09:23:49

Kafka高性能架構(gòu)

2019-07-16 13:15:38

Kafka分布式數(shù)據(jù)

2024-11-11 16:22:15

2009-01-04 10:32:28

2020-01-07 16:16:57

Kafka開源消息系統(tǒng)

2018-07-06 11:18:46

HBaseHFile數(shù)據(jù)庫(kù)

2024-04-16 13:29:53

2018-06-19 16:58:36

UCloud彭晶鑫存儲(chǔ)

2023-05-08 14:56:00

Kafka高可靠高性能
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)