汽車之家:基于 Flink + Iceberg 的湖倉一體架構(gòu)實踐
內(nèi)容簡要:
一、數(shù)據(jù)倉庫架構(gòu)升級的背景
二、基于 Iceberg 的湖倉一體架構(gòu)實踐
三、總結(jié)與收益
四、后續(xù)規(guī)劃
一、數(shù)據(jù)倉庫架構(gòu)升級的背景
1. 基于 Hive 的數(shù)據(jù)倉庫的痛點
原有的數(shù)據(jù)倉庫完全基于 Hive 建造而成,主要存在三大痛點:
痛點一:不支持 ACID
1)不支持 Upsert 場景;
2)不支持 Row-level delete,數(shù)據(jù)修正成本高。
痛點二:時效性難以提升
1)數(shù)據(jù)難以做到準(zhǔn)實時可見;
2)無法增量讀取,無法實現(xiàn)存儲層面的流批統(tǒng)一;
3)無法支持分鐘級延遲的數(shù)據(jù)分析場景。
痛點三:Table Evolution
1)寫入型 Schema,對 Schema 變更支持不好;
2)Partition Spec 變更支持不友好。
2. Iceberg 關(guān)鍵特性
Iceberg 主要有四大關(guān)鍵特性:支持 ACID 語義、增量快照機(jī)制、開放的表格式和流批接口支持。
支持 ACID 語義不會讀到不完整的 Commit;基于樂觀鎖支持并發(fā) Commit;Row-level delete,支持 Upsert。
增量快照機(jī)制Commit 后數(shù)據(jù)即可見(分鐘級);可回溯歷史快照。
開放的表格式數(shù)據(jù)格式:parquet、orc、avro計算引擎:Spark、Flink、Hive、Trino/Presto
流批接口支持支持流、批寫入;支持流、批讀取。
二、基于 Iceberg 的湖倉一體架構(gòu)實踐
湖倉一體的意義就是說我不需要看見湖和倉,數(shù)據(jù)有著打通的元數(shù)據(jù)的格式,它可以自由的流動,也可以對接上層多樣化的計算生態(tài)。
——賈揚(yáng)清(阿里云計算平臺高級研究員)
1. Append 流入湖的鏈路
上圖為日志類數(shù)據(jù)入湖的鏈路,日志類數(shù)據(jù)包含客戶端日志、用戶端日志以及服務(wù)端日志。這些日志數(shù)據(jù)會實時錄入到 Kafka,然后通過 Flink 任務(wù)寫到 Iceberg 里面,最終存儲到 HDFS。
2. Flink SQL 入湖鏈路打通
我們的 Flink SQL 入湖鏈路打通是基于 “Flink 1.11 + Iceberg 0.11” 完成的,對接 Iceberg Catalog 我們主要做了以下內(nèi)容:
1)Meta Server 增加對 Iceberg Catalog 的支持;
2)SQL SDK 增加 Iceberg Catalog 支持。
然后在這基礎(chǔ)上,平臺開放 Iceberg 表的管理功能,使得用戶可以自己在平臺上建 SQL 的表。
3. 入湖 - 支持代理用戶
第二步是內(nèi)部的實踐,對接現(xiàn)有預(yù)算體系、權(quán)限體系。
因為之前平臺做實時作業(yè)的時候,平臺都是默認(rèn)為 Flink 用戶去運(yùn)行的,之前存儲不涉及 HDFS 存儲,因此可能沒有什么問題,也就沒有思考預(yù)算劃分方面的問題。
但是現(xiàn)在寫 Iceberg 的話,可能就會涉及一些問題。比如數(shù)倉團(tuán)隊有自己的集市,數(shù)據(jù)就應(yīng)該寫到他們的目錄下面,預(yù)算也是劃到他們的預(yù)算下,同時權(quán)限和離線團(tuán)隊賬號的體系打通。
如上所示,這塊主要是在平臺上做了代理用戶的功能,用戶可以去指定用哪個賬號去把這個數(shù)據(jù)寫到 Iceberg 里面,實現(xiàn)過程主要有以下三個。
增加 Table 級別配置:'iceberg.user.proxy' = 'targetUser’1)啟用 Superuser2)團(tuán)隊賬號鑒權(quán)
訪問 HDFS 時啟用代理用戶:
訪問 Hive Metastore 時指定代理用戶1)參考 Spark 的相關(guān)實現(xiàn):org.apache.spark.deploy.security.HiveDelegationTokenProvider2)動態(tài)代理 HiveMetaStoreClient,使用代理用戶訪問 Hive metastore
4. Flink SQL 入湖示例
DDL + DML
5. CDC 數(shù)據(jù)入湖鏈路
如上所示,我們有一個 AutoDTS 平臺,負(fù)責(zé)業(yè)務(wù)庫數(shù)據(jù)的實時接入。我們會把這些業(yè)務(wù)庫的數(shù)據(jù)接入到 Kafka 里面,同時它還支持在平臺上配置分發(fā)任務(wù),相當(dāng)于把進(jìn) Kafka 的數(shù)據(jù)分發(fā)到不同的存儲引擎里,在這個場景下是分發(fā)到 Iceberg 里。
6. Flink SQL CDC 入湖鏈路打通
下面是我們基于 “Flink1.11 + Iceberg 0.11” 支持 CDC 入湖所做的改動:
改進(jìn) Iceberg Sink:
Flink 1.11 版本為 AppendStreamTableSink,無法處理 CDC 流,修改并適配。
表管理1)支持 Primary key(PR1978)2)開啟 V2 版本:'iceberg.format.version' = '2'
7. CDC 數(shù)據(jù)入湖
1. 支持 Bucket
Upsert 場景下,需要確保同一條數(shù)據(jù)寫入到同一 Bucket 下,這又如何實現(xiàn)?
目前 Flink SQL 語法不支持聲明 bucket 分區(qū),通過配置的方式聲明 Bucket:
'partition.bucket.source'='id', // 指定 bucket 字段
'partition.bucket.num'='10', // 指定 bucket 數(shù)量
2. Copy-on-write sink
做 Copy-on-Write 的原因是原本社區(qū)的 Merge-on-Read 不支持合并小文件,所以我們臨時去做了 Copy-on-write sink 的實現(xiàn)。目前業(yè)務(wù)一直在測試使用,效果良好。
上方為 Copy-on-Write 的實現(xiàn),其實跟原來的 Merge-on-Read 比較類似,也是有 StreamWriter 多并行度寫入和 FileCommitter 單并行度順序提交。
在 Copy-on-Write 里面,需要根據(jù)表的數(shù)據(jù)量合理設(shè)置 Bucket 數(shù),無需額外做小文件合并。
StreamWriter 在 snapshotState 階段多并行度寫入1)增加 Buffer;2)寫入前需要判斷上次 checkpoint 已經(jīng) commit 成功;3)按 bucket 分組、合并,逐個 Bucket 寫入。
FileCommitter 單并行度順序提交1)table.newOverwrite()2)Flink.last.committed.checkpoint.id
8. 示例 - CDC 數(shù)據(jù)配置入湖
如上圖所示,在實際使用中,業(yè)務(wù)方可以在 DTS 平臺上創(chuàng)建或配置分發(fā)任務(wù)即可。
實例類型選擇 Iceberg 表,然后選擇目標(biāo)庫,表明要把哪個表的數(shù)據(jù)同步到 Iceberg 里,然后可以選原表和目標(biāo)表的字段的映射關(guān)系是什么樣的,配置之后就可以啟動分發(fā)任務(wù)。啟動之后,會在實時計算平臺 Flink 里面提交一個實時任務(wù),接著用 Copy-on-write sink 去實時地把數(shù)據(jù)寫到 Iceberg 表里面。
9. 入湖其他實踐
實踐一:減少 empty commit
問題描述:
在上游 Kafka 長期沒有數(shù)據(jù)的情況下,每次 Checkpoint 依舊會生成新的 Snapshot,導(dǎo)致大量的空文件和不必要的 Snapshot。
解決方案(PR - 2042):
增加配置 Flink.max-continuousempty-commits,在連續(xù)指定次數(shù) Checkpoint 都沒有數(shù)據(jù)后才真正觸發(fā) Commit,生成 Snapshot。
實踐二:記錄 watermark
問題描述:
目前 Iceberg 表本身無法直接反映數(shù)據(jù)寫入的進(jìn)度,離線調(diào)度難以精準(zhǔn)觸發(fā)下游任務(wù)。
解決方案( PR - 2109 ):
在 Commit 階段將 Flink 的 Watermark 記錄到 Iceberg 表的 Properties 中,可直觀的反映端到端的延遲情況,同時可以用來判斷分區(qū)數(shù)據(jù)完整性,用于調(diào)度觸發(fā)下游任務(wù)。
實踐三:刪表優(yōu)化
問題描述:
刪除 Iceberg 可能會很慢,導(dǎo)致平臺接口相應(yīng)超時。因為 Iceberg 是面向?qū)ο蟠鎯沓橄?IO 層的,沒有快速清除目錄的方法。
解決方案:
擴(kuò)展 FileIO,增加 deleteDir 方法,在 HDFS 上快速刪除表數(shù)據(jù)。
10. 小文件合并及數(shù)據(jù)清理
定期為每個表執(zhí)行批處理任務(wù)(spark 3),分為以下三個步驟:
1. 定期合并新增分區(qū)的小文件:
rewriteDataFilesAction.execute(); 僅合并小文件,不會刪除舊文件。
2. 刪除過期的 snapshot,清理元數(shù)據(jù)及數(shù)據(jù)文件:
table.expireSnapshots().expireOld erThan(timestamp).commit();
3. 清理 orphan 文件,默認(rèn)清理 3 天前,且無法觸及的文件:
removeOrphanFilesAction.older Than(timestamp).execute();
11. 計算引擎 – Flink
Flink 是實時平臺的核心計算引擎,目前主要支持?jǐn)?shù)據(jù)入湖場景,主要有以下幾個方面的特點。
數(shù)據(jù)準(zhǔn)實時入湖:Flink 和 Iceberg 在數(shù)據(jù)入湖方面集成度最高,F(xiàn)link 社區(qū)主動擁抱數(shù)據(jù)湖技術(shù)。
平臺集成:AutoStream 引入 IcebergCatalog,支持通過 SQL 建表、入湖 AutoDTS 支持將 MySQL、SQLServer、TiDB 表配置入湖。
流批一體:在流批一體的理念下,F(xiàn)link 的優(yōu)勢會逐漸體現(xiàn)出來。
12. 計算引擎 – Hive
Hive 在 SQL 批處理層面 Iceberg 和 Spark 3 集成度更高,主要提供以下三個方面的功能。
定期小文件合并及 meta 信息查詢:SELECT * FROM prod.db.table.history 還可查看 snapshots, files, manifests。
離線數(shù)據(jù)寫入:1)Insert into 2)Insert overwrite 3)Merge into
分析查詢:主要支持日常的準(zhǔn)實時分析查詢場景。
13. 計算引擎 – Trino/Presto
AutoBI 已經(jīng)和 Presto 集成,用于報表、分析型查詢場景。
Trino1)直接將 Iceberg 作為報表數(shù)據(jù)源2)需要增加元數(shù)據(jù)緩存機(jī)制:https://github.com/trinodb/trino/issues/7551
Presto社區(qū)集成中:https://github.com/prestodb/presto/pull/15836
14. 踩過的坑
1. 訪問 Hive Metastore 異常
問題描述:HiveConf 的構(gòu)造方法的誤用,導(dǎo)致 Hive 客戶端中聲明的配置被覆蓋,導(dǎo)致訪問 Hive metastore 時異常
解決方案(PR-2075):修復(fù) HiveConf 的構(gòu)造,顯示調(diào)用 addResource 方法,確保配置不會被覆蓋:hiveConf.addResource(conf);
2.Hive metastore 鎖未釋放
問題描述:“CommitFailedException: Timed out after 181138 ms waiting for lock xxx.” 原因是 hiveMetastoreClient.lock 方法,在未獲得鎖的情況下,也需要顯示 unlock,否則會導(dǎo)致上面異常。
解決方案(PR-2263):優(yōu)化 HiveTableOperations#acquireLock 方法,在獲取鎖失敗的情況下顯示調(diào)用 unlock 來釋放鎖。
3. 元數(shù)據(jù)文件丟失
問題描述:Iceberg 表無法訪問,報 “NotFoundException Failed to open input stream for file : xxx.metadata.json”
解決方案(PR-2328):當(dāng)調(diào)用 Hive metastore 更新 iceberg 表的 metadata_location 超時后,增加檢查機(jī)制,確認(rèn)元數(shù)據(jù)未保存成功后再刪除元數(shù)據(jù)文件。
三、收益與總結(jié)
1. 總結(jié)
通過對湖倉一體、流批融合的探索,我們分別做了總結(jié)。
湖倉一體1)Iceberg 支持 Hive Metastore;2)總體使用上與 Hive 表類似:相同數(shù)據(jù)格式、相同的計算引擎。
流批融合準(zhǔn)實時場景下實現(xiàn)流批統(tǒng)一:同源、同計算、同存儲。
2. 業(yè)務(wù)收益
數(shù)據(jù)時效性提升:入倉延遲從 2 小時以上降低到 10 分鐘以內(nèi);算法核心任務(wù) SLA 提前 2 小時完成。
準(zhǔn)實時的分析查詢:結(jié)合 Spark 3 和 Trino,支持準(zhǔn)實時的多維分析查詢。
特征工程提效:提供準(zhǔn)實時的樣本數(shù)據(jù),提高模型訓(xùn)練時效性。
CDC 數(shù)據(jù)準(zhǔn)實時入倉:可以在數(shù)倉針對業(yè)務(wù)表做準(zhǔn)實時分析查詢。
3. 架構(gòu)收益 - 準(zhǔn)實時數(shù)倉
上方也提到了,我們支持準(zhǔn)實時的入倉和分析,相當(dāng)于是為后續(xù)的準(zhǔn)實時數(shù)倉建設(shè)提供了基礎(chǔ)的架構(gòu)驗證。準(zhǔn)實時數(shù)倉的優(yōu)勢是一次開發(fā)、口徑統(tǒng)一、統(tǒng)一存儲,是真正的批流一體。劣勢是實時性較差,原來可能是秒級、毫秒級的延遲,現(xiàn)在是分鐘級的數(shù)據(jù)可見性。
但是在架構(gòu)層面上,這個意義還是很大的,后續(xù)我們能看到一些希望,可以把整個原來 “T + 1” 的數(shù)倉,做成準(zhǔn)實時的數(shù)倉,提升數(shù)倉整體的數(shù)據(jù)時效性,然后更好地支持上下游的業(yè)務(wù)。
四、后續(xù)規(guī)劃
1. 跟進(jìn) Iceberg 版本
全面開放 V2 格式,支持 CDC 數(shù)據(jù)的 MOR 入湖。
2. 建設(shè)準(zhǔn)實時數(shù)倉
基于 Flink 通過 Data pipeline 模式對數(shù)倉各層表全面提速。
3. 流批一體
隨著 upsert 功能的逐步完善,持續(xù)探索存儲層面流批一體。
4. 多維分析
基于 Presto/Spark3 輸出準(zhǔn)實時多維分析。