線上Kafka消息堆積,Consumer掉線,怎么辦?
線上kafka消息堆積,所有consumer全部掉線,到底怎么回事?
最近處理了一次線上故障,具體故障表現(xiàn)就是kafka某個topic消息堆積,這個topic的相關(guān)consumer全部掉線。
整體排查過程和事后的復(fù)盤都很有意思,并且結(jié)合本次故障,對kafka使用的最佳實踐有了更深刻的理解。
好了,一起來回顧下這次線上故障吧,最佳實踐總結(jié)放在最后,千萬不要錯過。
1、現(xiàn)象
線上kafka消息突然開始堆積
消費(fèi)者應(yīng)用反饋沒有收到消息(沒有處理消息的日志)
kafka的consumer group上看沒有消費(fèi)者注冊
消費(fèi)者應(yīng)用和kafka集群最近一周內(nèi)沒有代碼、配置相關(guān)變更
2、排查過程
服務(wù)端、客戶端都沒有特別的異常日志,kafka其他topic的生產(chǎn)和消費(fèi)都是正常,所以基本可以判斷是客戶端消費(fèi)存在問題。
所以我們重點放在客戶端排查上。
1)arthas在線修改日志等級,輸出debug
由于客戶端并沒有明顯異常日志,因此只能通過arthas修改應(yīng)用日志等級,來尋找線索。
果然有比較重要的發(fā)現(xiàn):
看起來是kafka-client自己主動發(fā)送消息給kafka集群,進(jìn)行自我驅(qū)逐了。因此consumer都掉線了。
2)arthas查看相關(guān)線程狀態(tài)變量用arthas vmtool命令進(jìn)一步看下kafka-client相關(guān)線程的狀態(tài)。

可以看到 HeartbeatThread線程狀態(tài)是WAITING,Cordinator狀態(tài)是UNJOINED。
此時,結(jié)合源碼看,大概推斷是由于消費(fèi)時間過長,導(dǎo)致客戶端自我驅(qū)逐了。
于是立刻嘗試修改max.poll.records,減少一批拉取的消息數(shù)量,同時增大max.poll.interval.ms參數(shù),避免由于拉取間隔時間過長導(dǎo)致自我驅(qū)逐。
參數(shù)修改上線后,發(fā)現(xiàn)consumer確實不掉線了,但是消費(fèi)一段時間后,還是就停止消費(fèi)了。
3、最終原因
相關(guān)同學(xué)去查看了消費(fèi)邏輯,發(fā)現(xiàn)了業(yè)務(wù)代碼中的死循環(huán),確認(rèn)了最終原因。
消息內(nèi)容中的一個字段有新的值,觸發(fā)了消費(fèi)者消費(fèi)邏輯的死循環(huán),導(dǎo)致后續(xù)消息無法消費(fèi)。同時,消費(fèi)阻塞導(dǎo)致消費(fèi)者自我驅(qū)逐,partition重新reblance,所有消費(fèi)者逐個自我驅(qū)逐。
這里核心涉及到kafka的消費(fèi)者和kafka之間的?;顧C(jī)制,可以簡單了解一下。

kafka-client會有一個獨(dú)立線程HeartbeatThread跟kafka集群進(jìn)行定時心跳,這個線程跟lisenter無關(guān),完全獨(dú)立。
根據(jù)debug日志顯示的“Sending LeaveGroup request”信息,我們可以很容易定位到自我驅(qū)逐的邏輯。

HeartbeatThread線程在發(fā)送心跳前,會比較一下當(dāng)前時間跟上次poll時間,一旦大于max.poll.interval.ms 參數(shù),就會發(fā)起自我驅(qū)逐了。
4、進(jìn)一步思考
雖然最后原因找到了,但是回顧下整個排查過程,其實并不順利,主要有兩點:
kafka-client對某個消息消費(fèi)超時能否有明確異常?而不是只看到自我驅(qū)逐和rebalance
有沒有辦法通過什么手段發(fā)現(xiàn) 消費(fèi)死循環(huán)?
4.1 kafka-client對某個消息消費(fèi)超時能否有明確異常?
4.1.1 kafka似乎沒有類似機(jī)制
我們對消費(fèi)邏輯進(jìn)行斷點,可以很容易看到整個調(diào)用鏈路。

