偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

線上Kafka消息堆積,Consumer掉線,怎么辦?

開發(fā) 前端
服務(wù)端、客戶端都沒有特別的異常日志,kafka其他topic的生產(chǎn)和消費(fèi)都是正常,所以基本可以判斷是客戶端消費(fèi)存在問題。

線上kafka消息堆積,所有consumer全部掉線,到底怎么回事?

最近處理了一次線上故障,具體故障表現(xiàn)就是kafka某個topic消息堆積,這個topic的相關(guān)consumer全部掉線。

整體排查過程和事后的復(fù)盤都很有意思,并且結(jié)合本次故障,對kafka使用的最佳實踐有了更深刻的理解。

好了,一起來回顧下這次線上故障吧,最佳實踐總結(jié)放在最后,千萬不要錯過。

1、現(xiàn)象

線上kafka消息突然開始堆積

消費(fèi)者應(yīng)用反饋沒有收到消息(沒有處理消息的日志)

kafka的consumer group上看沒有消費(fèi)者注冊

消費(fèi)者應(yīng)用和kafka集群最近一周內(nèi)沒有代碼、配置相關(guān)變更

2、排查過程

服務(wù)端、客戶端都沒有特別的異常日志,kafka其他topic的生產(chǎn)和消費(fèi)都是正常,所以基本可以判斷是客戶端消費(fèi)存在問題。

所以我們重點放在客戶端排查上。

1)arthas在線修改日志等級,輸出debug

由于客戶端并沒有明顯異常日志,因此只能通過arthas修改應(yīng)用日志等級,來尋找線索。

果然有比較重要的發(fā)現(xiàn):

2022-10-25 17:36:17,774 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Disabling heartbeat thread

2022-10-25 17:36:17,773 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Sending LeaveGroup request to coordinator xxxxxx (id: 2147483644 rack: null)

看起來是kafka-client自己主動發(fā)送消息給kafka集群,進(jìn)行自我驅(qū)逐了。因此consumer都掉線了。

2)arthas查看相關(guān)線程狀態(tài)變量用arthas vmtool命令進(jìn)一步看下kafka-client相關(guān)線程的狀態(tài)。

圖片

可以看到 HeartbeatThread線程狀態(tài)是WAITING,Cordinator狀態(tài)是UNJOINED。

此時,結(jié)合源碼看,大概推斷是由于消費(fèi)時間過長,導(dǎo)致客戶端自我驅(qū)逐了。

于是立刻嘗試修改max.poll.records,減少一批拉取的消息數(shù)量,同時增大max.poll.interval.ms參數(shù),避免由于拉取間隔時間過長導(dǎo)致自我驅(qū)逐。

參數(shù)修改上線后,發(fā)現(xiàn)consumer確實不掉線了,但是消費(fèi)一段時間后,還是就停止消費(fèi)了。

3、最終原因

相關(guān)同學(xué)去查看了消費(fèi)邏輯,發(fā)現(xiàn)了業(yè)務(wù)代碼中的死循環(huán),確認(rèn)了最終原因。

消息內(nèi)容中的一個字段有新的值,觸發(fā)了消費(fèi)者消費(fèi)邏輯的死循環(huán),導(dǎo)致后續(xù)消息無法消費(fèi)。同時,消費(fèi)阻塞導(dǎo)致消費(fèi)者自我驅(qū)逐,partition重新reblance,所有消費(fèi)者逐個自我驅(qū)逐。

這里核心涉及到kafka的消費(fèi)者和kafka之間的?;顧C(jī)制,可以簡單了解一下。

圖片

kafka-client會有一個獨(dú)立線程HeartbeatThread跟kafka集群進(jìn)行定時心跳,這個線程跟lisenter無關(guān),完全獨(dú)立。

根據(jù)debug日志顯示的“Sending LeaveGroup request”信息,我們可以很容易定位到自我驅(qū)逐的邏輯。

圖片

HeartbeatThread線程在發(fā)送心跳前,會比較一下當(dāng)前時間跟上次poll時間,一旦大于max.poll.interval.ms 參數(shù),就會發(fā)起自我驅(qū)逐了。

4、進(jìn)一步思考

雖然最后原因找到了,但是回顧下整個排查過程,其實并不順利,主要有兩點:

kafka-client對某個消息消費(fèi)超時能否有明確異常?而不是只看到自我驅(qū)逐和rebalance

有沒有辦法通過什么手段發(fā)現(xiàn) 消費(fèi)死循環(huán)?

4.1 kafka-client對某個消息消費(fèi)超時能否有明確異常?

4.1.1 kafka似乎沒有類似機(jī)制

我們對消費(fèi)邏輯進(jìn)行斷點,可以很容易看到整個調(diào)用鏈路。

圖片

對消費(fèi)者來說,主要采用一個線程池來處理每個kafkaListener,一個listener就是一個獨(dú)立線程。

這個線程會同步處理 poll消息,然后動態(tài)代理回調(diào)用戶自定義的消息消費(fèi)邏輯,也就是我們在@KafkaListener中寫的業(yè)務(wù)。

圖片

所以,從這里可以知道兩件事情。

第一點,如果業(yè)務(wù)消費(fèi)邏輯很慢或者卡住了,會影響poll。

