螞蟻面試:Kafka 如何做壓測?如何保證系統(tǒng)穩(wěn)定?
Kafka是大數(shù)據(jù)領(lǐng)域應(yīng)用非常廣泛的消息中間件,如何確定Kafka集群的最大吞吐量和延遲呢?又如何保證Kafka集群的穩(wěn)定呢?今天我們來介紹Kafka壓測方案,來確認(rèn)Kafka集群的各類指標(biāo)。

一、Kafka自帶性能測試工具
Kafka提供了內(nèi)置的性能測試工具,可以用于生產(chǎn)者和消費(fèi)者的基準(zhǔn)測試:
- 生產(chǎn)者性能測試工具:kafka-producer-perf-test.sh
 - 消費(fèi)者性能測試工具:kafka-consumer-perf-test.sh
 
第三方壓測工具JMeter:
- 可以使用JMeter的Kafka插件進(jìn)行壓測Tsung
 - 支持Kafka協(xié)議的分布式壓測工具Gatling
 - 可以通過Kafka插件進(jìn)行壓測
 
二、壓測場景設(shè)計(jì)
1. 生產(chǎn)者性能測試
測試不同消息大小、批處理設(shè)置和壓縮算法對生產(chǎn)者性能的影響:
# 測試100字節(jié)消息,無壓縮  
/opt/kafka/bin/kafka-producer-perf-test.sh \  
  --topic test-topic \  
  --num-records 10000000 \  
  --record-size 100 \  
  --throughput -1 \  
  --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \  
  acks=1 \  
  batch.size=16384 \  
  linger.ms=0 \  
  compression.type=none  
# 測試1KB消息,使用lz4壓縮  
/opt/kafka/bin/kafka-producer-perf-test.sh \  
  --topic test-topic \  
  --num-records 10000000 \  
  --record-size 1024 \  
  --throughput -1 \  
  --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \  
  acks=1 \  
  batch.size=65536 \  
  linger.ms=10 \  
  compression.type=lz42. 消費(fèi)者性能測試
測試不同消費(fèi)者組配置和分區(qū)數(shù)對消費(fèi)性能的影響:
# 基本消費(fèi)者性能測試  
/opt/kafka/bin/kafka-consumer-perf-test.sh \  
  --bootstrap-server broker1:9092,broker2:9092,broker3:9092 \  
  --topic test-topic \  
  --messages 10000000 \  
  --threads 1 \  
  --print-metrics  
# 多線程消費(fèi)者測試  
/opt/kafka/bin/kafka-consumer-perf-test.sh \  
  --bootstrap-server broker1:9092,broker2:9092,broker3:9092 \  
  --topic test-topic \  
  --messages 10000000 \  
  --threads 8 \  
  --print-metrics3. 端到端延遲測試
測量從生產(chǎn)到消費(fèi)的端到端延遲:
# 創(chuàng)建一個(gè)具有多個(gè)分區(qū)的測試主題  
/opt/kafka/bin/kafka-topics.sh \  
  --bootstrap-server broker1:9092 \  
  --create \  
  --topic latency-test \  
  --partitions 8 \  
  --replication-factor 3  
# 使用自定義Java程序測量端到端延遲
// 生產(chǎn)者代碼示例  
Properties props = new Properties();  
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");  
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
props.put("acks", "all");  
Producer<String, String> producer = new KafkaProducer<>(props);  
for (int i = 0; i < 10000; i++) {  
    long timestamp = System.currentTimeMillis();  
    ProducerRecord<String, String> record =   
        new ProducerRecord<>("latency-test", null, timestamp, "key-" + i, "value-" + timestamp);  
    producer.send(record);  
    Thread.sleep(100); // 每秒發(fā)送10條消息  
}  
producer.close();  
// 消費(fèi)者代碼示例  
Properties props = new Properties();  
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");  
props.put("group.id", "latency-test-group");  
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
props.put("auto.offset.reset", "earliest");  
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);  
consumer.subscribe(Collections.singletonList("latency-test"));  
while (true) {  
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  
    for (ConsumerRecord<String, String> record : records) {  
        long latency = System.currentTimeMillis() - record.timestamp();  
        System.out.printf("Offset = %d, Latency = %d ms%n", record.offset(), latency);  
    }  
}4. 吞吐量與延遲權(quán)衡測試
測試不同配置下吞吐量與延遲的權(quán)衡關(guān)系:
# 高吞吐量配置測試  
/opt/kafka/bin/kafka-producer-perf-test.sh \  
  --topic throughput-test \  
  --num-records 5000000 \  
  --record-size 1024 \  
  --throughput -1 \  
  --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \  
  acks=1 \  
  batch.size=131072 \  
  linger.ms=50 \  
  compression.type=lz4 \  
  buffer.memory=67108864  