對消費(fèi)者來說,主要采用一個線程池來處理每個kafkaListener,一個listener就是一個獨(dú)立線程。
這個線程會同步處理 poll消息,然后動態(tài)代理回調(diào)用戶自定義的消息消費(fèi)邏輯,也就是我們在@KafkaListener中寫的業(yè)務(wù)。

所以,從這里可以知道兩件事情。
第一點,如果業(yè)務(wù)消費(fèi)邏輯很慢或者卡住了,會影響poll。
第二點,這里沒有看到直接設(shè)置消費(fèi)超時的參數(shù),其實也不太好做。
因為這里做了超時中斷,那么poll也會被中斷,是在同一個線程中。所以要么poll和消費(fèi)邏輯在兩個工作線程,要么中斷掉當(dāng)前線程后,重新起一個線程poll。
所以從業(yè)務(wù)使用角度來說,可能的實現(xiàn),還是自己設(shè)置業(yè)務(wù)超時。比較通用的實現(xiàn),可以是在消費(fèi)邏輯中,用線程池處理消費(fèi)邏輯,同時用Future get阻塞超時中斷。
google了一下,發(fā)現(xiàn)kafka 0.8 曾經(jīng)有consumer.timeout.ms這個參數(shù),但是現(xiàn)在的版本沒有這個參數(shù)了,不知道是不是類似的作用。
4.1.2 RocketMQ有點相關(guān)機(jī)制
然后去看了下RocketMQ是否有相關(guān)實現(xiàn),果然有發(fā)現(xiàn)。
在RocketMQ中,可以對consumer設(shè)置consumeTimeout,這個超時就跟我們的設(shè)想有一點像了。
consumer會啟動一個異步線程池對正在消費(fèi)的消息做定時做 cleanExpiredMsg() 處理。

注意,如果消息類型是順序消費(fèi)(orderly),這個機(jī)制就不生效。
如果是并發(fā)消費(fèi),那么就會進(jìn)行超時判斷,如果超時了,就會將這條消息的信息通過sendMessageBack() 方法發(fā)回給broker進(jìn)行重試。

如果消息重試超過一定次數(shù),就會進(jìn)入RocketMQ的死信隊列。
spring-kafka其實也有做類似的封裝,可以自定義一個死信topic,做異常處理
4.2 有辦法快速發(fā)現(xiàn)死循環(huán)嗎?
一般來說,死循環(huán)的線程會導(dǎo)致CPU飆高、OOM等現(xiàn)象,在本次故障中,并沒有相關(guān)異常表現(xiàn),所以并沒有聯(lián)系到死循環(huán)的問題。
那通過這次故障后,對kafka相關(guān)機(jī)制有了更深刻了解,poll間隔超時很有可能就是消費(fèi)阻塞甚至死循環(huán)導(dǎo)致。
所以,如果下次出現(xiàn)類似問題,消費(fèi)者停止消費(fèi),但是kafkaListener線程還在,可以直接通過arthas的 thread id 命令查看對應(yīng)線程的調(diào)用棧,看看是否有異常方法死循環(huán)調(diào)用。
5、最佳實踐
通過此次故障,我們也可以總結(jié)幾點kafka使用的最佳實踐:
- 使用消息隊列進(jìn)行消費(fèi)時,一定需要多考慮異常情況,包括冪等、耗時處理(甚至死循環(huán))的情況。
- 盡量提高客戶端的消費(fèi)速度,消費(fèi)邏輯另起線程進(jìn)行處理,并最好做超時控制。
- 減少Group訂閱Topic的數(shù)量,一個Group訂閱的Topic最好不要超過5個,建議一個Group只訂閱一個Topic。
- 參考以下說明調(diào)整參數(shù)值:max.poll.records:降低該參數(shù)值,建議遠(yuǎn)遠(yuǎn)小于<單個線程每秒消費(fèi)的條數(shù)> * <消費(fèi)線程的個數(shù)> * <max.poll.interval.ms>的積。max.poll.interval.ms: 該值要大于<max.poll.records> / (<單個線程每秒消費(fèi)的條數(shù)> * <消費(fèi)線程的個數(shù)>)的值。





























