偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

DataLeap 的 Catalog 系統(tǒng)近實時消息同步能力優(yōu)化

精選
大數(shù)據(jù)
Apache Atlas 對于實時消息的消費處理不滿足性能要求,內(nèi)部使用 Flink 任務(wù)的處理方案在 ToB 場景中也存在諸多限制,所以團隊自研了輕量級異步消息處理框架,很好的支持了字節(jié)內(nèi)部和火山引擎上同步元數(shù)據(jù)的訴求。本文定義了需求場景,并詳細介紹框架的設(shè)計與實現(xiàn)。

字節(jié)數(shù)據(jù)中臺 DataLeap 的 Data Catalog 系統(tǒng)通過接收 MQ 中的近實時消息來同步部分元數(shù)據(jù)。Apache Atlas 對于實時消息的消費處理不滿足性能要求,內(nèi)部使用 Flink 任務(wù)的處理方案在 ToB 場景中也存在諸多限制,所以團隊自研了輕量級異步消息處理框架,很好的支持了字節(jié)內(nèi)部和火山引擎上同步元數(shù)據(jù)的訴求。本文定義了需求場景,并詳細介紹框架的設(shè)計與實現(xiàn)。

1. 背景

1.1 動機

字節(jié)數(shù)據(jù)中臺 DataLeap 的 Data Catalog 系統(tǒng)基于 Apache Atlas 搭建,其中 Atlas 通過 Kafka 獲取外部系統(tǒng)的元數(shù)據(jù)變更消息。在開源版本中,每臺服務(wù)器支持的 Kafka Consumer 數(shù)量有限,在每日百萬級消息體量下,經(jīng)常有長延時等問題,影響用戶體驗。

在 2020 年底,我們針對 Atlas 的消息消費部分做了重構(gòu),將消息的消費和處理從后端服務(wù)中剝離出來,并編寫了 Flink 任務(wù)承擔(dān)這部分工作,比較好的解決了擴展性和性能問題。然而,到 2021 年年中,團隊開始重點投入私有化部署和火山公有云支持,對于 Flink 集群的依賴引入了可維護性的痛點。

在仔細的分析了使用場景和需求,并調(diào)研了現(xiàn)成的解決方案后,我們決定投入人力自研一個消息處理框架。當(dāng)前這個框架很好的支持了字節(jié)內(nèi)部以及 ToB 場景中 Data Catalog 對于消息消費和處理的場景。

本文會詳細介紹框架解決的問題,整體的設(shè)計,以及實現(xiàn)中的關(guān)鍵決定。

1.2 需求定義

使用下面的表格將具體場景定義清楚。

需求維度

需求描述

吞吐量

每日百萬級別,每秒峰值>100

服務(wù)質(zhì)量(QoS)

至少一次

延遲消息

支持將消息標(biāo)記為延遲處理,最高延遲 1 min

重試

自動對處理失敗消息重試,重試次數(shù)可定義

并行與順序處理

Partition 內(nèi)部支持按照某個 Key 重新分組,不同 Key 之間接受并行,同一個 Key 要求順序處理

消息處理時間

不同類型的消息,處理時間會有較大差別,從< 1 s~1 min

封裝

確保不丟消息的前提下,依賴框架做 Offset 的提交,業(yè)務(wù)側(cè)只需要編寫消息的處理邏輯;另外,將系統(tǒng)狀態(tài)以 Metric 方式暴露

輕量

支持與后端服務(wù)混合部署,不引入額外的維護成本

1.3 相關(guān)工作

在啟動自研之前,我們評估了兩個比較相關(guān)的方案,分別是 Flink 和 Kafka Streaming。

Flink 是我們之前生產(chǎn)上使用的方案,在能力上是符合要求的,最主要的問題是長期的可維護性。在公有云場景,那個階段 Flink 服務(wù)在火山云上還沒有發(fā)布,我們自己的服務(wù)又有嚴格的時間線,所以必須考慮替代;在私有化場景,我們不確認客戶的環(huán)境一定有 Flink 集群,即使部署的數(shù)據(jù)底座中帶有 Flink,后續(xù)的維護也是個頭疼的問題。另外一個角度,作為通用流式處理框架,F(xiàn)link 的大部分功能其實我們并沒有用到,對于單條消息的流轉(zhuǎn)路徑,其實只是簡單的讀取和處理,使用 Flink 有些“殺雞用牛刀”了。

另外一個比較標(biāo)準(zhǔn)的方案是 Kafka Streaming。作為 Kafka 官方提供的框架,對于流式處理的語義有較好的支持,也滿足我們對于輕量的訴求。最終沒有采用的主要考慮點是兩個:

  • 對于 Offset 的維護不夠靈活:我們的場景不能使用自動提交(會丟消息),而對于同一個 Partition 中的數(shù)據(jù)又要求一定程度的并行處理,使用 Kafka Streaming 的原生接口較難支持。
  • 與 Kafka 強綁定:大部分場景下,我們團隊不是元數(shù)據(jù)消息隊列的擁有者,也有團隊使用 RocketMQ 等提供元數(shù)據(jù)變更,在應(yīng)用層,我們希望使用同一套框架兼容。

2. 設(shè)計

2.1 概念說明

  • MQ Type:Message Queue 的類型,比如 Kafka與RocketMQ。后續(xù)內(nèi)容以 Kafka 為主,設(shè)計一定程度兼容其他 MQ。
  • Topic:一批消息的集合,包含多個 Partition,可以被多個 Consumer Group消費。
  • Consumer Group:一組 Consumer,同一 Group 內(nèi)的 Consumer 數(shù)據(jù)不會重復(fù)消費。
  • Consumer:消費消息的最小單位,屬于某個 Consumer Group。?
  • Partition:Topic 中的一部分數(shù)據(jù),同一 Partition 內(nèi)消息有序。同一 Consumer Group 內(nèi),一個 Partition 只會被其中一個 Consumer 消費。
  • Event:由 Topic 中的消息轉(zhuǎn)換而來,部分屬性如下。
  • Event Type:消息的類型定義,會與 Processor 有對應(yīng)關(guān)系;
  • Event Key:包含消息 Topic、Partition、Offset 等元數(shù)據(jù),用來對消息進行 Hash 操作;
  • Processor:消息處理的單元,針對某個 Event Type 定制的業(yè)務(wù)邏輯。
  • Task:消費消息并處理的一條 Pipeline,Task 之間資源是相互獨立的。

2.2 框架架構(gòu)

圖片

整個框架主要由 MQ Consumer, Message Processor 和 State Manager 組成。

  • MQ Consumer:負責(zé)從Kafka Topic拉取消息,并根據(jù) Event Key 將消息投放到內(nèi)部隊列,如果消息需要延時消費,會被投放到對應(yīng)的延時隊列;該模塊還負責(zé)定時查詢 State Manager 中記錄的消息狀態(tài),并根據(jù)返回提交消息 Offset;上報與消息消費相關(guān)的 Metric。
  • Message Processor:負責(zé)從隊列中拉取消息并異步進行處理,它會將消息的處理結(jié)果更新給 State Manager,同時上報與消息處理相關(guān)的 Metric。
  • State Manager:負責(zé)維護每個 Kafka Partition 的消息狀態(tài),并暴露當(dāng)前應(yīng)提交的 Offset 信息給 MQ Consumer。

3. 實現(xiàn)

3.1 線程模型

每個 Task 可以運行在一臺或多臺實例,建議部署到多臺機器,以獲得更好的性能和容錯能力。

每臺實例中,存在兩組線程池:

  • Consumer Pool:負責(zé)管理 MQ Consumer Thread 的生命周期,當(dāng)服務(wù)啟動時,根據(jù)配置拉起一定規(guī)模的線程,并在服務(wù)關(guān)閉時確保每個 Thread 安全退出或者超時停止。整體有效 Thread 的上限與 Topic 的 Partition 的總數(shù)有關(guān)。
  • Processor Pool:負責(zé)管理 Message Processor Thread 的生命周期,當(dāng)服務(wù)啟動時,根據(jù)配置拉起一定規(guī)模的線程,并在服務(wù)關(guān)閉時確保每個 Thread 安全退出或者超時停止。可以根據(jù) Event Type 所需要處理的并行度來靈活配置。

兩類 Thread 的性質(zhì)分別如下:

  • Consumer Thread:每個 MQ Consumer 會封裝一個 Kafka Consumer,可以消費 0 個或者多個 Partition。根據(jù) Kafka 的機制,當(dāng) MQ Consumer Thread 的個數(shù)超過 Partition 的個數(shù)時,當(dāng)前 Thread 不會有實際流量。
  • Processor Thread:唯一對應(yīng)一個內(nèi)部的隊列,并以 FIFO 的方式消費和處理其中的消息。

3.2 StateManager

圖片

在 State Manager 中,會為每個 Partition 維護一個優(yōu)先隊列(最小堆),隊列中的信息是 Offset,兩個優(yōu)先隊列的職責(zé)如下:

  • 處理中的隊列:一條消息轉(zhuǎn)化為 Event 后,MQ Consumer 會調(diào)用 StateManager 接口,將消息 Offset  插入該隊列。
  • 處理完的隊列:一條消息處理結(jié)束或最終失敗,Message Processor 會調(diào)用 StateManager 接口,將消息 Offset 插入該隊列。

MQ Consumer 會周期性的檢查當(dāng)前可以 Commit 的 Offset,情況枚舉如下:

  • 處理中的隊列堆頂 < 處理完的隊列堆頂或者處理完的隊列為空:代表當(dāng)前消費回來的消息還在處理過程中,本輪不做 Offset 提交。
  • 處理中的隊列堆頂 = 處理完的隊列堆頂:表示當(dāng)前消息已經(jīng)處理完,兩邊同時出隊,并記錄當(dāng)前堆頂為可提交的 Offset,重復(fù)檢查過程。
  • 處理中的隊列堆頂 > 處理完的隊列堆頂:異常情況,通常是數(shù)據(jù)回放到某些中間狀態(tài),將處理完的隊列堆頂出堆。

注意:當(dāng)發(fā)生 Consumer 的 Rebalance 時,需要將對應(yīng) Partition 的隊列清空

3.3 KeyBy 與 Delay Processing 的支持

因源頭的 Topic 和消息格式有可能不可控制,所以 MQ Consumer 的職責(zé)之一是將消息統(tǒng)一封裝為 Event。

根據(jù)需求,會從原始消息中拼裝出 Event Key,對 Key 取 Hash 后,相同結(jié)果的 Event 會進入同一個隊列,可以保證分區(qū)內(nèi)的此類事件處理順序的穩(wěn)定,同時將消息的消費與處理解耦,支持增大內(nèi)部隊列數(shù)量來增加吞吐。

Event 中也支持設(shè)置是否延遲處理屬性,可以根據(jù) Event Time 延遲固定時間后處理,需要被延遲處理的事件會被發(fā)送到有界延遲隊列中,有界延遲隊列的實現(xiàn)繼承了 DelayQueue,限制 DelayQueue 長度, 達到限定值入隊會被阻塞。

3.4 異常處理

Processor 在消息處理過程中,可能遇到各種異常情況,設(shè)計框架的動機之一就是為業(yè)務(wù)邏輯的編寫者屏蔽掉這種復(fù)雜度。Processor 相關(guān)框架的邏輯會與 State Manager 協(xié)作,處理異常并充分暴露狀態(tài)。比較典型的異常情況以及處理策略如下:

  • 處理消息失?。鹤詣佑|發(fā)重試,重試到用戶設(shè)置的最大次數(shù)或默認值后會將消息失敗狀態(tài)通知 State Manager。
  • 處理消息超時:超時對于吞吐影響較大,且通常重試的效果不明顯,因此當(dāng)前策略是不會對消息重試,直接通知 State Manager  消息處理失敗。
  • 處理消息較慢:上游 Topic 存在 Lag,Message Consumer 消費速率大于  Message Processor 處理速率時,消息會堆積在隊列中,達到隊列最大長度, Message Consumer 會被阻塞在入隊操作,停止拉取消息,類似 Flink 框架中的背壓。?

3.5 監(jiān)控

