我是如何將一個老系統(tǒng)的Kafka消費者服務的性能提升近百倍的?
如果問你,如何提高kafka隊列中的消息消費速度呢?
答案很簡單,topic多分幾個分片,然后使用消費者組(Consumer Group)去消費topic即可。
如果加個條件,對同一個對象的操作請求必須要嚴格按照順序進行處理呢?
答案也不難,topic分片之后,生產(chǎn)者定制分發(fā)策略,保證同一對象的操作請求都分發(fā)到同一個分片中,這樣每個消費者就都是在按照順序消費各自分片中的數(shù)據(jù)啦~
如果再加上一些條件:
這個消費者消費速度極慢、慢到需要100ms才能處理完一條消息,即使topic分100片也不滿足不了要求;
每個對象的操作請求數(shù)量存在嚴重傾斜的現(xiàn)象,有的分片消息數(shù)量很大,有的分片消息量很少,可能有的分片一直積壓、有的分片卻很閑;
請求操作很重要,需要確保每條請求都被可靠消費,要保證事務最終一致性;
數(shù)十年的老系統(tǒng),業(yè)務錯綜復雜,項目方不允許涉及業(yè)務邏輯以及整體架構的大改…
當上述各種條件疊加到一起,要求將消費性能明顯提升,如果是你,會如何破局呢?
前段時間,應業(yè)務部門的要求,給他們的一個線上歷史系統(tǒng)做個并發(fā)性能提升的方案,就遇到了上述各種要求疊加在一起的棘手情況。
先簡單說下遇到的業(yè)務場景:
一個互動類的論壇的帖子評論處理場景,要求每個帖子的評論請求操作都必須要嚴格遵循一定的順序(比如可能會有評論刪除、引用評論、回復評論等操作,所以請求順序必須要嚴格按照順序處理),帖子評論的操作請求發(fā)送到kafka里面,然后評論服務消費kafka處理各個請求,這個評論消費者服務消費太慢,需要提升下并發(fā)效率。
增加分片與消費者數(shù)量
正式開始著手去整改優(yōu)化。
首先是常規(guī)調(diào)整:根據(jù)kafka自身的機制,將topic進行分片調(diào)整,拆分為N個分片,然后增設消費者組,在消費者組內(nèi)部署與分片數(shù)相等的消費者服務節(jié)點,這樣每個消費者可以處理一個分片,這樣整個評論的消費性能就會提升N倍。
那么,這里為什么要強調(diào)消費者組里的服務節(jié)點數(shù)要等于topic分片數(shù)呢?這里提一下kafka中Consumer Group中消費者數(shù)量與topic分片數(shù)之間的相關邏輯。
看一下不同的消費者數(shù)量與topic分片數(shù)對應的處理消費場景:
所以說,消費者組里面的消費者數(shù)量并不是越多越好,而是受到了topic的分片數(shù)量的限制的:
消費者數(shù)量太少,會導致一個消費者需要消費多個分片的數(shù)據(jù),造成某一個消費者消費壓力提升;
消費者數(shù)量太多,會導致有的消費者并不會消費任何數(shù)據(jù),浪費部署資源。
也是基于這一點,上述我們的方案中,規(guī)劃消費者組里的消費者數(shù)量與topic的分片數(shù)一致,這樣可以保證每個消費者消費1個分片,達到最大效率協(xié)調(diào)。
再補充個知識點:為什么kafka要限制每個分片最多只能有1個消費者組里的消費者在處理呢?
因為消費者拉取消息需要提供offset, limit。如果offset放在broker端,那么一定會產(chǎn)生額外的通信開銷;如果offset放在Consumer端,如果在一個組有多個消費者,就需要有一個協(xié)調(diào)者,集中式的管理,解決鎖沖突,如果不解決沖突,那么勢必會產(chǎn)生重復消費、無用的消費,從而導致資源浪費。所以說,從性能與復雜度的取舍上,Kafka采用了相對簡單的一種解決策略。
保證分片內(nèi)寫入順序
通過上一章的方式,增加了topic分區(qū)數(shù)以及消費者組中消費者數(shù)量,對kafka中消息并行消費的效率是提升了,但是問題又來了:順序問題!
前面說過,由于業(yè)務明確要求確保順序消費,而kafka只是保證分片內(nèi)的消費順序是固定的,但是不同分片之間的消費順序是無法保證的。
對業(yè)務進行分析發(fā)現(xiàn),業(yè)務要求的順序處理,其實是有條件的順序處理。即對于同一個帖子的所有評論相關的操作必須要同步處理,對于不同帖子的評論相關操作并沒有順序的要求。那么問題就簡單了,只要保證同一個帖子的所有評論相關操作請求都被分發(fā)到同一個topic分區(qū)內(nèi)即可!
生產(chǎn)者寫入消息到kafka的topic時,kafka將依據(jù)不同的策略將數(shù)據(jù)分配到不同的分區(qū)中:
- 輪詢分區(qū)策略
- 隨機分區(qū)策略
- 按key分區(qū)分配策略
- 自定義分區(qū)策略
這里采用自定義分區(qū)策略,因為每個評論操作請求中都攜帶有一個原始帖子ID字段,所以分發(fā)策略也很簡單,直接帖子ID % 分片數(shù)將消息進行分發(fā),這樣同一個帖子ID的評論操作就都可以到同一個分片中,這樣順序的問題就解決了。
所以,對上一環(huán)節(jié)給定的初步方案進行優(yōu)化,補充下生產(chǎn)者端的定制化分發(fā)策略的要求,保證同一個帖子的評論操作都會到同一個Topic分片中:
方案設計到這里,似乎已經(jīng)是解決了并發(fā)消費的問題了。但是后來實際壓測之后,結果令人大跌眼鏡。
單消費者速度提升
按照前面給出的方案,部署了DEMO環(huán)境進行壓測(拆分成4個分片,部署4個消費者),最終發(fā)現(xiàn)集群消費速度的確是翻了4倍、但是整體并發(fā)量依舊是低的可憐,4臺機器最終消費并發(fā)量甚至不到100???
心靈受到暴擊之后,去分析下單個消費者節(jié)點的運行情況,發(fā)現(xiàn)壓測過程中整個機器CPU、IO、MEM、線程數(shù)都非常低、毫無任何波動。問業(yè)務方要了代碼權限,下載了代碼并走讀了一遍Consumer服務的代碼邏輯才發(fā)現(xiàn)其中玄機。
其實該業(yè)務整體交互邏輯其實很簡單,從kafka獲取一個消息,然后進行消費。但是這個消費邏輯,是需要按順序調(diào)用10余個周邊系統(tǒng)的HTTP接口!這也難怪CPU、內(nèi)存、IO都非常低了,整個進程中只有一個線程在處理業(yè)務、而這個線程大部分時間都是處于IO等待狀態(tài)。
所以要想提升整體集群的消費能力,要么無限擴機器、要么就提升單節(jié)點的消費能力 —— 顯然前者是不可能的,只能選擇后者。而對于單線程、多IO操作的場景,提升并發(fā)性能,首先想到的就是改為多線程并發(fā)處理。但是多線程并發(fā)的時候,又會涉及到如何保證順序消費的問題。
對前面的方案進行優(yōu)化,給出如下方案:
在前面方案的基礎上,主要是對消費者端的實現(xiàn)邏輯進行了調(diào)整:
在消費者內(nèi)部,區(qū)分Consumer Thread和Work Thread,Consumer Thread負責從kafka拉取消息,而Work Thread負責真正的消費邏輯處理。
單機內(nèi)存中維護若干個隊列,每個隊列對應一個Work Thread,負責消費該隊列中的數(shù)據(jù);
Consumer Thread基于親緣性分發(fā)策略對消息進行二次分發(fā),保證相同帖子ID的請求分發(fā)到不同的內(nèi)部隊列中。
再進行壓測,設置單個消費者服務Work Thread數(shù)量為100,集群內(nèi)4個消費者服務,整體消費速度達到了7000。單節(jié)點的消費性能從原來的20提升到1700,提升了近80倍!
如何保證消息不丟失
經(jīng)過將單機的消費模式改為多線程的方式,目前并發(fā)消費性能的問題是解決了,可是可靠性的問題又出現(xiàn)了。
原先的時候,消費者從kafka拉取一條消息,然后消費完成后,給kafka一個ack應答,然后去拉取下一條消息,這樣即使消費者中途宕機了,kafka依舊可以將消息分發(fā)給下一個可用的消費者去處理,可以保證請求消息不會丟失掉。
而前面的方案,消費者服務從kafka拉取到消息之后,并沒有等待處理完成,就繼續(xù)從kafka拉取消息然后緩存在本機內(nèi)存中等待work thread慢慢消費,這個時候,如果機器宕機,所有緩存的消息將全部丟失!
為了解決上述問題,考慮將kafka應答機制改為手動提交ack。但是由于多個線程之間亂序的處理kafka上的數(shù)據(jù),各個線程已經(jīng)處理的offset值是不一樣的。如下示意圖:
為了保證消息可靠不丟失,采用如下策略:定期手動提交當前的offset信息,提交的offset值,選擇當前節(jié)點已處理的最小offset值(對于上面示意圖,即提交1002這個offset值),可以通過在內(nèi)存中緩存下處理的offset列表的方式實現(xiàn),如下如實現(xiàn)策略:
正常情況下,提交的offset值不會有什么作用或影響,但是一旦出現(xiàn)異常情況,導致當前節(jié)點進程不可用,kafka重平衡將當前分片分給另一個消費者進行消費的時候,另一個消費者會從最后一次提交的offset位置開始繼續(xù)往后消費。這樣便解決了數(shù)據(jù)丟失的問題,保證了數(shù)據(jù)可靠。
但是,另一個問題又出現(xiàn)了:重復消費。好在,雖然這個業(yè)務系統(tǒng)是十多年前構建的,但是至少分布式消費者該有的一個關鍵特性還是具備的,那就是冪等,所以這個問題就不用考慮了。
數(shù)據(jù)積壓不可控場景兜底
到這里,總該一切都沒問題了吧?
是,也不是。正常情況下是沒問題了,但是作為一個"核心"系統(tǒng),極端的異常情況的保命策略還需要考慮下。
舉個例子,如果突然有一條帖子爆火,這條帖子的評論量遠超其余帖子的評論量,甚至遠超整個系統(tǒng)的額定最大負載請求量,這樣會出現(xiàn)個問題:
- kafka某一個分片數(shù)據(jù)量積壓嚴重,其余分片很空閑
- 該條火爆的帖子的相關評論請求,阻塞了與該帖子分配到同一分區(qū)的其余帖子的評論處理。
這個原計劃做一個動態(tài)伸縮的分片分發(fā)策略,但考慮到此場景過于極端,當前系統(tǒng)實施起來性價比不高,所以本著適當設計的原則,放棄了原先方案,改為了簡單的手動處理 + 補償服務方式,如下:
一旦出現(xiàn)未預料到的異常,導致系統(tǒng)積壓已經(jīng)超過正常的處理范圍了,且已經(jīng)遠超系統(tǒng)可以正?;謴偷南薅?,為了保證現(xiàn)有業(yè)務盡快的恢復正常,可以先跳過積壓的請求,先保證新過來的請求正常被處理,然后啟動補償進程,慢慢消費之前積壓的消息。
有一說一:
這個地方是整個方案里面我自己不太滿意的一個實現(xiàn),屬于遷就現(xiàn)實的一種妥協(xié)方案,寫這篇文檔的時候,自己還是打算近期將這部分按照一個更優(yōu)的方案進行實現(xiàn)。如果您也有興趣了解或者有更好的建議思路,歡迎聯(lián)系我,我們一起掰扯下。
總結梳理
至此呢,為了解決kafka消費者消費能力太慢場景的集群并發(fā)性能提升方案就全部設計完成了,業(yè)務要求的各種要求約束也都可以滿足了,最終實現(xiàn)了在業(yè)務邏輯沒有變的情況下,整體集群的性能提升了上百倍。整體的改動內(nèi)容如下: