Kafka集群在馬蜂窩大數(shù)據(jù)平臺的優(yōu)化與應(yīng)用擴(kuò)展
Kafka 是當(dāng)下熱門的消息隊(duì)列中間件,它可以實(shí)時(shí)地處理海量數(shù)據(jù),具備高吞吐、低延時(shí)等特性及可靠的消息異步傳遞機(jī)制,可以很好地解決不同系統(tǒng)間數(shù)據(jù)的交流和傳遞問題。
Kafka 在馬蜂窩也有非常廣泛的應(yīng)用,為很多核心的業(yè)務(wù)提供支撐。本文將圍繞 Kafka 在馬蜂窩大數(shù)據(jù)平臺的應(yīng)用實(shí)踐,介紹相關(guān)業(yè)務(wù)場景、在 Kafka 應(yīng)用的不同階段我們遇到了哪些問題以及如何解決、之后還有哪些計(jì)劃等。
一、應(yīng)用場景
從 Kafka 在大數(shù)據(jù)平臺的應(yīng)用場景來看,主要分為以下三類:
- 第一類是將 Kafka 作為數(shù)據(jù)庫,提供大數(shù)據(jù)平臺對實(shí)時(shí)數(shù)據(jù)的存儲服務(wù)。從來源和用途兩個(gè)維度來說,可以將實(shí)時(shí)數(shù)據(jù)分為業(yè)務(wù)端 DB 數(shù)據(jù)、監(jiān)控類型日志、基于埋點(diǎn)的客戶端日志(H5、WEB、APP、小程序)和服務(wù)端日志。
- 第二類是為數(shù)據(jù)分析提供數(shù)據(jù)源,各埋點(diǎn)日志會作為數(shù)據(jù)源,支持并對接公司離線數(shù)據(jù)、實(shí)時(shí)數(shù)據(jù)倉庫及分析系統(tǒng),包括多維查詢、實(shí)時(shí) Druid OLAP、日志明細(xì)等。
- 第三類是為業(yè)務(wù)方提供數(shù)據(jù)訂閱。除了在大數(shù)據(jù)平臺內(nèi)部的應(yīng)用之外,我們還使用 Kafka 為推薦搜索、大交通、酒店、內(nèi)容中心等核心業(yè)務(wù)提供數(shù)據(jù)訂閱服務(wù),如用戶實(shí)時(shí)特征計(jì)算、用戶實(shí)時(shí)畫像訓(xùn)練及實(shí)時(shí)推薦、反作弊、業(yè)務(wù)監(jiān)控報(bào)警等。
主要應(yīng)用如下圖所示:

二、演進(jìn)之路
1、四個(gè)階段
早期大數(shù)據(jù)平臺之所以引入 Kafka 作為業(yè)務(wù)日志的收集處理系統(tǒng),主要是考慮到它高吞吐低延遲、多重訂閱、數(shù)據(jù)回溯等特點(diǎn),可以更好地滿足大數(shù)據(jù)場景的需求。
但隨著業(yè)務(wù)量的迅速增加,以及在業(yè)務(wù)使用和系統(tǒng)維護(hù)中遇到的問題,例如注冊機(jī)制、監(jiān)控機(jī)制等的不完善,導(dǎo)致出現(xiàn)問題無法快速定位,以及一些線上實(shí)時(shí)任務(wù)發(fā)生故障后沒有快速恢復(fù)導(dǎo)致消息積壓等, 使 Kafka 集群的穩(wěn)定性和可用性得受到挑戰(zhàn),經(jīng)歷了幾次嚴(yán)重的故障。
解決以上問題對我們來說迫切而棘手。針對大數(shù)據(jù)平臺在使用 Kafka 上存在的一些痛點(diǎn),我們從集群使用到應(yīng)用層擴(kuò)展做了一系列的實(shí)踐,整體來說包括四個(gè)階段:
第一階段:版本升級
圍繞平臺數(shù)據(jù)生產(chǎn)和消費(fèi)方面存在的一些瓶頸和問題,我們針對目前的 Kafka 版本進(jìn)行技術(shù)選型,最終確定使用 1.1.1 版本。
第二階段:資源隔離
為了支持業(yè)務(wù)的快速發(fā)展,我們完善了多集群建設(shè)以及集群內(nèi) Topic 間的資源隔離。
第三階段:權(quán)限控制和監(jiān)控告警
首先在安全方面,早期的 Kafka 集群處于裸跑狀態(tài)。由于多產(chǎn)品線共用 Kafka,很容易由于誤讀其他業(yè)務(wù)的 Topic 導(dǎo)致數(shù)據(jù)安全問題。因此我們基于 SASL/ SCRAM + ACL 增加了鑒權(quán)的功能。
在監(jiān)控告警方面,Kafka 目前已然成為實(shí)時(shí)計(jì)算中輸入數(shù)據(jù)源的標(biāo)配,那么其中 Lag 積壓情況、吞吐情況就成為實(shí)時(shí)任務(wù)是否健康的重要指標(biāo)。因此,大數(shù)據(jù)平臺構(gòu)建了統(tǒng)一的 Kafka 監(jiān)控告警平臺并命名「雷達(dá)」,多維度監(jiān)控 Kafka 集群及使用方情況。
第四階段:應(yīng)用擴(kuò)展
早期 Kafka 在對公司各業(yè)務(wù)線開放的過程中,由于缺乏統(tǒng)一的使用規(guī)范,導(dǎo)致了一些業(yè)務(wù)方的不正確使用。為解決該痛點(diǎn),我們構(gòu)建了實(shí)時(shí)訂閱平臺,通過應(yīng)用服務(wù)的形式賦能給業(yè)務(wù)方,實(shí)現(xiàn)數(shù)據(jù)生產(chǎn)和消費(fèi)申請、平臺的用戶授權(quán)、使用方監(jiān)控告警等眾多環(huán)節(jié)流程化自動化,打造從需求方使用到資源全方位管控的整體閉環(huán)。
下面圍繞幾個(gè)關(guān)鍵點(diǎn)為大家展開介紹。
2、核心實(shí)踐
1)版本升級
之前大數(shù)據(jù)平臺一直使用的是 0.8.3 這一 Kafka 早期版本,而截止到當(dāng)前,Kafka 官方最新的 Release 版本已經(jīng)到了 2.3,于是長期使用 0.8 版本過程中漸漸遇到的很多瓶頸和問題,我們是能夠通過版本升級來解決的。
舉例來說,以下是一些之前使用舊版時(shí)常見的問題:
- 缺少對 Security 的支持:存在數(shù)據(jù)安全性問題及無法通過認(rèn)證授權(quán)對資源使用細(xì)粒度管理;
- broker under replicated:發(fā)現(xiàn) broker 處于 under replicated 狀態(tài),但不確定問題的產(chǎn)生原因,難以解決;
- 新的 feature 無法使用:如事務(wù)消息、冪等消息、消息時(shí)間戳、消息查詢等;
- 客戶端的對 offset 的管理依賴 zookeeper, 對 zookeeper 的使用過重, 增加運(yùn)維的復(fù)雜度;
- 監(jiān)控指標(biāo)不完善:如 topic、partition、broker 的數(shù)據(jù) size 指標(biāo), 同時(shí) kafka manager 等監(jiān)控工具對低版本 kafka 支持不好。
同時(shí)對一些目標(biāo)版本的特性進(jìn)行了選型調(diào)研,如:
- 0.9 版本, 增加了配額和安全性, 其中安全認(rèn)證和授權(quán)是我們最關(guān)注的功能;
- 0.10 版本,更細(xì)粒度的時(shí)間戳. 可以基于偏移量進(jìn)行快速的數(shù)據(jù)查找,找到所要的時(shí)間戳。這在實(shí)時(shí)數(shù)據(jù)處理中基于 Kafka 數(shù)據(jù)源的數(shù)據(jù)重播是極其重要的;
- 0.11 版本, 冪等性和 Transactions 的支持及副本數(shù)據(jù)丟失/數(shù)據(jù)不一致的解決;
- 冪等性意味著對于同一個(gè) Partition,面對 Data 的多次發(fā)布,Kafka broker 端就可以做到自動去重;
- 對 Transactions 的支持使一個(gè)事務(wù)下發(fā)布多條信息到多個(gè)Topic Partition 時(shí),我們可以使它以原子性的方式被完成。在我們的下游消費(fèi)者中,很多都是用 Flink 做一些流處理的工作,因此在數(shù)據(jù)處理及故障恢復(fù)時(shí)僅一次語義則顯得尤為重要。而0.11 版本對于事務(wù)的支持則可以保證與 Kafka 交互的 Flink 應(yīng)用實(shí)現(xiàn)端到端僅一次語義, 支持 EOS 可以對數(shù)據(jù)可靠性有絕對要求, 比如交易、風(fēng)控等場景下的重要支持;
- Leader Epoch:解決了原先依賴水位表示副本進(jìn)度可能造成的數(shù)據(jù)丟失/數(shù)據(jù)不一致問題;
1.1 版本,運(yùn)維性的提升。比如當(dāng) Controller Shut Down,想要關(guān)閉一個(gè) Broker 的時(shí)候,之前需要一個(gè)很長很復(fù)雜的過程在 1.0 版本得到很大的改善。
最終選擇 1.1 版本, 則是因?yàn)槌鲇?Camus 與 Kafka 版本的兼容性及 1.1 版本已經(jīng)滿足了使用場景中重要新特性的支持的綜合考量。這里再簡單說一下 Camus 組件,同樣是由 Linkedin 開源,在我們的大數(shù)據(jù)平臺中主要作為 Kafka 數(shù)據(jù) Dump 到 HDFS 的重要方式。
2)資源隔離
之前由于業(yè)務(wù)的復(fù)雜性和規(guī)模不大,大數(shù)據(jù)平臺對于 Kafka 集群的劃分比較簡單。于是,一段時(shí)間以后導(dǎo)致公司業(yè)務(wù)數(shù)據(jù)混雜在一起,某一個(gè)業(yè)務(wù)主題存在的不合理使用都有可能導(dǎo)致某些 Broker 負(fù)載過重,影響到其他正常的業(yè)務(wù),甚至某些 Broker 的故障會出現(xiàn)影響整個(gè)集群,導(dǎo)致全公司業(yè)務(wù)不可用的風(fēng)險(xiǎn)。
針對以上的問題,在集群改造上做了兩方面實(shí)踐
- 按功能屬性拆分獨(dú)立的集群;
- 集群內(nèi)部 Topic 粒度的資源隔離。
①集群拆分
按照功能維度拆分多個(gè) Kafka 物理集群,進(jìn)行業(yè)務(wù)隔離,降低運(yùn)維復(fù)雜度。
以目前最重要的埋點(diǎn)數(shù)據(jù)使用來說, 目前拆分為三類集群,各類集群的功能定義如下:
Log 集群:各端的埋點(diǎn)數(shù)據(jù)采集后會優(yōu)先落地到該集群, 所以這個(gè)過程不能出現(xiàn)由于 Kafka 問題導(dǎo)致采集中斷,這對 Kafka 可用性要求很高。因此該集群不會對外提供訂閱,保證消費(fèi)方可控;同時(shí)該集群業(yè)務(wù)也作為離線采集的源頭,數(shù)據(jù)會通過 Camus 組件按小時(shí)時(shí)間粒度 dump 到 HDFS 中,這部分?jǐn)?shù)據(jù)參與后續(xù)的離線計(jì)算。
全量訂閱集群:該集群 Topic 中的絕大部分?jǐn)?shù)據(jù)是從 Log 集群實(shí)時(shí)同步過來的。上面我們提到了 Log 集群的數(shù)據(jù)是不對外的,因此全量集群就承擔(dān)了消費(fèi)訂閱的職責(zé)。目前主要是用于平臺內(nèi)部的實(shí)時(shí)任務(wù)中,來對多個(gè)業(yè)務(wù)線的數(shù)據(jù)分析并提供分析服務(wù)。
個(gè)性定制集群:之前提到過,我們可以根據(jù)業(yè)務(wù)方需求來拆分、合并數(shù)據(jù)日志源,同時(shí)我們還支持定制化 Topic,該集群只需要提供分流后 Topic 的落地存儲。
集群整體架構(gòu)劃分如下圖:

②資源隔離
Topic 的流量大小是集群內(nèi)部進(jìn)行資源隔離的重要依據(jù)。例如,我們在業(yè)務(wù)中埋點(diǎn)日志量較大的兩個(gè)數(shù)據(jù)源分別是后端埋點(diǎn)數(shù)據(jù)源 server-event 和端上的埋點(diǎn) mobile-event 數(shù)據(jù)源,我們要避免存儲兩個(gè)數(shù)據(jù)的主題分區(qū)分配到集群中同一個(gè) Broker 上的節(jié)點(diǎn)。通過在不同 Topic 進(jìn)行物理隔離,就可以避免 Broker 上的流量發(fā)生傾斜。
3)權(quán)限控制和監(jiān)控告警
①權(quán)限控制
開始介紹時(shí)我們說過,早期 Kafka 集群沒有設(shè)置安全驗(yàn)證處于裸跑狀態(tài),因此只要知道 Broker 的連接地址即可生產(chǎn)消費(fèi),存在嚴(yán)重的數(shù)據(jù)安全性問題。
一般來說, 使用 SASL 的用戶多會選擇 Kerberos,但就平臺 Kafka 集群的使用場景來說,用戶系統(tǒng)并不復(fù)雜,使用 Kerberos 就有些大材小用, 同時(shí) Kerberos 相對復(fù)雜,存在引發(fā)其他問題的風(fēng)險(xiǎn)。另外,在 Encryption 方面, 由于都是運(yùn)行在內(nèi)網(wǎng)環(huán)境,所以并沒有使用 SSL 加密。
最終平臺 Kafka 集群使用 SASL 作為鑒權(quán)方式, 基于 SASL/ SCRAM + ACL 的輕量級組合方式,實(shí)現(xiàn)動態(tài)創(chuàng)建用戶,保障數(shù)據(jù)安全。
②監(jiān)控告警
之前在集群的使用中我們經(jīng)常發(fā)現(xiàn),消費(fèi)應(yīng)用的性能無緣無故變差了。分析問題的原因, 通常是滯后 Consumer 讀取的數(shù)據(jù)大概率沒有命中 Page- cache,導(dǎo)致 Broker 端機(jī)器的內(nèi)核要首先從磁盤讀取數(shù)據(jù)加載到 Page- cache 中后,才能將結(jié)果返還給 Consumer,相當(dāng)于本來可以服務(wù)于寫操作的磁盤現(xiàn)在要讀取數(shù)據(jù)了, 影響了使用方讀寫同時(shí)降低的集群的性能。
這時(shí)就需要找出滯后 Consumer 的應(yīng)用進(jìn)行事前的干預(yù)從而減少問題發(fā)生,因此監(jiān)控告警無論對平臺還是用戶都有著重大的意義。下面介紹一下我們的實(shí)踐思路。
整體方案:
- 整體方案主要是基于開源組件 Kafka JMX Metrics+OpenFalcon+Grafana:
- Kafka JMX Metrics:Kafka broker 的內(nèi)部指標(biāo)都以 JMX Metrics 的形式暴露給外部。1.1.1 版本 提供了豐富的監(jiān)控指標(biāo),滿足監(jiān)控需要;
- OpenFalcon:小米開源的一款企業(yè)級、高可用、可擴(kuò)展的開源監(jiān)控系統(tǒng);
- Grafana:Metrics 可視化系統(tǒng),大家比較熟悉,可對接多種 Metrics 數(shù)據(jù)源。
關(guān)于監(jiān)控:
- Falcon-agent:部署到每臺 Broker 上, 解析 Kafka JMX 指標(biāo)上報(bào)數(shù)據(jù);
- Grafana:用來可視化 Falcon Kafka Metrics 數(shù)據(jù),對 Cluster、Broker、Topic、Consumer 4 個(gè)角色制作監(jiān)控大盤;
- Eagle:獲取消費(fèi)組 Active 狀態(tài)、消費(fèi)組 Lag 積壓情況,同時(shí)提供 API,為監(jiān)控告警系統(tǒng)「雷達(dá)」提供監(jiān)控?cái)?shù)據(jù)。
關(guān)于告警:
雷達(dá)系統(tǒng): 自研監(jiān)控系統(tǒng),通過 Falcon 及 Eagle 獲取 Kafka 指標(biāo),結(jié)合設(shè)定閾值進(jìn)行告警。以消費(fèi)方式舉例,Lag 是衡量消費(fèi)情況是否正常的一個(gè)重要指標(biāo),如果 Lag 一直增加,必須要對它進(jìn)行處理。
發(fā)生問題的時(shí)候,不僅 Consumer 管理員要知道,它的用戶也要知道,所以報(bào)警系統(tǒng)也需要通知到用戶。具體方式是通過企業(yè)微信告警機(jī)器人自動提醒對應(yīng)消費(fèi)組的負(fù)責(zé)人或使用者及 Kafka 集群的管理者。
監(jiān)控示例:

4)應(yīng)用擴(kuò)展
①實(shí)時(shí)數(shù)據(jù)訂閱平臺
實(shí)時(shí)數(shù)據(jù)訂閱平臺是一個(gè)提供 Kafka 使用全流程管理的系統(tǒng)應(yīng)用,以工單審批的方式將數(shù)據(jù)生產(chǎn)和消費(fèi)申請、平臺用戶授權(quán)、使用方監(jiān)控告警等眾多環(huán)節(jié)流程化自動化, 并提供統(tǒng)一管控。
核心思想是基于 Kafka 數(shù)據(jù)源的身份認(rèn)證和權(quán)限控制,增加數(shù)據(jù)安全性的同時(shí)對 Kafka 下游應(yīng)用進(jìn)行管理。
②標(biāo)準(zhǔn)化的申請流程
無論生產(chǎn)者還是消費(fèi)者的需求,使用方首先會以工單的方式提出訂閱申請。申請信息包括業(yè)務(wù)線、Topic、訂閱方式等信息;工單最終會流轉(zhuǎn)到平臺等待審批;如果審批通過,使用方會分配到授權(quán)賬號及 Broker 地址。至此,使用方就可以進(jìn)行正常的生產(chǎn)消費(fèi)了。

③監(jiān)控告警
對于平臺來說,權(quán)限與資源是綁定的,資源可以是用于生產(chǎn)的 Topic 或消費(fèi)使用的 GroupTopic。一旦權(quán)限分配后,對于該部分資源的使用就會自動在我們的雷達(dá)監(jiān)控系統(tǒng)進(jìn)行注冊,用于資源整個(gè)生命的周期的監(jiān)控。
④數(shù)據(jù)重播
出于對數(shù)據(jù)完整性和準(zhǔn)確性的考量,目前 Lamda 架構(gòu)已經(jīng)是大數(shù)據(jù)的一種常用架構(gòu)方式。但從另一方面來說, Lamda 架構(gòu)也存在資源的過多使用和開發(fā)難度高等問題。
實(shí)時(shí)訂閱平臺可以為消費(fèi)組提供任意位點(diǎn)的重置,支持對實(shí)時(shí)數(shù)據(jù)按時(shí)間、位點(diǎn)等多種方式的數(shù)據(jù)重播, 并提供對 Kappa 架構(gòu)場景的支持,來解決以上痛點(diǎn)。

