騰訊大數(shù)據(jù)實(shí)時(shí)湖倉智能優(yōu)化實(shí)踐
一、湖倉架構(gòu)
騰訊大數(shù)據(jù)的湖倉架構(gòu)如下圖所示:
這里分為三個(gè)部分,分別是數(shù)據(jù)湖計(jì)算、數(shù)據(jù)湖管理和數(shù)據(jù)湖存儲(chǔ)。
數(shù)據(jù)湖計(jì)算部分,Spark 作為 ETL Batch 任務(wù)的主要批處理引擎,F(xiàn)link 作為準(zhǔn)實(shí)時(shí)計(jì)算的流處理引擎,StarRocks 和 Presto 作為即席查詢的 OLAP 引擎。數(shù)據(jù)湖管理層以 Iceberg 為核心,同時(shí)開放了一些簡單的 API,支持用戶通過 SDK 的方式去調(diào)用。在 Iceberg 之上構(gòu)建了一套 Auto Optimize Service 服務(wù),幫助用戶在使用 Iceberg 的過程中實(shí)現(xiàn)查詢性能的提升和存儲(chǔ)成本的降低。數(shù)據(jù)湖底層存儲(chǔ)基于 HDFS 和 COS,COS 是騰訊云的云對象存儲(chǔ),可以滿足云上用戶的大規(guī)模結(jié)構(gòu)化/非結(jié)構(gòu)化存儲(chǔ)需求,在上層計(jì)算框架和底層存儲(chǔ)系統(tǒng)之間,也會(huì)引入 Alluxio 構(gòu)建了一個(gè)統(tǒng)一的存儲(chǔ) Cache 層,進(jìn)行數(shù)據(jù)緩存提速。本次分享的重點(diǎn)主要是圍繞智能優(yōu)化服務(wù)(Auto Optimize Service)展開。
二、智能優(yōu)化服務(wù)
智能優(yōu)化服務(wù)主要由六個(gè)部分組成,分別是:Compaction Service(合并小文件)、Expiration Service(淘汰過期快照)、Cleaning Service(生命周期管理和孤兒文件清理)、Clustering Service(數(shù)據(jù)重分布)、Index Service(二級索引推薦)和 Auto Engine Service(自動(dòng)引擎加速)。以下就各模塊近期做的重點(diǎn)工作展開介紹。
1. Compaction Service
(1)小文件合并優(yōu)化
小文件合并有讀和寫兩個(gè)階段,由于 Iceberg 主要以 PARQUET/ORC 列存格式為主,讀寫列存面臨著兩次行列轉(zhuǎn)換和編解碼,開銷非常大。針對這個(gè)痛點(diǎn),我們對 Parquet 存儲(chǔ)模型進(jìn)行了分析,主要由 RowGroup、Column Chunk、Page 以及 Footer 組成,相對位置如下圖所示,不同列的最小存儲(chǔ)單元以 Page 級別組織,數(shù)據(jù)水平方向上以 RowGroup 大小劃分?jǐn)?shù)據(jù)塊,以便上層引擎按照 RowGroup 級別分配 task 加載數(shù)據(jù)。
基于存儲(chǔ)模型的特點(diǎn),我們針對性地采用了 RowGroup Level 和 Page Level 兩種拷貝優(yōu)化,對于大文件合并大文件且僅涉及重新壓縮、僅涉及列裁剪的場景,使用 RowGroup Copy;對于小文件合并大文件、不涉及列變化、不涉及 BloomFilter 的場景,使用 Page Copy。
下面是我們內(nèi)部全部升級優(yōu)化之后的落地效果,合并時(shí)間&資源減少 5 倍多。
(2)更多優(yōu)化
我們還增強(qiáng)了 Delete Files 合并優(yōu)化和增量 Rewrite 策略。
在大規(guī)模 Update 的場景下,會(huì)產(chǎn)生大量的 Delete Files,數(shù)據(jù)讀取時(shí)會(huì)頻繁地進(jìn)行 Delete File Apply Data File 的操作,這個(gè)過程是串行的,I/O 開銷巨大。當(dāng)合并的速度低于 Delete File Apply 的速度,就會(huì)因?yàn)榉e攢了大量的 Delete Files 導(dǎo)致合并失敗。針對這個(gè)痛點(diǎn),我們使用 Left Anti Join 拆分出了關(guān)聯(lián) Delete File 的 DataFile 和未關(guān)聯(lián) Delete File 的 DataFile,然后將兩者進(jìn)行 Union All。此外還在 Delete File Apply Data File 的過程中使用了 Bloom Index 加速尋找,及時(shí)刪除未關(guān)聯(lián) Data File 的 Delete File。
增量 Rewrite 優(yōu)化會(huì)通過在 DataFile 中引入 Modify Time 來決策,進(jìn)行分區(qū)級別的增量更新。
2. Index Service
(1)Iceberg Core Framework
Iceberg 較 Hive 增加了 min-max 索引,記錄了 DataFile 所有 column 列的最大值和最小值,在執(zhí)行引擎計(jì)算時(shí)可以協(xié)助做文件級別的過濾,但是文件級別的索引粒度較粗,在隨機(jī)寫數(shù)據(jù)的時(shí)候 min-max 存在交叉,導(dǎo)致索引失效。所以我們在這個(gè)基礎(chǔ)之上進(jìn)一步拓展了二級索引,來提高 Data Skipping 的能力,加速查詢。索引的構(gòu)建和加載過程在 Iceberg Core 層的框架支持實(shí)現(xiàn)如下:
(2)Iceberg scan metrics
對于專注于業(yè)務(wù)開發(fā)的用戶來說,索引的選擇往往是比較困難的,如何精準(zhǔn)的判斷是不是需要索引,需要什么索引,索引是否有效,索引是否會(huì)帶來副作用等,往往需要經(jīng)過一些額外的任務(wù)來進(jìn)行分析,如果靠用戶自己的決策選擇,獲得大規(guī)模的適配收益很難?;谶@個(gè)想法,我們做了智能推薦索引的支持,而智能的推薦,首先是需要一套 metrics 框架的支持,能夠記錄表的 Scan,F(xiàn)ilter 等各種事件,收集 Partition Status 信息,然后對這些事件進(jìn)行分析,統(tǒng)計(jì)列的查詢頻次,過濾條件,根據(jù)規(guī)則區(qū)分高/低基數(shù)列等。最后根據(jù)分析結(jié)果,進(jìn)行 Index 的推薦。
(3)索引智能推薦流程
整個(gè)端到端的 Index Service 流程如下圖:1)首先是 SQL 提取,由于我們獲取到的 SQL 是引擎優(yōu)化后的,并不是原始 SQL,所以需要進(jìn)行 SQL 重構(gòu)。2)是索引粗篩,根據(jù)拿到的信息,比如列和分區(qū)的查詢頻度,初步判斷怎么建立索引是有效的。3)開始嘗試構(gòu)建索引,支持構(gòu)建分區(qū)級別增量索引。4)在用戶無感知的情況下進(jìn)行任務(wù)雙跑。5)根據(jù)雙跑結(jié)果進(jìn)行索引優(yōu)化的效果評估。6)將索引優(yōu)化數(shù)據(jù)輸出給用戶,推薦用戶使用。7)由于索引構(gòu)建是復(fù)雜的,一個(gè)表會(huì)被多任務(wù)引用,一個(gè)任務(wù)也會(huì)去訪問多張表,我們提供任務(wù)級別和表級別的索引構(gòu)建,盡可能實(shí)現(xiàn)表級和任務(wù)級的同步優(yōu)化。
3. Clustering Service
由于 Iceberg 的 min-max 索引在隨機(jī)寫的情況下是普遍失效的,導(dǎo)致 Data Skipping 能力較差,所以如果需要精確覆蓋 min-max,可以將數(shù)據(jù)進(jìn)行重排分布。當(dāng)用戶進(jìn)行單列查詢的時(shí)候,提前對數(shù)據(jù)列排序?qū)懭?,如果是多列查詢的情況,由于無法保證多個(gè)列都分布在一個(gè)文件中,我們使用 Z-order,對每個(gè)列進(jìn)行數(shù)字化處理,采樣計(jì)算 Range-ID,生成交錯(cuò)位Z-Value,根據(jù) Z-Value 進(jìn)行重分區(qū),可以保證不同列之間的相對有序性。
實(shí)際業(yè)務(wù)中,Data Clustering 和 Data Skipping 都實(shí)現(xiàn)了四倍以上的效果提升。
4. AutoEngine Service
相對于 OLAP 引擎來講,Iceberg 表,Hudi 表都是外表,這些外表基本都是 TB 級別,使用 StarRocks,Doris 查詢外表并不能發(fā)揮 OLAP 的查詢優(yōu)勢。AutoEngine Service 通過收集 OLAP 引擎的 Event Message,對相應(yīng)的分區(qū)進(jìn)行加熱,也就是將相關(guān)分區(qū)數(shù)據(jù)路由到 StarRocks 集群,上層引擎可以在 StarRocks 集群中發(fā)現(xiàn)該分區(qū)的元數(shù)據(jù),由此實(shí)現(xiàn)基于存儲(chǔ)計(jì)算引擎的選擇優(yōu)化。
三、場景化能力
1. 多流拼接
關(guān)于多流拼接,這里舉個(gè)例子簡單說明, 如圖所示,有兩個(gè) MQ 同時(shí)往下游寫數(shù)據(jù),MQ1 更新列 data1,MQ2 更新列 data2,最終根據(jù) id 聚合,取時(shí)間戳 orderColumn 排序最靠前的一條,作為 join 之后的 source。要實(shí)現(xiàn)這個(gè)合并更新能力,往往需要外接各種臨時(shí)存儲(chǔ) Redis/Hbase/MQ 等組件。
那在 Iceberg 層面是怎么優(yōu)化的呢?由于 Iceberg 本身支持事務(wù)和列級的更新刪除操作,類似于代碼倉庫的 Branch 概念,因此可以通過打 tag 的方式去標(biāo)記狀態(tài)。具體實(shí)現(xiàn)是,初始化階段,數(shù)據(jù)寫入主流程,同時(shí)多流往其他 Merged Branch 去寫入,寫完之后的話會(huì)有一個(gè)異步的 Compaction 任務(wù),定期和主流程合并,當(dāng)用戶在讀的時(shí)候,直接讀取 Merged Branch。
2. 主鍵表
通過多流 Join 的實(shí)現(xiàn)方法依賴 Compaction Service 的調(diào)度性能,當(dāng)數(shù)據(jù)規(guī)模不斷增加,多流 join 聚合計(jì)算更新的拼接方式可能存在性能瓶頸,所以我們也引入主鍵表作為行級更新的另一種實(shí)現(xiàn)方式。比如這里我們根據(jù) id 分成四個(gè)桶,存在多個(gè)任務(wù)往一個(gè)桶去寫數(shù)據(jù),一個(gè)桶內(nèi)的數(shù)據(jù)是有序的,那么下游在讀取桶數(shù)據(jù)的時(shí)候會(huì)更輕松。但是當(dāng) id 的基數(shù)很大的時(shí)候,比如當(dāng) id 為 4/8/16 的時(shí)候,都會(huì)往一個(gè)桶內(nèi)寫數(shù),會(huì)產(chǎn)生 DataFile 的重疊,在下游從桶內(nèi)讀數(shù)的時(shí)候,就需要合并一個(gè)桶內(nèi)的多個(gè) DataFile 到一個(gè) Reader 處理。如果分桶數(shù)量設(shè)置的不合適,單點(diǎn)壓力就會(huì)過大,此時(shí)可以使用 Rescale 實(shí)現(xiàn)桶的彈性擴(kuò)縮容。另外在桶的基礎(chǔ)上擴(kuò)展列族 Column Family 的概念,相當(dāng)于每個(gè)列都作為獨(dú)立的文件寫入,多個(gè) Column Family 行拼接 Full Outer Join 即可。
3. In Place 遷移
由于對數(shù)據(jù)湖的高階特性能力的需要,很多業(yè)務(wù)做了架構(gòu)的升級,同時(shí)也面臨著存量 Thive(騰訊自研 Hive)和 Hive 的數(shù)據(jù)遷移到 Iceberg。這里需要重點(diǎn)支持的工作包括:存儲(chǔ)數(shù)據(jù)的遷移,計(jì)算任務(wù)的遷移。
首先存儲(chǔ)數(shù)據(jù)的遷移,我們提供了 data in-place 的方案,不搬移原來的 data files,僅僅重新生成 Iceberg 新表所需的 metadata 即可,遷移的過程支持了 STRICT/APPEND/OVERWRITE 等三種模式。
其次是計(jì)算任務(wù)的遷移支持, 我們改進(jìn)支持了新的 Name Mapping 機(jī)制,增強(qiáng)支持了 Identity partition pruning 能力,使得對于場景的 built-in functions 裁剪能力取得數(shù)量級性能提升,優(yōu)化實(shí)現(xiàn)如下:
4. PyIceberg
Iceberg Table Spec 是開發(fā)性的實(shí)現(xiàn),可以支持多種語言 API 接入,AI生態(tài)圈數(shù)據(jù)科學(xué)等主要以 Python 環(huán)境為主,要求高性能 Native 解碼,對 JVM 環(huán)境無強(qiáng)依賴,PySpark 雖然具備接入 Iceberg 的能力,但是太重了。我們可以直接利用 PyIceberg 能力,無JVM 依賴,加載解碼一次即可,提供廣泛的機(jī)器學(xué)習(xí)類庫的優(yōu)勢,拓展 Python的技術(shù)棧到 Iceberg 元數(shù)據(jù)層面,構(gòu)造 Pandas,Tensorflow,Pytorch 等不同的 DataFrame,方便進(jìn)行數(shù)據(jù)分析和 AI 模型訓(xùn)練的編程探索,我們內(nèi)部也深度支持了 PyIceberg SQL 的列裁剪和謂詞下推能力,結(jié)合 DuckDB 做一些小數(shù)據(jù)集的算法快速調(diào)試。
四、總結(jié)和展望
未來還將從以下方面著手,進(jìn)行實(shí)時(shí)湖倉的優(yōu)化:
1. Auto Optimize Service
- 冷熱分離降本提效
- 物化視圖提速
- AE 服務(wù)智能化感知
- Compaction 能力打磨
- 更多 Transform UDF Partition Pruning 優(yōu)化
2. 主鍵表優(yōu)化
拓展 Deletion Vector,解決謂詞下推必須聯(lián)合去重的性能問題
3. AI 探索
- 落地適合模型訓(xùn)練的湖倉格式。
- 探索實(shí)現(xiàn)分布式 dataFrame,整合 metadata 和引擎。