偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

記一次生產(chǎn)環(huán)境中 Kafka 集群面對(duì)消息量飆升后獲得五倍性能提升的優(yōu)化之旅

大數(shù)據(jù)
前幾天我碰到了一個(gè)Kafka集群在消息量突然飆升時(shí)遇到的性能瓶頸問題,經(jīng)過幾個(gè)小時(shí)的苦戰(zhàn)后,集群恢復(fù)了平穩(wěn)運(yùn)行,性能提升了5倍,本文將詳細(xì)闡述從問題發(fā)現(xiàn)、分析到最終解決的完整過程。

前幾天我碰到了一個(gè)Kafka集群在消息量突然飆升時(shí)遇到的性能瓶頸問題,經(jīng)過幾個(gè)小時(shí)的苦戰(zhàn)后,集群恢復(fù)了平穩(wěn)運(yùn)行,性能提升了5倍,本文將詳細(xì)闡述從問題發(fā)現(xiàn)、分析到最終解決的完整過程。

一、第一部分:問題發(fā)現(xiàn)

1. 告警觸發(fā)

周一早上9:00,監(jiān)控系統(tǒng)突然觸發(fā)了多項(xiàng)Kafka集群告警:

  • 消息積壓量急劇上升,部分主題的消費(fèi)延遲從毫秒級(jí)增加到分鐘級(jí)
  • 生產(chǎn)者客戶端報(bào)告大量請求超時(shí)錯(cuò)誤
  • Broker的CPU使用率飆升至90%以上
  • 網(wǎng)絡(luò)流量激增,接近物理網(wǎng)卡限制
  • 磁盤I/O使用率達(dá)到峰值

2. 初步情況評(píng)估

運(yùn)維團(tuán)隊(duì)立即展開初步調(diào)查,發(fā)現(xiàn):

  • 集群配置:5個(gè)broker節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)16核CPU,64GB內(nèi)存,10Gbps網(wǎng)絡(luò),RAID10磁盤陣列
  • 主要業(yè)務(wù)主題的分區(qū)數(shù):30個(gè),副本因子為3
  • 正常情況下,集群處理能力約為每秒50,000條消息
  • 當(dāng)前情況:消息生產(chǎn)速率突然飆升至每秒200,000條,是平時(shí)的4倍
  • 消費(fèi)者組處理速度明顯滯后,無法跟上生產(chǎn)速度

3. 業(yè)務(wù)影響評(píng)估

  • 多個(gè)關(guān)鍵業(yè)務(wù)系統(tǒng)報(bào)告數(shù)據(jù)處理延遲
  • 用戶交易確認(rèn)時(shí)間延長,部分交易出現(xiàn)超時(shí)
  • 數(shù)據(jù)分析平臺(tái)無法及時(shí)獲取最新數(shù)據(jù)
  • 實(shí)時(shí)監(jiān)控系統(tǒng)數(shù)據(jù)更新滯后

二、第二部分:問題分析

1. 日志分析

首先檢查Kafka服務(wù)器日志,發(fā)現(xiàn)大量警告和錯(cuò)誤信息:

WARN [ReplicaManager brokerId=1] Produce request with correlation id 12345 from client client1 on partition topic1-15 failed due to request timeout  
ERROR [KafkaRequestHandlerPool-0] Error when handling request: clientId=client2, correlationId=23456, api=PRODUCE  
java.lang.OutOfMemoryError: Java heap space  
WARN [ReplicaFetcherThread-0-3] Error in fetch from broker 3 for partition topic2-5

這些日志表明集群正面臨嚴(yán)重的資源壓力,包括請求處理超時(shí)、內(nèi)存不足和副本同步問題。

2. JVM監(jiān)控分析

通過JVM監(jiān)控工具分析broker進(jìn)程:

  • 堆內(nèi)存使用率接近90%,頻繁觸發(fā)Full GC
  • GC暫停時(shí)間從正常的幾十毫秒增加到幾百毫秒
  • 年輕代內(nèi)存分配速率異常高,表明有大量對(duì)象創(chuàng)建