第二點,這里沒有看到直接設(shè)置消費(fèi)超時的參數(shù),其實也不太好做。

因為這里做了超時中斷,那么poll也會被中斷,是在同一個線程中。所以要么poll和消費(fèi)邏輯在兩個工作線程,要么中斷掉當(dāng)前線程后,重新起一個線程poll。

所以從業(yè)務(wù)使用角度來說,可能的實現(xiàn),還是自己設(shè)置業(yè)務(wù)超時。比較通用的實現(xiàn),可以是在消費(fèi)邏輯中,用線程池處理消費(fèi)邏輯,同時用Future get阻塞超時中斷。

google了一下,發(fā)現(xiàn)kafka 0.8 曾經(jīng)有consumer.timeout.ms這個參數(shù),但是現(xiàn)在的版本沒有這個參數(shù)了,不知道是不是類似的作用。

4.1.2 RocketMQ有點相關(guān)機(jī)制

然后去看了下RocketMQ是否有相關(guān)實現(xiàn),果然有發(fā)現(xiàn)。

在RocketMQ中,可以對consumer設(shè)置consumeTimeout,這個超時就跟我們的設(shè)想有一點像了。

consumer會啟動一個異步線程池對正在消費(fèi)的消息做定時做 cleanExpiredMsg() 處理。

圖片

注意,如果消息類型是順序消費(fèi)(orderly),這個機(jī)制就不生效。

如果是并發(fā)消費(fèi),那么就會進(jìn)行超時判斷,如果超時了,就會將這條消息的信息通過sendMessageBack() 方法發(fā)回給broker進(jìn)行重試。

圖片

如果消息重試超過一定次數(shù),就會進(jìn)入RocketMQ的死信隊列。

spring-kafka其實也有做類似的封裝,可以自定義一個死信topic,做異常處理

4.2 有辦法快速發(fā)現(xiàn)死循環(huán)嗎?

一般來說,死循環(huán)的線程會導(dǎo)致CPU飆高、OOM等現(xiàn)象,在本次故障中,并沒有相關(guān)異常表現(xiàn),所以并沒有聯(lián)系到死循環(huán)的問題。

那通過這次故障后,對kafka相關(guān)機(jī)制有了更深刻了解,poll間隔超時很有可能就是消費(fèi)阻塞甚至死循環(huán)導(dǎo)致。

所以,如果下次出現(xiàn)類似問題,消費(fèi)者停止消費(fèi),但是kafkaListener線程還在,可以直接通過arthas的 thread id 命令查看對應(yīng)線程的調(diào)用棧,看看是否有異常方法死循環(huán)調(diào)用。

5、最佳實踐

通過此次故障,我們也可以總結(jié)幾點kafka使用的最佳實踐:

  • 使用消息隊列進(jìn)行消費(fèi)時,一定需要多考慮異常情況,包括冪等、耗時處理(甚至死循環(huán))的情況。
  • 盡量提高客戶端的消費(fèi)速度,消費(fèi)邏輯另起線程進(jìn)行處理,并最好做超時控制。
  • 減少Group訂閱Topic的數(shù)量,一個Group訂閱的Topic最好不要超過5個,建議一個Group只訂閱一個Topic。
  • 參考以下說明調(diào)整參數(shù)值:max.poll.records:降低該參數(shù)值,建議遠(yuǎn)遠(yuǎn)小于<單個線程每秒消費(fèi)的條數(shù)> * <消費(fèi)線程的個數(shù)> * <max.poll.interval.ms>的積。max.poll.interval.ms: 該值要大于<max.poll.records> / (<單個線程每秒消費(fèi)的條數(shù)> * <消費(fèi)線程的個數(shù)>)的值。
責(zé)任編輯:武曉燕 來源: 阿丸筆記
相關(guān)推薦

2021-02-24 08:38:48

Kafka消息Consumer

2022-05-10 07:31:49

消息隊列CPUQPS

2024-03-20 08:33:00

Kafka線程安全Rebalance

2020-09-29 12:15:13

生死鎖MySQL

2024-12-12 14:56:48

消息積壓MQ分區(qū)

2022-06-24 09:22:15

MySQL自增id

2022-07-14 10:16:22

Flink

2022-07-14 10:23:39

數(shù)據(jù)

2023-12-21 08:01:41

RocketMQ消息堆積

2022-12-19 11:31:57

緩存失效數(shù)據(jù)庫

2009-11-03 08:56:02

linux死機(jī)操作系統(tǒng)

2024-04-22 08:17:23

MySQL誤刪數(shù)據(jù)

2017-02-21 13:11:43

SDN網(wǎng)絡(luò)體系SDN架構(gòu)

2022-05-19 08:01:49

PostgreSQL數(shù)據(jù)庫

2019-10-12 09:50:46

Redis內(nèi)存數(shù)據(jù)庫

2022-07-05 11:48:47

MySQL死鎖表鎖

2018-01-28 20:39:39

戴爾

2015-10-22 09:09:59

BAT投資VC

2021-11-08 15:38:15

消息延遲堆積

2019-08-29 07:35:29

網(wǎng)站404空白nginx
點贊
收藏

51CTO技術(shù)棧公眾號