開源“Chaperone”:Uber是如何對Kafka進行端到端審計的
隨著Uber業(yè)務(wù)規(guī)模不斷增長,我們的系統(tǒng)也在持續(xù)不斷地產(chǎn)生更多的事件、服務(wù)間的消息和日志。這些數(shù)據(jù)在得到處理之前需要經(jīng)過Kafka。那么我們的平臺是如何實時地對這些數(shù)據(jù)進行審計的呢?
為了監(jiān)控Kafka數(shù)據(jù)管道的健康狀況并對流經(jīng)Kafka的每個消息進行審計,我們完全依賴我們的審計系統(tǒng)Chaperone。Chaperone自2016年1月成為Uber的跨數(shù)據(jù)中心基礎(chǔ)設(shè)施以來,每天處理萬億的消息量。下面我們會介紹它的工作原理,并說明我們?yōu)槭裁磿?gòu)建Chaperone。
Uber的Kafka數(shù)據(jù)管道概覽
Uber的服務(wù)以雙活的模式運行在多個數(shù)據(jù)中心。Apache Kafka和uReplicator是連接Uber生態(tài)系統(tǒng)各個部分的消息總線。
截止2016年11月份,Uber的Kafka數(shù)據(jù)管道概覽。數(shù)據(jù)從兩個數(shù)據(jù)中心聚合到一個Kafka集群上。
要讓Uber的Kafka對下游的消費者做出即時響應(yīng)是很困難的。為了保證吞吐量,我們盡可能地使用批次,并嚴(yán)重依賴異步處理。服務(wù)使用自家的客戶端把消息發(fā)布到Kafka代理,代理把這些消息分批轉(zhuǎn)發(fā)到本地的Kafka集群上。有些Kafka的主題會被本地集群直接消費,而剩下的大部分會跟來自其他數(shù)據(jù)中心的數(shù)據(jù)一起被組合到一個聚合Kafka集群上,我們使用uReplicator來完成這種面向大規(guī)模流或批處理的工作。
Uber的Kafka數(shù)據(jù)管道可以分為四層,它們跨越了多個數(shù)據(jù)中心。Kafka代理和它的客戶端分別是第二層和***層。它們被作為消息進入第三層的網(wǎng)關(guān),也就是每個數(shù)據(jù)中心的本地Kafka集群。本地集群的部分?jǐn)?shù)據(jù)會被復(fù)制到聚合集群,也就是數(shù)據(jù)管道的***一層。
Kafka數(shù)據(jù)管道的數(shù)據(jù)都會經(jīng)過分批和確認(rèn)(發(fā)送確認(rèn)):
Kafka數(shù)據(jù)管道的數(shù)據(jù)流經(jīng)的路徑概覽。
Uber的數(shù)據(jù)從代理客戶端流向Kafka需要經(jīng)過幾個階段:
- 應(yīng)用程序通過調(diào)用代理客戶端的produce方法向代理客戶端發(fā)送消息。
- 代理客戶端把收到的消息放到客戶端的緩沖區(qū)中,并讓方法調(diào)用返回。
- 代理客戶端把緩沖區(qū)里的消息進行分批并發(fā)送到代理服務(wù)器端。
- 代理服務(wù)器把消息放到生產(chǎn)者緩沖區(qū)并對代理客戶端進行確認(rèn)。這時,消息批次已經(jīng)被分好區(qū),并根據(jù)不同的主題名稱放在了相應(yīng)的緩沖區(qū)里。
- 代理服務(wù)器對緩沖區(qū)里的消息進行分批并發(fā)送到本地Kafka服務(wù)器上。
- 本地Kafka服務(wù)器把消息追加到本地日志并對代理服務(wù)器進行確認(rèn)(acks=1)。
- uReplicator從本地Kafka服務(wù)器獲取消息并發(fā)送到聚合服務(wù)器上。
- 聚合服務(wù)器把消息追加到本地日志并對uReplicator進行確認(rèn)(acks=1)。
我們?yōu)榱俗孠afka支持高吞吐量,做出了一些權(quán)衡。數(shù)以千計的微服務(wù)使用Kafka來處理成百上千的并發(fā)業(yè)務(wù)流量(而且還在持續(xù)增長)會帶來潛在的問題。Chaperone的目標(biāo)是在數(shù)據(jù)流經(jīng)數(shù)據(jù)管道的每個階段,能夠抓住每個消息,統(tǒng)計一定時間段內(nèi)的數(shù)據(jù)量,并盡早準(zhǔn)確地檢測出數(shù)據(jù)的丟失、延遲和重復(fù)情況。
Chaperone概覽
Chaperone由四個組件組成:AuditLibrary、ChaperoneService、ChaperoneCollector和WebService。
Chaperone架構(gòu):AuditLibrary、ChaperoneService、ChaperoneCollector和WebService,它們會收集數(shù)據(jù),并進行相關(guān)計算,自動檢測出丟失和延遲的數(shù)據(jù),并展示審計結(jié)果。
AuditLibrary實現(xiàn)了審計算法,它會定時收集并打印統(tǒng)計時間窗。這個庫被其它三個組件所依賴。它的輸出模塊是可插拔的(可以使用Kafka、HTTP等)。在代理客戶端,審計度量指標(biāo)被發(fā)送到Kafka代理。而在其它層,度量指標(biāo)直接被發(fā)送到專門的Kafka主題上。
審計算法是AuditLibrary的核心,Chaperone使用10分鐘的滾動時間窗來持續(xù)不斷地從每個主題收集消息。消息里的事件時間戳被用來決定該消息應(yīng)該被放到哪個時間窗里。對于同一個時間窗內(nèi)的消息,Chaperone會計算它們的數(shù)量和p99延遲。Chaperone會定時把每個時間窗的統(tǒng)計信息包裝成審計消息發(fā)送到可插拔的后端,它們可能是Kafka代理或者之前提到的Kafka服務(wù)器。
Chaperone根據(jù)消息的事件時間戳把消息聚合到滾動時間窗內(nèi)。
審計消息里的tier字段很重要,通過它可以知道審計是在哪里發(fā)生的,也可以知道消息是否到達(dá)了某一個地方。通過比較一定時間段內(nèi)不同層之間的消息數(shù)量,我們可以知道這段時間內(nèi)所生成的消息是否被成功送達(dá)。
ChaperoneService是工作負(fù)載***的一個組件,而且總是處在饑餓的狀態(tài)。它消費Kafka的每一個消息并記錄時間戳。ChaperoneService是基于uReplicator的HelixKafkaConsumer構(gòu)建的,這個消費者組件已經(jīng)被證明比Kafka自帶的消費者組件(Kafka 0.8.2)更可靠,也更好用。ChaperoneService通過定時向特定的Kafka主題生成審計消息來記錄狀態(tài)。
ChaperoneCollector監(jiān)聽特定的Kafka主題,并獲取所有的審計消息,然后把它們存到數(shù)據(jù)庫。同時,它還會生產(chǎn)多個儀表盤:
Chaperone創(chuàng)建的儀表盤,從上面我們看出數(shù)據(jù)的丟失情況。
從上圖可以看出每個層的主題消息總量,它們是通過聚合所有數(shù)據(jù)中心的消息得出的。如果沒有數(shù)據(jù)丟失,所有的線會***地重合起來。如果層之間有數(shù)據(jù)丟失,那么線與線之間會出現(xiàn)裂縫。例如,從下圖可以看出,Kafka代理丟掉了一些消息,不過在之后的層里沒有消息丟失。從儀表盤可以很容易地看出數(shù)據(jù)丟失的時間窗,從而可以采取相應(yīng)的行動。
從儀表盤上還能看出消息的延遲情況,借此我們就能夠知道消息的及時性以及它們是否在某些層發(fā)生了傳輸延遲。用戶可以直接從這一個儀表盤上看出主題的健康狀況,而無需去查看Kafka服務(wù)器或uReplicator的儀表盤:
Chaperone提供一站式的儀表盤來查看每個數(shù)據(jù)中心的主題狀態(tài)。
***,WebService提供了REST接口來查詢Chaperone收集到的度量指標(biāo)。通過這些接口,我們可以準(zhǔn)確地計算出數(shù)據(jù)丟失的數(shù)量。在知道了數(shù)據(jù)丟失的時間窗后,我們可以從Chaperone查到確切的數(shù)量:
Chaperone的Web界面。
Chaperone的兩個設(shè)計目標(biāo)
在設(shè)計Chaperone時,為了能夠做到準(zhǔn)確的審計,我們把注意力集中在兩個必須完成的任務(wù)上:
1)每個消息只被審計一次
為了確保每個消息只被審計一次,ChaperoneService使用了預(yù)寫式日志(WAL)。ChaperoneService每次在觸發(fā)Kafka審計消息時,會往審計消息里添加一個UUID。這個帶有相關(guān)偏移量的消息在發(fā)送到Kafka之前被保存在WAL里。在得到Kafka的確認(rèn)之后,WAL里的消息被標(biāo)記為已完成。如果ChaperoneService崩潰,在重啟后它可以重新發(fā)送WAL里未被標(biāo)記的審計消息,并定位到最近一次的審計偏移量,然后繼續(xù)消費。WAL確保了每個Kafka消息只被審計一次,而且每個審計消息至少會被發(fā)送一次。
接下來,ChaperoneCollector使用ChaperoneService之前添加過的UUID來移除重復(fù)消息。有了UUID和WAL,我們可以確保審計的一次性。在代理客戶端和服務(wù)器端難以實現(xiàn)一次性保證,因為這樣會給它們帶來額外的開銷。我們依賴它們的優(yōu)雅關(guān)閉操作,這樣它們的狀態(tài)才會被沖刷出去。
2)在層間使用一致性的時間戳
因為Chaperone可以在多個層里看到相同的Kafka消息,所以為消息內(nèi)嵌時間戳是很有必要的。如果沒有這些時間戳,在計數(shù)時會發(fā)生時間錯位。在Uber,大部分發(fā)送到Kafka的數(shù)據(jù)要么使用avro風(fēng)格的schema編碼,要么使用JSON格式。對于使用schema編碼的消息,可以直接獲取時間戳。而對于JSON格式的消息,需要對JSON數(shù)據(jù)進行解碼才能拿到時間戳。為了加快這個過程,我們實現(xiàn)了一個基于流的JSON消息解析器,這個解析器無需預(yù)先解碼整個消息就可以掃描到時間戳。這個解析器用在ChaperoneService里是很高效的,不過對代理客戶端和服務(wù)器來說仍然需要付出很高代價。所以在這兩個層里,我們使用的是消息的處理時間戳。因為時間戳的不一致造成的層間計數(shù)差異可能會觸發(fā)錯誤的數(shù)據(jù)丟失警告。我們正在著手解決時間戳不一致問題,之后也會把解決方案公布出來。
Chaperone在Uber的兩大用途
1. 檢測數(shù)據(jù)丟失
在Chaperone之前,數(shù)據(jù)丟失的***個征兆來自數(shù)據(jù)消費者,他們會出來抱怨數(shù)據(jù)的丟失情況。但是等他們出來抱怨已經(jīng)為時已晚,而且我們無法知道是數(shù)據(jù)管道的哪一部分出現(xiàn)了問題。有了Chaperone之后,我們創(chuàng)建了一個用于檢測丟失數(shù)據(jù)的作業(yè),它會定時地從Chaperone拉取度量指標(biāo),并在層間的消息數(shù)量出現(xiàn)不一致時發(fā)出告警。告警包含了Kafka數(shù)據(jù)管道端到端的信息,從中可以看出那些管道組件的度量指標(biāo)無法告訴我們的問題。檢測作業(yè)會自動地發(fā)現(xiàn)新主題,并且你可以根據(jù)數(shù)據(jù)的重要性配置不同的告警規(guī)則和閾值。數(shù)據(jù)丟失的通知會通過多種通道發(fā)送出去,比如頁式調(diào)度系統(tǒng)、企業(yè)聊天系統(tǒng)或者郵件系統(tǒng),總之會很快地通知到你。
2. 在Kafka里通過偏移量之外的方式讀取數(shù)據(jù)
我們生產(chǎn)環(huán)境的大部分集群仍然在使用Kafka 0.8.x,這一版本的Kafka對從時間戳到偏移量的索引沒有提供原生支持。于是我們在Chaperone里自己構(gòu)建了這樣的索引。這種索引可以用來做基于時間區(qū)間的查詢,所以我們不僅限于使用Kafka的偏移量來讀取數(shù)據(jù),我們可以使用Chaperone提供的時間戳來讀取數(shù)據(jù)。
Kafka對數(shù)據(jù)的保留是有期限的,不過我們對消息進行了備份,并把消息的偏移量也原封不動地保存起來。借助Chaperone提供的索引,用戶可以基于時間區(qū)間讀取這些備份數(shù)據(jù),而不是僅僅局限于Kafka現(xiàn)存的數(shù)據(jù),而且使用的訪問接口跟Kafka是一樣的。有了這個特性,Kafka用戶可以通過檢查任意時間段里的消息來對他們的服務(wù)進行問題診斷,在必要時可以回填消息。當(dāng)下游系統(tǒng)的審計結(jié)果跟Chaperone出現(xiàn)不一致,我們可以把一些特定的消息導(dǎo)出來進行比較,以便定位問題的根源。
總結(jié)
我們構(gòu)建了Chaperone來解決以下問題:
- 是否有數(shù)據(jù)丟失?如果是,那么丟失了多少數(shù)據(jù)?它們是在數(shù)據(jù)管道的哪個地方丟失的?
- 端到端的延遲是多少?如果有消息延遲,是從哪里開始的?
- 是否有數(shù)據(jù)重復(fù)?
Chaperone不僅僅告訴我們系統(tǒng)的健康情況,它還告訴我們是否有數(shù)據(jù)丟失。例如,在Kafka服務(wù)器返回非預(yù)期的錯誤時,uReplicator會出現(xiàn)死循環(huán),而此時uReplicator和Kafka都不會觸發(fā)任何告警,不過我們的檢測作業(yè)會很快地把問題暴露出來。
如果你想更多地了解Chaperone,可以自己去探究。我們已經(jīng)把Chaperone開源,它的源代碼放在Github上。



































