字節(jié)跳動基于Doris的湖倉分析探索實踐
01 Doris簡介
Apache Doris具備以下幾個特點:
- 良好的架構設計,支持高并發(fā)低延時的查詢服務,支持高吞吐量的交互式分析。多FE均可對外提供服務,并發(fā)增加時,線性擴充FE和BE即可支持高并發(fā)的查詢請求。
- 支持批量數(shù)據(jù)load和流式數(shù)據(jù)load,支持數(shù)據(jù)更新。支持Update/Delete語法,unique/aggregate數(shù)據(jù)模型,支持動態(tài)更新數(shù)據(jù),實時更新聚合指標。
- 提供了高可用,容錯處理,高擴展的企業(yè)級特性。FE Leader錯誤異常,F(xiàn)E Follower秒級切換為新Leader繼續(xù)對外提供服務。
- 支持聚合表和物化視圖。多種數(shù)據(jù)模型,支持aggregate, replace等多種數(shù)據(jù)模型,支持創(chuàng)建rollup表,支持創(chuàng)建物化視圖。rollup表和物化視圖支持動態(tài)更新,無需用戶手動處理。
- MySQL協(xié)議兼容,支持直接使用MySQL客戶端連接,非常易用的數(shù)據(jù)應用對接。
Doris 由 Frontend(以下簡稱FE)和 Backend(以下簡稱BE)組成,其中FE負責接受用戶請求、編譯、優(yōu)化、分發(fā)執(zhí)行計劃、元數(shù)據(jù)管理、BE節(jié)點的管理等功能,BE負責執(zhí)行由FE下發(fā)的執(zhí)行計劃,存儲和管理用戶數(shù)據(jù)。
?
02 數(shù)據(jù)湖格式Hudi簡介
Hudi是下一代流式數(shù)據(jù)湖平臺,為數(shù)據(jù)湖提供了表格式管理的能力,提供事務,ACID,MVCC,數(shù)據(jù)更新刪除,增量數(shù)據(jù)讀取等功能。支持Spark, Flink, Presto, Trino等多種計算引擎。
?
Hudi根據(jù)數(shù)據(jù)更新時行為不同分為兩種表類型:
針對Hudi的兩種表格式,存在3種不同的查詢類型:
03
Doris分析Hudi數(shù)據(jù)的技術背景
在數(shù)倉業(yè)務中,隨著業(yè)務對數(shù)據(jù)實時性的要求越來越高,T+1數(shù)倉業(yè)務逐漸往小時級、分鐘級,甚至秒級演進。實時數(shù)倉的應用也越來越廣,也經(jīng)歷了多個發(fā)展階段。目前存在著多種解決方案。
1. Lambda架構
Lambda將數(shù)據(jù)處理流分為在線分析和離線分析兩條不同的處理路徑,兩條路徑互相獨立,互不影響。
離線分析處理T+1數(shù)據(jù),使用Hive/Spark處理大數(shù)據(jù)量,不可變數(shù)據(jù),數(shù)據(jù)一般存儲在HDFS等系統(tǒng)上。如果遇到數(shù)據(jù)更新,需要overwrite整張表或整個分區(qū),成本比較高。
在線分析處理實時數(shù)據(jù),使用Flink/Spark Streaming處理流式數(shù)據(jù),分析處理秒級或分鐘級流式數(shù)據(jù),數(shù)據(jù)保存在Kafka或定期(分鐘級)保存到HDFS中。?
該套方案存在以下缺點:
- 同一套指標可能需要開發(fā)兩份代碼來進行在線分析和離線分析,維護復雜。
- 數(shù)據(jù)應用查詢指標時可能需要同時查詢離線數(shù)據(jù)和在線數(shù)據(jù),開發(fā)復雜。
- 同時部署批處理和流式計算兩套引擎,運維復雜。
- 數(shù)據(jù)更新需要overwrite整張表或分區(qū),成本高。
2. Kappa架構
隨著在線分析業(yè)務越來越多,Lambda架構的弊端就越來越明顯,增加一個指標需要在線離線分別開發(fā),維護困難,離線指標可能和在線指標對不齊,部署復雜,組件繁多。于是Kappa架構應運而生。
Kappa架構使用一套架構處理在線數(shù)據(jù)和離線數(shù)據(jù),使用同一套引擎同時處理在線和離線數(shù)據(jù),數(shù)據(jù)存儲在消息隊列上。?
Kappa架構也有一定的局限:
- 流式計算引擎批處理能力較弱,處理大數(shù)據(jù)量性能較弱。
- 數(shù)據(jù)存儲使用消息隊列,消息隊列對數(shù)據(jù)存儲有有效性限制,歷史數(shù)據(jù)無法回溯。
- 數(shù)據(jù)時序可能亂序,可能對部分在時序要求方面比較嚴格的應用造成數(shù)據(jù)錯誤。
- 數(shù)據(jù)應用需要從消息隊列中取數(shù),需要開發(fā)適配接口,開發(fā)復雜。
3. 基于數(shù)據(jù)湖的實時數(shù)倉
針對Lambda架構和Kappa架構的缺陷,業(yè)界基于數(shù)據(jù)湖開發(fā)了Iceberg, Hudi, DeltaLake這些數(shù)據(jù)湖技術,使得數(shù)倉支持ACID, Update/Delete,數(shù)據(jù)Time Travel, Schema Evolution等特性,使得數(shù)倉的時效性從小時級提升到分鐘級,數(shù)據(jù)更新也支持部分更新,大大提高了數(shù)據(jù)更新的性能。兼具流式計算的實時性和批計算的吞吐量,支持的是近實時的場景。
以上方案中其中基于數(shù)據(jù)湖的應用最廣,但數(shù)據(jù)湖模式無法支撐更高的秒級實時性,也無法直接對外提供數(shù)據(jù)服務,需要搭建其他的數(shù)據(jù)服務組件,系統(tǒng)較為復雜?;诖吮尘跋拢糠謽I(yè)務開始使用Doris來承接,業(yè)務數(shù)據(jù)分析師需要對Doris與Hudi中的數(shù)據(jù)進行聯(lián)邦分析,此外在Doris對外提供數(shù)據(jù)服務時既要能查詢Doris中數(shù)據(jù),也要能加速查詢離線業(yè)務中的數(shù)據(jù)湖數(shù)據(jù),因此我們開發(fā)了Doris訪問數(shù)據(jù)湖Hudi中數(shù)據(jù)的特性。
04 Doris分析Hudi數(shù)據(jù)的設計原理
基于以上背景,我們設計了Apache Doris中查詢數(shù)據(jù)湖格式Hudi數(shù)據(jù),因Hudi生態(tài)為java語言,而Apache Doris的執(zhí)行節(jié)點BE為C++環(huán)境,C++ 無法直接調(diào)用Hudi java SDK,針對這一點,我們有三種解決方案。
1.實現(xiàn)Hudi C++ client,在BE中直接調(diào)用Hudi C++ client去讀寫Hudi表。?
該方案需要完整實現(xiàn)一套Hudi C++ client,開發(fā)周期較長,后期Hudi行為變更需要同步修改Hudi C++ client,維護較為困難。
2.BE通過thrift協(xié)議發(fā)送讀寫請求至Broker,由Broker調(diào)用Hudi java client讀取Hudi表。
該方案需要在Broker中增加讀寫Hudi數(shù)據(jù)的功能,目前Broker定位僅為fs的操作接口,引入Hudi打破了Broker的定位。第二,數(shù)據(jù)需要在BE和Broker之間傳輸,性能較低。
3.在BE中使用JNI創(chuàng)建JVM,加載Hudi java client去讀寫Hudi表。
該方案需要在BE進程中維護JVM,有JVM調(diào)用Hudi java client對Hudi進行讀寫。讀寫邏輯使用Hudi社區(qū)java實現(xiàn),可以維護與社區(qū)同步;同時數(shù)據(jù)在同一個進程中進行處理,性能較高。但需要在BE維護一個JVM,管理較為復雜。
4.使用BE arrow parquet c++ api讀取hudi parquet base file,hudi表中的delta file暫不處理。?
該方案可以由BE直接讀取hudi表的parquet文件,性能最高。但當前不支持base file和delta file的合并讀取,因此僅支持COW表Snapshot Queries和MOR表的Read Optimized Queries,不支持Incremental Queries。
綜上,我們選擇方案四,第一期實現(xiàn)了COW表Snapshot Queries和MOR表的Read Optimized Queries,后面聯(lián)合Hudi社區(qū)開發(fā)base file和delta file合并讀取的C++接口。
05 Doris分析Hudi數(shù)據(jù)的技術實現(xiàn)
Doris中查詢分析Hudi外表使用步驟非常簡單。
1. 創(chuàng)建Hudi外表
建表時指定engine為Hudi,同時指定Hudi外表的相關信息,如hive metastore uri,在hive metastore中的database和table名字等。
建表僅僅在Doris的元數(shù)據(jù)中增加一張表,無任何數(shù)據(jù)移動。
建表時支持指定全部或部分hudi schema,也支持不指定schema創(chuàng)建hudi外表。指定schema時必須與hiveMetaStore中hudi表的列名,類型一致。
Example:
Plaintext
CREATE TABLE example_db.t_hudi
ENGINE=HUDI
PROPERTIES (
"hudi.database" = "hudi_db",
"hudi.table" = "hudi_table",
"hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
CREATE TABLE example_db.t_hudi (
column1 int,
column2 string)
ENGINE=HUDI
PROPERTIES (
"hudi.database" = "hudi_db",
"hudi.table" = "hudi_table",
"hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
2. 查詢Hudi外表
查詢Hudi數(shù)據(jù)表時,F(xiàn)E在analazy階段會查詢元數(shù)據(jù)獲取到Hudi外表的的hive metastore地址,從Hive metastore中獲取hudi表的schema信息與文件路徑。
- 獲取hudi表的數(shù)據(jù)地址。
- FE規(guī)劃fragment增加HudiScanNode。HudiScanNode中獲取Hudi table對應的data file文件列表。
- 根據(jù)Hudi table獲取的data file列表生成scanRange。
- 下發(fā)HudiScan 任務至BE節(jié)點。
- BE節(jié)點根據(jù)HudiScanNode指定的Hudi外表文件路徑調(diào)用native parquet reader進行數(shù)據(jù)讀取。
?
06 后期規(guī)劃
目前Apche Doris查詢Hudi表已合入社區(qū),當前已支持COW表的Snapshot Query,支持MOR表的Read Optimized Query。對MOR表的Snapshot Query暫時還未支持,流式場景中的Incremental Query也沒有支持。
后續(xù)還有幾項工作需要處理,我們和社區(qū)也在積極合作進行中:
- MOR表的Snapshot Query。MOR表實時讀需要合并讀取Data file與對應的Delta file,BE需要支持Delta file AVRO格式的讀取,需要增加avro的native讀取方式。
- COW/MOR表的Incremental Query。支持實時業(yè)務中的增量讀取。
- BE讀取Hudi base file和delta file的native接口。目前BE讀取Hudi數(shù)據(jù)時,僅能讀取data file,使用的是parquet的C++ SDK。后期我們和聯(lián)合Hudi社區(qū)提供Huid base file和delta file的C++/Rust等語言的讀取接口,在Doris BE中直接使用native接口來查詢Hudi數(shù)據(jù)。
今天的分享就到這里,謝謝大家。