Flink 和 Iceberg 如何解決數(shù)據(jù)入湖面臨的挑戰(zhàn)
一、數(shù)據(jù)入湖的核心挑戰(zhàn)
數(shù)據(jù)實(shí)時(shí)入湖可以分成三個(gè)部分,分別是數(shù)據(jù)源、數(shù)據(jù)管道和數(shù)據(jù)湖(數(shù)倉(cāng)),本文的內(nèi)容將圍繞這三部分展開(kāi)。
1. Case #1:程序 BUG 導(dǎo)致數(shù)據(jù)傳輸中斷
首先,當(dāng)數(shù)據(jù)源通過(guò)數(shù)據(jù)管道傳到數(shù)據(jù)湖(數(shù)倉(cāng))時(shí),很有可能會(huì)遇到作業(yè)有 BUG 的情況,導(dǎo)致數(shù)據(jù)傳到一半,對(duì)業(yè)務(wù)造成影響;
第二個(gè)問(wèn)題是當(dāng)遇到這種情況的時(shí)候,如何重啟作業(yè),并保證數(shù)據(jù)不重復(fù)也不缺失,完整地同步到數(shù)據(jù)湖(數(shù)倉(cāng))中。
2. Case #2:數(shù)據(jù)變更太痛苦
數(shù)據(jù)變更當(dāng)發(fā)生數(shù)據(jù)變更的情況時(shí),會(huì)給整條鏈路帶來(lái)較大的壓力和挑戰(zhàn)。以下圖為例,原先是一個(gè)表定義了兩個(gè)字段,分別是 ID 和 NAME。此時(shí),業(yè)務(wù)方面的同學(xué)表示需要將地址加上,以方便更好地挖掘用戶的價(jià)值。首先,我們需要把 Source 表加上一個(gè)列 Address,然后再把到 Kafka 中間的鏈路加上鏈,然后修改作業(yè)并重啟。接著整條鏈路得一路改過(guò)去,添加新列,修改作業(yè)并重啟,最后把數(shù)據(jù)湖(數(shù)倉(cāng))里的所有數(shù)據(jù)全部更新,從而實(shí)現(xiàn)新增列。這個(gè)過(guò)程的操作不僅耗時(shí),而且會(huì)引入一個(gè)問(wèn)題,就是如何保證數(shù)據(jù)的隔離性,在變更的過(guò)程中不會(huì)對(duì)分析作業(yè)的讀取造成影響。
分區(qū)變更如下圖所示,數(shù)倉(cāng)里面的表是以 “月” 為單位進(jìn)行分區(qū),現(xiàn)在希望改成以 “天” 為單位做分區(qū),這可能就需要將很多系統(tǒng)的數(shù)據(jù)全部更新一遍,然后再用新的策略進(jìn)行分區(qū),這個(gè)過(guò)程十分耗時(shí)。
3. Case #3:越來(lái)越慢的近實(shí)時(shí)報(bào)表?
當(dāng)業(yè)務(wù)需要更加近實(shí)時(shí)的報(bào)表時(shí),需要將數(shù)據(jù)的導(dǎo)入周期,從 “天” 改到 “小時(shí)”,甚至 “分鐘” 級(jí)別,這可能會(huì)帶來(lái)一系列問(wèn)題。
如上圖所示,首先帶來(lái)的第一個(gè)問(wèn)題是:文件數(shù)以肉眼可見(jiàn)的速度增長(zhǎng),這將對(duì)外面的系統(tǒng)造成越來(lái)越大的壓力。壓力主要體現(xiàn)在兩個(gè)方面:
第一個(gè)壓力是,啟動(dòng)分析作業(yè)越來(lái)越慢,Hive Metastore 面臨擴(kuò)展難題,如下圖所示。隨著小文件越來(lái)越多,使用中心化的 Metastore 的瓶頸會(huì)越來(lái)越嚴(yán)重,這會(huì)造成啟動(dòng)分析作業(yè)越來(lái)越慢,因?yàn)閱?dòng)作業(yè)的時(shí)候,會(huì)把所有的小文件原數(shù)據(jù)都掃一遍。第二是因?yàn)?Metastore 是中心化的系統(tǒng),很容易碰到 Metastore 擴(kuò)展難題。例如 Hive,可能就要想辦法擴(kuò)后面的 MySQL,造成較大的維護(hù)成本和開(kāi)銷。
第二個(gè)壓力是掃描分析作業(yè)越來(lái)越慢。隨著小文件增加,在分析作業(yè)起來(lái)之后,會(huì)發(fā)現(xiàn)掃描的過(guò)程越來(lái)越慢。本質(zhì)是因?yàn)樾∥募罅吭黾?,?dǎo)致掃描作業(yè)在很多個(gè) Datanode 之間頻繁切換。
4. Case #4:實(shí)時(shí)地分析 CDC 數(shù)據(jù)很困難
大家調(diào)研 Hadoop 里各種各樣的系統(tǒng),發(fā)現(xiàn)整個(gè)鏈路需要跑得又快又好又穩(wěn)定,并且有好的并發(fā),這并不容易。
首先從源端來(lái)看,比如要將 MySQL 的數(shù)據(jù)同步到數(shù)據(jù)湖進(jìn)行分析,可能會(huì)面臨一個(gè)問(wèn)題,就是 MySQL 里面有存量數(shù)據(jù),后面如果不斷產(chǎn)生增量數(shù)據(jù),如何完美地同步全量和增量數(shù)據(jù)到數(shù)據(jù)湖中,保證數(shù)據(jù)不多也不少。
此外,假設(shè)解決了源頭的全量跟增量切換,如果在同步過(guò)程中遇到異常,如上游的 Schema 變更導(dǎo)致作業(yè)中斷,如何保證 CDC 數(shù)據(jù)一行不少地同步到下游。
整條鏈路的搭建,需要涉及源頭全量跟同步的切換,包括中間數(shù)據(jù)流的串通,還有寫入到數(shù)據(jù)湖(數(shù)倉(cāng))的流程,搭建整個(gè)鏈路需要寫很多代碼,開(kāi)發(fā)門檻較高。
最后一個(gè)問(wèn)題,也是關(guān)鍵的一個(gè)問(wèn)題,就是我們發(fā)現(xiàn)在開(kāi)源的生態(tài)和系統(tǒng)中,很難找到高效、高并發(fā)分析 CDC 這種變更性質(zhì)的數(shù)據(jù)。
5. 數(shù)據(jù)入湖面臨的核心挑戰(zhàn)
數(shù)據(jù)同步任務(wù)中斷無(wú)法有效隔離寫入對(duì)分析的影響;同步任務(wù)不保證 exactly-once 語(yǔ)義。
端到端數(shù)據(jù)變更DDL 導(dǎo)致全鏈路更新升級(jí)復(fù)雜;修改湖/倉(cāng)中存量數(shù)據(jù)困難。
越來(lái)越慢的近實(shí)時(shí)報(bào)表頻繁寫入產(chǎn)生大量小文件;Metadata 系統(tǒng)壓力大, 啟動(dòng)作業(yè)慢;大量小文件導(dǎo)致數(shù)據(jù)掃描慢。
無(wú)法近實(shí)時(shí)分析 CDC 數(shù)據(jù)難以完成全量到增量同步的切換;涉及端到端的代碼開(kāi)發(fā),門檻高;開(kāi)源界缺乏高效的存儲(chǔ)系統(tǒng)。
二、Apache Iceberg 介紹
1. Netflix:Hive 上云痛點(diǎn)總結(jié)
Netflix 做 Iceberg 最關(guān)鍵的原因是想解決 Hive 上云的痛點(diǎn),痛點(diǎn)主要分為以下三個(gè)方面:
1.1 痛點(diǎn)一:數(shù)據(jù)變更和回溯困難
不提供 ACID 語(yǔ)義。在發(fā)生數(shù)據(jù)改動(dòng)時(shí),很難隔離對(duì)分析任務(wù)的影響。典型操作如:INSERT OVERWRITE;修改數(shù)據(jù)分區(qū);修改 Schema;
無(wú)法處理多個(gè)數(shù)據(jù)改動(dòng),造成沖突問(wèn)題;
無(wú)法有效回溯歷史版本。
1.2 痛點(diǎn)二:替換 HDFS 為 S3 困難
數(shù)據(jù)訪問(wèn)接口直接依賴 HDFS API;
依賴 RENAME 接口的原子性,這在類似 S3 這樣的對(duì)象存儲(chǔ)上很難實(shí)現(xiàn)同樣的語(yǔ)義;
大量依賴文件目錄的 list 接口,這在對(duì)象存儲(chǔ)系統(tǒng)上很低效。
1.3 痛點(diǎn)三:太多細(xì)節(jié)問(wèn)題
Schema 變更時(shí),不同文件格式行為不一致。不同 FileFormat 甚至連數(shù)據(jù)類型的支持都不一致;
Metastore 僅維護(hù) partition 級(jí)別的統(tǒng)計(jì)信息,造成不 task plan 開(kāi)銷; Hive Metastore 難以擴(kuò)展;
非 partition 字段不能做 partition prune。
2. Apache Iceberg 核心特性
通用化標(biāo)準(zhǔn)設(shè)計(jì)完美解耦計(jì)算引擎Schema 標(biāo)準(zhǔn)化開(kāi)放的數(shù)據(jù)格式支持 Java 和 Python
完善的 Table 語(yǔ)義Schema 定義與變更靈活的 Partition 策略ACID 語(yǔ)義Snapshot 語(yǔ)義
豐富的數(shù)據(jù)管理存儲(chǔ)的流批統(tǒng)一可擴(kuò)展的 META 設(shè)計(jì)支持批更新和 CDC支持文件加密
性價(jià)比計(jì)算下推設(shè)計(jì)低成本的元數(shù)據(jù)管理向量化計(jì)算輕量級(jí)索引
3. Apache Iceberg File Layout
上方為一個(gè)標(biāo)準(zhǔn)的 Iceberg 的 TableFormat 結(jié)構(gòu),核心分為兩部分,一部分是 Data,一部分是 Metadata,無(wú)論哪部分都是維護(hù)在 S3 或者是 HDFS 之上的。
4. Apache Iceberg Snapshot View
上圖為 Iceberg 的寫入跟讀取的大致流程。
可以看到這里面分三層:
最上面黃色的是快照;
中間藍(lán)色的是 Manifest;
最下面是文件。
每次寫入都會(huì)產(chǎn)生一批文件,一個(gè)或多個(gè) Manifest,還有快照。
比如第一次形成了快照 Snap-0,第二次形成快照 Snap-1,以此類推。但是在維護(hù)原數(shù)據(jù)的時(shí)候,都是增量一步一步做追加維護(hù)的。
這樣的話可以幫助用戶在一個(gè)統(tǒng)一的存儲(chǔ)上做批量的數(shù)據(jù)分析,也可以基于存儲(chǔ)之上去做快照之間的增量分析,這也是 Iceberg 在流跟批的讀寫上能夠做到一些支持的原因。
5. 選擇 Apache Iceberg 的公司
上圖為目前在使用 Apache Iceberg 的部分公司,國(guó)內(nèi)的例子大家都較為熟悉,這里大致介紹一下國(guó)外公司的使用情況。
NetFlix 現(xiàn)在是有數(shù)百PB的數(shù)據(jù)規(guī)模放到 Apache Iceberg 之上,F(xiàn)link 每天的數(shù)據(jù)增量是上百T的數(shù)據(jù)規(guī)模。
Adobe 每天的數(shù)據(jù)新增量規(guī)模為數(shù)T,數(shù)據(jù)總規(guī)模在幾十PB左右。
AWS 把 Iceberg 作為數(shù)據(jù)湖的底座。
Cloudera 基于 Iceberg 構(gòu)建自己整個(gè)公有云平臺(tái),像 Hadoop 這種 HDFS 私有化部署的趨勢(shì)在減弱,上云的趨勢(shì)逐步上升,Iceberg 在 Cloudera 數(shù)據(jù)架構(gòu)上云的階段中起到關(guān)鍵作用。
蘋果有兩個(gè)團(tuán)隊(duì)在使用:一是整個(gè) iCloud 數(shù)據(jù)平臺(tái)基于 Iceberg 構(gòu)建;二是人工智能語(yǔ)音服務(wù) Siri,也是基于 Flink 跟 Iceberg 來(lái)構(gòu)建整個(gè)數(shù)據(jù)庫(kù)的生態(tài)。
三、Flink 和 Iceberg 如何解決問(wèn)題
回到最關(guān)鍵的內(nèi)容,下面闡述 Flink 和 Iceberg 如何解決第一部分所遇到的一系列問(wèn)題。
1. Case #1:程序 BUG 導(dǎo)致數(shù)據(jù)傳輸中斷
首先,同步鏈路用 Flink,可以保證 exactly once 的語(yǔ)義,當(dāng)作業(yè)出現(xiàn)故障時(shí),能夠做嚴(yán)格的恢復(fù),保證數(shù)據(jù)的一致性。
第二個(gè)是 Iceberg,它提供嚴(yán)謹(jǐn)?shù)?ACID 語(yǔ)義,可以幫用戶輕松隔離寫入對(duì)分析任務(wù)的不利影響。
2. Case #2:數(shù)據(jù)變更太痛苦
如上所示,當(dāng)發(fā)生數(shù)據(jù)變更時(shí),用 Flink 和 Iceberg 可以解決這個(gè)問(wèn)題。
Flink 可以捕捉到上游 Schema 變更的事件,然后把這個(gè)事件同步到下游,同步之后下游的 Flink 直接把數(shù)據(jù)往下轉(zhuǎn)發(fā),轉(zhuǎn)發(fā)之后到存儲(chǔ),Iceberg 可以瞬間把 Schema 給變更掉。
當(dāng)做 Schema 這種 DDL 的時(shí)候,Iceberg 直接維護(hù)了多個(gè)版本的 Schema,然后老的數(shù)據(jù)源完全不動(dòng),新的數(shù)據(jù)寫新的 Schema,實(shí)現(xiàn)一鍵 Schema 隔離。
另外一個(gè)例子是分區(qū)變更的問(wèn)題,Iceberg 做法如上圖所示。
之前按 “月” 做分區(qū)(上方黃色數(shù)據(jù)塊),如果希望改成按 “天” 做分區(qū),可以直接一鍵把 Partition 變更,原來(lái)的數(shù)據(jù)不變,新的數(shù)據(jù)全部按 “天” 進(jìn)行分區(qū),語(yǔ)義做到 ACID 隔離。
3. Case #3:越來(lái)越慢的近實(shí)時(shí)報(bào)表?
第三個(gè)問(wèn)題是小文件對(duì) Metastore 造成的壓力。
首先對(duì)于 Metastore 而言,Iceberg 是把原數(shù)據(jù)統(tǒng)一存到文件系統(tǒng)里,然后用 metadata 的方式維護(hù)。整個(gè)過(guò)程其實(shí)是去掉了中心化的 Metastore,只依賴文件系統(tǒng)擴(kuò)展,所以擴(kuò)展性較好。
另一個(gè)問(wèn)題是小文件越來(lái)越多,導(dǎo)致數(shù)據(jù)掃描會(huì)越來(lái)越慢。在這個(gè)問(wèn)題上,F(xiàn)link 和 Iceberg 提供了一系列解決方案:
第一個(gè)方案是在寫入的時(shí)候優(yōu)化小文件的問(wèn)題,按照 Bucket 來(lái) Shuffle 方式寫入,因?yàn)?Shuffle 這個(gè)小文件,寫入的文件就自然而然的小。
第二個(gè)方案是批作業(yè)定期合并小文件。
第三個(gè)方案相對(duì)智能,就是自動(dòng)增量地合并小文件。
4. Case #4:實(shí)時(shí)地分析CDC數(shù)據(jù)很困難
首先是是全量跟增量數(shù)據(jù)同步的問(wèn)題,社區(qū)其實(shí)已有 Flink CDC Connected 方案,就是說(shuō) Connected 能夠自動(dòng)做全量跟增量的無(wú)縫銜接。
第二個(gè)問(wèn)題是在同步過(guò)程中,如何保證 Binlog 一行不少地同步到湖中, 即使中間碰到異常。
對(duì)于這個(gè)問(wèn)題,F(xiàn)link 在 Engine 層面能夠很好地識(shí)別不同類型的事件,然后借助 Flink 的 exactly once 的語(yǔ)義,即使碰到故障,它也能自動(dòng)做恢復(fù)跟處理。
第三個(gè)問(wèn)題是搭建整條鏈路需要做不少代碼開(kāi)發(fā),門檻太高。
在用了 Flink 和 Data Lake 方案后,只需要寫一個(gè) source 表和 sink 表,然后一條 INSERT INTO,整個(gè)鏈路就可以打通,無(wú)需寫任何業(yè)務(wù)代碼。
最后是存儲(chǔ)層面如何支持近實(shí)時(shí)的 CDC 數(shù)據(jù)分析。
四、社區(qū) Roadmap
上圖為 Iceberg 的 Roadmap,可以看到 Iceberg 在 2019 年只發(fā)了一個(gè)版本, 卻在 2020 年直接發(fā)了三個(gè)版本,并在 0.9.0 版本就成為頂級(jí)項(xiàng)目。
上圖為 Flink 與 Iceberg 的 Roadmap,可以分為 4 個(gè)階段。
第一個(gè)階段是 Flink 與 Iceberg 建立連接。
第二階段是 Iceberg 替換 Hive 場(chǎng)景。在這個(gè)場(chǎng)景下,有很多公司已經(jīng)開(kāi)始上線,落地自己的場(chǎng)景。
第三個(gè)階段是通過(guò) Flink 與 Iceberg 解決更復(fù)雜的技術(shù)問(wèn)題。
第四個(gè)階段是把這一套從單純的技術(shù)方案,到面向更完善的產(chǎn)品方案角度去做。

































 
 
 










 
 
 
 