3. 系統(tǒng)資源分析

使用系統(tǒng)監(jiān)控工具分析各節(jié)點(diǎn)資源使用情況:

  • CPU:用戶空間使用率85%,系統(tǒng)空間15%,幾乎沒有空閑
  • 內(nèi)存:物理內(nèi)存使用率95%,部分節(jié)點(diǎn)開始使用swap
  • 磁盤I/O:寫入速度接近物理極限,讀取隊(duì)列長度持續(xù)增加
  • 網(wǎng)絡(luò):入站流量8.5Gbps,接近10Gbps的物理限制

4. Kafka指標(biāo)分析

通過Kafka內(nèi)置指標(biāo)和JMX監(jiān)控,發(fā)現(xiàn)以下關(guān)鍵問題:

請求隊(duì)列積壓:

  • 請求隊(duì)列大小(request-queue-size)持續(xù)增長,表明網(wǎng)絡(luò)線程無法及時(shí)處理請求
  • 網(wǎng)絡(luò)處理線程CPU使用率接近100%

磁盤I/O瓶頸:

  • 日志刷盤延遲(log-flush-rate-and-time-ms)顯著增加
  • 磁盤寫入操作頻繁阻塞

副本同步問題:

  • ISR收縮事件頻繁發(fā)生,表明副本無法跟上leader的寫入速度
  • 副本同步延遲(replica-lag)持續(xù)增加

消息批處理效率低下:

  • 平均批次大小(batch-size-avg)遠(yuǎn)低于配置的最大值
  • 生產(chǎn)者請求頻率過高,導(dǎo)致網(wǎng)絡(luò)和處理開銷增加

5. 客戶端配置分析

檢查生產(chǎn)者客戶端配置,發(fā)現(xiàn)以下問題:

  • 批處理大小(batch.size)設(shè)置過小,默認(rèn)為16KB
  • linger.ms設(shè)置為0,導(dǎo)致消息立即發(fā)送而不等待批處理
  • 壓縮類型設(shè)置為none,未啟用任何壓縮
  • 生產(chǎn)者緩沖區(qū)(buffer.memory)設(shè)置不足,僅為32MB

檢查消費(fèi)者客戶端配置:

  • 消費(fèi)者拉取大小(fetch.max.bytes)設(shè)置過小,限制了單次拉取的數(shù)據(jù)量
  • 消費(fèi)者線程數(shù)不足,無法充分利用多核處理能力
  • 消費(fèi)者組重平衡策略不合理,導(dǎo)致頻繁的分區(qū)重分配

6. 根因分析

綜合以上分析,確定了以下核心問題:

(1) 資源配置不足:

  • Broker的JVM堆內(nèi)存配置不足以應(yīng)對(duì)突發(fā)流量
  • 網(wǎng)絡(luò)線程和I/O線程數(shù)量不足,無法充分利用多核CPU

(2) 參數(shù)配置不合理:

  • 生產(chǎn)者批處理和壓縮配置不合理,導(dǎo)致網(wǎng)絡(luò)和處理效率低下
  • 消費(fèi)者拉取配置限制了消費(fèi)速度
  • Broker端隊(duì)列大小限制不足以應(yīng)對(duì)突發(fā)流量

(3) 主題分區(qū)設(shè)計(jì)不合理:

  • 熱點(diǎn)主題分區(qū)數(shù)量不足,無法充分利用并行處理能力
  • 分區(qū)分布不均衡,導(dǎo)致部分broker負(fù)載過重

(4) 監(jiān)控預(yù)警不及時(shí):

  • 缺乏對(duì)生產(chǎn)速率變化的有效監(jiān)控和預(yù)警機(jī)制
  • 沒有針對(duì)突發(fā)流量的自動(dòng)擴(kuò)容策略

三、第三部分:緊急優(yōu)化方案

基于上述分析,制定了分階段的緊急優(yōu)化方案:

1. 第一階段:立即緩解壓力(0-2小時(shí))

