偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

生產者的實現(xiàn)邏輯-kafka知識體系(二)

開發(fā) 架構 Kafka
Kafka是最初由Linkedin公司開發(fā),是一個分布式、支持分區(qū)的(partition)、多副本的(replica),基于zookeeper協(xié)調的分布式消息系統(tǒng),它的最大的特性就是可以實時的處理大量數(shù)據以滿足各種需求場景。

[[409180]]

kafka 是單條發(fā)送還是批量發(fā)送消息?

kafka 怎么做到單條發(fā)送?

kafka 發(fā)送消息是順序的嗎?

生產者什么情況下可能會頻繁FullGC?

消息發(fā)送的邏輯

上帝視角來看消息發(fā)送的流程。

生產者的設計

消費發(fā)送機制:

1)序列化器:序列化消息對象轉成字節(jié)數(shù)組,然后通過網絡傳輸。

2)分區(qū)器:計算消息發(fā)往的具體分區(qū);如果顯示指定了partition,便不會走分區(qū)器。

3)消息緩沖池:客戶端的消息緩沖池,默認大小32M,見參數(shù)buffer.memory。

4)批量發(fā)送:緩沖池中消息會按batch分批次發(fā)送,默認批次大小16KB,見參數(shù)batch.size。

負載均衡設計:

由于消息topic 由多個partition 組成,且partition 會均衡分布到不同broker 上。因此,為了有效利用broker 集群的性能,提高消息的吞吐量,producer 可以通過隨機或者hash 等方式,將消息平均發(fā)送到多個partition 上,以實現(xiàn)負載均衡。

分區(qū)策略:

  1. 輪詢策略,默認策略
  2. 隨機策略,實際表現(xiàn)來看,它要遜于輪詢策略
  3. 按消息鍵保序策略,一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面,由于每個分區(qū)下的消息處理都是有順序的。

KafkaProducer

源碼

  1. //客戶端ID。在創(chuàng)建 KafkaProducer 時可通過 client.id 定義 clientId,如果未指定,則默認 producer- seq,seq 在進程內遞增,強烈建議客戶端顯示指定 clientId。 
  2.  private final String clientId; 
  3.    //度量的相關存儲容器,例如消息體大小、發(fā)送耗時等與監(jiān)控相關的指標。 
  4.     final Metrics metrics; 
  5.     //分區(qū)負載均衡算法,通過參數(shù) partitioner.class 指定。 
  6.     private final Partitioner partitioner; 
  7.     //調用 send 方法發(fā)送的最大請求大小,包括 key、消息體序列化后的消息總大小不能超過該值。通過參數(shù) max.request.size 來設置。 
  8.     private final int maxRequestSize; 
  9.     //生產者緩存所占內存的總大小,通過參數(shù) buffer.memory 設置。 
  10.     private final long totalMemorySize; 
  11.     //元數(shù)據信息,例如 topic 的路由信息,由 KafkaProducer 自動更新。 
  12.     private final Metadata metadata; 
  13.     //消息記錄累積器 
  14.     private final RecordAccumulator accumulator; 
  15.     //用于封裝消息發(fā)送的邏輯,即向 broker 發(fā)送消息的處理邏輯。 
  16.     private final Sender sender; 
  17.     //用于消息發(fā)送的后臺線程,一個獨立的線程,內部使用 Sender 來向 broker 發(fā)送消息。 
  18.     private final Thread ioThread; 
  19.     //壓縮類型,默認不啟用壓縮,可通過參數(shù) compression.type 配置??蛇x值:none、gzip、snappy、lz4、zstd。 
  20.     private final CompressionType compressionType; 
  21.     //錯誤信息收集器,當成一個 metrics,用來做監(jiān)控的。 
  22.     private final Sensor errors; 
  23.     //用于獲取系統(tǒng)時間或線程睡眠等。 
  24.     private final Time time
  25.     //用于對消息的 key 進行序列化。 
  26.     private final ExtendedSerializer<K> keySerializer; 
  27.     //Serializer< V> valueSerializer 
  28.     private final ExtendedSerializer<V> valueSerializer; 
  29.     //生產者的配置信息。 
  30.     private final ProducerConfig producerConfig; 
  31.     //最大阻塞時間,當生產者使用的緩存已經達到規(guī)定值后,此時消息發(fā)送會阻塞,通過參數(shù) max.block.ms 來設置最多等待多久。 
  32.     private final long maxBlockTimeMs; 
  33.    //配置控制客戶機等待請求響應的最長時間。如果在超時超時之前沒有收到響應,客戶端將在需要時重新發(fā)送請求,或者在重試耗盡時失敗請求。 
  34.     private final int requestTimeoutMs; 
  35.     //生產者端的攔截器,在消息發(fā)送之前進行一些定制化處理。 
  36.     private final ProducerInterceptors<K, V> interceptors; 
  37.     //維護 api 版本的相關元信息,該類只能在 kafka 內部使用。 
  38.     private final ApiVersions apiVersions; 
  39.     //kafka 消息事務管理器。 
  40.     private final TransactionManager transactionManager; 
  41.     //kafka 生產者事務上下文環(huán)境初始結果。 
  42.     private TransactionalRequestResult initTransactionsResult; 

