阿里面試:Fluss 是什么?如何使用 Fluss 構(gòu)建流式湖倉一體?
一、湖倉概述
1. 傳統(tǒng)湖倉架構(gòu)的挑戰(zhàn)
數(shù)據(jù)湖倉(Lakehouse)是一種結(jié)合了數(shù)據(jù)湖的可擴(kuò)展性和成本效益與數(shù)據(jù)倉庫的可靠性和性能的新型開放架構(gòu)。Apache Iceberg、Apache Paimon、Apache Hudi和Delta Lake等知名數(shù)據(jù)湖格式在湖倉架構(gòu)中扮演著關(guān)鍵角色,它們在單一統(tǒng)一平臺內(nèi)促進(jìn)了數(shù)據(jù)存儲、可靠性和分析能力之間的和諧平衡。
然而,傳統(tǒng)湖倉架構(gòu)面臨著實(shí)時性與分析能力平衡的挑戰(zhàn):
(1) 實(shí)時性與分析效率的矛盾
- 如果要求低延遲,就需要頻繁寫入和提交,這會產(chǎn)生大量小型Parquet文件,導(dǎo)致讀取效率低下
- 如果要求讀取效率,就需要積累數(shù)據(jù)直到能寫入大型Parquet文件,但這會引入更高的延遲
(2) 數(shù)據(jù)新鮮度限制
即使在最佳使用條件下,這些數(shù)據(jù)湖格式通常也只能在分鐘級粒度內(nèi)實(shí)現(xiàn)數(shù)據(jù)新鮮度。
二、流式湖倉一體化
1. 流與湖的統(tǒng)一
Fluss是一種支持亞秒級低延遲流式讀寫的流式存儲系統(tǒng)。通過Lakehouse Storage,F(xiàn)luss在Lakehouse之上提供實(shí)時流數(shù)據(jù)服務(wù),實(shí)現(xiàn)了數(shù)據(jù)流和數(shù)據(jù)湖倉的統(tǒng)一。這不僅為數(shù)據(jù)湖倉帶來了低延遲,還為數(shù)據(jù)流增加了強(qiáng)大的分析能力。
為了構(gòu)建流式湖倉,F(xiàn)luss維護(hù)了一個分層服務(wù),該服務(wù)將實(shí)時數(shù)據(jù)從Fluss集群壓縮到存儲在Lakehouse Storage中的數(shù)據(jù)湖格式。Fluss集群中的數(shù)據(jù)(流式Arrow格式)針對低延遲讀寫進(jìn)行了優(yōu)化,而Lakehouse中的壓縮數(shù)據(jù)(帶壓縮的Parquet格式)針對強(qiáng)大的分析和長期數(shù)據(jù)存儲進(jìn)行了優(yōu)化。因此,F(xiàn)luss集群中的數(shù)據(jù)作為實(shí)時數(shù)據(jù)層,保留了具有亞秒級新鮮度的數(shù)據(jù);而Lakehouse中的數(shù)據(jù)作為歷史數(shù)據(jù)層,保留了具有分鐘級新鮮度的數(shù)據(jù)。
2. 共享數(shù)據(jù)與共享元數(shù)據(jù)
流式湖倉的核心理念是流和湖倉之間共享數(shù)據(jù)和共享元數(shù)據(jù),避免數(shù)據(jù)重復(fù)和元數(shù)據(jù)不一致。它提供了以下強(qiáng)大功能:
- 統(tǒng)一元數(shù)據(jù):Fluss為流和湖倉中的數(shù)據(jù)提供統(tǒng)一的表元數(shù)據(jù)。用戶只需處理一個表,但可以訪問實(shí)時流數(shù)據(jù)、歷史數(shù)據(jù)或它們的聯(lián)合。
- 聯(lián)合讀?。河嬎阋鎸Ρ韴?zhí)行查詢時將讀取實(shí)時流數(shù)據(jù)和湖倉數(shù)據(jù)的聯(lián)合。目前,只有Flink支持聯(lián)合讀取,但更多引擎正在規(guī)劃中。
- 實(shí)時湖倉:聯(lián)合讀取幫助湖倉從近實(shí)時分析發(fā)展到真正的實(shí)時分析,使企業(yè)能夠從實(shí)時數(shù)據(jù)中獲得更有價值的洞察。
- 分析流:聯(lián)合讀取幫助數(shù)據(jù)流具備強(qiáng)大的分析能力,這減少了開發(fā)流應(yīng)用程序的復(fù)雜性,簡化了調(diào)試,并允許立即訪問實(shí)時數(shù)據(jù)洞察。
- 連接到湖倉生態(tài)系統(tǒng):Fluss在將數(shù)據(jù)壓縮到Lakehouse時保持表元數(shù)據(jù)與數(shù)據(jù)湖目錄同步,這允許Spark、StarRocks、Flink、Trino等外部引擎通過連接到數(shù)據(jù)湖目錄直接讀取數(shù)據(jù)。
3. 數(shù)據(jù)湖集成
(1) Paimon集成
Apache Paimon創(chuàng)新地結(jié)合了湖格式和LSM結(jié)構(gòu),將高效更新引入湖架構(gòu)。
要將Fluss與Paimon集成,必須啟用lakehouse存儲并將Paimon配置為lakehouse存儲。具體步驟如下:
① 配置Lakehouse存儲
首先,在server.yaml中配置lakehouse存儲:
datalake.format: paimon
# Paimon目錄配置,假設(shè)使用Filesystem目錄
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: /tmp/paimon_data_warehouse
② 啟動數(shù)據(jù)湖分層服務(wù)
然后,啟動數(shù)據(jù)湖分層服務(wù),將Fluss的數(shù)據(jù)壓縮到lakehouse存儲:
# 切換到Fluss目錄
cd $FLUSS_HOME
# 啟動分層服務(wù),假設(shè)rest端點(diǎn)是localhost:8081
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081
③ 為表啟用Lakehouse存儲
要為表啟用lakehouse存儲,必須在創(chuàng)建表時使用選項(xiàng)'table.datalake.enabled' = 'true':
CREATE TABLE datalake_enriched_orders (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`cust_name` STRING,
`cust_phone` STRING,
`cust_acctbal` DECIMAL(15, 2),
`cust_mktsegment` STRING,
`nation_name` STRING,
PRIMARY KEY (`order_key`) NOT ENFORCED
) WITH ('table.datalake.enabled' = 'true');
當(dāng)在Fluss中創(chuàng)建或修改帶有選項(xiàng)'table.datalake.enabled' = 'true'的表時,F(xiàn)luss將創(chuàng)建一個具有相同表路徑的相應(yīng)Paimon表。Paimon表的模式與Fluss表的模式相同,只是在最后附加了兩個額外的列__offset和__timestamp。這兩列用于幫助Fluss客戶端以流式方式消費(fèi)Paimon中的數(shù)據(jù),例如按偏移量/時間戳查找等。
然后,數(shù)據(jù)湖分層服務(wù)會持續(xù)將數(shù)據(jù)從Fluss壓縮到Paimon。對于主鍵表,它還將生成Paimon格式的變更日志,使您能夠以Paimon方式流式消費(fèi)它。
(2) Iceberg集成規(guī)劃
目前,F(xiàn)luss支持Paimon作為Lakehouse存儲,更多種類的數(shù)據(jù)湖格式正在規(guī)劃中。
Fluss社區(qū)正在積極努力增強(qiáng)流和湖倉統(tǒng)一功能,重點(diǎn)關(guān)注以下關(guān)鍵領(lǐng)域:
① 擴(kuò)展聯(lián)合讀取生態(tài)系統(tǒng)
目前,聯(lián)合讀取功能已與Apache Flink集成,實(shí)現(xiàn)了實(shí)時和歷史數(shù)據(jù)的無縫查詢。未來,社區(qū)計劃擴(kuò)展此功能以支持其他查詢引擎,如Apache Spark和StarRocks,進(jìn)一步擴(kuò)大其生態(tài)系統(tǒng)兼容性和采用。
② 多樣化湖存儲格式
目前,F(xiàn)luss支持Apache Paimon作為其主要湖存儲。為了滿足多樣化的用戶需求,社區(qū)計劃添加對更多湖格式的支持,包括Apache Iceberg和Apache Hudi,從而提供更大的靈活性和與更廣泛的湖倉生態(tài)系統(tǒng)的互操作性。
4. 聯(lián)合讀?。║nion Read)
(1) 實(shí)時數(shù)據(jù)與歷史數(shù)據(jù)的無縫結(jié)合
對于帶有選項(xiàng)'table.datalake.enabled' = 'true'的表,有兩部分?jǐn)?shù)據(jù):保留在Fluss中的數(shù)據(jù)和已經(jīng)在Paimon中的數(shù)據(jù)?,F(xiàn)在,您有兩種表視圖:一種是具有分鐘級延遲的Paimon數(shù)據(jù)視圖,一種是聯(lián)合Fluss和Paimon數(shù)據(jù)的完整數(shù)據(jù)視圖,具有秒級延遲。
Flink使您能夠決定選擇哪種視圖:
- 僅Paimon意味著更好的分析性能,但數(shù)據(jù)新鮮度較差
- 結(jié)合Fluss和Paimon意味著更好的數(shù)據(jù)新鮮度,但分析性能下降
① 僅讀取Paimon中的數(shù)據(jù)
要指定讀取Paimon中的數(shù)據(jù),必須使用$lake后綴指定表,以下SQL顯示了如何執(zhí)行此操作:
-- 假設(shè)我們有一個名為`orders`的表
-- 從paimon讀取
SELECT COUNT(*) FROM orders$lake;
-- 我們還可以查詢系統(tǒng)表
SELECT * FROM orders$lake$snapshots;
當(dāng)在查詢中使用$lake后綴指定表時,它就像一個普通的Paimon表,因此它繼承了Paimon表的所有能力。您可以享受Flink在Paimon上支持/優(yōu)化的所有功能,如查詢系統(tǒng)表、時間旅行等。
② 聯(lián)合讀取Fluss和Paimon中的數(shù)據(jù)
要指定讀取聯(lián)合Fluss和Paimon的完整數(shù)據(jù),只需像查詢普通表一樣查詢它,無需任何后綴或其他內(nèi)容,以下SQL顯示了如何執(zhí)行此操作:
-- 查詢將聯(lián)合Fluss和Paimon的數(shù)據(jù)
SELECT SUM(order_count) as total_orders FROM ads_nation_purchase_power;
查詢可能看起來比只查詢Paimon中的數(shù)據(jù)慢,但它查詢了完整數(shù)據(jù),這意味著更好的數(shù)據(jù)新鮮度。您可以多次運(yùn)行查詢,由于數(shù)據(jù)持續(xù)寫入表中,每次運(yùn)行都應(yīng)該得到不同的結(jié)果。
(2) 查詢引擎支持
① Flink支持
Fluss向Flink用戶公開統(tǒng)一的API,允許他們選擇是使用聯(lián)合讀取還是僅在Lakehouse上進(jìn)行讀取,使用以下SQL:
SELECT * FROM orders
這將讀取orders表的完整數(shù)據(jù),F(xiàn)link將聯(lián)合讀取Fluss和Lakehouse中的數(shù)據(jù)。如果用戶只需要讀取數(shù)據(jù)湖上的數(shù)據(jù),可以在要讀取的表后添加$lake后綴。SQL如下:
-- 分析查詢
SELECT COUNT(*), MAX(t), SUM(amount)
FROM orders$lake
-- 查詢系統(tǒng)表
SELECT * FROM orders$lake$snapshots
5. 實(shí)時湖倉與分析流
(1) 為湖倉帶來真正的實(shí)時性
傳統(tǒng)的數(shù)據(jù)湖倉架構(gòu)通常只能提供分鐘級的數(shù)據(jù)新鮮度,這對于許多實(shí)時分析場景來說是不夠的。Fluss的流式湖倉一體化架構(gòu)通過聯(lián)合讀?。║nion Read)機(jī)制,為湖倉帶來了真正的實(shí)時性:
- 秒級數(shù)據(jù)新鮮度:通過將Fluss的實(shí)時流數(shù)據(jù)與Paimon的歷史數(shù)據(jù)聯(lián)合起來,查詢可以訪問最新寫入的數(shù)據(jù),實(shí)現(xiàn)秒級數(shù)據(jù)新鮮度。
- 實(shí)時分析能力增強(qiáng):企業(yè)可以在保持湖倉強(qiáng)大分析能力的同時,獲得實(shí)時數(shù)據(jù)洞察,使決策更加及時和準(zhǔn)確。
- 無需數(shù)據(jù)復(fù)制:不需要將數(shù)據(jù)復(fù)制到單獨(dú)的實(shí)時系統(tǒng)中,減少了存儲成本和數(shù)據(jù)一致性問題。
(2) 為數(shù)據(jù)流增加強(qiáng)大的分析能力
傳統(tǒng)的流處理系統(tǒng)通常缺乏強(qiáng)大的分析能力,而Fluss通過與湖倉的集成,為數(shù)據(jù)流增加了這些能力:
- 復(fù)雜分析查詢支持:流數(shù)據(jù)可以與歷史數(shù)據(jù)結(jié)合,支持更復(fù)雜的分析查詢,如聚合、窗口分析等。
- 開發(fā)簡化:減少了開發(fā)流應(yīng)用程序的復(fù)雜性,簡化了調(diào)試過程。
- 生態(tài)系統(tǒng)集成:可以利用現(xiàn)有的湖倉生態(tài)系統(tǒng)工具和查詢引擎,如Spark、StarRocks、Trino等。
6. 流式湖倉一體化實(shí)施流程
下面是實(shí)施Fluss流式湖倉一體化的詳細(xì)步驟和流程圖:
(1) 準(zhǔn)備環(huán)境
確保已安裝Fluss和Flink,并且兩者都正常運(yùn)行。Fluss需要一個運(yùn)行中的Flink集群來執(zhí)行數(shù)據(jù)湖分層服務(wù)。
(2) 配置Lakehouse存儲
在Fluss的server.yaml配置文件中設(shè)置Lakehouse存儲:
# 指定數(shù)據(jù)湖格式為Paimon
datalake.format: paimon
# Paimon目錄配置
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: /path/to/paimon/warehouse
(3) 啟動數(shù)據(jù)湖分層服務(wù)
數(shù)據(jù)湖分層服務(wù)是一個Flink作業(yè),負(fù)責(zé)將數(shù)據(jù)從Fluss壓縮到Paimon:
# 切換到Fluss安裝目錄
cd $FLUSS_HOME
# 啟動分層服務(wù),指定Flink REST端點(diǎn)
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081
您還可以設(shè)置其他Flink配置參數(shù),例如:
# 設(shè)置檢查點(diǎn)間隔為10秒
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=10s
# 僅同步特定數(shù)據(jù)庫中的表
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D database=fluss_\\w+
(4) 創(chuàng)建啟用Lakehouse的表
使用Flink SQL創(chuàng)建啟用了Lakehouse的表,通過設(shè)置'table.datalake.enabled' = 'true'選項(xiàng):
-- 創(chuàng)建一個啟用了Lakehouse的日志表
CREATE TABLE log_orders (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(10, 2),
status STRING
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
-- 創(chuàng)建一個啟用了Lakehouse的主鍵表
CREATE TABLE pk_orders (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(10, 2),
status STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
當(dāng)創(chuàng)建啟用了Lakehouse的表時,F(xiàn)luss會自動創(chuàng)建一個對應(yīng)的Paimon表,并在表末尾添加兩個額外的列__offset和__timestamp,用于支持流式消費(fèi)。
(5) 數(shù)據(jù)寫入
使用標(biāo)準(zhǔn)的Flink SQL向表中寫入數(shù)據(jù):
-- 插入數(shù)據(jù)到日志表
INSERT INTO log_orders
VALUES (1001, 101, TIMESTAMP '2025-05-18 10:30:00', 199.99, 'PENDING');
-- 插入數(shù)據(jù)到主鍵表
INSERT INTO pk_orders
VALUES (1001, 101, TIMESTAMP '2025-05-18 10:30:00', 199.99, 'PENDING');
-- 或者從其他表插入數(shù)據(jù)
INSERT INTO pk_orders
SELECT order_id, customer_id, order_time, amount, status
FROM source_table;
數(shù)據(jù)寫入后,會首先存儲在Fluss的實(shí)時層中,然后由數(shù)據(jù)湖分層服務(wù)定期壓縮到Paimon中。對于主鍵表,還會生成Paimon格式的變更日志。
(6) 數(shù)據(jù)查詢
① 聯(lián)合讀取(實(shí)時+歷史數(shù)據(jù))
要查詢完整的數(shù)據(jù)(Fluss中的實(shí)時數(shù)據(jù)和Paimon中的歷史數(shù)據(jù)),直接查詢表名,無需任何后綴:
-- 聯(lián)合讀取實(shí)時和歷史數(shù)據(jù)
SELECT COUNT(*) FROM pk_orders;
-- 流式查詢,會持續(xù)獲取最新數(shù)據(jù)
SELECT * FROM pk_orders WHERE amount > 100;
聯(lián)合讀取提供秒級數(shù)據(jù)新鮮度,但可能會比僅查詢Paimon數(shù)據(jù)慢一些。由于數(shù)據(jù)持續(xù)寫入,多次運(yùn)行相同的查詢可能會得到不同的結(jié)果。
② 僅讀取Lakehouse數(shù)據(jù)
要僅查詢Paimon中的歷史數(shù)據(jù),在表名后添加$lake后綴:
-- 僅查詢Paimon中的歷史數(shù)據(jù)
SELECT COUNT(*) FROM pk_orders$lake;
-- 查詢Paimon系統(tǒng)表
SELECT * FROM pk_orders$lake$snapshots;
-- 使用Paimon的時間旅行功能
SELECT * FROM pk_orders$lake FOR TIMESTAMP AS OF '2025-05-17 00:00:00';
僅讀取Paimon數(shù)據(jù)提供更好的分析性能,但數(shù)據(jù)新鮮度較差(分鐘級)。您可以使用Paimon的所有功能,如查詢系統(tǒng)表、時間旅行等。
7. 流式湖倉一體化架構(gòu)圖
三、實(shí)際應(yīng)用場景示例
場景1:實(shí)時儀表盤
一個電子商務(wù)平臺需要一個實(shí)時儀表盤,顯示最新的銷售數(shù)據(jù)和趨勢。
實(shí)現(xiàn)方式:
- 創(chuàng)建啟用Lakehouse的訂單表
- 使用Flink SQL進(jìn)行聯(lián)合讀取,獲取最新的銷售數(shù)據(jù)
- 將結(jié)果推送到儀表盤應(yīng)用
-- 創(chuàng)建啟用Lakehouse的訂單表
CREATE TABLE sales_orders (
order_id BIGINT,
product_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(10, 2),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'bucket.num' = '8',
'table.datalake.enabled' = 'true'
);
-- 實(shí)時計算最近1小時的銷售額
SELECT
TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
SUM(amount) AS total_sales
FROM sales_orders
WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
場景2:歷史數(shù)據(jù)分析與實(shí)時數(shù)據(jù)結(jié)合
一個金融機(jī)構(gòu)需要分析客戶的歷史交易模式,并將其與實(shí)時交易數(shù)據(jù)結(jié)合,以檢測潛在的欺詐行為。
實(shí)現(xiàn)方式:
- 創(chuàng)建啟用Lakehouse的交易表
- 使用Paimon查詢歷史交易模式
- 使用聯(lián)合讀取將歷史模式與實(shí)時交易結(jié)合分析
-- 創(chuàng)建啟用Lakehouse的交易表
CREATE TABLE transactions (
transaction_id BIGINT,
account_id BIGINT,
transaction_time TIMESTAMP,
amount DECIMAL(15, 2),
merchant_id BIGINT,
location STRING,
PRIMARY KEY (transaction_id) NOT ENFORCED
) WITH (
'bucket.num' = '16',
'table.datalake.enabled' = 'true'
);
-- 查詢歷史交易模式(僅Paimon數(shù)據(jù))
SELECT
account_id,
AVG(amount) AS avg_amount,
STDDEV(amount) AS stddev_amount
FROM transactions$lake
WHERE transaction_time BETWEEN TIMESTAMP '2025-04-01 00:00:00' AND TIMESTAMP '2025-05-01 00:00:00'
GROUP BY account_id;
-- 將歷史模式與實(shí)時交易結(jié)合分析(聯(lián)合讀?。?
WITH historical_patterns AS (
SELECT
account_id,
AVG(amount) AS avg_amount,
STDDEV(amount) AS stddev_amount
FROM transactions$lake
WHERE transaction_time BETWEEN TIMESTAMP '2025-04-01 00:00:00' AND TIMESTAMP '2025-05-01 00:00:00'
GROUP BY account_id
)
SELECT
t.transaction_id,
t.account_id,
t.transaction_time,
t.amount,
t.merchant_id,
t.location,
h.avg_amount,
h.stddev_amount,
CASE
WHEN ABS(t.amount - h.avg_amount) > 3 * h.stddev_amount THEN 'SUSPICIOUS'
ELSE 'NORMAL'
END AS risk_level
FROM transactions t
JOIN historical_patterns h ON t.account_id = h.account_id
WHERE t.transaction_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR;
場景3:實(shí)時數(shù)據(jù)湖分層架構(gòu)
一個大型零售企業(yè)需要構(gòu)建一個數(shù)據(jù)湖分層架構(gòu)(Bronze、Silver、Gold),同時保持各層的數(shù)據(jù)新鮮度。
實(shí)現(xiàn)方式:
- 創(chuàng)建各層的啟用Lakehouse的表
- 使用Flink SQL將數(shù)據(jù)從一層轉(zhuǎn)換到下一層
- 利用聯(lián)合讀取確保各層都具有秒級數(shù)據(jù)新鮮度
-- Bronze層:創(chuàng)建原始銷售數(shù)據(jù)表
CREATE TABLE sales_raw (
event_time TIMESTAMP,
store_id BIGINT,
product_id BIGINT,
quantity INT,
amount DECIMAL(10, 2),
payment_method STRING,
source STRING
) WITH (
'bucket.num' = '16',
'table.datalake.enabled' = 'true'
);
-- Silver層:創(chuàng)建清洗后的銷售數(shù)據(jù)表
CREATE TABLE sales_cleaned (
event_time TIMESTAMP,
store_id BIGINT,
product_id BIGINT,
quantity INT,
amount DECIMAL(10, 2),
payment_method STRING,
source STRING,
PRIMARY KEY (store_id, product_id, event_time) NOT ENFORCED
) WITH (
'bucket.num' = '16',
'table.datalake.enabled' = 'true'
);
-- Gold層:創(chuàng)建聚合的銷售指標(biāo)表
CREATE TABLE sales_aggregated (
window_start TIMESTAMP,
window_end TIMESTAMP,
store_id BIGINT,
product_id BIGINT,
total_quantity BIGINT,
total_amount DECIMAL(15, 2),
PRIMARY KEY (window_start, window_end, store_id, product_id) NOT ENFORCED
) WITH (
'bucket.num' = '8',
'table.datalake.enabled' = 'true'
);
-- Silver層ETL:清洗和轉(zhuǎn)換數(shù)據(jù)
INSERT INTO sales_cleaned
SELECT
event_time,
store_id,
product_id,
CASE WHEN quantity <= 0 THEN 1 ELSE quantity END AS quantity,
amount,
payment_method,
source
FROM sales_raw
WHERE amount > 0;
-- Gold層ETL:聚合數(shù)據(jù)
INSERT INTO sales_aggregated
SELECT
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' HOUR) AS window_end,
store_id,
product_id,
SUM(quantity) AS total_quantity,
SUM(amount) AS total_amount
FROM sales_cleaned
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), store_id, product_id;
四、流式湖倉一體化的技術(shù)優(yōu)勢
1. 數(shù)據(jù)分布一致性
Fluss和Paimon之間的數(shù)據(jù)分布是嚴(yán)格對齊的。Fluss支持分區(qū)表和桶,其分桶算法與Paimon完全一致。這確保了給定的數(shù)據(jù)始終分配到兩個系統(tǒng)中的相同桶,創(chuàng)建了Fluss桶和Paimon桶之間的一對一對應(yīng)關(guān)系。
這種數(shù)據(jù)分布的一致性提供了兩個顯著優(yōu)勢:
(1) 消除分層過程中的Shuffle開銷
當(dāng)將數(shù)據(jù)從Fluss分層到Paimon格式時:
- Fluss桶(例如bucket1)可以直接分層到相應(yīng)的Paimon桶(bucket1)
- 無需讀取Fluss桶中的數(shù)據(jù),計算每條數(shù)據(jù)屬于哪個Paimon桶,然后將其寫入適當(dāng)?shù)腜aimon桶
通過繞過這個中間重新分配步驟,架構(gòu)避免了昂貴的shuffle開銷,顯著提高了壓縮效率。
(2) 防止數(shù)據(jù)不一致
通過在Fluss和Paimon中使用相同的分桶算法,確保了數(shù)據(jù)一致性。該算法計算每條數(shù)據(jù)的桶分配如下:
bucket_id = hash(row) % bucket_num
2. 更高效的數(shù)據(jù)追蹤
在Fluss架構(gòu)中,歷史數(shù)據(jù)存儲在Lakehouse存儲中,而實(shí)時數(shù)據(jù)保留在Fluss中。在流式讀取期間,這種架構(gòu)實(shí)現(xiàn)了歷史和實(shí)時數(shù)據(jù)訪問的無縫結(jié)合:
- 歷史數(shù)據(jù)訪問:Fluss直接從Lakehouse存儲檢索歷史數(shù)據(jù),利用其固有優(yōu)勢,如:
- 高效的過濾下推:使查詢引擎能夠在存儲層應(yīng)用過濾條件,減少讀取的數(shù)據(jù)量并提高性能
- 列裁剪:允許僅檢索必要的列,優(yōu)化數(shù)據(jù)傳輸和查詢效率
- 高壓縮比:在保持快速檢索速度的同時最小化存儲開銷
- 實(shí)時數(shù)據(jù)訪問:Fluss同時從自己的存儲中讀取最新的實(shí)時數(shù)據(jù),確保毫秒級的新鮮度
3. 一致的數(shù)據(jù)新鮮度
在構(gòu)建數(shù)據(jù)倉庫時,通常按層組織和管理數(shù)據(jù),如Bronze、Silver和Gold層。當(dāng)數(shù)據(jù)流經(jīng)這些層時,維護(hù)數(shù)據(jù)新鮮度變得至關(guān)重要。
當(dāng)Paimon作為每一層的唯一存儲解決方案時,數(shù)據(jù)可見性取決于Flink檢查點(diǎn)間隔,這會引入累積延遲:
- 給定層的變更日志僅在Flink檢查點(diǎn)完成后才可見
- 當(dāng)這個變更日志傳播到后續(xù)層時,數(shù)據(jù)新鮮度延遲隨著每個檢查點(diǎn)間隔而增加
例如,使用1分鐘的Flink檢查點(diǎn)間隔:
- Bronze層經(jīng)歷1分鐘延遲
- Silver層增加另一個1分鐘延遲,總計2分鐘
- Gold層再增加1分鐘延遲,累積為3分鐘延遲
而使用Fluss和Paimon,我們獲得:
- 即時數(shù)據(jù)可見性:Fluss中的數(shù)據(jù)在攝入后立即可見,無需等待Flink檢查點(diǎn)完成。變更日志立即傳輸?shù)较乱粚?/li>
- 一致的數(shù)據(jù)新鮮度:所有層的數(shù)據(jù)新鮮度是一致的,以秒為單位,消除了累積延遲
4. 完整實(shí)施示例:電子商務(wù)實(shí)時分析平臺
下面是一個完整的電子商務(wù)實(shí)時分析平臺實(shí)施示例,展示了如何使用Fluss的流式湖倉一體化架構(gòu)構(gòu)建端到端解決方案。
步驟1:環(huán)境準(zhǔn)備
首先,確保已安裝并配置好Fluss和Flink環(huán)境:
# 下載并解壓Fluss
wget https://github.com/alibaba/fluss/releases/download/v0.6.0/fluss-0.6.0.tgz
tar -xzf fluss-0.6.0.tgz
cd fluss-0.6.0
# 啟動本地Fluss集群
./bin/start-local.sh
# 啟動本地Flink集群(用于數(shù)據(jù)湖分層服務(wù))
cd /path/to/flink
./bin/start-cluster.sh
在Fluss的conf/server.yaml中配置Paimon作為Lakehouse存儲:
datalake.format: paimon
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: /path/to/paimon/warehouse
步驟3:啟動數(shù)據(jù)湖分層服務(wù)
啟動數(shù)據(jù)湖分層服務(wù),將數(shù)據(jù)從Fluss壓縮到Paimon:
cd /path/to/fluss
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=10s
步驟4:創(chuàng)建數(shù)據(jù)模型
使用Flink SQL創(chuàng)建電子商務(wù)分析所需的表結(jié)構(gòu):
-- 創(chuàng)建訂單表(主鍵表)
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
total_amount DECIMAL(10, 2),
status STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'bucket.num' = '8',
'table.datalake.enabled' = 'true'
);
-- 創(chuàng)建訂單明細(xì)表(日志表)
CREATE TABLE order_items (
item_id BIGINT,
order_id BIGINT,
product_id BIGINT,
quantity INT,
unit_price DECIMAL(10, 2),
discount DECIMAL(5, 2)
) WITH (
'bucket.num' = '8',
'table.datalake.enabled' = 'true'
);
-- 創(chuàng)建產(chǎn)品表(主鍵表)
CREATE TABLE products (
product_id BIGINT,
name STRING,
category STRING,
brand STRING,
price DECIMAL(10, 2),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
-- 創(chuàng)建客戶表(主鍵表)
CREATE TABLE customers (
customer_id BIGINT,
name STRING,
email STRING,
registration_time TIMESTAMP,
vip_level INT,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
-- 創(chuàng)建實(shí)時銷售指標(biāo)表(主鍵表)
CREATE TABLE sales_metrics (
window_start TIMESTAMP,
window_end TIMESTAMP,
category STRING,
total_orders BIGINT,
total_sales DECIMAL(15, 2),
PRIMARY KEY (window_start, window_end, category) NOT ENFORCED
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
步驟5:數(shù)據(jù)處理流程
使用Flink SQL實(shí)現(xiàn)數(shù)據(jù)處理流程:
-- 1. 計算實(shí)時銷售指標(biāo)(每5分鐘更新一次)
INSERT INTO sales_metrics
SELECT
TUMBLE_START(o.order_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(o.order_time, INTERVAL '5' MINUTE) AS window_end,
p.category,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(i.quantity * i.unit_price * (1 - i.discount/100)) AS total_sales
FROM orders o
JOIN order_items i ON o.order_id = i.order_id
JOIN products p ON i.product_id = p.product_id
WHERE o.status = 'COMPLETED'
GROUP BY
TUMBLE(o.order_time, INTERVAL '5' MINUTE),
p.category;
-- 2. 創(chuàng)建VIP客戶實(shí)時推薦視圖
CREATE VIEW vip_recommendations AS
SELECT
c.customer_id,
c.name,
p.category,
p.brand,
COUNT(*) AS purchase_count
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN order_items i ON o.order_id = i.order_id
JOIN products p ON i.product_id = p.product_id
WHERE
c.vip_level >= 3 AND
o.order_time > CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY
c.customer_id, c.name, p.category, p.brand
HAVING COUNT(*) >= 2;
步驟6:實(shí)時儀表盤查詢
使用聯(lián)合讀取查詢實(shí)時銷售指標(biāo):
-- 實(shí)時銷售趨勢(聯(lián)合讀?。?
SELECT
window_start,
category,
total_sales
FROM sales_metrics
WHERE
window_start >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
ORDER BY
window_start DESC, total_sales DESC;
步驟7:歷史分析查詢
使用Paimon查詢歷史數(shù)據(jù)分析:
-- 歷史銷售分析(僅Paimon數(shù)據(jù))
SELECT
p.category,
MONTH(o.order_time) AS month,
SUM(i.quantity * i.unit_price * (1 - i.discount/100)) AS total_sales
FROM orders$lake o
JOIN order_items$lake i ON o.order_id = i.order_id
JOIN products$lake p ON i.product_id = p.product_id
WHERE
o.order_time >= TIMESTAMP '2025-01-01 00:00:00' AND
o.order_time < TIMESTAMP '2025-05-01 00:00:00'
GROUP BY
p.category, MONTH(o.order_time)
ORDER BY
p.category, month;
這種查詢只訪問Paimon中的歷史數(shù)據(jù),提供更好的查詢性能,適合大規(guī)模歷史數(shù)據(jù)分析。
5. 流式湖倉一體化的關(guān)鍵技術(shù)點(diǎn)
(1) 統(tǒng)一元數(shù)據(jù)管理
在傳統(tǒng)架構(gòu)中,流存儲系統(tǒng)(如Kafka)和湖倉存儲解決方案(如Paimon)作為獨(dú)立實(shí)體運(yùn)行,各自維護(hù)自己的元數(shù)據(jù)。這給計算引擎(如Flink)帶來了兩個主要挑戰(zhàn):
雙重目錄:用戶需要創(chuàng)建和管理兩個獨(dú)立的目錄——一個用于流存儲,另一個用于湖存儲
手動切換:訪問數(shù)據(jù)需要手動在目錄之間切換,以確定是查詢流存儲還是湖存儲,導(dǎo)致操作復(fù)雜性和效率低下
而在Fluss中,雖然Fluss和Paimon仍然維護(hù)獨(dú)立的元數(shù)據(jù),但它們向計算引擎(如Flink)公開統(tǒng)一的目錄和單一的表抽象。這種統(tǒng)一方法提供了幾個關(guān)鍵優(yōu)勢:
- 簡化數(shù)據(jù)訪問:用戶可以通過單一目錄無縫訪問湖倉存儲(Paimon)和流存儲(Fluss),無需管理或在獨(dú)立目錄之間切換
- 集成查詢:統(tǒng)一表抽象允許直接訪問Fluss中的實(shí)時數(shù)據(jù)和Paimon中的歷史數(shù)據(jù)
- 操作效率:通過提供一致的接口,該架構(gòu)降低了操作復(fù)雜性,使用戶更容易在單一工作流中處理實(shí)時和歷史數(shù)據(jù)
(2) 數(shù)據(jù)分布對齊
Fluss和Paimon之間的數(shù)據(jù)分布是嚴(yán)格對齊的,這是通過使用相同的分桶算法實(shí)現(xiàn)的:
bucket_id = hash(row) % bucket_num
這種對齊確保了:
- 分層效率:Fluss桶可以直接分層到對應(yīng)的Paimon桶,無需重新分配數(shù)據(jù)
- 數(shù)據(jù)一致性:相同的數(shù)據(jù)在兩個系統(tǒng)中分配到相同的桶,防止數(shù)據(jù)不一致
(3) 檢查點(diǎn)間隔與數(shù)據(jù)新鮮度
在傳統(tǒng)的Paimon架構(gòu)中,數(shù)據(jù)新鮮度受Flink檢查點(diǎn)間隔的限制。例如,使用1分鐘的檢查點(diǎn)間隔時:
而在Fluss流式湖倉架構(gòu)中,數(shù)據(jù)在寫入后立即可見:
6. 流式湖倉一體化的最佳實(shí)踐
(1) 表設(shè)計最佳實(shí)踐
- 合理設(shè)置桶數(shù)量:根據(jù)數(shù)據(jù)量和查詢模式設(shè)置適當(dāng)?shù)耐皵?shù)量,通常為2的冪次方(4、8、16等)
- 選擇合適的主鍵:對于主鍵表,選擇具有良好分布特性的主鍵,避免熱點(diǎn)
- 考慮分區(qū)策略:對于大型表,使用分區(qū)來提高查詢性能和管理數(shù)據(jù)生命周期
(2) 數(shù)據(jù)湖分層服務(wù)配置
- 檢查點(diǎn)間隔:根據(jù)實(shí)時性需求和系統(tǒng)負(fù)載調(diào)整檢查點(diǎn)間隔,通常在10-60秒之間
- 資源分配:為數(shù)據(jù)湖分層服務(wù)分配足夠的資源,確保能夠及時處理數(shù)據(jù)
- 監(jiān)控:定期監(jiān)控分層服務(wù)的性能和延遲,確保數(shù)據(jù)及時壓縮到Paimon
( 3)查詢優(yōu)化
①選擇合適的查詢模式
- 對于需要最新數(shù)據(jù)的查詢,使用聯(lián)合讀取
- 對于大規(guī)模歷史數(shù)據(jù)分析,使用僅Paimon查詢
②過濾下推:在查詢中盡可能早地應(yīng)用過濾條件,利用Paimon的過濾下推能力
③列裁剪:只選擇需要的列,減少數(shù)據(jù)傳輸和處理量