# 低延遲配置測試  
/opt/kafka/bin/kafka-producer-perf-test.sh \  
  --topic latency-test \  
  --num-records 1000000 \  
  --record-size 1024 \  
  --throughput -1 \  
  --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \  
  acks=1 \  
  batch.size=8192 \  
  linger.ms=0 \  
  compression.type=none三、壓測指標(biāo)分析
1. 生產(chǎn)者關(guān)鍵指標(biāo)
- 吞吐量(Throughput):每秒處理的消息數(shù)或字節(jié)數(shù)
 - 延遲(Latency):消息從發(fā)送到確認(rèn)的時(shí)間
 - CPU使用率:生產(chǎn)者進(jìn)程的CPU使用情況
 - 內(nèi)存使用率:生產(chǎn)者進(jìn)程的內(nèi)存使用情況
 - 批處理率:每批次的平均消息數(shù)
 
2. 消費(fèi)者關(guān)鍵指標(biāo)
- 吞吐量:每秒消費(fèi)的消息數(shù)或字節(jié)數(shù)
 - 延遲:消息從生產(chǎn)到消費(fèi)的時(shí)間
 - 消費(fèi)者滯后(Consumer Lag):消費(fèi)者落后于生產(chǎn)者的消息數(shù)
 - 處理時(shí)間:消費(fèi)者處理每條消息的時(shí)間
 - 提交率:偏移量提交的頻率和成功率
 
3. Broker關(guān)鍵指標(biāo)
- 請求處理率:每秒處理的請求數(shù)
 - 請求隊(duì)列大?。旱却幚淼恼埱髷?shù)
 - 網(wǎng)絡(luò)吞吐量:進(jìn)出Broker的網(wǎng)絡(luò)流量
 - 磁盤使用率:日志文件的增長速率
 - GC暫停時(shí)間:垃圾收集對性能的影響
 
四、壓測結(jié)果解讀
1. 生產(chǎn)者性能分析
以下是一個(gè)典型的生產(chǎn)者性能測試結(jié)果示例:
100000 records sent, 25000.0 records/sec (24.41 MB/sec), 15.2 ms avg latency, 293.0 ms max latency.  
200000 records sent, 26315.8 records/sec (25.67 MB/sec), 12.8 ms avg latency, 128.0 ms max latency.  
300000 records sent, 27272.7 records/sec (26.61 MB/sec), 11.5 ms avg latency, 98.0 ms max latency.結(jié)果解讀:
- 吞吐量隨時(shí)間穩(wěn)定在約26,000條記錄/秒(約25MB/秒)
 - 平均延遲約為13毫秒,最大延遲為293毫秒
 - 隨著測試進(jìn)行,延遲趨于穩(wěn)定,表明系統(tǒng)性能良好
 
2. 消費(fèi)者性能分析
以下是一個(gè)典型的消費(fèi)者性能測試結(jié)果示例:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec  
2023-05-01 10:00:00, 2023-05-01 10:01:00, 1024.00, 17.07, 1048576, 17476.27, 20, 60000, 17.07, 17476.27結(jié)果解讀:
- 消費(fèi)速率為17.07MB/秒,約17,476條消息/秒
 - 重平衡時(shí)間為20毫秒,表明消費(fèi)者組協(xié)調(diào)效率高
 - 獲取時(shí)間為60秒,與測試持續(xù)時(shí)間一致
 
3. 瓶頸識別與解決
常見的性能瓶頸及解決方案:
CPU瓶頸:
- 增加broker數(shù)量
 - 優(yōu)化消息壓縮算法
 - 調(diào)整JVM參數(shù)
 
內(nèi)存瓶頸:
- 增加堆內(nèi)存大小
 - 優(yōu)化生產(chǎn)者/消費(fèi)者客戶端緩沖區(qū)大小
 - 減少不必要的對象創(chuàng)建
 
磁盤I/O瓶頸:
- 使用更快的存儲(如SSD)
 - 增加數(shù)據(jù)目錄數(shù)量,分散I/O負(fù)載
 - 優(yōu)化日志段大小和刷盤策略
 
網(wǎng)絡(luò)瓶頸:
- 增加網(wǎng)絡(luò)帶寬
 - 優(yōu)化消息批處理大小
 - 使用更高效的壓縮算法
 















 
 
 














 
 
 
 