KafkaProducer 具有如下特征:

  1. KafkaProducer 是線程安全的,可以被多個線程交叉使用。
  2. KafkaProducer 內部包含一個緩存池,存放待發(fā)送消息,即 ProducerRecord 隊列,與此同時會開啟一個IO線程將 ProducerRecord 對象發(fā)送到 Kafka 集群。
  3. KafkaProducer 的消息發(fā)送 API send 方法是異步,只負責將待發(fā)送消息 ProducerRecord 發(fā)送到緩存區(qū)中,立即返回,并返回一個結果憑證 Future。

acks 參數(shù)的作用

KafkaProducer 提供了一個核心參數(shù) acks 用來定義消息“已提交”的條件(標準),就是 Broker 端向客戶端承偌已提交的條件,可選值如下:

  1. 0:只要調用 KafkaProducer 的 send 方法返回后即認為成功
  2. all 或 -1:表示消息不僅需要 Leader 節(jié)點已存儲該消息,并且要求其副本(準確的來說是 ISR 中的節(jié)點)全部存儲才認為已提交,才向客戶端返回提交成功。這是最嚴格的持久化保障,當然性能也最低。
  3. 1:表示消息只需要寫入 Leader 節(jié)點后就可以向客戶端返回提交成功。

retries 參數(shù)的作用

kafka 在生產端提供的另外一個核心屬性,用來控制消息在發(fā)送失敗后的重試次數(shù),設置為 0 表示不重試,重試就有可能造成消息在發(fā)送端的重復。從消息發(fā)送接口來看:

  1. Future<RecordMetadata> send(ProducerRecord<K, V> record);Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); 

從上面的 API 可以得知,用戶在使用 KafkaProducer 發(fā)送消息時,首先需要將待發(fā)送的消息封裝成 ProducerRecord,返回的是一個 Future 對象,典型的 Future 設計模式。

Kafka 消息追加流程

KafkaProducer 的 send 方法,并不會直接向 broker 發(fā)送消息,kafka 將消息發(fā)送異步化,即分解成兩個步驟,send 方法的職責是將消息追加到內存中(分區(qū)的緩存隊列中),然后會由專門的 Send 線程異步將緩存中的消息批量發(fā)送到 Kafka Broker 中。

主要的方法在KafkaProducer#doSend

將消息追加到生產者的發(fā)送緩存區(qū),其實現(xiàn)類為:RecordAccumulator。我們先來看一下 Kafka 一條消息寫到內存的流程圖:

Sender線程

到此為止,我們看到,當我們調用send 方法的時候,其實只是發(fā)送到了 生產者客戶端的服務內存中。還沒有到Broker。Kafka producer 客戶端后臺會啟動一個線程不停的輪詢消息批次存放的區(qū)域,把消息發(fā)送給Broker。

消息批次的內存結構和分配

根據上面的源碼我們可以了解到,每一個ProducerBatch 是一塊 大小為batch.size 字節(jié)大小的內存。而且用到了池化技術。

