秒級(jí)響應(yīng)!B站基于 Iceberg 的湖倉一體平臺(tái)構(gòu)建實(shí)踐
一、背景
我們使用 Iceberg 構(gòu)建湖倉一體平臺(tái)的初衷是希望解決業(yè)務(wù)方在使用 Hive 數(shù)倉時(shí)的一些痛點(diǎn)。主要包括以下幾大方面:
(1)Hive 的查詢性能達(dá)不到交互式分析的要求,所以經(jīng)常需要把 Hive 的數(shù)據(jù)儲(chǔ)存到其它引擎當(dāng)中。
(2)上一點(diǎn)造成了出倉鏈路越來越多,越來越復(fù)雜,維護(hù)成本高。
(3)另外,出倉的數(shù)據(jù)容易形成數(shù)據(jù)孤島,造成數(shù)據(jù)冗余,導(dǎo)致存儲(chǔ)成本上漲。
(4)最后,Hive 的時(shí)效性不好,即使用 FIink 流式的引擎寫入,延遲也會(huì)在小時(shí)級(jí)別。
我們希望我們的湖倉一體平臺(tái)能夠解決這些痛點(diǎn),我們的目標(biāo)是:
(1)首先,平臺(tái)要是互聯(lián)互通的,要支持各種引擎的訪問,避免數(shù)據(jù)孤島的出現(xiàn)。
(2)第二,查詢要高效,以滿足交互式分析的要求。
(3)第三,使用要盡可能的便捷,盡可能降低業(yè)務(wù)方的門檻。
我們的湖倉一體架構(gòu)如上圖所示,采用 Iceberg 來存儲(chǔ)數(shù)據(jù),數(shù)據(jù)是在 HDFS 上。入湖的幾條鏈路包括 FIink、Spark 引擎來寫入,也提供 java 的 API,業(yè)務(wù)方可以直接通過 API 來寫入數(shù)據(jù),后臺(tái)有一個(gè)叫做 Magnus 的服務(wù)對(duì) Iceberg 的數(shù)據(jù)進(jìn)行不斷的優(yōu)化。另外我們也用 Alluxio 來對(duì)數(shù)據(jù)進(jìn)行緩存加速。我們使用 Trino 來進(jìn)行交互式分析,對(duì)外提供查詢接口。寫入 Iceberg 的數(shù)據(jù)有一部分是要繼續(xù)寫入下游的 Iceberg 表。一般是數(shù)倉的分層建模的場(chǎng)景。雖然我們減少了 Hive 出倉的鏈路,但是有一些場(chǎng)景可能 Trino 的查詢還是達(dá)不到響應(yīng)時(shí)間的要求。比如毫秒級(jí)的響應(yīng),可能還是會(huì)出倉到 ClickHouse、ES 等其它存儲(chǔ)中。
下面簡(jiǎn)單介紹一下 Iceberg 的表結(jié)構(gòu),以及我們?yōu)槭裁催x Iceberg 作為存儲(chǔ)格式。
Iceberg 有文件級(jí)別的元數(shù)據(jù)管理。它基于 snapshot 來做多版本的控制。每一個(gè) snapshot 對(duì)應(yīng)一組 manifest,每一個(gè) manifest 再對(duì)應(yīng)具體的數(shù)據(jù)文件。我們選 Iceberg 的一個(gè)比較重要的原因是其開放的存儲(chǔ)格式。它有著比較好的 API 和存儲(chǔ)規(guī)范的定義,方便我們?cè)诤罄m(xù)對(duì)它做一些功能上的擴(kuò)展。
二、查詢加速
接下來介紹我們目前的一些比較重要的工作。其中最核心的一項(xiàng)是查詢加速。
因?yàn)槲覀兠鎸?duì)的是 OLAP 的場(chǎng)景,一般是會(huì)有過濾條件的。所以我們第一個(gè)思路是如何盡可能過濾掉不需要掃描的數(shù)據(jù)。Iceberg 在文件級(jí)別記錄了每一個(gè)列的一些統(tǒng)計(jì)信息,比如說 MinMax 值,這些統(tǒng)計(jì)可以在查詢計(jì)劃階段就把一些不需要的文件過濾掉。我們很直觀的一個(gè)想法是,如果對(duì)數(shù)據(jù)進(jìn)行排序,就會(huì)讓相同的數(shù)據(jù)有更好的聚集效果,在過濾的時(shí)候就會(huì)過濾掉更多的文件。
所以我們最早是做了多維的排序。過濾字段可能有多個(gè),不能用簡(jiǎn)單的線性排序來做。線性排序只對(duì)靠前的排序字段有比較好的聚集效果。所以我們比較了 Z-ORDER 和 Hibert Curve 這兩種排序方式。從多維排序的實(shí)現(xiàn)來比較,發(fā)現(xiàn) Hibert 的聚集性會(huì)更好一點(diǎn)。所以我們目前都是采用 Hibert 的方式。不管是 Z-ORDER 還是 Hibert ,都要求參與排序的字段是一個(gè)整型值。對(duì)于非整型的數(shù)據(jù),我們用 Boundary Index 的方式來參與計(jì)算。
我們會(huì)把數(shù)據(jù)按照需要多少區(qū)間,來切出不同的 Boundary。根據(jù)它的 Boundary Index 來參與 Z-ORDER 和 Hibert Curve 的計(jì)算。
有了排序以后,另一個(gè)問題是多維的排序字段是不可以無限增加的。一般來說排序字段的個(gè)數(shù)越多,其聚集效果會(huì)越差。我們對(duì)業(yè)務(wù)方的建議是一般不要超過四個(gè)排序字段。如果有更多的過濾字段怎么辦?我們考慮到對(duì)于一些基數(shù)比較高的過濾字段,不去做排序,而是通過創(chuàng)建索引的方式,也能有一個(gè)比較好的過濾效果。
我們實(shí)現(xiàn)的索引是為了判斷一個(gè)數(shù)據(jù)文件是否滿足查詢條件的要求。所以我們的索引是文件級(jí)別的,一個(gè)表可以針對(duì)不同的列創(chuàng)建不同的索引。一個(gè) DataFile 可能會(huì)關(guān)聯(lián)多個(gè)索引文件,我們把索引文件和 DataFile 的元數(shù)據(jù)一起存儲(chǔ)在 manifest 里。
下面介紹一下我們支持的索引種類:
(1)BloomFilter:計(jì)算比較簡(jiǎn)單,占用空間也比較小。存在 false positive 的問題,只支持等值的查詢。
(2)Bitmap:功能更強(qiáng)大,支持等值和范圍查詢,匹配更精準(zhǔn),更精準(zhǔn)是因?yàn)榭梢詫?duì)多個(gè)條件匹配到的數(shù)據(jù)進(jìn)行交并補(bǔ)計(jì)算,同時(shí)它返回的行號(hào)也可以幫助進(jìn)一步 skip 數(shù)據(jù)。Bitmap 的缺點(diǎn)是占用空間比較大,尤其是對(duì)一些高基數(shù)的字段,創(chuàng)建 Bitmap 索引,可能加載索引的時(shí)間已經(jīng)超過了過濾掉數(shù)據(jù)所節(jié)約的時(shí)間,甚至?xí)a(chǎn)生一些負(fù)向的效果。
(3)BloomRF:我們參考一篇論文,實(shí)現(xiàn)了一種 BloomRF 索引,它與 BloomFilter 的原理類似,但是用了多段的有序哈希函數(shù)來支持等值和范圍的查詢。它的存儲(chǔ)開銷也與 BloomFilter 類似。其問題也是會(huì)有 false positive。
(4)TokenBloomFilter、NgramBloomFilter,TokenBitmap、NgramBitmap:是針對(duì) token 的索引,是為日志場(chǎng)景設(shè)計(jì)的。相當(dāng)于對(duì)日志做一些分詞的操作。分詞完成以后,構(gòu)建 BloomFilter 或者 Bitmap 這樣的索引。TokenBloomFilter 和 TokenBitmap 針對(duì)的是英文的分詞,Ngram 針對(duì)的是中文的分詞。
除了索引以外,我們也在做對(duì)預(yù)計(jì)算的支持,內(nèi)部叫做 Cube,或者 AggIndex,是針對(duì)聚合計(jì)算的加速。目前支持單表和星型模型的查詢。一個(gè) Cube 的定義,主要定義兩個(gè)信息:一個(gè)是 Cube 的維度字段;另一個(gè)是 Cube 需要的聚合計(jì)算,常見的如 count、min、max、count distinct 等都是支持的。另外聚合是做在文件級(jí)別的。
舉一個(gè)例子:
它是一個(gè)星型模型,lineorder 表是事實(shí)表,會(huì)關(guān)聯(lián) dates 、part 和 supplier 維表。如果要對(duì)這樣一個(gè)查詢場(chǎng)景去定義 Cube,所有需要在 group by 、where 語句中使用的字段都要作為維度字段。大家可以看到預(yù)計(jì)算是定義在事實(shí)表上的。它的預(yù)計(jì)算的定義是跟 lineorder 表關(guān)聯(lián)的。但是這里使用到的一些列可能是有維表當(dāng)中的列。我們做了一個(gè)叫做關(guān)聯(lián)列的實(shí)現(xiàn)。事實(shí)表不僅可以用關(guān)聯(lián)列來定義 Cube,同時(shí)也能用關(guān)聯(lián)列對(duì)事實(shí)表的數(shù)據(jù)來進(jìn)行排序和索引。像查詢里,p_brand 上有一個(gè)過濾條件,Cube 數(shù)據(jù)也可以用到索引來進(jìn)行過濾。上面的過濾條件也可以用來過濾事實(shí)表的數(shù)據(jù)。
定義了 Cube 以后,Magnus 服務(wù)會(huì)在后臺(tái)去負(fù)責(zé) Cube 文件的生成。因?yàn)槭俏募?jí)別的聚合,所以生成的邏輯是每一個(gè)文件會(huì)去關(guān)聯(lián)其他的文件。比如這是事實(shí)表當(dāng)中的一個(gè) DataFile,它會(huì)去關(guān)聯(lián)三張維表。這三張維表關(guān)聯(lián)完以后會(huì)計(jì)算聚合值,最終會(huì)生成一個(gè) CubeFile。CubeFile 與索引的情況類似,它會(huì)跟 DataFile 關(guān)聯(lián)起來,一起保存在 Manifest 當(dāng)中。
對(duì)聚合值的處理,因?yàn)槲覀冏龅氖俏募?jí)別的聚合。所以真正查詢的時(shí)候,還需要把文件級(jí)別的聚合再做 global merge, 才能得到最終的一個(gè)聚合效果。這里分兩種情況:
一種是可以直接累加的一些聚合值,如 min、max、count,在生成 Cube 文件的時(shí)候,可以直接存儲(chǔ)聚合結(jié)果;有一些不能直接累加,比如 Average,存儲(chǔ)的是中間狀態(tài)。查詢時(shí)需要判斷能否用 Cube 來響應(yīng),比如下圖中展示的查詢:
它是一個(gè)原始的邏輯計(jì)劃。我們會(huì)去找查詢當(dāng)中的 aggregation 節(jié)點(diǎn)。對(duì)于 aggregation 節(jié)點(diǎn),判斷其 source 表中是否存在一個(gè) Cube 能滿足聚合計(jì)算的要求。如果找到,會(huì)把邏輯計(jì)劃進(jìn)行轉(zhuǎn)換。轉(zhuǎn)換完以后,原來的 table scan 就會(huì)切換成 Cube 模式,就不去讀原始的數(shù)據(jù)了,而是去讀 Cube 文件的數(shù)據(jù)。因?yàn)?Cube 文件是異步生成的,所以就肯定會(huì)存在一種情況,可能有一些文件已經(jīng)構(gòu)建了 Cube,有一些文件可能還沒有生成 Cube。查詢改寫這一側(cè)會(huì)稍微有一點(diǎn)不一樣。對(duì)于這種情況,我們的處理思路是把有 Cube 的部分,保持跟原來一樣的改寫方式;沒有 Cube 的部分,現(xiàn)場(chǎng)把 Cube 的數(shù)據(jù)算出來,與已有 Cube 的數(shù)據(jù)做一次 union 以后,再做 global merge,這樣可以得到一個(gè)最終的結(jié)果。
當(dāng)然這個(gè)做法只適用于只有少量文件還沒有 Cube 的情況。如果大部分文件都沒有 Cube,那么直接退化成原始的計(jì)算會(huì)更好。
Cube 做好之后,我們目前在探索用 star-tree index 對(duì) Cube 來做一個(gè)增強(qiáng)。我們參考了 Apache Pinot 的實(shí)現(xiàn)。
要解決的問題是,Cube 是可以響應(yīng)不同的維度組合的。比如 Cube 的定義可能選了三個(gè)維度,查詢的時(shí)候只用到了其中的兩個(gè)或者一個(gè),Cube 也是可以響應(yīng)的。所以從節(jié)省存儲(chǔ)的角度來說,用最細(xì)粒度的維度來定義 Cube。這樣只需要一個(gè) Cube,就可以響應(yīng)所有維度組合的查詢。
但是如果維度選的比較多,生成的 Cube,它的數(shù)據(jù)量也會(huì)比較大。而且維度多了以后,聚合效果會(huì)變差。如果用最細(xì)粒度定義的 Cube,去響應(yīng)很少維度的查詢,中間還需要額外做很多聚合的計(jì)算。
如果針對(duì)每一個(gè)查詢都去定義特定的 Cube,可以保證查詢的時(shí)候 Cube 一定是最優(yōu)的。但是它的問題是所需要的存儲(chǔ)成本就會(huì)比較高,所有不同的組合,都要實(shí)現(xiàn),生成不同的 Cube 文件。
Star-Tree Index 希望在兩者之間做一個(gè)折中。針對(duì)我們的 Cube 生成 Star-Tree Index 這樣一個(gè)數(shù)據(jù)結(jié)構(gòu)。
舉一個(gè)例子,比如我的 Cube 的定義是 Dim1、Dim2、Dim3 這三個(gè)字段,聚合值是 count。雖然維度一共有三個(gè),但是常用的可能是 Dim1、Dim2 這兩個(gè)。這時(shí)候就可以按照 Dim1、Dim2 指定這兩個(gè)維度字段來生成 star tree。star tree 是一個(gè)多叉樹,每一層對(duì)應(yīng)一個(gè)維度。每一層的節(jié)點(diǎn)是當(dāng)前這一個(gè)維度的取值。比如 Dim1 的取值是 1、2、3,Dim2 的取值是 a、 b 、c。Star-Tee Index 會(huì)針對(duì)不同的取值來構(gòu)造樹的節(jié)點(diǎn)。每一層還會(huì)有一個(gè)特殊的 star 節(jié)點(diǎn),star 節(jié)點(diǎn)的含義是忽略掉這一層的取值,或者我們認(rèn)為 star 是一個(gè)通配符。全部聚合在一起以后,它的聚合的結(jié)果是多少。對(duì)于 star 節(jié)點(diǎn),會(huì)額外生成一些 star record。star 節(jié)點(diǎn)下面的這些節(jié)點(diǎn)都會(huì)生成具體的一個(gè) star record。比如例子里面,Dim1 取值是 “*” 的時(shí)候,Dim2 可能有 a、d 這兩種。如果查詢當(dāng)中只用到了 Dim2 這一個(gè)維度,那么可以通過 star record 來進(jìn)行響應(yīng)。因?yàn)槲抑恍枰紤] Dim1 為 “*” 的情況。
三、智能優(yōu)化
介紹完查詢加速以后,再來講一下我們目前做的智能優(yōu)化的一些工作。
針對(duì)的是我們的 Magnus 服務(wù)。我們最根本的目標(biāo)是希望盡可能降低用戶的使用門檻。比如 Hive 用戶,他可能需要了解一些大數(shù)據(jù)的原理;小文件多了,應(yīng)該怎么處理,可能需要做一些合并;Hive 表應(yīng)該怎么做分桶,文件內(nèi)部怎么做排序。我們目前所處的一個(gè)階段,叫做自動(dòng)化的階段。用戶不需要知道這么多底層的知識(shí)。但是他還是需要告訴我一些業(yè)務(wù)上的邏輯。比如常用的過濾字段是哪些,常用的聚合的模型是什么樣子的。我們?cè)俑鶕?jù)用戶提供的信息來自動(dòng)幫他去創(chuàng)建索引,去創(chuàng)建 Cube。
最終我們是希望進(jìn)一步簡(jiǎn)化,用戶只是建表。表在建出來的使用過程當(dāng)中,我們可以對(duì)它做一個(gè)智能的持續(xù)的優(yōu)化。Magnus 服務(wù)就是以此為目的來開發(fā)的。它主要負(fù)責(zé)的功能包括:
(1)一個(gè)是自動(dòng)的后臺(tái)優(yōu)化,目前所有 Iceberg 表的寫入操作,Magnus 都會(huì)監(jiān)聽,當(dāng)監(jiān)聽到寫入事件后,它會(huì)根據(jù)自己內(nèi)部的一些調(diào)度邏輯,通過 spark 任務(wù)對(duì)表進(jìn)行一些操作,比如排序、創(chuàng)建索引、構(gòu)建 Cube 等。
(2)另一個(gè)比較重要的功能是,它可以幫助我們把 Iceberg 表的一些詳情做一個(gè)圖形化的展示,便于我們定位和排查問題。比如下圖中顯示的一張 Iceberg 表。
可以看到表是定義了排序字段的,在界面上可以看到它某一個(gè)分區(qū)下有多少個(gè)文件,這些文件有哪些已經(jīng)按照用戶的要求做了排序,有哪些已經(jīng)按照用戶的要求去構(gòu)建了索引等等。
(3)第三個(gè)功能是智能化的推薦。實(shí)現(xiàn)方式是使用 Trino 把查詢明細(xì)全部落庫。
查詢明細(xì)當(dāng)中包含了每張表用到的過濾字段,Magnus 服務(wù)會(huì)去定期去分析這些查詢明細(xì),結(jié)合用戶的歷史查詢以及 Iceberg 表本身的統(tǒng)計(jì)信息。當(dāng)然有一些統(tǒng)計(jì)信息可能是需要用 Trino 去現(xiàn)場(chǎng)計(jì)算出來的。結(jié)合這些信息以后,會(huì)給出一些優(yōu)化建議。
上面的例子展示的是 Magnus 對(duì)某一張表的一次優(yōu)化建議??梢钥吹奖砝锩嬗脩粼臼嵌x了排序和索引字段的。Magnus 分析結(jié)果來看,首先是排序可以增加幾個(gè)字段,同時(shí)可以刪掉一些不必要的字段。索引也是可以去掉一些用不到的索引。后續(xù)我們會(huì)考慮根據(jù)推薦去驗(yàn)證效果。如果效果好,后面可以考慮去自動(dòng)幫助用戶進(jìn)行修改。
四、現(xiàn)狀
最后來介紹一下我們目前落地的情況。
目前主要場(chǎng)景包括 BI 報(bào)表、指標(biāo)服務(wù)、A/B Test、人群圈選和日志等。
Iceberg 表總量大約為 5PB,日增 75TB。Trino 查詢每天在 20 萬左右,P95 的響應(yīng)時(shí)間是 5 秒。我們給自己的定位為秒級(jí)到 10 秒級(jí)。過濾的數(shù)據(jù)量(估算)為 500TB/ 天,占比約 100%~200%。