(1) 增加關(guān)鍵資源配置

增加JVM堆內(nèi)存:

# 修改Kafka啟動(dòng)腳本中的KAFKA_HEAP_OPTS  
export KAFKA_HEAP_OPTS="-Xms16G -Xmx16G"

優(yōu)化GC配置:

# 添加以下GC參數(shù)  
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"

增加網(wǎng)絡(luò)和I/O線程數(shù):修改server.properties配置

# 增加網(wǎng)絡(luò)線程數(shù),從默認(rèn)的3增加到16  
num.network.threads=16  


# 增加I/O線程數(shù),從默認(rèn)的8增加到32  
num.io.threads=32  


# 增加請求隊(duì)列大小  
queued.max.requests=1000

調(diào)整請求處理參數(shù):

# 增加socket接收緩沖區(qū)  
socket.receive.buffer.bytes=1048576  


# 增加socket發(fā)送緩沖區(qū)  
socket.send.buffer.bytes=1048576  


# 增加最大請求大小  
max.request.size=10485760

(2) 優(yōu)化生產(chǎn)者客戶端配置

為主要生產(chǎn)者應(yīng)用緊急推送配置更新:

# 增加批處理大小  
batch.size=131072  


# 設(shè)置linger.ms使消息有時(shí)間進(jìn)行批處理  
linger.ms=50  


# 啟用壓縮  
compression.type=lz4  


# 增加生產(chǎn)者緩沖區(qū)  
buffer.memory=536870912  


# 增加重試次數(shù)和重試間隔  
retries=10  
retry.backoff.ms=100

(3) 優(yōu)化消費(fèi)者客戶端配置

為主要消費(fèi)者應(yīng)用緊急推送配置更新:

# 增加單次拉取大小  
fetch.max.bytes=52428800  


# 增加最大拉取等待時(shí)間  
fetch.max.wait.ms=500  


# 增加消費(fèi)者緩沖區(qū)大小  
max.partition.fetch.bytes=10485760

(4) 臨時(shí)限流措施

為非關(guān)鍵業(yè)務(wù)實(shí)施臨時(shí)限流:

// 在生產(chǎn)者客戶端實(shí)施限流  
Properties props = new Properties();  
props.put("throttle.rate", "10000"); // 每秒限制10000條消息

2. 第二階段:系統(tǒng)優(yōu)化(2-6小時(shí))

(1) 增加熱點(diǎn)主題分區(qū)數(shù)

為熱點(diǎn)主題增加分區(qū)數(shù),提高并行處理能力:

# 使用kafka-topics.sh工具增加分區(qū)數(shù)  
bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter --topic hot-topic --partitions 60

(2) 重平衡分區(qū)分配

執(zhí)行分區(qū)重分配,優(yōu)化分區(qū)在broker間的分布:

# 生成分區(qū)重分配計(jì)劃  
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --generate --topics-to-move-json-file topics.json --broker-list "1,2,3,4,5"  


# 執(zhí)行分區(qū)重分配  
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --execute --reassignment-json-file reassignment.json

(3) 優(yōu)化日志配置

修改server.properties配置,優(yōu)化日志處理:

# 增加日志段大小,減少小文件數(shù)量  
log.segment.bytes=1073741824  


# 優(yōu)化日志刷盤策略,避免頻繁刷盤  
log.flush.interval.messages=50000  
log.flush.interval.ms=10000  


# 優(yōu)化日志清理線程數(shù)  
log.cleaner.threads=4

(4) 調(diào)整副本同步參數(shù)

# 增加副本拉取線程數(shù)  
num.replica.fetchers=8  


# 增加副本拉取大小  
replica.fetch.max.bytes=10485760  


# 調(diào)整ISR收縮時(shí)間,避免頻繁的ISR變化  
replica.lag.time.max.ms=30000

(5) 實(shí)施動(dòng)態(tài)配置調(diào)整

利用Kafka的動(dòng)態(tài)配置功能,無需重啟即可調(diào)整關(guān)鍵參數(shù):

# 使用kafka-configs.sh工具動(dòng)態(tài)調(diào)整配置  
bin/kafka-configs.sh --bootstrap-server broker1:9092 --entity-type brokers --entity-name 1 --alter --add-config "num.io.threads=32,num.network.threads=16"

3. 第三階段:架構(gòu)優(yōu)化(6-24小時(shí))

(1) 擴(kuò)展集群規(guī)模

緊急添加新的broker節(jié)點(diǎn),擴(kuò)展集群處理能力:

# 配置新的broker節(jié)點(diǎn)  
# server.properties for new brokers  
broker.id=6  
listeners=PLAINTEXT://new-broker:9092  
...  


# 啟動(dòng)新節(jié)點(diǎn)  
bin/kafka-server-start.sh config/server.properties

(2) 實(shí)施主題分區(qū)遷移

將部分熱點(diǎn)主題的分區(qū)遷移到新節(jié)點(diǎn):

# 生成遷移計(jì)劃,將部分分區(qū)遷移到新節(jié)點(diǎn)  
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --generate --topics-to-move-json-file hot-topics.json --broker-list "1,2,3,4,5,6"  


# 執(zhí)行遷移計(jì)劃  
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --execute --reassignment-json-file migration.json

(3) 優(yōu)化存儲(chǔ)架構(gòu)

將日志目錄分散到多個(gè)物理磁盤:

# 配置多個(gè)日志目錄,分散I/O壓力  
log.dirs=/data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs,/data4/kafka-logs

為不同類型的主題配置不同的存儲(chǔ)策略:

# 為高吞吐量主題配置專用存儲(chǔ)策略  
bin/kafka-configs.sh --bootstrap-server broker1:9092 --entity-type topics --entity-name high-throughput-topic --alter --add-config "retention.ms=86400000,cleanup.policy=delete"  


# 為需要長期保存的主題配置壓縮策略  
bin/kafka-configs.sh --bootstrap-server broker1:9092 --entity-type topics --entity-name archive-topic --alter --add-config "cleanup.policy=compact"

(4) 實(shí)施客戶端架構(gòu)優(yōu)化

改進(jìn)生產(chǎn)者設(shè)計(jì):

// 實(shí)現(xiàn)異步批量發(fā)送模式  
public class OptimizedProducer {  
    private final KafkaProducer<String, String> producer;  
    private final ScheduledExecutorService scheduler;  
    private final ConcurrentLinkedQueue<ProducerRecord<String, String>> messageQueue;  
    private final int batchSize;  


    public OptimizedProducer(Properties props, int batchSize) {  
        this.producer = new KafkaProducer<>(props);  
        this.scheduler = Executors.newScheduledThreadPool(1);  
        this.messageQueue = new ConcurrentLinkedQueue<>();  
        this.batchSize = batchSize;  


        // 定時(shí)批量發(fā)送  
        this.scheduler.scheduleAtFixedRate(this::sendBatch, 100, 100, TimeUnit.MILLISECONDS);  
    }  


    public void send(String topic, String key, String value) {  
        messageQueue.add(new ProducerRecord<>(topic, key, value));  
        if (messageQueue.size() >= batchSize) {  
            sendBatch();  
        }  
    }  


    private void sendBatch() {  
        List<ProducerRecord<String, String>> batch = new ArrayList<>();  
        ProducerRecord<String, String> record;  
        while ((record = messageQueue.poll()) != null && batch.size() < batchSize) {  
            batch.add(record);  
        }  


        for (ProducerRecord<String, String> r : batch) {  
            producer.send(r, (metadata, exception) -> {  
                if (exception != null) {  
                    // 處理失敗,重新入隊(duì)或記錄日志  
                    messageQueue.add(r);  
                }  
            });  
        }  
        producer.flush();  
    }  
}

優(yōu)化消費(fèi)者處理模型:

// 實(shí)現(xiàn)多線程消費(fèi)處理模型  
public class ParallelConsumer {  
    private final Consumer<String, String> consumer;  
    private final ExecutorService executor;  
    private final int numWorkers;  