緩沖池的內存持有類是 BufferPool,我們先來看下 BufferPool 都有哪些成員:

  1. public class BufferPool { 
  2.   // 總的內存大小 
  3.   private final long totalMemory; 
  4.   // 每個內存塊大小,即 batch.size 
  5.   private final int poolableSize; 
  6.   // 申請、歸還內存的方法的同步鎖 
  7.   private final ReentrantLock lock; 
  8.   // 空閑的內存塊 
  9.   private final Deque<ByteBuffer> free
  10.   // 需要等待空閑內存塊的事件 
  11.   private final Deque<Condition> waiters; 
  12.   /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */ 
  13.   // 緩沖池還未分配的空閑內存,新申請的內存塊就是從這里獲取內存值 
  14.   private long nonPooledAvailableMemory; 
  15.   // ... 

從 BufferPool 的成員可看出,緩沖池實際上由一個個 ByteBuffer 組成的,BufferPool 持有這些內存塊,并保存在成員 free 中,free 的總大小由 totalMemory 作限制,而 nonPooledAvailableMemory 則表示還剩下緩沖池還剩下多少內存還未被分配。

當 Batch 的消息發(fā)送完畢后,就會將它持有的內存塊歸還到 free 中,以便后面的 Batch 申請內存塊時不再創(chuàng)建新的 ByteBuffer,從 free 中取就可以了,從而避免了內存塊被 JVM 回收的問題。

創(chuàng)建內存塊的流程如下:

歸還內存塊的邏輯流程

如果歸還的內存塊大小等于 batchSize,則將其清空后添加到緩沖池的 free 中,即將其歸還給緩沖池,避免了 JVM GC 回收該內存塊。如果不等于就直接將內存大小累加到未分配并且空閑的內存大小值中即可,內存就無需歸還了,等待 JVM GC 回收掉,最后喚醒正在等待空閑內存的線程。

Java生產者是如何管理TCP連接的

為何采用 TCP?

Apache Kafka 的所有通信都是基于 TCP 的,而不是基于 HTTP 或其他協(xié)議。無論是生產者、消費者,還是 Broker 之間的通信都是如此。

從社區(qū)的角度來看,在開發(fā)客戶端時,人們能夠利用 TCP 本身提供的一些高級功能,比如多路復用請求以及同時輪詢多個連接的能力。

TCP 的多路復用請求會在一條物理連接上創(chuàng)建若干個虛擬連接,每個虛擬連接負責流轉各自對應的數(shù)據流。其實嚴格來說,TCP 并不能多路復用,它只是提供可靠的消息交付語義保證,比如自動重傳丟失的報文。

而且目前已知的 HTTP 庫在很多編程語言中都略顯簡陋。

何時創(chuàng)建 TCP 連接?

TCP 連接是在創(chuàng)建 KafkaProducer 實例時建立的 ,在創(chuàng)建 KafkaProducer 實例時,生產者應用會在后臺創(chuàng)建并啟動一個名為 Sender 的線程,該 Sender 線程開始運行時首先會創(chuàng)建與 Broker 的連接。

  1. Properties properties = new Properties(); 
  2. properties.put("bootstrap.servers""localhost:9092"); 
  3. properties.put("key.serializer", StringSerializer.class.getName()); 
  4. properties.put("value.serializer", StringSerializer.class.getName()); 
  5. // try-with-resources 
  6. // 創(chuàng)建KafkaProducer實例時,會在后臺創(chuàng)建并啟動Sender線程,Sender線程開始運行時首先會創(chuàng)建與Broker的TCP連接 
  7. try (Producer<String, String> producer = new KafkaProducer<>(properties)) { 
  8.     ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, KEY, VALUE); 
  9.     Callback callback = (metadata, exception) -> { 
  10.     }; 
  11.     producer.send(record, callback); 
  1. bootstrap.servers是Producer的核心參數(shù)之一,指定了Producer啟動時要連接的Broker地址
  2. 如果bootstrap.servers指定了1000個Broker,那么Producer啟動時會首先創(chuàng)建與這1000個Broker的TCP連接
  3. 因此不建議把集群中所有的Broker信息都配置到bootstrap.servers中,通常配置3~4臺足夠
  4. Producer一旦連接到集群中的任意一臺Broker,就能拿到整個集群的Broker信息(metadata request)

TCP 連接還可能在兩個地方被創(chuàng)建:一個是在更新元數(shù)據后,另一個是在消息發(fā)送時。

  • 當Producer更新了集群的元數(shù)據后,如果發(fā)現(xiàn)與某些Broker當前沒有連接,那么Producer會創(chuàng)建一個TCP連接

【場景1】

當Producer嘗試向不存在的主題發(fā)送消息時,Broker會告訴Producer這個主題不存在,此時Producer會發(fā)送metadata request到Kafka集群,去嘗試獲取最新的元數(shù)據信息,與集群中所有的Broker建立TCP連接。

【場景2】

Producer通過metadata.max.age.ms參數(shù)定期地去更新元數(shù)據信息,默認值300000,即5分鐘。

  • 當Producer要發(fā)送消息時,Producer發(fā)現(xiàn)與目標Broker(依賴負載均衡算法)還沒有連接,也會創(chuàng)建一個TCP連接。

何時關閉 TCP 連接?

Producer端關閉TCP連接有兩種方式:用戶主動關閉、Kafka自動關閉。

【用戶主動關閉】

廣義的主動關閉,包括用戶調用kill -9來殺掉Producer,最推薦的方式:producer.close()

【Kafka自動關閉】

Producer端參數(shù)connections.max.idle.ms,默認值540000,即9分鐘

如果9分鐘內沒有任何請求經過某個TCP連接,Kafka會主動把TCP連接關閉

connections.max.idle.ms=-1會禁用這種機制,TCP連接將成為永久長連接

Kafka創(chuàng)建的Socket連接都開啟了keepalive。

【注意】

關閉TCP連接的發(fā)起方是Kafka客戶端,屬于被動關閉的場景

被動關閉的后果就是會產生大量的CLOSE_WAIT連接

Producer端或Client端沒有機會顯式地觀測到此TCP連接已被中斷

總結

現(xiàn)在我們可以回答開頭的3個問題了。

1、kafka 是單條發(fā)送還是批量發(fā)送消息?

正常情況下都是批量發(fā)送的。封裝成一個ProducerBatch 發(fā)送。

2.kafka 怎么做到單條發(fā)送?

只能設置單生產者單線程同步調用send 方法。

3.kafka 發(fā)送消息是順序的嗎?

不是的,如果需求順序必須設置key,并且是生產者是單線程的。

4.生產者什么情況下可能會頻繁FullGC?

如果你的消息大小比 batchSize 還要大,則不會從 free 中循環(huán)獲取已分配好的內存塊,而是重新創(chuàng)建一個新的 ByteBuffer,并且該 ByteBuffer 不會被歸還到緩沖池中(JVM GC 回收),如果此時 nonPooledAvailableMemory 比消息體還要小,還會將 free 中空閑的內存塊銷毀(JVM GC 回收),以便緩沖池中有足夠的內存空間提供給用戶申請,這些動作都會導致頻繁 GC 的問題出現(xiàn)。

因此,需要根據業(yè)務消息的大小,適當調整 batch.size 的大小,避免頻繁 GC。

責任編輯:姜華 來源: 今日頭條
相關推薦

2021-07-08 05:52:34

Kafka架構主從架構

2021-07-07 07:06:31

Brokerkafka架構

2021-07-08 07:16:24

RocketMQ數(shù)據結構Message

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-13 11:52:47

順序消息RocketMQkafka

2015-07-28 17:52:36

IOS知識體系

2020-08-04 10:45:05

運維架構技術

2021-09-09 06:55:43

kafka冪等生產者

2021-07-12 10:25:03

RocketMQ數(shù)據結構kafka

2022-05-10 10:06:03

Kafka

2021-07-02 06:27:00

Kafka架構主從架構

2021-12-22 11:00:05

模型Golang語言

2017-06-22 13:07:21

2012-03-08 11:13:23

企業(yè)架構

2017-04-03 15:35:13

知識體系架構

2017-02-27 16:42:23

Spark識體系

2022-05-23 08:20:29

Kafka生產者元數(shù)據管理

2015-08-26 09:39:30

java消費者

2020-04-17 14:49:34

Kafka分區(qū)數(shù)據

2021-12-28 12:01:59

Kafka 消費者機制
點贊
收藏

51CTO技術棧公眾號