生產(chǎn)事故!Kafka 消費延遲十小時,我用這三招起死回生
兄弟們,凌晨兩點,手機像被踩了尾巴的貓一樣狂震。我迷迷糊糊摸到手機,鎖屏上跳出運維監(jiān)控群的99+消息——某核心業(yè)務(wù)的Kafka消費延遲突破10小時大關(guān),像脫韁的野馬在報警大屏上狂奔。作為負責(zé)這個系統(tǒng)的背鍋俠,我瞬間清醒,套上拖鞋就往公司趕,心里暗罵:“Kafka你個老六,平時挺穩(wěn)當(dāng)?shù)模趺赐蝗唤o我整這出?”
一、事故現(xiàn)場:當(dāng)訂單堆積成“珠穆朗瑪”
到公司打開監(jiān)控頁面,好家伙,Kafka消費組的Lag數(shù)值像坐了火箭,直接竄到120萬條。再看業(yè)務(wù)系統(tǒng),訂單處理模塊的吞吐量幾乎歸零,數(shù)據(jù)庫里待處理的訂單表像個被吹脹的氣球,隨時可能爆炸??头块T已經(jīng)傳來戰(zhàn)報,用戶投訴量直線上升,說下單后半天收不到確認短信,還以為自己遇到了詐騙APP。
趕緊連到Kafka服務(wù)器,用kafka-consumer-groups.sh
命令一查,發(fā)現(xiàn)問題出在某個關(guān)鍵Topic的消費組上。這個Topic平時承載著用戶下單、支付、物流等核心事件,下游有10多個消費者組在消費,偏偏我們這個組掉了鏈子。再仔細看消費者實例,明明配置了8個實例,怎么每個實例的消費速率都低得可憐?每秒處理量不到200條,而生產(chǎn)端的消息寫入速率可是穩(wěn)定在5000條/秒,這就好比一個水龍頭開得嘩嘩響,下面接水的杯子卻只有針眼大,不堵才怪。
抽絲剝繭:到底是誰拖了后腿?
剛開始懷疑是網(wǎng)絡(luò)問題,畢竟之前有過機房交換機故障導(dǎo)致吞吐量下降的先例。但登錄服務(wù)器一查,網(wǎng)卡流量連峰值的10%都沒到,帶寬穩(wěn)穩(wěn)的。再看CPU和內(nèi)存,CPU利用率倒是不低,平均在70%左右,但內(nèi)存還有一大半空閑,難道是CPU瓶頸?
不對啊,我們的消費者實例用的可是4核8G的配置,按理說處理這種量級的消息不該這么吃力。突然想到,Kafka消費者的性能和分區(qū)分配有很大關(guān)系。用list-consumer-groups
和describe-consumer-groups
命令一看,好家伙,8個消費者實例,居然有2個實例各分到了20個分區(qū),剩下6個實例只分到5個分區(qū)。這就好比班里分作業(yè),有的同學(xué)抱了一摞,有的同學(xué)卻只拿到可憐的幾本,忙的忙死,閑的閑死。
再深入分析消費者的日志,發(fā)現(xiàn)大量的時間花在了反序列化和業(yè)務(wù)處理上。我們用的是Avro序列化格式,按道理反序列化效率不低,但業(yè)務(wù)處理里有個坑:每處理一條消息,都要去調(diào)用3個不同的微服務(wù)接口,而且還是串行調(diào)用,每個接口的平均耗時居然超過200ms。這就相當(dāng)于你吃個漢堡,非要先去買面包,再去煎肉餅,最后去摘生菜,每一步都得等上半天,效率能高才怪。
二、第一招:讓消費者“多線程搬磚”
既然問題出在分區(qū)分配不均和處理效率上,那就先從消費者的并行度下手。Kafka的消費者是通過分區(qū)來并行消費的,每個分區(qū)只能被同一個消費組中的一個消費者實例處理,所以合理分配分區(qū)是關(guān)鍵。
1. 調(diào)整分區(qū)分配策略
默認的分區(qū)分配策略是RangeAssignor
,這種策略在分區(qū)數(shù)量不能被消費者實例數(shù)整除時,容易導(dǎo)致分配不均。比如我們有100個分區(qū),8個消費者,100÷8=12余4,前4個消費者會分到13個分區(qū),后4個分到12個。雖然這次的分配更離譜,但本質(zhì)還是分配策略的問題。我們換成RoundRobinAssignor
,它會把分區(qū)按順序輪流分配給消費者,能更均勻一些。
修改消費者配置,加上partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
,然后重啟消費者實例。神奇的事情發(fā)生了,每個實例分到的分區(qū)數(shù)基本一致,再也沒有“勞模”和“摸魚黨”之分了。
2. 增加消費者實例數(shù)
既然分區(qū)數(shù)量是100個,那消費者實例數(shù)最好和分區(qū)數(shù)匹配,或者是分區(qū)數(shù)的因數(shù)。我們之前用8個實例,和100不匹配,那就加到20個實例,這樣每個實例分到5個分區(qū),壓力均勻多了。這里要注意,消費者實例數(shù)不能超過分區(qū)數(shù),否則多余的實例會閑置。
3. 優(yōu)化業(yè)務(wù)處理線程池
消費者處理消息是在poll循環(huán)里,默認是單線程處理。我們的業(yè)務(wù)處理耗時太長,必須用多線程來加速。在消費者的回調(diào)函數(shù)里,把消息丟到一個線程池里異步處理,這樣poll可以盡快去取下一批消息,不會被阻塞。
ExecutorService executor = Executors.newFixedThreadPool(10);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> processMessage(record));
}
consumer.commitAsync();
}
這里要注意線程池的大小不能太大,否則會占用太多資源,一般設(shè)置為CPU核心數(shù)的2倍左右比較合適。另外,異步處理時要做好異常處理,避免消息處理失敗后丟失。
三、第二招:給Kafka“松綁”,別讓它“背鍋”
處理完消費者端的問題,發(fā)現(xiàn)延遲下降了一些,但還是在5小時左右徘徊。這時候意識到,可能生產(chǎn)者端也有問題,或者Kafka本身的配置需要優(yōu)化。
1. 檢查生產(chǎn)者批次大小
登錄生產(chǎn)者服務(wù)器,查看配置,發(fā)現(xiàn)batch.size
設(shè)置得太小,只有16KB。Kafka生產(chǎn)者會把消息攢成批次發(fā)送,批次太小會導(dǎo)致發(fā)送頻率過高,網(wǎng)絡(luò)開銷增大。我們把它調(diào)到160KB,這樣每次發(fā)送的消息更多,效率更高。不過也不能調(diào)得太大,否則會增加延遲,需要根據(jù)實際情況平衡。
2. 調(diào)整消費者拉取參數(shù)
消費者端的fetch.maxBytes
和fetch.max等待時間
也很重要。默認fetch.maxBytes
是50MB,可能太大了,導(dǎo)致消費者每次拉取需要處理很久。我們調(diào)到10MB,同時把fetch.max.wait.ms
從500ms調(diào)到200ms,讓消費者在沒有足夠數(shù)據(jù)時不用等太久,及時處理已有的數(shù)據(jù)。
3. 清理無效的舊數(shù)據(jù)
查看Kafka Topic的配置,發(fā)現(xiàn)retention.hours
設(shè)置為72小時,但我們的業(yè)務(wù)其實只需要保留24小時的數(shù)據(jù)。大量的舊數(shù)據(jù)堆積在磁盤上,不僅占用空間,還會影響消費者的拉取速度。趕緊修改配置,執(zhí)行kafka-topics.sh --alter --topic my_topic --config retention.hours=24
,然后等待Kafka自動清理舊數(shù)據(jù)。不過要注意,清理過程中可能會對性能有一定影響,最好選擇業(yè)務(wù)低峰期操作。
四、第三招:給消息處理“減肥”,拒絕“無效勞動”
經(jīng)過前兩招,延遲已經(jīng)降到了2小時,但離我們的目標(biāo)還有差距。這時候必須深入業(yè)務(wù)處理邏輯,看看有沒有可以優(yōu)化的地方。
1. 合并微服務(wù)調(diào)用
之前每處理一條消息,都要串行調(diào)用3個微服務(wù)接口,總耗時超過600ms。其實這3個接口之間沒有嚴(yán)格的依賴關(guān)系,可以改成并行調(diào)用。用CompletableFuture來實現(xiàn)異步并行調(diào)用,然后合并結(jié)果。
CompletableFuture<Result1> future1 = CompletableFuture.supplyAsync(() -> callService1());
CompletableFuture<Result2> future2 = CompletableFuture.supplyAsync(() -> callService2());
CompletableFuture<Result3> future3 = CompletableFuture.supplyAsync(() -> callService3());
CompletableFuture.allOf(future1, future2, future3).join();
Result1 result1 = future1.get();
Result2 result2 = future2.get();
Result3 result3 = future3.get();
這樣總耗時降到了200ms左右,效率提升了3倍。
2. 增加本地緩存
有些頻繁調(diào)用的基礎(chǔ)數(shù)據(jù),比如商品類目、用戶等級等,每次都去數(shù)據(jù)庫查詢,耗時很長。我們在消費者實例里增加了本地緩存,用Caffeine緩存,設(shè)置5分鐘的過期時間,減少數(shù)據(jù)庫訪問次數(shù)。這就好比你每天上班都要帶鑰匙,每次回家都要翻包找,不如在門口裝個密碼鎖,直接輸入密碼更快捷。
3. 跳過無效消息
通過監(jiān)控發(fā)現(xiàn),有一部分消息是重復(fù)的或者狀態(tài)無效的,比如已經(jīng)取消的訂單再次發(fā)送確認消息。我們在消息處理前增加了一個過濾環(huán)節(jié),先檢查消息的狀態(tài),如果是無效的,直接跳過,不進行后續(xù)處理。這就像分揀快遞,先把明顯破損或者地址錯誤的包裹挑出來,剩下的再慢慢處理,效率自然提高。
五、勝利時刻:延遲從10小時到30分鐘的逆襲
經(jīng)過這三招組合拳,凌晨五點,監(jiān)控頁面上的Lag數(shù)值開始穩(wěn)步下降,像泄了氣的氣球一樣,到早上八點,已經(jīng)降到了30分鐘以內(nèi),吞吐量也恢復(fù)到了每秒5000條以上,和生產(chǎn)端基本持平。再看業(yè)務(wù)系統(tǒng),訂單處理終于跟上了節(jié)奏,數(shù)據(jù)庫里的積壓訂單也慢慢消化完了。
六、復(fù)盤總結(jié):這些坑以后別再踩
- 分區(qū)分配要均衡:根據(jù)分區(qū)數(shù)量合理設(shè)置消費者實例數(shù),選擇合適的分配策略,避免“勞逸不均”。
- 業(yè)務(wù)處理要輕量:盡量減少同步阻塞操作,能用異步并行的就別串行,能緩存的就別頻繁查數(shù)據(jù)庫。
- 參數(shù)配置要調(diào)優(yōu):生產(chǎn)者和消費者的參數(shù)不是一成不變的,要根據(jù)實際吞吐量和延遲情況動態(tài)調(diào)整,比如
batch.size
、fetch.maxBytes
等。 - 監(jiān)控報警要完善:這次事故能及時發(fā)現(xiàn),多虧了完善的監(jiān)控體系。以后要繼續(xù)優(yōu)化監(jiān)控指標(biāo),比如消費者延遲、吞吐量、CPU內(nèi)存使用率等,設(shè)置合理的報警閾值。
Kafka雖然強大,但也不是萬能的。當(dāng)遇到消費延遲問題時,不要慌,先冷靜分析原因,從消費者、生產(chǎn)者、業(yè)務(wù)處理三個維度去排查。記住,沒有最好的配置,只有最適合自己業(yè)務(wù)的配置。平時多做壓力測試,模擬高并發(fā)場景,提前發(fā)現(xiàn)潛在問題,才能在生產(chǎn)事故來臨時不慌不亂,從容應(yīng)對。