為了方便運維,在框架層面暴露了一組監(jiān)控指標(biāo),并支持用戶自定義 Metrics。其中默認支持的 Metrics 如下表所示:

監(jiān)控類別

監(jiān)控指標(biāo)

Message Consumer

Consumer Lag


Rebalance rate


Deserialize QPS


Consumer heartbeat


Message Enqueue Time

Message Processor

Process QPS


Process time

Internal Queue

Queue length

4. 線上運維 Case 舉例

實際生產(chǎn)環(huán)境運行時,偶爾需要做些運維操作,其中最常見的是消息堆積和消息重放。

對于 Conusmer Lag 這類問題的處理步驟大致如下:

  • 查看 Enqueue Time,Queue Length 的監(jiān)控確定服務(wù)內(nèi)隊列是否有堆積。
  • 如果隊列有堆積,查看 Process Time 指標(biāo),確定是否是某個 Processor 處理慢,如果是,根據(jù)指標(biāo)中的 Tag 確定事件類型等屬性特征,判斷業(yè)務(wù)邏輯或者 Key 設(shè)置是否合理;全部 Processor 處理慢,可以通過增加 Processor 并行度來解決。
  • 如果隊列無堆積,排除網(wǎng)絡(luò)問題后,可以考慮增加 Consumer 并行度至 Topic Partition 上限。

消息重放被觸發(fā)的原因通常有兩種,要么是業(yè)務(wù)上需要重放部分數(shù)據(jù)做補全,要么是遇到了事故需要修復(fù)數(shù)據(jù)。為了應(yīng)對這種需求,我們在框架層面支持了根據(jù)時間戳重置 Offset 的能力。具體操作時的步驟如下:

  • 使用服務(wù)側(cè)暴露的 API,啟動一臺實例使用新的 Consumer GroupId: {newConsumerGroup} 從某個 startupTimestamp 開始消費。
  • 更改全部配置中的 Consumer GroupId 為 {newConsumerGroup}。
  • 分批重啟所有實例。

5. 總結(jié)

為了解決字節(jié)數(shù)據(jù)中臺 DataLeap 中 Data Catalog 系統(tǒng)消費近實時元數(shù)據(jù)變更的業(yè)務(wù)場景,我們自研了輕量級消息處理框架。當(dāng)前該框架已在字節(jié)內(nèi)部生產(chǎn)環(huán)境穩(wěn)定運行超過 1 年,并支持了火山引擎上的數(shù)據(jù)地圖服務(wù)的元數(shù)據(jù)同步場景,滿足了我們團隊的需求。

下一步會根據(jù)優(yōu)先級排期支持 RocketMQ 等其他消息隊列,并持續(xù)優(yōu)化配置動態(tài)更新,監(jiān)控報警,運維自動化等方面。

責(zé)任編輯:未麗燕 來源: 字節(jié)跳動技術(shù)團隊
相關(guān)推薦

2023-04-14 15:37:02

DataLeap存儲優(yōu)化MySQL

2015-04-01 15:03:58

Spark大數(shù)據(jù)

2023-05-03 08:58:46

數(shù)據(jù)庫開源

2022-06-08 09:55:19

Data Catal字節(jié)跳動業(yè)務(wù)系統(tǒng)

2023-06-27 07:11:37

湖倉一體MaxCompute

2013-06-27 09:59:26

網(wǎng)絡(luò)通信HTML5Web

2022-11-03 07:22:42

2022-11-24 08:50:07

數(shù)據(jù)中臺Data Catal

2022-04-12 08:22:54

Linux內(nèi)核操作系統(tǒng)

2011-06-22 10:37:08

rsyncinotify

2023-10-19 11:43:47

惡意軟件

2024-10-18 11:39:55

MySQL數(shù)據(jù)檢索

2009-04-28 10:00:52

中華網(wǎng)壓縮開支裁員

2020-03-18 07:11:24

實時同步搜索

2024-07-03 11:33:02

2013-05-16 10:15:11

信息泄密彭博Bloomberg

2020-10-09 15:00:56

實時消息編程語言

2024-07-03 08:02:19

MySQL數(shù)據(jù)搜索
點贊
收藏

51CTO技術(shù)棧公眾號