Hudi 在 vivo 湖倉一體的落地實踐

一、Hudi 基礎(chǔ)能力及相關(guān)概念介紹
1.1 流批同源能力
與Hive不同,Hudi數(shù)據(jù)在Spark/Flink寫入后,下游可以繼續(xù)使用Spark/Flink引擎以流讀的形式實時讀取數(shù)據(jù)。同一份Hudi數(shù)據(jù)源既可以批讀也支持流讀。
Flink、Hive、Spark的流轉(zhuǎn)批架構(gòu):

Hudi流批同源架構(gòu):

1.2 COW和MOR的概念
Hudi支持COW(Copy On Write)和MOR(Merge On Read)兩種類型:
(1)COW寫時拷貝:
每次更新的數(shù)據(jù)都會拷貝一份新的數(shù)據(jù)版本出來,用戶通過最新或者指定version的可以進行數(shù)據(jù)查詢。缺點是寫入的時候往往會有寫內(nèi)存放大的情況,優(yōu)點是查詢不需要合并,直接讀取效率相對比較高。JDK中的CopyOnWriteArrayList/
CopyOnWriteArraySet 容器正是采用了 COW 思想。
COW表的數(shù)據(jù)組織格式如下:

(2)MOR讀時合并:
每次更新或者插入新的數(shù)據(jù)時,并寫入parquet文件,而是寫入Avro格式的log文件中,數(shù)據(jù)按照FileGroup進行分組,每個FileGroup由base文件(parquet文件)和若干log文件組成,每個FileGroup有單獨的FileGroupID;在讀取的時候會在內(nèi)存中將base文件和log文件進行合并,進而返回查詢的數(shù)據(jù)。缺點是合并需要花費額外的合并時間,查詢的效率受到影響;優(yōu)點是寫入的時候效率相較于COW快很多,一般用于要求數(shù)據(jù)快速寫入的場景。
MOR數(shù)據(jù)組織格式如下:

1.3 Hudi的小文件治理方案
Hudi表會針對COW和MOR表制定不同的文件合并方案,分別對應(yīng)Clustering和Compaction。
Clustering顧名思義,就是將COW表中多個FileGroup下的parquet根據(jù)指定的數(shù)據(jù)大小重新編排合并為新的且文件體積更大的文件塊。如下圖所示:

Compaction即base parquet文件與相同F(xiàn)ileGroup下的其余l(xiāng)og文件進行合并,生成最新版本的base文件。如下圖所示:

1.4 周邊引擎查詢Hudi的原理
當前主流的OLAP引擎等都是從HMS中獲取Hudi的分區(qū)元數(shù)據(jù)信息,從InputFormat屬性中判斷需要啟動HiveCatalog還是HudiCatalog,然后生成查詢計劃最終執(zhí)行。當前StarRocks、Presto等引擎都支持以外表的形式對Hudi表進行查詢。
圖片
1.5 Procedure介紹
Hudi 支持多種Procedure,即過程處理程序,用戶可以通過這些Procedure方便快速的處理Hudi表的相關(guān)邏輯,比如Compaction、Clustering、Clean等相關(guān)處理邏輯,不需要進行編碼,直接通過sparksql的語句來執(zhí)行。
1.6 項目架構(gòu)
1). 按時效性要求進行分類
秒級延遲:
圖片
分鐘級延遲:

當前Hudi主要還是應(yīng)用在準實時場景:
上游從Kafka以append模式接入ods的cow表,下游部分dw層業(yè)務(wù)根據(jù)流量大小選擇不同類型的索引表,比如bucket index的mor表,在數(shù)據(jù)去重后進行dw構(gòu)建,從而提供統(tǒng)一數(shù)據(jù)服務(wù)層給下游的實時和離線的業(yè)務(wù),同時ods層和dw層統(tǒng)一以insert overwrite的方式進行分區(qū)級別的容災(zāi)保障,Timeline上寫入一個replacecommit的instant,不會引發(fā)下游流量驟增,如下圖所示:
圖片
1.7 線上達成能力
實時場景:
支持1億條/min量級準實時寫入;流讀延遲穩(wěn)定在分鐘級
離線場景:
支持千億級別數(shù)據(jù)單批次離線寫入;查詢性能與查詢Hive持平(部分線上任務(wù)較查詢Hive提高20%以上)
小文件治理:
95%以上的合并任務(wù)單次執(zhí)行控制在10min內(nèi)完成
二、組件能力優(yōu)化
2.1 組件版本
當前線上所有Hudi的版本已從0.12 升級到 0.14,主要考慮到0.14版本的組件能力更加完備,且與社區(qū)前沿動態(tài)保持一致。
2.2 流計算場景
1). 限流
數(shù)據(jù)積壓嚴重的情況下,默認情況會消費所有未消費的commits,往往因消費的commits數(shù)目過大,導(dǎo)致任務(wù)頻繁O(jiān)OM,影響任務(wù)穩(wěn)定性;優(yōu)化后每次用戶可以攝取指定數(shù)目的commits,很大程度上避免任務(wù)OOM,提高了任務(wù)穩(wěn)定性。

2). 外置clean算子
避免單并行度的clean算子最終階段影響數(shù)據(jù)實時寫入的性能;將clean單獨剝離到
compaction/clustering執(zhí)行。這樣的好處是單個clean算子,不會因為其生成clean計劃和執(zhí)行導(dǎo)致局部某些Taskmanager出現(xiàn)熱點的問題,極大程度提升了實時任務(wù)穩(wěn)定性。

3). JM內(nèi)存優(yōu)化
部分大流量場景中,盡管已經(jīng)對Hudi進行了最大程度的調(diào)優(yōu),但是JM的內(nèi)存仍然在較高水位波動,還是會間隔性出現(xiàn)內(nèi)存溢出影響穩(wěn)定性。這種情況下我們嘗試對 state.backend.fs.memory-threshold 參數(shù)進行調(diào)整;從默認的20KB調(diào)整到1KB,JM內(nèi)存顯著下降;同時運行至今state相關(guān)數(shù)據(jù)未產(chǎn)生小文件影響。

2.3 批計算場景
1). Bucket index下的BulkInsert優(yōu)化
0.14版本后支持了bucket表的bulkinsert,實際使用過程中發(fā)現(xiàn)分區(qū)數(shù)很大的情況下,寫入延遲耗時與計算資源消耗較高;分析后主要是打開的句柄數(shù)較多,不斷CPU IO 頻繁切換影響寫入性能。
因此在hudi內(nèi)核進行了優(yōu)化,主要是基于partition path和bucket id組合進行預(yù)排序,并提前關(guān)閉空閑寫入句柄,進而優(yōu)化cpu資源使用率。
這樣原先50分鐘的任務(wù)能降低到30分鐘以內(nèi),數(shù)據(jù)寫入性能提高約30% ~ 40%。
優(yōu)化前:

優(yōu)化后:

2). 查詢優(yōu)化
0.14版本中,部分情況下分區(qū)裁剪會失效,從而導(dǎo)致條件查詢往往會掃描不相關(guān)的分區(qū),在分區(qū)數(shù)龐大的情況下,會導(dǎo)致driver OOM,對此問題進行了修復(fù),提高了查詢?nèi)蝿?wù)的速度和穩(wěn)定性。
eg:select * from `hudi_test`.`tmp_hudi_test` where day='2023-11-20' and hour=23;
(其中tmp_hudi_test是一張按日期和小時二級分區(qū)的表)
修復(fù)前:

修復(fù)后:

優(yōu)化后不僅包括減少分區(qū)的掃描數(shù)目,也減少了一些無效文件RPC的stage。
3). 多種OLAP引擎支持
此外,為了提高MOR表管理的效率,我們禁止了RO/RT表的生成;同時修復(fù)了原表的元數(shù)據(jù)不能正常同步到HMS的缺陷(這種情況下,OLAP引擎例如Presto、StarRocks查詢原表數(shù)據(jù)默認僅支持對RO/RT表的查詢,原表查詢?yōu)榭战Y(jié)果)。
圖片
2.4 小文件合并
1). 序列化問題修復(fù)
0.14版本Hudi在文件合并場景中,Compaction的性能相較0.12版本有30%左右的資源優(yōu)化,比如:原先0.12需要6G資源才能正常啟動單個executor的場景下,0.14版本 4G就可以啟動并穩(wěn)定執(zhí)行任務(wù);但是clustering存在因TypedProperties重復(fù)序列化導(dǎo)致的性能缺陷。完善后,clustering的性能得到30%以上的提升。
可以從executor的修復(fù)前后的火焰圖進行比對。
修復(fù)前:

修復(fù)后:

2). 分批compaction/clustering
compaction/clustering默認不支持按commits數(shù)分批次執(zhí)行,為了更好的兼容平臺調(diào)度能力,對compaction/clustering相關(guān)procedure進行了改進,支持按批次執(zhí)行。
同時對其他部分procedure也進行了優(yōu)化,比如copy_to_table支持了列裁剪拷貝、
delete_procedures支持了批量執(zhí)行等,降低sparksql的執(zhí)行時間。
3). clean優(yōu)化
Hudi0.14 在多分區(qū)表的場景下clean的時候很容易OOM,主要是因為構(gòu)建
HoodieTableFileSystemView的時候需要頻繁訪問TimelineServer,因產(chǎn)生大量分區(qū)信息請求對象導(dǎo)致內(nèi)存溢出。具體情況如下:

對此我們對partition request Job做了相關(guān)優(yōu)化,將多個task分為多個batch來執(zhí)行,降低對TimelineSever的內(nèi)存壓力,同時增加了請求前的緩存判斷,如果已經(jīng)緩存的將不會發(fā)起請求。
改造后如下:

此外實際情況下還可以在FileSystemViewManager構(gòu)建過程中將 remoteview 和 secondview 的順序互調(diào),絕大部分場景下也能避免clean oom的問題,直接優(yōu)先從secondview中獲取分區(qū)信息即可。
2.5 生命周期管理
當前計算平臺支持用戶表級別生命周期設(shè)置,為了提高刪除的效率,我們設(shè)計實現(xiàn)了直接從目錄對數(shù)據(jù)進行刪除的方案,這樣的收益有:
- 降低了元數(shù)據(jù)交互時間,執(zhí)行時間快;
- 無須加鎖、無須停止任務(wù);
- 不會影響后續(xù)compaction/clustering 相關(guān)任務(wù)執(zhí)行(比如執(zhí)行合并的時候不會報文件不存在等異常)。
刪除前會對compaction/clustering等instants的元數(shù)據(jù)信息進行掃描,經(jīng)過合法性判斷后區(qū)分用戶需要刪除的目錄是否存在其中,如果有就保存;否則直接刪除。流程如下:

三、總結(jié)
我們分別在流批場景、小文件治理、生命周期管理等方向做了相關(guān)優(yōu)化,上線后的收益主要體現(xiàn)這四個方向:
- 部分實時鏈路可以進行合并,降低了計算和存儲資源成本;
- 基于watermark有效識別分區(qū)寫入的完成度,接入湖倉的后續(xù)離線任務(wù)平均SLA提前時間不低于60分鐘;
- 部分流轉(zhuǎn)批后的任務(wù)上線后執(zhí)行時間減少約40%(比如原先執(zhí)行需要150秒的任務(wù)可以縮短到100秒左右完成 ;
- 離線增量更新場景,部分任務(wù)相較于原先Hive任務(wù)可以下降30%以上的計算資源。
同時跟進用戶實際使用情況,發(fā)現(xiàn)了一些有待優(yōu)化的問題:
- Hudi生成文件的體積相較于原先Hive,體積偏大(平均有1.3 ~ 1.4的比例);
- 流讀的指標不夠準確;
- Hive—>Hudi遷移需要有一定的學(xué)習(xí)成本;
針對上述問題,我們也做了如下后續(xù)計劃:
- 對hoodie parquet索引文件進行精簡優(yōu)化,此外業(yè)務(wù)上對主鍵的重新設(shè)計也會直接影響到文件體積大小;
- 部分流讀的指標不準,我們已經(jīng)完成初步的指標修復(fù),后續(xù)需要補充更多實時的任務(wù)指標來提高用戶體驗;
- 完善Hudi遷移流程,提供更快更簡潔的遷移工具,此外也會向更多的業(yè)務(wù)推廣Hudi組件,進一步挖掘Hudi組件的潛在使用價值。


































