DataLeap 的 Catalog 系統(tǒng)近實時消息同步能力優(yōu)化
精選字節(jié)數(shù)據(jù)中臺 DataLeap 的 Data Catalog 系統(tǒng)通過接收 MQ 中的近實時消息來同步部分元數(shù)據(jù)。Apache Atlas 對于實時消息的消費處理不滿足性能要求,內(nèi)部使用 Flink 任務(wù)的處理方案在 ToB 場景中也存在諸多限制,所以團隊自研了輕量級異步消息處理框架,很好的支持了字節(jié)內(nèi)部和火山引擎上同步元數(shù)據(jù)的訴求。本文定義了需求場景,并詳細介紹框架的設(shè)計與實現(xiàn)。
1. 背景
1.1 動機
字節(jié)數(shù)據(jù)中臺 DataLeap 的 Data Catalog 系統(tǒng)基于 Apache Atlas 搭建,其中 Atlas 通過 Kafka 獲取外部系統(tǒng)的元數(shù)據(jù)變更消息。在開源版本中,每臺服務(wù)器支持的 Kafka Consumer 數(shù)量有限,在每日百萬級消息體量下,經(jīng)常有長延時等問題,影響用戶體驗。
在 2020 年底,我們針對 Atlas 的消息消費部分做了重構(gòu),將消息的消費和處理從后端服務(wù)中剝離出來,并編寫了 Flink 任務(wù)承擔(dān)這部分工作,比較好的解決了擴展性和性能問題。然而,到 2021 年年中,團隊開始重點投入私有化部署和火山公有云支持,對于 Flink 集群的依賴引入了可維護性的痛點。
在仔細的分析了使用場景和需求,并調(diào)研了現(xiàn)成的解決方案后,我們決定投入人力自研一個消息處理框架。當(dāng)前這個框架很好的支持了字節(jié)內(nèi)部以及 ToB 場景中 Data Catalog 對于消息消費和處理的場景。
本文會詳細介紹框架解決的問題,整體的設(shè)計,以及實現(xiàn)中的關(guān)鍵決定。
1.2 需求定義
使用下面的表格將具體場景定義清楚。
需求維度 | 需求描述 |
吞吐量 | 每日百萬級別,每秒峰值>100 |
服務(wù)質(zhì)量(QoS) | 至少一次 |
延遲消息 | 支持將消息標(biāo)記為延遲處理,最高延遲 1 min |
重試 | 自動對處理失敗消息重試,重試次數(shù)可定義 |
并行與順序處理 | Partition 內(nèi)部支持按照某個 Key 重新分組,不同 Key 之間接受并行,同一個 Key 要求順序處理 |
消息處理時間 | 不同類型的消息,處理時間會有較大差別,從< 1 s~1 min |
封裝 | 確保不丟消息的前提下,依賴框架做 Offset 的提交,業(yè)務(wù)側(cè)只需要編寫消息的處理邏輯;另外,將系統(tǒng)狀態(tài)以 Metric 方式暴露 |
輕量 | 支持與后端服務(wù)混合部署,不引入額外的維護成本 |
1.3 相關(guān)工作
在啟動自研之前,我們評估了兩個比較相關(guān)的方案,分別是 Flink 和 Kafka Streaming。
Flink 是我們之前生產(chǎn)上使用的方案,在能力上是符合要求的,最主要的問題是長期的可維護性。在公有云場景,那個階段 Flink 服務(wù)在火山云上還沒有發(fā)布,我們自己的服務(wù)又有嚴格的時間線,所以必須考慮替代;在私有化場景,我們不確認客戶的環(huán)境一定有 Flink 集群,即使部署的數(shù)據(jù)底座中帶有 Flink,后續(xù)的維護也是個頭疼的問題。另外一個角度,作為通用流式處理框架,F(xiàn)link 的大部分功能其實我們并沒有用到,對于單條消息的流轉(zhuǎn)路徑,其實只是簡單的讀取和處理,使用 Flink 有些“殺雞用牛刀”了。
另外一個比較標(biāo)準(zhǔn)的方案是 Kafka Streaming。作為 Kafka 官方提供的框架,對于流式處理的語義有較好的支持,也滿足我們對于輕量的訴求。最終沒有采用的主要考慮點是兩個:
- 對于 Offset 的維護不夠靈活:我們的場景不能使用自動提交(會丟消息),而對于同一個 Partition 中的數(shù)據(jù)又要求一定程度的并行處理,使用 Kafka Streaming 的原生接口較難支持。
- 與 Kafka 強綁定:大部分場景下,我們團隊不是元數(shù)據(jù)消息隊列的擁有者,也有團隊使用 RocketMQ 等提供元數(shù)據(jù)變更,在應(yīng)用層,我們希望使用同一套框架兼容。
2. 設(shè)計
2.1 概念說明
- MQ Type:Message Queue 的類型,比如 Kafka與RocketMQ。后續(xù)內(nèi)容以 Kafka 為主,設(shè)計一定程度兼容其他 MQ。
- Topic:一批消息的集合,包含多個 Partition,可以被多個 Consumer Group消費。
- Consumer Group:一組 Consumer,同一 Group 內(nèi)的 Consumer 數(shù)據(jù)不會重復(fù)消費。
- Consumer:消費消息的最小單位,屬于某個 Consumer Group。?
- Partition:Topic 中的一部分數(shù)據(jù),同一 Partition 內(nèi)消息有序。同一 Consumer Group 內(nèi),一個 Partition 只會被其中一個 Consumer 消費。
- Event:由 Topic 中的消息轉(zhuǎn)換而來,部分屬性如下。
- Event Type:消息的類型定義,會與 Processor 有對應(yīng)關(guān)系;
- Event Key:包含消息 Topic、Partition、Offset 等元數(shù)據(jù),用來對消息進行 Hash 操作;
- Processor:消息處理的單元,針對某個 Event Type 定制的業(yè)務(wù)邏輯。
- Task:消費消息并處理的一條 Pipeline,Task 之間資源是相互獨立的。
2.2 框架架構(gòu)
整個框架主要由 MQ Consumer, Message Processor 和 State Manager 組成。
- MQ Consumer:負責(zé)從Kafka Topic拉取消息,并根據(jù) Event Key 將消息投放到內(nèi)部隊列,如果消息需要延時消費,會被投放到對應(yīng)的延時隊列;該模塊還負責(zé)定時查詢 State Manager 中記錄的消息狀態(tài),并根據(jù)返回提交消息 Offset;上報與消息消費相關(guān)的 Metric。
- Message Processor:負責(zé)從隊列中拉取消息并異步進行處理,它會將消息的處理結(jié)果更新給 State Manager,同時上報與消息處理相關(guān)的 Metric。
- State Manager:負責(zé)維護每個 Kafka Partition 的消息狀態(tài),并暴露當(dāng)前應(yīng)提交的 Offset 信息給 MQ Consumer。
3. 實現(xiàn)
3.1 線程模型
每個 Task 可以運行在一臺或多臺實例,建議部署到多臺機器,以獲得更好的性能和容錯能力。
每臺實例中,存在兩組線程池:
- Consumer Pool:負責(zé)管理 MQ Consumer Thread 的生命周期,當(dāng)服務(wù)啟動時,根據(jù)配置拉起一定規(guī)模的線程,并在服務(wù)關(guān)閉時確保每個 Thread 安全退出或者超時停止。整體有效 Thread 的上限與 Topic 的 Partition 的總數(shù)有關(guān)。
- Processor Pool:負責(zé)管理 Message Processor Thread 的生命周期,當(dāng)服務(wù)啟動時,根據(jù)配置拉起一定規(guī)模的線程,并在服務(wù)關(guān)閉時確保每個 Thread 安全退出或者超時停止。可以根據(jù) Event Type 所需要處理的并行度來靈活配置。
兩類 Thread 的性質(zhì)分別如下:
- Consumer Thread:每個 MQ Consumer 會封裝一個 Kafka Consumer,可以消費 0 個或者多個 Partition。根據(jù) Kafka 的機制,當(dāng) MQ Consumer Thread 的個數(shù)超過 Partition 的個數(shù)時,當(dāng)前 Thread 不會有實際流量。
- Processor Thread:唯一對應(yīng)一個內(nèi)部的隊列,并以 FIFO 的方式消費和處理其中的消息。
3.2 StateManager
在 State Manager 中,會為每個 Partition 維護一個優(yōu)先隊列(最小堆),隊列中的信息是 Offset,兩個優(yōu)先隊列的職責(zé)如下:
- 處理中的隊列:一條消息轉(zhuǎn)化為 Event 后,MQ Consumer 會調(diào)用 StateManager 接口,將消息 Offset 插入該隊列。
- 處理完的隊列:一條消息處理結(jié)束或最終失敗,Message Processor 會調(diào)用 StateManager 接口,將消息 Offset 插入該隊列。
MQ Consumer 會周期性的檢查當(dāng)前可以 Commit 的 Offset,情況枚舉如下:
- 處理中的隊列堆頂 < 處理完的隊列堆頂或者處理完的隊列為空:代表當(dāng)前消費回來的消息還在處理過程中,本輪不做 Offset 提交。
- 處理中的隊列堆頂 = 處理完的隊列堆頂:表示當(dāng)前消息已經(jīng)處理完,兩邊同時出隊,并記錄當(dāng)前堆頂為可提交的 Offset,重復(fù)檢查過程。
- 處理中的隊列堆頂 > 處理完的隊列堆頂:異常情況,通常是數(shù)據(jù)回放到某些中間狀態(tài),將處理完的隊列堆頂出堆。
注意:當(dāng)發(fā)生 Consumer 的 Rebalance 時,需要將對應(yīng) Partition 的隊列清空
3.3 KeyBy 與 Delay Processing 的支持
因源頭的 Topic 和消息格式有可能不可控制,所以 MQ Consumer 的職責(zé)之一是將消息統(tǒng)一封裝為 Event。
根據(jù)需求,會從原始消息中拼裝出 Event Key,對 Key 取 Hash 后,相同結(jié)果的 Event 會進入同一個隊列,可以保證分區(qū)內(nèi)的此類事件處理順序的穩(wěn)定,同時將消息的消費與處理解耦,支持增大內(nèi)部隊列數(shù)量來增加吞吐。
Event 中也支持設(shè)置是否延遲處理屬性,可以根據(jù) Event Time 延遲固定時間后處理,需要被延遲處理的事件會被發(fā)送到有界延遲隊列中,有界延遲隊列的實現(xiàn)繼承了 DelayQueue,限制 DelayQueue 長度, 達到限定值入隊會被阻塞。
3.4 異常處理
Processor 在消息處理過程中,可能遇到各種異常情況,設(shè)計框架的動機之一就是為業(yè)務(wù)邏輯的編寫者屏蔽掉這種復(fù)雜度。Processor 相關(guān)框架的邏輯會與 State Manager 協(xié)作,處理異常并充分暴露狀態(tài)。比較典型的異常情況以及處理策略如下:
- 處理消息失?。鹤詣佑|發(fā)重試,重試到用戶設(shè)置的最大次數(shù)或默認值后會將消息失敗狀態(tài)通知 State Manager。
- 處理消息超時:超時對于吞吐影響較大,且通常重試的效果不明顯,因此當(dāng)前策略是不會對消息重試,直接通知 State Manager 消息處理失敗。
- 處理消息較慢:上游 Topic 存在 Lag,Message Consumer 消費速率大于 Message Processor 處理速率時,消息會堆積在隊列中,達到隊列最大長度, Message Consumer 會被阻塞在入隊操作,停止拉取消息,類似 Flink 框架中的背壓。?
3.5 監(jiān)控
為了方便運維,在框架層面暴露了一組監(jiān)控指標(biāo),并支持用戶自定義 Metrics。其中默認支持的 Metrics 如下表所示:
監(jiān)控類別 | 監(jiān)控指標(biāo) |
Message Consumer | Consumer Lag |
Rebalance rate | |
Deserialize QPS | |
Consumer heartbeat | |
Message Enqueue Time | |
Message Processor | Process QPS |
Process time | |
Internal Queue | Queue length |
4. 線上運維 Case 舉例
實際生產(chǎn)環(huán)境運行時,偶爾需要做些運維操作,其中最常見的是消息堆積和消息重放。
對于 Conusmer Lag 這類問題的處理步驟大致如下:
- 查看 Enqueue Time,Queue Length 的監(jiān)控確定服務(wù)內(nèi)隊列是否有堆積。
- 如果隊列有堆積,查看 Process Time 指標(biāo),確定是否是某個 Processor 處理慢,如果是,根據(jù)指標(biāo)中的 Tag 確定事件類型等屬性特征,判斷業(yè)務(wù)邏輯或者 Key 設(shè)置是否合理;全部 Processor 處理慢,可以通過增加 Processor 并行度來解決。
- 如果隊列無堆積,排除網(wǎng)絡(luò)問題后,可以考慮增加 Consumer 并行度至 Topic Partition 上限。
消息重放被觸發(fā)的原因通常有兩種,要么是業(yè)務(wù)上需要重放部分數(shù)據(jù)做補全,要么是遇到了事故需要修復(fù)數(shù)據(jù)。為了應(yīng)對這種需求,我們在框架層面支持了根據(jù)時間戳重置 Offset 的能力。具體操作時的步驟如下:
- 使用服務(wù)側(cè)暴露的 API,啟動一臺實例使用新的 Consumer GroupId: {newConsumerGroup} 從某個 startupTimestamp 開始消費。
- 更改全部配置中的 Consumer GroupId 為 {newConsumerGroup}。
- 分批重啟所有實例。
5. 總結(jié)
為了解決字節(jié)數(shù)據(jù)中臺 DataLeap 中 Data Catalog 系統(tǒng)消費近實時元數(shù)據(jù)變更的業(yè)務(wù)場景,我們自研了輕量級消息處理框架。當(dāng)前該框架已在字節(jié)內(nèi)部生產(chǎn)環(huán)境穩(wěn)定運行超過 1 年,并支持了火山引擎上的數(shù)據(jù)地圖服務(wù)的元數(shù)據(jù)同步場景,滿足了我們團隊的需求。
下一步會根據(jù)優(yōu)先級排期支持 RocketMQ 等其他消息隊列,并持續(xù)優(yōu)化配置動態(tài)更新,監(jiān)控報警,運維自動化等方面。