⑤主題管理
為什么提供主題管理?舉一些很簡單的例子,比如當(dāng)我們想讓一個(gè)用戶在集群上創(chuàng)建他自己的 Kafka Topic,這時(shí)顯然是不希望讓他直接到一個(gè)節(jié)點(diǎn)上操作的。因此剛才所講的服務(wù),不管是對用戶來講,還是管理員來講,我們都需要有一個(gè)界面操作它,因?yàn)椴豢赡芩腥硕纪ㄟ^ SSH 去連服務(wù)器。
因此需要一個(gè)提供管理功能的服務(wù),創(chuàng)建統(tǒng)一的入口并引入主題管理的服務(wù),包括主題的創(chuàng)建、資源隔離指定、主題元數(shù)據(jù)管理等。

⑥數(shù)據(jù)分流
在之前的架構(gòu)中, 使用方消費(fèi) Kafka 數(shù)據(jù)的粒度都是每個(gè) Kafka Topic 保存 LogSource 的全量數(shù)據(jù),但在使用中很多消費(fèi)方只需要消費(fèi)各 LogSource 的部分?jǐn)?shù)據(jù),可能也就是某一個(gè)應(yīng)用下幾個(gè)埋點(diǎn)事件的數(shù)據(jù)。如果需要下游應(yīng)用自己寫過濾規(guī)則,肯定存在資源的浪費(fèi)及使用便捷性的問題;另外還有一部分場景是需要多個(gè)數(shù)據(jù)源 Merge 在一起來使用的。
基于上面的兩種情況, 我人實(shí)現(xiàn)了按業(yè)務(wù)方需求拆分、合并并定制化 Topic 支持跨數(shù)據(jù)源的數(shù)據(jù)合并及 appcode 和 event code 的任意組個(gè)條件的過濾規(guī)則。

三、后續(xù)計(jì)劃
1、解決數(shù)據(jù)重復(fù)問題。為了解決目前平臺實(shí)時(shí)流處理中因故障恢復(fù)等因素導(dǎo)致數(shù)據(jù)重復(fù)的問題,我們正在嘗試用 Kafka 的事務(wù)機(jī)制結(jié)合 Flink 的兩段提交協(xié)議實(shí)現(xiàn)端到端的僅一次語義。目前已經(jīng)在平臺上小范圍試用, 如果通過測試,將會在生產(chǎn)環(huán)境下推廣。
2、Consumer 限流。在一寫多讀場景中, 如果某一個(gè) Consumer 操作大量讀磁盤, 會影響 Produce 級其他消費(fèi)者操作的延遲。l因此,通過 Kafka Quota 機(jī)制對 Consume 限流及支持動態(tài)調(diào)整閾值也是我們后續(xù)的方向
3、場景擴(kuò)展。基于 Kafka 擴(kuò)展 SDK、HTTP 等多種消息訂閱及生產(chǎn)方式,滿足不同語言環(huán)境及場景的使用需求。
以上就是關(guān)于 Kafka 在馬蜂窩大數(shù)據(jù)平臺應(yīng)用實(shí)踐的分享,如果大家有什么建議或者問題,歡迎在后臺留言。