    public ParallelConsumer(Properties props, int numWorkers) {  
        this.consumer = new KafkaConsumer<>(props);  
        this.numWorkers = numWorkers;  
        this.executor = Executors.newFixedThreadPool(numWorkers);  
    }  


    public void start(String topic, MessageProcessor processor) {  
        consumer.subscribe(Collections.singleton(topic));  


        while (true) {  
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  
            if (!records.isEmpty()) {  
                // 將記錄分組,并行處理  
                List<Future<?>> futures = new ArrayList<>();  
                for (TopicPartition partition : records.partitions()) {  
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);  
                    futures.add(executor.submit(() -> {  
                        for (ConsumerRecord<String, String> record : partitionRecords) {  
                            processor.process(record);  
                        }  
                    }));  
                }  


                // 等待所有處理完成  
                for (Future<?> future : futures) {  
                    try {  
                        future.get();  
                    } catch (Exception e) {  
                        // 處理異常  
                    }  
                }  


                // 手動(dòng)提交偏移量  
                consumer.commitSync();  
            }  
        }  
    }  


    public interface MessageProcessor {  
        void process(ConsumerRecord<String, String> record);  
    }  
}

(5) 實(shí)施主題分區(qū)策略優(yōu)化

根據(jù)消息流量特征重新設(shè)計(jì)分區(qū)策略:

# 為高流量主題增加分區(qū)數(shù)  
bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter --topic high-traffic-topic --partitions 100  


# 為關(guān)鍵業(yè)務(wù)主題增加副本因子  
bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter --topic critical-topic --replica-assignment "1:2:3:4:5,2:3:4:5:1,..."

實(shí)施自定義分區(qū)器,避免熱點(diǎn)分區(qū):

public class BalancedPartitioner implements Partitioner {  
    private Random random = new Random();  


    @Override  
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {  
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);  
        int numPartitions = partitions.size();  


        if (keyBytes == null) {  
            // 隨機(jī)分區(qū)  
            return random.nextInt(numPartitions);  
        } else {  
            // 使用一致性哈希算法,但避免熱點(diǎn)  
            int hashCode = Arrays.hashCode(keyBytes);  
            return Math.abs(hashCode) % numPartitions;  
        }  
    }  


    @Override  
    public void close() {}  


    @Override  
    public void configure(Map<String, ?> configs) {}  
}
責(zé)任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關(guān)推薦

2020-08-10 11:00:02

Python優(yōu)化代碼

2020-11-03 07:34:12

Kafka后端工程師

2022-06-01 06:17:42

微服務(wù)Kafka

2021-03-01 06:14:50

環(huán)境高并發(fā)延遲

2019-08-15 11:30:06

SQL數(shù)據(jù)庫ASH

2019-08-19 01:34:38

數(shù)據(jù)庫SQL數(shù)據(jù)庫優(yōu)化

2019-01-21 11:17:13

CPU優(yōu)化定位

2018-12-06 16:25:39

數(shù)據(jù)庫服務(wù)器線程池

2019-12-16 07:18:42

數(shù)據(jù)庫SQL代碼

2019-09-27 17:24:26

數(shù)據(jù)庫優(yōu)化sql

2020-09-25 07:57:42

生產(chǎn)事故系統(tǒng)

2019-11-18 13:42:55

MySQL數(shù)據(jù)庫遷移

2019-11-22 08:05:01

數(shù)據(jù)庫mysql分區(qū)

2019-12-12 10:38:10

mysql數(shù)據(jù)庫nnodb

2021-01-12 07:57:36

MySQLBinlog故障處理

2019-12-02 08:09:57

境數(shù)據(jù)庫連接超時(shí)自動(dòng)回收

2021-01-27 11:50:07

Python優(yōu)化代碼

2019-09-24 07:00:01

SQL Server服務(wù)器卡頓內(nèi)存分配

2020-12-15 11:18:43

系統(tǒng)磁盤lvm數(shù)據(jù)庫

2025-05-19 09:22:32

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)