Kafka不常見(jiàn)但是很高級(jí)的功能:Kafka 攔截器
既然是不常見(jiàn),那就說(shuō)明在實(shí)際場(chǎng)景中并沒(méi)有太高的出場(chǎng)率,但它們依然是很高級(jí)很實(shí)用的。下面就有請(qǐng)今天的主角登場(chǎng):Kafka 攔截器。
什么是攔截器?
如果你用過(guò) Spring Interceptor 或是 Apache Flume,那么應(yīng)該不會(huì)對(duì)攔截器這個(gè)概念感到陌生,其基本思想就是允許應(yīng)用程序在不修改邏輯的情況下,動(dòng)態(tài)地實(shí)現(xiàn)一組可插拔的事件處理邏輯鏈。它能夠在主業(yè)務(wù)操作的前后多個(gè)時(shí)間點(diǎn)上插入對(duì)應(yīng)的“攔截”邏輯。下面這張圖展示了 Spring MVC 攔截器的工作原理:
圖片來(lái)源:https://o7planning.org/en/11229/spring-mvc-interceptors-tutorial
攔截器 1 和攔截器 2 分別在請(qǐng)求發(fā)送之前、發(fā)送之后以及完成之后三個(gè)地方插入了對(duì)應(yīng)的處理邏輯。而 Flume 中的攔截器也是同理,它們插入的邏輯可以是修改待發(fā)送的消息,也可以是創(chuàng)建新的消息,甚至是丟棄消息。這些功能都是以配置攔截器類(lèi)的方式動(dòng)態(tài)插入到應(yīng)用程序中的,故可以快速地切換不同的攔截器而不影響主程序邏輯。
Kafka 攔截器借鑒了這樣的設(shè)計(jì)思路。你可以在消息處理的前后多個(gè)時(shí)點(diǎn)動(dòng)態(tài)植入不同的處理邏輯,比如在消息發(fā)送前或者在消息被消費(fèi)后。
作為一個(gè)非常小眾的功能,Kafka 攔截器自 0.10.0.0 版本被引入后并未得到太多的實(shí)際應(yīng)用,我也從未在任何 Kafka 技術(shù)峰會(huì)上看到有公司分享其使用攔截器的成功案例。但即便如此,在自己的 Kafka 工具箱中放入這么一個(gè)有用的東西依然是值得的。今天我們就讓它來(lái)發(fā)揮威力,展示一些非??犰诺墓δ?。
Kafka 攔截器
Kafka 攔截器分為生產(chǎn)者攔截器和消費(fèi)者攔截器。生產(chǎn)者攔截器允許你在發(fā)送消息前以及消息提交成功后植入你的攔截器邏輯;而消費(fèi)者攔截器支持在消費(fèi)消息前以及提交位移后編寫(xiě)特定邏輯。值得一提的是,這兩種攔截器都支持鏈的方式,即你可以將一組攔截器串連成一個(gè)大的攔截器,Kafka 會(huì)按照添加順序依次執(zhí)行攔截器邏輯。
舉個(gè)例子,假設(shè)你想在生產(chǎn)消息前執(zhí)行兩個(gè)“前置動(dòng)作”:第一個(gè)是為消息增加一個(gè)頭信息,封裝發(fā)送該消息的時(shí)間,第二個(gè)是更新發(fā)送消息數(shù)字段,那么當(dāng)你將這兩個(gè)攔截器串聯(lián)在一起統(tǒng)一指定給 Producer 后,Producer 會(huì)按順序執(zhí)行上面的動(dòng)作,然后再發(fā)送消息。
當(dāng)前 Kafka 攔截器的設(shè)置方法是通過(guò)參數(shù)配置完成的。生產(chǎn)者和消費(fèi)者兩端有一個(gè)相同的參數(shù),名字叫 interceptor.classes,它指定的是一組類(lèi)的列表,每個(gè)類(lèi)就是特定邏輯的攔截器實(shí)現(xiàn)類(lèi)。拿上面的例子來(lái)說(shuō),假設(shè)第一個(gè)攔截器的完整類(lèi)路徑是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二個(gè)類(lèi)是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定攔截器:
現(xiàn)在問(wèn)題來(lái)了,我們應(yīng)該怎么編寫(xiě) AddTimeStampInterceptor 和 UpdateCounterInterceptor 類(lèi)呢?其實(shí)很簡(jiǎn)單,這兩個(gè)類(lèi)以及你自己編寫(xiě)的所有 Producer 端攔截器實(shí)現(xiàn)類(lèi)都要繼承org.apache.kafka.clients.producer.ProducerInterceptor 接口。該接口是 Kafka 提供的,里面有兩個(gè)核心的方法。
- onSend:該方法會(huì)在消息發(fā)送之前被調(diào)用。如果你想在發(fā)送之前對(duì)消息“美美容”,這個(gè)方法是你唯一的機(jī)會(huì)。
- onAcknowledgement:該方法會(huì)在消息成功提交或發(fā)送失敗之后被調(diào)用。還記得我在上一期中提到的發(fā)送回調(diào)通知 callback 嗎?onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用。值得注意的是,這個(gè)方法和 onSend 不是在同一個(gè)線(xiàn)程中被調(diào)用的,因此如果你在這兩個(gè)方法中調(diào)用了某個(gè)共享可變對(duì)象,一定要保證線(xiàn)程安全哦。還有一點(diǎn)很重要,這個(gè)方法處在 Producer 發(fā)送的主路徑中,所以最好別放一些太重的邏輯進(jìn)去,否則你會(huì)發(fā)現(xiàn)你的 Producer TPS 直線(xiàn)下降。
同理,指定消費(fèi)者攔截器也是同樣的方法,只是具體的實(shí)現(xiàn)類(lèi)要實(shí)現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,這里面也有兩個(gè)核心方法。
- onConsume:該方法在消息返回給 Consumer 程序之前調(diào)用。也就是說(shuō)在開(kāi)始正式處理消息之前,攔截器會(huì)先攔一道,搞一些事情,之后再返回給你。
- onCommit:Consumer 在提交位移之后調(diào)用該方法。通常你可以在該方法中做一些記賬類(lèi)的動(dòng)作,比如打日志等。
一定要注意的是,指定攔截器類(lèi)時(shí)要指定它們的全限定名,即 full qualified name。通俗點(diǎn)說(shuō)就是要把完整包名也加上,不要只有一個(gè)類(lèi)名在那里,并且還要保證你的 Producer 程序能夠正確加載你的攔截器類(lèi)。
典型使用場(chǎng)景
Kafka 攔截器都能用在哪些地方呢?其實(shí),跟很多攔截器的用法相同,Kafka 攔截器可以應(yīng)用于包括客戶(hù)端監(jiān)控、端到端系統(tǒng)性能檢測(cè)、消息審計(jì)等多種功能在內(nèi)的場(chǎng)景。
我以端到端系統(tǒng)性能檢測(cè)和消息審計(jì)為例來(lái)展開(kāi)介紹下。
今天 Kafka 默認(rèn)提供的監(jiān)控指標(biāo)都是針對(duì)單個(gè)客戶(hù)端或 Broker 的,你很難從具體的消息維度去追蹤集群間消息的流轉(zhuǎn)路徑。同時(shí),如何監(jiān)控一條消息從生產(chǎn)到最后消費(fèi)的端到端延時(shí)也是很多 Kafka 用戶(hù)迫切需要解決的問(wèn)題。
從技術(shù)上來(lái)說(shuō),我們可以在客戶(hù)端程序中增加這樣的統(tǒng)計(jì)邏輯,但是對(duì)于那些將 Kafka 作為企業(yè)級(jí)基礎(chǔ)架構(gòu)的公司來(lái)說(shuō),在應(yīng)用代碼中編寫(xiě)統(tǒng)一的監(jiān)控邏輯其實(shí)是很難的,畢竟這東西非常靈活,不太可能提前確定好所有的計(jì)算邏輯。另外,將監(jiān)控邏輯與主業(yè)務(wù)邏輯耦合也是軟件工程中不提倡的做法。
現(xiàn)在,通過(guò)實(shí)現(xiàn)攔截器的邏輯以及可插拔的機(jī)制,我們能夠快速地觀(guān)測(cè)、驗(yàn)證以及監(jiān)控集群間的客戶(hù)端性能指標(biāo),特別是能夠從具體的消息層面上去收集這些數(shù)據(jù)。這就是 Kafka 攔截器的一個(gè)非常典型的使用場(chǎng)景。
我們?cè)賮?lái)看看消息審計(jì)(message audit)的場(chǎng)景。設(shè)想你的公司把 Kafka 作為一個(gè)私有云消息引擎平臺(tái)向全公司提供服務(wù),這必然要涉及多租戶(hù)以及消息審計(jì)的功能。
作為私有云的 PaaS 提供方,你肯定要能夠隨時(shí)查看每條消息是哪個(gè)業(yè)務(wù)方在什么時(shí)間發(fā)布的,之后又被哪些業(yè)務(wù)方在什么時(shí)刻消費(fèi)。一個(gè)可行的做法就是你編寫(xiě)一個(gè)攔截器類(lèi),實(shí)現(xiàn)相應(yīng)的消息審計(jì)邏輯,然后強(qiáng)行規(guī)定所有接入你的 Kafka 服務(wù)的客戶(hù)端程序必須設(shè)置該攔截器。
案例分享
下面我以一個(gè)具體的案例來(lái)說(shuō)明一下攔截器的使用。在這個(gè)案例中,我們通過(guò)編寫(xiě)攔截器類(lèi)來(lái)統(tǒng)計(jì)消息端到端處理的延時(shí),非常實(shí)用,我建議你可以直接移植到你自己的生產(chǎn)環(huán)境中。
我曾經(jīng)給一個(gè)公司做 Kafka 培訓(xùn),在培訓(xùn)過(guò)程中,那個(gè)公司的人提出了一個(gè)訴求。他們的場(chǎng)景很簡(jiǎn)單,某個(gè)業(yè)務(wù)只有一個(gè) Producer 和一個(gè) Consumer,他們想知道該業(yè)務(wù)消息從被生產(chǎn)出來(lái)到最后被消費(fèi)的平均總時(shí)長(zhǎng)是多少,但是目前 Kafka 并沒(méi)有提供這種端到端的延時(shí)統(tǒng)計(jì)。
學(xué)習(xí)了攔截器之后,我們現(xiàn)在知道可以用攔截器來(lái)滿(mǎn)足這個(gè)需求。既然是要計(jì)算總延時(shí),那么一定要有個(gè)公共的地方來(lái)保存它,并且這個(gè)公共的地方還是要讓生產(chǎn)者和消費(fèi)者程序都能訪(fǎng)問(wèn)的。在這個(gè)例子中,我們假設(shè)數(shù)據(jù)被保存在 Redis 中。
Okay,這個(gè)需求顯然要實(shí)現(xiàn)生產(chǎn)者攔截器,也要實(shí)現(xiàn)消費(fèi)者攔截器。我們先來(lái)實(shí)現(xiàn)前者:
上面的代碼比較關(guān)鍵的是在發(fā)送消息前更新總的已發(fā)送消息數(shù)。為了節(jié)省時(shí)間,我沒(méi)有考慮發(fā)送失敗的情況,因?yàn)榘l(fā)送失敗可能導(dǎo)致總發(fā)送數(shù)不準(zhǔn)確。不過(guò)好在處理思路是相同的,你可以有針對(duì)性地調(diào)整下代碼邏輯。
下面是消費(fèi)者端的攔截器實(shí)現(xiàn),代碼如下:
在上面的消費(fèi)者攔截器中,我們?cè)谡嬲M(fèi)一批消息前首先更新了它們的總延時(shí),方法就是用當(dāng)前的時(shí)鐘時(shí)間減去封裝在消息中的創(chuàng)建時(shí)間,然后累計(jì)得到這批消息總的端到端處理延時(shí)并更新到 Redis 中。之后的邏輯就很簡(jiǎn)單了,我們分別從 Redis 中讀取更新過(guò)的總延時(shí)和總消息數(shù),兩者相除即得到端到端消息的平均處理延時(shí)。
創(chuàng)建好生產(chǎn)者和消費(fèi)者攔截器后,我們按照上面指定的方法分別將它們配置到各自的 Producer 和 Consumer 程序中,這樣就能計(jì)算消息從 Producer 端到 Consumer 端平均的處理延時(shí)了。這種端到端的指標(biāo)監(jiān)控能夠從全局角度俯察和審視業(yè)務(wù)運(yùn)行情況,及時(shí)查看業(yè)務(wù)是否滿(mǎn)足端到端的 SLA 目標(biāo)。
小結(jié)
今天我們花了一些時(shí)間討論 Kafka 提供的冷門(mén)功能:攔截器。如之前所說(shuō),攔截器的出場(chǎng)率極低,以至于我從未看到過(guò)國(guó)內(nèi)大廠(chǎng)實(shí)際應(yīng)用 Kafka 攔截器的報(bào)道。但冷門(mén)不代表沒(méi)用。事實(shí)上,我們可以利用攔截器滿(mǎn)足實(shí)際的需求,比如端到端系統(tǒng)性能檢測(cè)、消息審計(jì)等。