技術(shù)干貨|阿里云基于Hudi構(gòu)建Lakehouse實(shí)踐探索
一、數(shù)據(jù)湖與Lakehouse
2021年開(kāi)發(fā)者大會(huì)上,我們的一位研究員分享的一個(gè)議題,提到了很多數(shù)據(jù),主要想闡述的是行業(yè)發(fā)展到現(xiàn)在這個(gè)階段,數(shù)據(jù)的膨脹非常厲害,數(shù)據(jù)增速非??膳?。無(wú)論是數(shù)據(jù)規(guī)模還是生產(chǎn)處理的實(shí)時(shí)化,到生產(chǎn)處理的智能化,以及數(shù)據(jù)加速上云的云化過(guò)程。
這些數(shù)據(jù)來(lái)自Gartner、IDC的分析,都是行業(yè)最權(quán)威的分析報(bào)告里沉淀總結(jié)出來(lái)的。這就意味著我們?cè)跀?shù)據(jù)領(lǐng)域尤其是分析領(lǐng)域的機(jī)遇和挑戰(zhàn)都很大。
在海量的數(shù)據(jù)上,我們要真正做好數(shù)據(jù)價(jià)值的挖掘和使用會(huì)面臨很多挑戰(zhàn),第一是現(xiàn)有的架構(gòu)慢慢都要往云上架構(gòu)遷移;第二個(gè)是數(shù)據(jù)量;第三個(gè)是Serverless按量付費(fèi),慢慢從嘗試性的選擇成為默認(rèn)選擇;第四是還有多樣化的應(yīng)用、異構(gòu)數(shù)據(jù)源。相信大家接觸過(guò)云都知道,無(wú)論是哪個(gè)云廠商都有很多種云服務(wù)可供,尤其是數(shù)據(jù)類服務(wù)數(shù)量繁多。這時(shí)候,大量數(shù)據(jù)源一定會(huì)帶來(lái)一個(gè)問(wèn)題:分析難度大,尤其是想做關(guān)聯(lián)分析的時(shí)候,異構(gòu)數(shù)據(jù)源怎么連接起來(lái)是很大的問(wèn)題。其次是差異化的數(shù)據(jù)格式,通常我們做數(shù)據(jù)寫入會(huì)時(shí)選擇方便、簡(jiǎn)單的格式,比如CSV、Json格式,但對(duì)分析來(lái)講,這些格式往往是非常低效的,尤其是數(shù)據(jù)到了TB、PB級(jí)的時(shí)候,根本沒(méi)法分析。這時(shí)候Parquet、ORC等面向分析的列存格式就衍生出來(lái)了。當(dāng)然還包括鏈路安全以及差異化群體等等,數(shù)據(jù)量膨脹的過(guò)程中又增加了很多的分析難度。
在真實(shí)的客戶場(chǎng)景里,很多數(shù)據(jù)已經(jīng)上云了和“入湖”了。湖是什么?我們對(duì)湖的定義和理解更像AWS的S3或者阿里云OSS這種對(duì)象存儲(chǔ),是簡(jiǎn)單易用的API形式,可以存各種各樣差異化的數(shù)據(jù)格式,有無(wú)限的容量、按量付費(fèi)等非常多的好處。之前想基于湖做分析非常麻煩,很多時(shí)候要做T+1的建倉(cāng)和各種云服務(wù)投遞。有時(shí)候數(shù)據(jù)格式不對(duì)就要人肉做ETL,如果數(shù)據(jù)已經(jīng)在湖里要做元信息發(fā)現(xiàn)分析等等,整個(gè)運(yùn)維鏈路很復(fù)雜,問(wèn)題也很多。這里都是線上客戶實(shí)際面臨的離線數(shù)據(jù)湖問(wèn)題,有些優(yōu)先級(jí)偏高,有些低些,總而言之問(wèn)題非常多。
其實(shí)Databricks大概19年就開(kāi)始將研究重點(diǎn)從Spark方向,慢慢往Lakehouse方向調(diào)整了。他們發(fā)表了兩篇論文,這兩篇論文為數(shù)據(jù)湖怎么被統(tǒng)一訪問(wèn)、怎么被更好地訪問(wèn)提供了理論層面的定義。
基于Lakehouse的新概念,想做到的是屏蔽格式上的各種差異,為不同的應(yīng)用提供統(tǒng)一的接口以及更加簡(jiǎn)化的數(shù)據(jù)訪問(wèn)、數(shù)據(jù)分析能力。架構(gòu)說(shuō)實(shí)現(xiàn)數(shù)據(jù)倉(cāng)、數(shù)據(jù)湖、Lakehouse一步步演進(jìn)。
他的兩篇論文闡述了很多新概念:第一,怎么設(shè)計(jì)和實(shí)現(xiàn)MVCC,能讓離線數(shù)倉(cāng)也有像數(shù)據(jù)庫(kù)一樣的MVCC能力,從而滿足大部分對(duì)批事務(wù)的需求;第二,提供不同的存儲(chǔ)模式,能夠適應(yīng)不同的讀和寫Workload;第三,提供一些近實(shí)時(shí)的寫入和合并能力,為數(shù)據(jù)量提供鏈路能力??傊?,他的思路能夠較好解決離線數(shù)據(jù)分析的難題。
目前業(yè)界有三款產(chǎn)品相對(duì)比較流行,第一個(gè)是Delta Lake,它是Databricks自己發(fā)布的數(shù)據(jù)湖管理協(xié)議;第二個(gè)是Iceberg,Iceberg也是Apache的一個(gè)開(kāi)源項(xiàng)目;第三個(gè)是Hudi,Hudi最早由Uber內(nèi)部研發(fā),后來(lái)開(kāi)源的項(xiàng)目(早期用得比較多的是Hive的ACID)。目前這三個(gè)產(chǎn)品因?yàn)榭梢詫?duì)接HDFS的API,可以適配底層的湖存儲(chǔ),而OSS又可以適配到HDFS存儲(chǔ)接口。由于核心原理相似,三個(gè)產(chǎn)品各方面的能力都在逐漸靠近,同時(shí)有了論文做理論支撐,我們才會(huì)有方向去實(shí)踐。
對(duì)我們來(lái)說(shuō),當(dāng)時(shí)選擇Hudi也是因?yàn)槠洚a(chǎn)品成熟度方面的原因,還有它面向數(shù)據(jù)庫(kù)方面的數(shù)據(jù)入湖能力,形態(tài)上比較滿足我們?cè)跀?shù)據(jù)庫(kù)團(tuán)隊(duì)做CDC方面的業(yè)務(wù)需求。
Hudi早期的定義是Hadoop Updates anD Incrementals的縮寫,后面是面向Hadoop的Update、Delete、Insert的概念,核心邏輯是事務(wù)版本化、狀態(tài)機(jī)控制和異步化執(zhí)行,模擬整個(gè)MVCC的邏輯,提供對(duì)于內(nèi)部列存文件比如Parquet、ORC等對(duì)象列表的增量式管理,實(shí)現(xiàn)高效的存儲(chǔ)讀寫。它和Databricks定義的Lakehouse概念很相似,不謀而合,Iceberg也是一樣,它的能力也在逐步往這個(gè)方向提升。
Hudi官方網(wǎng)站對(duì)外提供的架構(gòu)是這樣的形態(tài)。之前我們做技術(shù)選型、調(diào)研的時(shí)候發(fā)現(xiàn)很多同行也已經(jīng)充分使用Hudi做數(shù)據(jù)入湖和離線數(shù)據(jù)管理的方案選型。第一,因?yàn)楫a(chǎn)品比較成熟;第二,它符合我們CDC的需求;第三,Delta Lake有一套開(kāi)源版本,一套內(nèi)部?jī)?yōu)化版本,對(duì)外只提供開(kāi)源版本,我們認(rèn)為它不一定把最好的東西呈現(xiàn)。Iceberg起步比較晚,早期相比其他兩個(gè)產(chǎn)品能力不太完全,所以沒(méi)有考慮它。因?yàn)槲覀兌际荍ava團(tuán)隊(duì),也有自己的Spark產(chǎn)品,Hudi正好比較契合我們用自己的runtime支持?jǐn)?shù)據(jù)入湖的能力,因此也就選擇了Hudi。
當(dāng)然我們也一直在關(guān)注這三個(gè)產(chǎn)品的發(fā)展,后來(lái)國(guó)內(nèi)的一個(gè)開(kāi)源項(xiàng)目StarLake,也是做類似的事情。每種產(chǎn)品都在進(jìn)步,長(zhǎng)期來(lái)看能力基本對(duì)齊,我覺(jué)得會(huì)和論文里定義的能力慢慢吻合。
“以開(kāi)源Hudi為列式、多版本格式為基礎(chǔ),將異構(gòu)數(shù)據(jù)源增量、低延遲入湖,存儲(chǔ)在開(kāi)放、低成本的對(duì)象存儲(chǔ)上,并且在這個(gè)過(guò)程中要實(shí)現(xiàn)數(shù)據(jù)布局優(yōu)化、元信息進(jìn)化的能力,最終實(shí)現(xiàn)離線數(shù)據(jù)統(tǒng)一管理,無(wú)差別支持上面的計(jì)算和分析能力,這是整體的方案。”這是我們對(duì)Lakehouse的理解,以及我們的技術(shù)探索方向。
二、阿里云Lakehouse實(shí)踐
下面介紹一下阿里云Lakehouse的技術(shù)探索和具體的實(shí)踐。首先,大概介紹一下阿里云數(shù)據(jù)庫(kù)團(tuán)隊(duì)近年來(lái)一直提的概念“數(shù)據(jù)庫(kù)、倉(cāng)、湖一體化”戰(zhàn)略。
大家都知道數(shù)據(jù)庫(kù)產(chǎn)品分為四個(gè)層次:一是DB;二是NewSQL/NoSQL產(chǎn)品;三是數(shù)倉(cāng)產(chǎn)品;四是湖數(shù)據(jù)產(chǎn)品。越往上數(shù)據(jù)的價(jià)值密度越大,會(huì)以元表元倉(cāng)形式的數(shù)據(jù)關(guān)聯(lián)到分析中去,比如DB數(shù)據(jù)格式非常簡(jiǎn)單、清晰; 越往下數(shù)據(jù)量越來(lái)越龐大,數(shù)據(jù)形式越來(lái)越復(fù)雜,有各種各樣的存儲(chǔ)格式,數(shù)據(jù)湖形式有結(jié)構(gòu)化、半結(jié)構(gòu)化、非結(jié)構(gòu)化,要分析就必須要做一定的提煉、挖掘,才能真正把數(shù)據(jù)價(jià)值用起來(lái)。
四個(gè)存儲(chǔ)方向有各自的領(lǐng)域,同時(shí)又有關(guān)聯(lián)分析訴求,主要就是要打破數(shù)據(jù)孤島,讓數(shù)據(jù)一體化,才能讓價(jià)值更立體化。如果只是做一些日志分析,例如關(guān)聯(lián)的地域、客戶來(lái)源的話,也只是使用了GroupBy或者是Count等相對(duì)簡(jiǎn)單的分析能力。對(duì)于底層數(shù)據(jù),可能要做多次清洗、回流,才能往向在線化、高并發(fā)的場(chǎng)景一層層分析。這里不僅僅直接將數(shù)據(jù)從湖到庫(kù)寫入,也可以到倉(cāng),到NoSQL/NewSQL的產(chǎn)品里,到KV系統(tǒng)里去,利用好在線化的查詢能力,等等。
反過(guò)來(lái)也是一樣,這些數(shù)據(jù)庫(kù)/NewSQL產(chǎn)品甚至數(shù)倉(cāng)中的數(shù)據(jù)也會(huì)向下流動(dòng),構(gòu)建低成本、大容量的存儲(chǔ)備份、歸檔,降低上面的存儲(chǔ)壓力、分析吞吐壓力,且可以形成強(qiáng)大的聯(lián)合分析能力。這也是我自己對(duì)數(shù)據(jù)庫(kù)、倉(cāng)、湖一體化的理解。
剛才講了數(shù)據(jù)庫(kù)的發(fā)展方向和定位,再看看數(shù)據(jù)庫(kù)下面OLAP本身的分層數(shù)倉(cāng)體系中Lakehouse是怎樣的定位。做過(guò)數(shù)倉(cāng)產(chǎn)品的同學(xué)都比我熟悉,(PPT圖示)基本上是這樣的分層體系,最開(kāi)始各種各樣的形態(tài)非數(shù)倉(cāng)或者非數(shù)據(jù)湖系統(tǒng)外面有各種各樣的形式存儲(chǔ)數(shù)據(jù),我們理解通過(guò)Lakehouse的能力,做入湖、建倉(cāng),通過(guò)清洗、沉淀和聚合,形成ODS或者是CDM層,這里做了初步的數(shù)據(jù)聚合和匯總能力,形成數(shù)據(jù)集市的概念。
這些數(shù)據(jù)在阿里云上我們會(huì)基于Hudi的協(xié)議,基于Parquet文件格式存到整個(gè)OSS上面,內(nèi)部通過(guò)ETL把初始數(shù)據(jù)集進(jìn)一步聚合為更清晰、更面向業(yè)務(wù)的數(shù)據(jù)集上,然后再構(gòu)建ETL,往實(shí)時(shí)數(shù)倉(cāng)里導(dǎo)入,等等?;蛘哌@些數(shù)據(jù)集直接面向低頻的交互式分析、BI分析,或面向Spark等引擎做機(jī)器學(xué)習(xí),最終輸出到整個(gè)數(shù)據(jù)應(yīng)用上,這是整體的分層體系。
整個(gè)過(guò)程中,我們會(huì)接入統(tǒng)一的元信息體系。因?yàn)槿绻到y(tǒng)的每個(gè)部分都有自己的術(shù)語(yǔ),都要保留一份自己的元信息,對(duì)OLAP體系來(lái)講是分裂的,因此元信息一定要統(tǒng)一,調(diào)度也是一樣。不同數(shù)據(jù)倉(cāng)層次的表在不同的地方要串聯(lián)起來(lái),一定要有完整、統(tǒng)一的調(diào)度能力。以上是我理解Lakehouse在OLAP體系中的的定位,主要是貼源層,匯聚離線數(shù)據(jù)的能力。
前面介紹了Lakehouse在數(shù)據(jù)庫(kù)和OLAP團(tuán)隊(duì)里的定位,后面重點(diǎn)介紹一下Lakehouse在我們的領(lǐng)域設(shè)計(jì)是怎樣的形態(tài)。因?yàn)橹拔易约河眠^(guò)K8s做分析系統(tǒng)上云,所以對(duì)K8s的很多理念還是比較清楚。
在我們自己設(shè)計(jì)的時(shí)候也試圖參考、學(xué)習(xí)一下K8s的體系。K8s有我們經(jīng)常提到的DevOps概念,這是一種實(shí)踐范式。在這個(gè)范式下會(huì)創(chuàng)建很多實(shí)例,在實(shí)例里會(huì)管理很多應(yīng)用,這些應(yīng)用最終通過(guò)Pod方式被原子性調(diào)度執(zhí)行,Pod里再跑一些業(yè)務(wù)邏輯,各種各樣的Container。
我們認(rèn)為L(zhǎng)akehouse也是一種范式,一種處理離線數(shù)據(jù)的范式。在這里,數(shù)據(jù)集是我們的核心概念,比如要構(gòu)建一套面向某種場(chǎng)景、某個(gè)方向的數(shù)據(jù)集。我們能要定義A、B、C不同數(shù)據(jù)集,在我們看來(lái)這是一個(gè)實(shí)例。圍繞這個(gè)數(shù)據(jù)集編排各種各樣的Workload工作負(fù)載,比如做DB入湖。還有分析優(yōu)化類的Workload,比如索引構(gòu)建,比如像z-ordering、Clustering、Compaction等技術(shù),查詢優(yōu)化能力提升得更好。還有就是Management類型的Workload,比如定期把歷史數(shù)據(jù)清理了,做冷熱存儲(chǔ)分層,因?yàn)镺SS提供了很多這樣的能力,把這些能力用好。最下面一層是各種Job,我們內(nèi)部是基于Spark建設(shè)離線計(jì)算能力,我們把Workload前后編排成小的job,原子的job全部彈性到Spark上執(zhí)行,以上是我們對(duì)于Lakehouse在技術(shù)實(shí)踐中的領(lǐng)域設(shè)計(jì)。
這是整體的技術(shù)架構(gòu)。首先,在云上有各種各樣的數(shù)據(jù)源,通過(guò)編排定義各種各樣的Workload,跑在我們自己的Spark彈性計(jì)算上。核心的存儲(chǔ)是基于Hudi+OSS,我們也支持別的HDFS系統(tǒng),比如阿里云的LindormDFS,內(nèi)部元信息系統(tǒng)管理庫(kù)、表、列等元信息。后面基于K8s調(diào)度所有的管控服務(wù)。上層通過(guò)原生的Hudi接口,對(duì)接計(jì)算、分析能力。這是整個(gè)彈性架構(gòu)。
其實(shí)Serverless Spark是我們的計(jì)算基礎(chǔ),提供作業(yè)級(jí)彈性,因?yàn)镾park本身也支持Spark Streaming,通過(guò)短時(shí)間彈出一個(gè)Spark作業(yè)實(shí)現(xiàn)流計(jì)算。選擇OSS和LindormDFS做存儲(chǔ)基礎(chǔ),主要利用低成本、無(wú)限容量的好處。
在這個(gè)架構(gòu)上,怎么連通用戶的數(shù)據(jù)實(shí)現(xiàn)數(shù)據(jù)入湖到存儲(chǔ)、到分析的能力呢?以上是我們基于VPC構(gòu)建的安全方案。首先我們是共享集群模式,用戶側(cè)可以通過(guò)SDK和VPDN網(wǎng)絡(luò)連接過(guò)來(lái),再由阿里云內(nèi)部網(wǎng)關(guān)打通計(jì)算集群,實(shí)現(xiàn)管理和調(diào)度;再通過(guò)阿里云彈性網(wǎng)卡技術(shù),聯(lián)通用戶的VPC實(shí)現(xiàn)數(shù)據(jù)通路,同時(shí)實(shí)現(xiàn)路由能力和網(wǎng)絡(luò)隔離能力,不同用戶還可能有子網(wǎng)網(wǎng)段沖突問(wèn)題,通過(guò)彈性網(wǎng)卡技術(shù)可以實(shí)現(xiàn)相同網(wǎng)段同時(shí)連接同一個(gè)計(jì)算集群的能力。
用過(guò)阿里云OSS的同學(xué)都知道,OSS本身是阿里云VPC網(wǎng)絡(luò)里的公網(wǎng),它是共享區(qū),不需要復(fù)雜的網(wǎng)絡(luò)。而RDS和Kafka是部署在用戶的VPC里,通過(guò)一套網(wǎng)絡(luò)架構(gòu)就可以實(shí)現(xiàn)多種網(wǎng)絡(luò)打通。對(duì)比VPC網(wǎng)段沖突,共享區(qū)域沒(méi)有這樣的問(wèn)題。其次,數(shù)據(jù)之間隔離,ENI有端到端的限制,比如VPC有ID標(biāo)志、有不同的授權(quán)要求,非法用戶嘗試連接VPC,如果不是這個(gè)網(wǎng)卡則網(wǎng)絡(luò)包無(wú)法聯(lián)通,就可以實(shí)現(xiàn)安全的隔離和數(shù)據(jù)的通路。
網(wǎng)絡(luò)架構(gòu)已經(jīng)確定了,怎么運(yùn)行執(zhí)行呢?在整個(gè)設(shè)計(jì)里,我們會(huì)以K8s的DSL設(shè)計(jì)為榜樣,前面提到會(huì)定義很多入湖任務(wù),一個(gè)Workload可能有很多小任務(wù),這時(shí)候需要類似定義DSL的編排能力,job1、job2、再到j(luò)ob3,定義一套編排腳本;這些編排腳本,通過(guò)SDK、控制臺(tái)等入口提交過(guò)來(lái),再通過(guò)API Server接收并由Scheduler調(diào)度起來(lái)。這個(gè)Scheduler會(huì)和Spark的網(wǎng)關(guān)之間打通,實(shí)現(xiàn)任務(wù)管理、狀態(tài)管理、任務(wù)分發(fā)等,最終調(diào)度內(nèi)部的K8s拉起作業(yè)來(lái)執(zhí)行。有些全量作業(yè)跑一次,比如DB拉一次就行了,還有常駐的流式作業(yè)、有觸發(fā)式的異步作業(yè)、定時(shí)異步作業(yè)等等,不同的形態(tài)相同的調(diào)度能力,從而可以擴(kuò)展。過(guò)程中有作業(yè)狀態(tài)持續(xù)反饋狀態(tài)、間隙性統(tǒng)計(jì)等等。在K8s里,K8s Master承擔(dān)了這樣的角色,同樣有API Server和Scheduler的角色。在我們這里也是類似,也是通過(guò)一主多從架構(gòu)實(shí)現(xiàn)調(diào)度能力HA機(jī)制等等。
在這里,為什么我們要把一個(gè)Workload面向用戶側(cè)的任務(wù)拆成N個(gè)不同的job?因?yàn)檫@些任務(wù)完全放在一個(gè)進(jìn)程里跑,整個(gè)Workload的水位變化非常大,做彈性調(diào)度非常難。全量任務(wù)跑一次就可以了,但是配多少資源合適呢?很多時(shí)候Spark沒(méi)有那么靈活,尤其是異步任務(wù)和定時(shí)任務(wù)拉起來(lái)消耗很大,但是用完之后又不知道下一次什么時(shí)候來(lái),很難預(yù)測(cè)。就像很多信號(hào)系統(tǒng)處理里,需要做傅里葉變換一樣,把復(fù)雜的波型拆成多個(gè)簡(jiǎn)單的波型,信號(hào)處理就簡(jiǎn)單起來(lái)。我們也是有這樣感性的理解。用不同的Job來(lái)執(zhí)行Workload中不同角色的任務(wù),就很容易實(shí)現(xiàn)彈性能力。像定時(shí)或臨時(shí)性的觸發(fā)Job,臨時(shí)拉一個(gè)job,資源消耗與常駐的流式任務(wù)完全無(wú)關(guān),就可以完全不影響流式任務(wù)的穩(wěn)定性、入湖延遲等等。這是設(shè)計(jì)背后的思考,就是讓復(fù)雜的問(wèn)題簡(jiǎn)單化。因?yàn)榛趶椥缘慕嵌葋?lái)講,拆得波形越簡(jiǎn)單,彈性就會(huì)更好做,預(yù)測(cè)也會(huì)簡(jiǎn)單一點(diǎn)。
入湖里會(huì)涉及很多用戶的賬密信息,因?yàn)椴皇撬性飘a(chǎn)品都以AWS的IAM或阿里云的RAM等系統(tǒng)來(lái)構(gòu)建完全云化的資源權(quán)限控制。很多產(chǎn)品還是以賬密方式做認(rèn)證和授權(quán)管理,包括用戶自建的系統(tǒng),數(shù)據(jù)庫(kù)系統(tǒng)等等。這樣,用戶要把所有的連接賬密都交給我們,怎么更安全的管理它們?我們是基于阿里云的兩套體系:一套是KMS,以硬件級(jí)數(shù)據(jù)加密體系來(lái)加密用戶數(shù)據(jù);第二套是STS,完全云化的三方鑒權(quán)能力,實(shí)現(xiàn)用戶數(shù)據(jù)的安全訪問(wèn),尤其是敏感數(shù)據(jù)的隔離或者保護(hù)的機(jī)制,這就是我們現(xiàn)在的整個(gè)體系。
還有一個(gè)問(wèn)題,不同用戶之間通過(guò)各種機(jī)制完全隔離開(kāi)了,但是同一個(gè)用戶有很多的任務(wù)。在Lakehouse概念中有四層結(jié)構(gòu),一個(gè)數(shù)據(jù)集下面有多個(gè)庫(kù),庫(kù)下面有多個(gè)表,表下面有不同的分區(qū),分區(qū)下面是不同的數(shù)據(jù)文件。用戶有子賬號(hào)體系、有各種不同的作業(yè),因此操作數(shù)據(jù)時(shí)可能會(huì)出現(xiàn)相互影響。
比如不同的入湖任務(wù)都想要寫同一張表,線上A任務(wù)已經(jīng)正常運(yùn)行了,結(jié)果另外的用戶配置了B任務(wù),也要寫入同一個(gè)空間,這就有可能把已經(jīng)上線的A任務(wù)數(shù)據(jù)全部沖掉,這是很危險(xiǎn)的事情。還有其他用戶刪除作業(yè)的行為,可能會(huì)刪掉線上正在運(yùn)行任務(wù)的數(shù)據(jù),有可能其他任務(wù)還在訪問(wèn),但又不能感知它;還比如通過(guò)別的云服務(wù)、或是VPC內(nèi)別的程序、自己部署的服務(wù)等等,都可能操作這個(gè)表,導(dǎo)致數(shù)據(jù)出問(wèn)題。因此我們?cè)O(shè)計(jì)了一整套機(jī)制,一方面是在表級(jí)別實(shí)現(xiàn)鎖的機(jī)制,如果有任務(wù)最早就占有一張數(shù)據(jù)寫入權(quán)限時(shí),后面的任務(wù)在這個(gè)任務(wù)生命周期結(jié)束之前,都不允許再寫入,不可以寫臟了。
另一方面基于OSS的Bucket Policy能力,構(gòu)建不同程序的權(quán)限校驗(yàn)?zāi)芰?。只允許Lakehouse的的任務(wù)有權(quán)限寫數(shù)據(jù),而其他程序不允許寫,但其他程序可以讀。同一個(gè)賬號(hào)的這些數(shù)據(jù)本來(lái)就是為了共享、為了分析,為了各種應(yīng)用場(chǎng)景的接入,就是可以讀,但絕對(duì)不可以污染它。我們?cè)谶@些方面做了可靠性工作。
我們更多講的架構(gòu)體系,回到整體看一下怎么理解數(shù)據(jù)模型,我們認(rèn)為整個(gè)過(guò)程是以行為中心(因?yàn)閿?shù)倉(cāng)還是一行行的數(shù)據(jù),存儲(chǔ)在表的范圍內(nèi)),以行數(shù)據(jù)構(gòu)建統(tǒng)一入湖、存儲(chǔ)、分析,元信息模型等。首先有各種各樣的數(shù)據(jù)源(有文本或二進(jìn)制,binlog就是二進(jìn)制的數(shù)據(jù);或者類似Kafka中可以存儲(chǔ)各種二進(jìn)制),這些數(shù)據(jù)最終通過(guò)各種各樣Connector、Reader(不同的系統(tǒng)有不同的叫法),把數(shù)據(jù)讀過(guò)來(lái),映射成行數(shù)據(jù)。在這些行數(shù)據(jù)中,有關(guān)鍵的描述信息,比如來(lái)源信息、變更類型等等,還有可變的列集合。再通過(guò)一系列的規(guī)則轉(zhuǎn)化,比如濾掉某些數(shù)據(jù),要為數(shù)據(jù)生成主鍵,要段定義版本、類型轉(zhuǎn)換等等;最后再通過(guò)Hudi Payload封裝、轉(zhuǎn)換、元信息信息維護(hù)、文件生成等等方式,最終寫到湖存儲(chǔ)里。
在存儲(chǔ)里通過(guò)元信息、分區(qū)等數(shù)據(jù)維護(hù),并對(duì)接后續(xù)計(jì)算和分析,就無(wú)縫看到湖、倉(cāng)里所有存的數(shù)據(jù)的元信息,無(wú)縫對(duì)接不同形態(tài)的應(yīng)用場(chǎng)景。
下面介紹一下我們對(duì)常見(jiàn)數(shù)據(jù)源接入形式的支持。DB入湖是最常見(jiàn)的場(chǎng)景,在阿里云上,有RDS和PolarDB等產(chǎn)品。以MySQL引擎舉例,一般都是有主庫(kù)、從庫(kù)、離線庫(kù)等架構(gòu),可能還有主從接入點(diǎn),但是萬(wàn)變不離其宗。DB入湖要先做一次全量同步,再做增量同步。對(duì)用戶來(lái)講,DB入湖是明確的Workload,但對(duì)系統(tǒng)來(lái)講要先做好全量同步這件事情,再自動(dòng)對(duì)接增量同步這件事情,數(shù)據(jù)還要通過(guò)一定的機(jī)制把位點(diǎn)銜接住,確保數(shù)據(jù)的正確性。整個(gè)調(diào)度過(guò)程通過(guò)統(tǒng)一的管控服務(wù)獲取DB信息,自動(dòng)選擇從庫(kù)或線上壓力最小的實(shí)例,進(jìn)行全量同步寫到庫(kù)里,并維護(hù)好相應(yīng)的Watermark,記錄全量從什么時(shí)間點(diǎn)開(kāi)始的、從庫(kù)和主庫(kù)之間有多少延遲等。全量做完之后,開(kāi)始做增量任務(wù),利用DTS等同步binlog服務(wù),基于前面的Watermark做數(shù)據(jù)回溯,開(kāi)始做增量。利用Hudi里的Upsert能力,以用戶定義的PK和版本按照一定邏輯把數(shù)據(jù)合并,確保數(shù)據(jù)最終一致,分析側(cè)的正確性。
在整個(gè)Watremark維護(hù)上需要考慮很多,如果全量掛了,再重試一下,位點(diǎn)應(yīng)該從哪里開(kāi)始,如果增量掛了,不僅要考慮增量之前已經(jīng)進(jìn)行到哪里,還要漸進(jìn)式的維護(hù)增量位點(diǎn),不能每次增量一掛就回退到最開(kāi)始全量前的位點(diǎn),那后面數(shù)據(jù)延遲太嚴(yán)重了。在Lakehouse表級(jí)別維護(hù)這些信息,在Workload運(yùn)行時(shí)、重啟、重試等過(guò)程可以自動(dòng)銜接,對(duì)用戶透明。
第二個(gè)是像類消息產(chǎn)品的入湖,我們也做了一些技術(shù)探索和業(yè)務(wù)嘗試,它的數(shù)據(jù)不像DB一樣Schema都很明確。像阿里云現(xiàn)有的Kafka服務(wù)里,它的Schema只有兩個(gè)字段,Key和Value,Key描述消息Id,value自定義,大部分時(shí)候是一個(gè)Json,或者是二進(jìn)制串。首先要解決怎么映射成行,會(huì)有很多邏輯處理,比如先做一些Schema推斷,得到原始的結(jié)構(gòu)。Json原來(lái)的嵌套格式比較容易存儲(chǔ),但是分析起來(lái)比較費(fèi)勁,只有打平成一個(gè)寬表分析才方便,所以還要做一些嵌套打平、格式展開(kāi)等等邏輯,再配合前面提到的核心邏輯,最終實(shí)現(xiàn)文件寫入、元信息合并等等。這個(gè)元信息合并就是指,源頭的列的個(gè)數(shù)不確定,對(duì)于不同的行有時(shí)候有這個(gè)列,有時(shí)候沒(méi)有。而對(duì)于Hudi來(lái)講,需要在應(yīng)用層把元信息維護(hù)好。Lakehouse里的Schema Evolution,就是Schema的合并、列的兼容處理、新增列的自動(dòng)維護(hù)等等。
我們內(nèi)部有基于Lindorm的方案。Lindorm是我們自研兼容HBase、Cassandra等大寬表接口的KV行存。它有很多的歷史文件和很多Log數(shù)據(jù),通過(guò)內(nèi)部的LTS服務(wù)調(diào),把全量和增量數(shù)據(jù)通過(guò)Lakehouse方式存在轉(zhuǎn)換成列存文件,支持分析。
對(duì)Kafka、SLS系統(tǒng)中都有分片(Partition、Shard)概念,流量變化很大時(shí)需要自動(dòng)擴(kuò)縮容,因此消費(fèi)側(cè)要主動(dòng)感知變化,不影響數(shù)據(jù)正確性的持續(xù)消費(fèi)。并且這種數(shù)據(jù)都是偏Append-Only,正好可以利用好Hudi小文件合并能力,讓下游分析更簡(jiǎn)單、更快、更高效。
三、客戶最佳實(shí)踐
以上是技術(shù)探索的分享,接下來(lái)會(huì)介紹一下在客戶的應(yīng)用。之前一個(gè)跨境電商的客戶,他們問(wèn)題就是DB數(shù)據(jù)不容易分析,目前有PolarDB和MongoDB系統(tǒng),希望把所有數(shù)據(jù)近實(shí)時(shí)入湖到OSS上做分析?,F(xiàn)在業(yè)界聯(lián)邦分析FederatedAnalytics,問(wèn)題在于直連查詢數(shù)據(jù)時(shí)原庫(kù)的壓力很大,最好的方式就是入湖到離線湖中里做分析。通過(guò)Lakehouse方式構(gòu)建離線湖倉(cāng),再對(duì)接計(jì)算和分析,或者對(duì)接ETL清晰,規(guī)避對(duì)線上數(shù)據(jù)的影響,同一架構(gòu)把整體數(shù)據(jù)平臺(tái)構(gòu)建起來(lái),應(yīng)用、分析百花齊放,不影響任何東西。
這個(gè)客戶的難點(diǎn)是他們有很多庫(kù)、表以及各種各樣的應(yīng)用case,我們?cè)贖udi上做了很多優(yōu)化,也完成了20多個(gè)patch貢獻(xiàn)到社區(qū)里完善Hudi,包括元信息打通、部分Schema Evolution能力,在客戶側(cè)也應(yīng)用起來(lái)。
另一個(gè)客戶數(shù)是Kafka日志近實(shí)時(shí)分析。原來(lái)他們的方案需要人肉做很多步驟,包括入湖、數(shù)據(jù)管理、小文件合并等。通過(guò)Lakehouse方案,對(duì)接客戶數(shù)據(jù),自動(dòng)合并入湖,維護(hù)元信息,客戶直接應(yīng)用就可以了,內(nèi)部直接打通了。
還有一個(gè)問(wèn)題小文件,在他們的場(chǎng)景里與Hudi社區(qū)一起參與Clustering技術(shù)的建設(shè)。Clustering就是自動(dòng)將小文件合并成大文件,因?yàn)榇笪募诜治?。其次,在合并過(guò)程中,可以按照某些特定列把數(shù)據(jù)排序,后續(xù)訪問(wèn)這些數(shù)據(jù)列時(shí),性能會(huì)好很多。
四、未來(lái)展望
最后,我再分享一下我們團(tuán)隊(duì)對(duì)未來(lái)的思考,Lakehouse可以怎么應(yīng)用起來(lái)。
第一,更豐富的入湖數(shù)據(jù)源。Lakehous重要的價(jià)值在于屏蔽各種數(shù)據(jù)差異,打破數(shù)據(jù)孤島。在云上很多系統(tǒng)中有各種各樣的數(shù)據(jù),有很大的分析價(jià)值,未來(lái)要統(tǒng)一更多的數(shù)據(jù)源,只支持一個(gè)DB或Kafka,客戶價(jià)值不是最大化的。只有把足量的數(shù)據(jù)匯總到一起,形成大的離線湖倉(cāng),并且屏蔽復(fù)雜度,對(duì)用戶的價(jià)值才愈發(fā)明顯。除了云產(chǎn)品,還有其他形式的入湖,像專有云、自建系統(tǒng)、自主上傳場(chǎng)景等。主要還是強(qiáng)化貼源層的能力。
第二,更低成本、更可靠的存儲(chǔ)能力,圍繞數(shù)據(jù)生命周期管理。因?yàn)榘⒗镌芆SS有非常豐富的計(jì)費(fèi)方式,支持多種存儲(chǔ)(標(biāo)準(zhǔn)存儲(chǔ)、低頻存儲(chǔ)、冷存儲(chǔ)以及更冷的存儲(chǔ))等等,計(jì)費(fèi)邏輯里幾十項(xiàng),一般人不完全清楚。但對(duì)用戶來(lái)講,成本永遠(yuǎn)是設(shè)計(jì)中心心,尤其是構(gòu)建海量的離線湖倉(cāng),因?yàn)閿?shù)據(jù)量越來(lái)越大、成本就越來(lái)越多。
之前接觸過(guò)一個(gè)客戶,他需要存儲(chǔ)三十年的數(shù)據(jù),他們的業(yè)務(wù)是股票分析,要把交易所、券商的所有數(shù)據(jù)全部爬下來(lái),傳到大的湖倉(cāng)里。因?yàn)橐鋈甑姆治?,成本?yōu)化是非常關(guān)鍵的。原來(lái)選擇在線系統(tǒng),存幾個(gè)月就扛不住了,因?yàn)閿?shù)據(jù)量太大了。分析數(shù)據(jù)是有從冷到熱、從相對(duì)低頻到高頻訪問(wèn)的特點(diǎn),Lakehouse利用這些特點(diǎn),通過(guò)定義規(guī)則和邏輯,自動(dòng)屏蔽用戶對(duì)哪些目錄需要冷存儲(chǔ)、哪些目錄需要熱存儲(chǔ)的復(fù)雜維護(hù),幫用戶走得更進(jìn)一步。
第三,更強(qiáng)的分析能力。在Hudi加速分析的能力里,除了前面提到的Clustering,還有Compaction。Clustering就是小文件合并,比如日志場(chǎng)景,每寫入一批就產(chǎn)生一個(gè)文件,這些文件一般都不是很大,但文件越小越碎分析時(shí)的訪問(wèn)代價(jià)很大。訪問(wèn)一個(gè)文件就要做鑒權(quán)、建連接、元信息訪問(wèn)。訪問(wèn)一個(gè)大文件這些過(guò)程只做一次,而訪問(wèn)小文件則成倍放大,開(kāi)銷非常大。在Append場(chǎng)景,通過(guò)Clustering快速合并小文件成大文件,規(guī)避因?yàn)閷懭攵鴮?dǎo)致的分析性能線性退化問(wèn)題,確保分析高效。
在Hudi中如果是Merge On Read類型的表,比如Delete、Update都會(huì)快速寫到log文件,在后續(xù)讀的時(shí)候Merge數(shù)據(jù),形成完整的邏輯的數(shù)據(jù)視圖。這里問(wèn)題也很明顯,如果有1000個(gè)log文件,每次讀需要合并1000次,分析能力退化肯定非常嚴(yán)重。這時(shí)Hudi的Compaction能力就會(huì)定期把log文件合并起來(lái)。前面提到,如果完全要在同一個(gè)入湖作業(yè)里實(shí)現(xiàn),尤其是文件合并,計(jì)算開(kāi)銷很大,在做這些重負(fù)載的時(shí)候,對(duì)入湖鏈路的延遲影響很大,一定要通過(guò)異步化調(diào)度的方式,實(shí)現(xiàn)寫延遲保障。并且這些過(guò)程都是可彈性的,不論是100個(gè)文件要合還是1萬(wàn)個(gè)文件要合,都是可以快速?gòu)椥远挥绊懷舆t,非常有優(yōu)勢(shì)。
第四,更豐富的場(chǎng)景化應(yīng)用。個(gè)人覺(jué)得Lakehouse還是面向貼源層的能力,配合做一定程度的聚合。因?yàn)楦邔哟蔚木酆闲院蛯?shí)時(shí)性,有更多實(shí)時(shí)數(shù)倉(cāng)選擇,現(xiàn)在業(yè)界比較火的DorisDB、ClickHouse對(duì)實(shí)時(shí)的高頻分析有很大優(yōu)勢(shì)?;贖udi、Lakehouse、OSS做實(shí)時(shí)分析沒(méi)有太多優(yōu)勢(shì),所以還是以構(gòu)建貼源層的能力為主。
原來(lái)都是近實(shí)時(shí)入湖場(chǎng)景,但是可能有些用戶沒(méi)有這么多實(shí)時(shí)性要求,周期性的T+1邏輯建倉(cāng)可以滿足,可以利用Hudi+Lakehouse能力,每天查詢一部分邏輯增量數(shù)據(jù)并寫入Hudi,并維護(hù)分區(qū),和實(shí)現(xiàn)Schema Evolution能力。
早期數(shù)據(jù)量越來(lái)越大,客戶通過(guò)分庫(kù)分表實(shí)現(xiàn)邏輯拆分。分析的時(shí)候發(fā)現(xiàn)庫(kù)、表太多了,分析、關(guān)聯(lián)難度大,這時(shí)候可以通過(guò)構(gòu)建多庫(kù)多表合并建倉(cāng)能力,匯總到一張表后做分析。
然后是跨區(qū)域融合分析,有很多客戶提這樣的需求,尤其是海外。有些客戶要服務(wù)海外用戶,必須有部分業(yè)務(wù)在海外,特別在跨境電商的場(chǎng)景,而它的采購(gòu)體系、倉(cāng)儲(chǔ)體系、物流體系、分銷體系等又都在國(guó)內(nèi)建設(shè),很多數(shù)據(jù)想要融合分析怎么辦?首先OSS提供了跨域復(fù)制,但也只是到數(shù)據(jù)層面,沒(méi)有任何邏輯,在這里可以通過(guò)Lakehouse做邏輯層建設(shè),把不同region數(shù)據(jù)混合在一起,匯總到同一個(gè)區(qū)域之后,提供統(tǒng)一的SQL join、union等能力。
最后Hudi有TimeTravel、Incremental query的能力,這時(shí)候構(gòu)建incremental ETL清洗不同的表,在一定程度上通用化,讓用戶用得更簡(jiǎn)單。未來(lái)內(nèi)置更多場(chǎng)景化能力,讓用戶構(gòu)建和應(yīng)用湖倉(cāng)更加簡(jiǎn)單!