偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

阿里面試:Fluss 是什么?如何使用 Fluss 構(gòu)建流式湖倉一體?

大數(shù)據(jù)
Fluss是一種支持亞秒級低延遲流式讀寫的流式存儲系統(tǒng)。通過Lakehouse Storage,F(xiàn)luss在Lakehouse之上提供實(shí)時流數(shù)據(jù)服務(wù),實(shí)現(xiàn)了數(shù)據(jù)流和數(shù)據(jù)湖倉的統(tǒng)一。

一、湖倉概述

1. 傳統(tǒng)湖倉架構(gòu)的挑戰(zhàn)

數(shù)據(jù)湖倉(Lakehouse)是一種結(jié)合了數(shù)據(jù)湖的可擴(kuò)展性和成本效益與數(shù)據(jù)倉庫的可靠性和性能的新型開放架構(gòu)。Apache Iceberg、Apache Paimon、Apache Hudi和Delta Lake等知名數(shù)據(jù)湖格式在湖倉架構(gòu)中扮演著關(guān)鍵角色,它們在單一統(tǒng)一平臺內(nèi)促進(jìn)了數(shù)據(jù)存儲、可靠性和分析能力之間的和諧平衡。 

然而,傳統(tǒng)湖倉架構(gòu)面臨著實(shí)時性與分析能力平衡的挑戰(zhàn):

(1) 實(shí)時性與分析效率的矛盾

  • 如果要求低延遲,就需要頻繁寫入和提交,這會產(chǎn)生大量小型Parquet文件,導(dǎo)致讀取效率低下
  • 如果要求讀取效率,就需要積累數(shù)據(jù)直到能寫入大型Parquet文件,但這會引入更高的延遲 

(2) 數(shù)據(jù)新鮮度限制

即使在最佳使用條件下,這些數(shù)據(jù)湖格式通常也只能在分鐘級粒度內(nèi)實(shí)現(xiàn)數(shù)據(jù)新鮮度。

二、流式湖倉一體化

1. 流與湖的統(tǒng)一

Fluss是一種支持亞秒級低延遲流式讀寫的流式存儲系統(tǒng)。通過Lakehouse Storage,F(xiàn)luss在Lakehouse之上提供實(shí)時流數(shù)據(jù)服務(wù),實(shí)現(xiàn)了數(shù)據(jù)流和數(shù)據(jù)湖倉的統(tǒng)一。這不僅為數(shù)據(jù)湖倉帶來了低延遲,還為數(shù)據(jù)流增加了強(qiáng)大的分析能力。 

為了構(gòu)建流式湖倉,F(xiàn)luss維護(hù)了一個分層服務(wù),該服務(wù)將實(shí)時數(shù)據(jù)從Fluss集群壓縮到存儲在Lakehouse Storage中的數(shù)據(jù)湖格式。Fluss集群中的數(shù)據(jù)(流式Arrow格式)針對低延遲讀寫進(jìn)行了優(yōu)化,而Lakehouse中的壓縮數(shù)據(jù)(帶壓縮的Parquet格式)針對強(qiáng)大的分析和長期數(shù)據(jù)存儲進(jìn)行了優(yōu)化。因此,F(xiàn)luss集群中的數(shù)據(jù)作為實(shí)時數(shù)據(jù)層,保留了具有亞秒級新鮮度的數(shù)據(jù);而Lakehouse中的數(shù)據(jù)作為歷史數(shù)據(jù)層,保留了具有分鐘級新鮮度的數(shù)據(jù)。

2. 共享數(shù)據(jù)與共享元數(shù)據(jù)

流式湖倉的核心理念是流和湖倉之間共享數(shù)據(jù)和共享元數(shù)據(jù),避免數(shù)據(jù)重復(fù)和元數(shù)據(jù)不一致。它提供了以下強(qiáng)大功能:

  • 統(tǒng)一元數(shù)據(jù):Fluss為流和湖倉中的數(shù)據(jù)提供統(tǒng)一的表元數(shù)據(jù)。用戶只需處理一個表,但可以訪問實(shí)時流數(shù)據(jù)、歷史數(shù)據(jù)或它們的聯(lián)合。
  • 聯(lián)合讀?。河嬎阋鎸Ρ韴?zhí)行查詢時將讀取實(shí)時流數(shù)據(jù)和湖倉數(shù)據(jù)的聯(lián)合。目前,只有Flink支持聯(lián)合讀取,但更多引擎正在規(guī)劃中。
  • 實(shí)時湖倉:聯(lián)合讀取幫助湖倉從近實(shí)時分析發(fā)展到真正的實(shí)時分析,使企業(yè)能夠從實(shí)時數(shù)據(jù)中獲得更有價值的洞察。
  • 分析流:聯(lián)合讀取幫助數(shù)據(jù)流具備強(qiáng)大的分析能力,這減少了開發(fā)流應(yīng)用程序的復(fù)雜性,簡化了調(diào)試,并允許立即訪問實(shí)時數(shù)據(jù)洞察。
  • 連接到湖倉生態(tài)系統(tǒng):Fluss在將數(shù)據(jù)壓縮到Lakehouse時保持表元數(shù)據(jù)與數(shù)據(jù)湖目錄同步,這允許Spark、StarRocks、Flink、Trino等外部引擎通過連接到數(shù)據(jù)湖目錄直接讀取數(shù)據(jù)。 

3. 數(shù)據(jù)湖集成

(1) Paimon集成

Apache Paimon創(chuàng)新地結(jié)合了湖格式和LSM結(jié)構(gòu),將高效更新引入湖架構(gòu)。 

要將Fluss與Paimon集成,必須啟用lakehouse存儲并將Paimon配置為lakehouse存儲。具體步驟如下:

① 配置Lakehouse存儲

首先,在server.yaml中配置lakehouse存儲:

datalake.format: paimon  


# Paimon目錄配置,假設(shè)使用Filesystem目錄  
datalake.paimon.metastore: filesystem  
datalake.paimon.warehouse: /tmp/paimon_data_warehouse

② 啟動數(shù)據(jù)湖分層服務(wù)

然后,啟動數(shù)據(jù)湖分層服務(wù),將Fluss的數(shù)據(jù)壓縮到lakehouse存儲:

# 切換到Fluss目錄  
cd $FLUSS_HOME  


# 啟動分層服務(wù),假設(shè)rest端點(diǎn)是localhost:8081  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081

③ 為表啟用Lakehouse存儲

要為表啟用lakehouse存儲,必須在創(chuàng)建表時使用選項(xiàng)'table.datalake.enabled' = 'true':

CREATE TABLE datalake_enriched_orders (  
    `order_key` BIGINT,  
    `cust_key` INT NOT NULL,  
    `total_price` DECIMAL(15, 2),  
    `order_date` DATE,  
    `order_priority` STRING,  
    `clerk` STRING,  
    `cust_name` STRING,  
    `cust_phone` STRING,  
    `cust_acctbal` DECIMAL(15, 2),  
    `cust_mktsegment` STRING,  
    `nation_name` STRING,  
    PRIMARY KEY (`order_key`) NOT ENFORCED  
) WITH ('table.datalake.enabled' = 'true');

當(dāng)在Fluss中創(chuàng)建或修改帶有選項(xiàng)'table.datalake.enabled' = 'true'的表時,F(xiàn)luss將創(chuàng)建一個具有相同表路徑的相應(yīng)Paimon表。Paimon表的模式與Fluss表的模式相同,只是在最后附加了兩個額外的列__offset和__timestamp。這兩列用于幫助Fluss客戶端以流式方式消費(fèi)Paimon中的數(shù)據(jù),例如按偏移量/時間戳查找等。 

然后,數(shù)據(jù)湖分層服務(wù)會持續(xù)將數(shù)據(jù)從Fluss壓縮到Paimon。對于主鍵表,它還將生成Paimon格式的變更日志,使您能夠以Paimon方式流式消費(fèi)它。 

(2) Iceberg集成規(guī)劃

目前,F(xiàn)luss支持Paimon作為Lakehouse存儲,更多種類的數(shù)據(jù)湖格式正在規(guī)劃中。 

Fluss社區(qū)正在積極努力增強(qiáng)流和湖倉統(tǒng)一功能,重點(diǎn)關(guān)注以下關(guān)鍵領(lǐng)域:

① 擴(kuò)展聯(lián)合讀取生態(tài)系統(tǒng)

目前,聯(lián)合讀取功能已與Apache Flink集成,實(shí)現(xiàn)了實(shí)時和歷史數(shù)據(jù)的無縫查詢。未來,社區(qū)計劃擴(kuò)展此功能以支持其他查詢引擎,如Apache Spark和StarRocks,進(jìn)一步擴(kuò)大其生態(tài)系統(tǒng)兼容性和采用。

② 多樣化湖存儲格式

目前,F(xiàn)luss支持Apache Paimon作為其主要湖存儲。為了滿足多樣化的用戶需求,社區(qū)計劃添加對更多湖格式的支持,包括Apache Iceberg和Apache Hudi,從而提供更大的靈活性和與更廣泛的湖倉生態(tài)系統(tǒng)的互操作性。

4. 聯(lián)合讀?。║nion Read)

(1) 實(shí)時數(shù)據(jù)與歷史數(shù)據(jù)的無縫結(jié)合

對于帶有選項(xiàng)'table.datalake.enabled' = 'true'的表,有兩部分?jǐn)?shù)據(jù):保留在Fluss中的數(shù)據(jù)和已經(jīng)在Paimon中的數(shù)據(jù)?,F(xiàn)在,您有兩種表視圖:一種是具有分鐘級延遲的Paimon數(shù)據(jù)視圖,一種是聯(lián)合Fluss和Paimon數(shù)據(jù)的完整數(shù)據(jù)視圖,具有秒級延遲。 

Flink使您能夠決定選擇哪種視圖:

  • 僅Paimon意味著更好的分析性能,但數(shù)據(jù)新鮮度較差
  • 結(jié)合Fluss和Paimon意味著更好的數(shù)據(jù)新鮮度,但分析性能下降

① 僅讀取Paimon中的數(shù)據(jù)

要指定讀取Paimon中的數(shù)據(jù),必須使用$lake后綴指定表,以下SQL顯示了如何執(zhí)行此操作:

-- 假設(shè)我們有一個名為`orders`的表  


-- 從paimon讀取  
SELECT COUNT(*) FROM orders$lake;  


-- 我們還可以查詢系統(tǒng)表  
SELECT * FROM orders$lake$snapshots;

當(dāng)在查詢中使用$lake后綴指定表時,它就像一個普通的Paimon表,因此它繼承了Paimon表的所有能力。您可以享受Flink在Paimon上支持/優(yōu)化的所有功能,如查詢系統(tǒng)表、時間旅行等。 

② 聯(lián)合讀取Fluss和Paimon中的數(shù)據(jù)

要指定讀取聯(lián)合Fluss和Paimon的完整數(shù)據(jù),只需像查詢普通表一樣查詢它,無需任何后綴或其他內(nèi)容,以下SQL顯示了如何執(zhí)行此操作:

-- 查詢將聯(lián)合Fluss和Paimon的數(shù)據(jù)  
SELECT SUM(order_count) as total_orders FROM ads_nation_purchase_power;

查詢可能看起來比只查詢Paimon中的數(shù)據(jù)慢,但它查詢了完整數(shù)據(jù),這意味著更好的數(shù)據(jù)新鮮度。您可以多次運(yùn)行查詢,由于數(shù)據(jù)持續(xù)寫入表中,每次運(yùn)行都應(yīng)該得到不同的結(jié)果。 

(2) 查詢引擎支持

① Flink支持

Fluss向Flink用戶公開統(tǒng)一的API,允許他們選擇是使用聯(lián)合讀取還是僅在Lakehouse上進(jìn)行讀取,使用以下SQL:

SELECT * FROM orders

這將讀取orders表的完整數(shù)據(jù),F(xiàn)link將聯(lián)合讀取Fluss和Lakehouse中的數(shù)據(jù)。如果用戶只需要讀取數(shù)據(jù)湖上的數(shù)據(jù),可以在要讀取的表后添加$lake后綴。SQL如下:

-- 分析查詢  
SELECT COUNT(*), MAX(t), SUM(amount)   
FROM orders$lake  


-- 查詢系統(tǒng)表  
SELECT * FROM orders$lake$snapshots

5. 實(shí)時湖倉與分析流

(1) 為湖倉帶來真正的實(shí)時性

傳統(tǒng)的數(shù)據(jù)湖倉架構(gòu)通常只能提供分鐘級的數(shù)據(jù)新鮮度,這對于許多實(shí)時分析場景來說是不夠的。Fluss的流式湖倉一體化架構(gòu)通過聯(lián)合讀?。║nion Read)機(jī)制,為湖倉帶來了真正的實(shí)時性:

  • 秒級數(shù)據(jù)新鮮度:通過將Fluss的實(shí)時流數(shù)據(jù)與Paimon的歷史數(shù)據(jù)聯(lián)合起來,查詢可以訪問最新寫入的數(shù)據(jù),實(shí)現(xiàn)秒級數(shù)據(jù)新鮮度。
  • 實(shí)時分析能力增強(qiáng):企業(yè)可以在保持湖倉強(qiáng)大分析能力的同時,獲得實(shí)時數(shù)據(jù)洞察,使決策更加及時和準(zhǔn)確。
  • 無需數(shù)據(jù)復(fù)制:不需要將數(shù)據(jù)復(fù)制到單獨(dú)的實(shí)時系統(tǒng)中,減少了存儲成本和數(shù)據(jù)一致性問題。

(2) 為數(shù)據(jù)流增加強(qiáng)大的分析能力

傳統(tǒng)的流處理系統(tǒng)通常缺乏強(qiáng)大的分析能力,而Fluss通過與湖倉的集成,為數(shù)據(jù)流增加了這些能力:

  • 復(fù)雜分析查詢支持:流數(shù)據(jù)可以與歷史數(shù)據(jù)結(jié)合,支持更復(fù)雜的分析查詢,如聚合、窗口分析等。
  • 開發(fā)簡化:減少了開發(fā)流應(yīng)用程序的復(fù)雜性,簡化了調(diào)試過程。 
  • 生態(tài)系統(tǒng)集成:可以利用現(xiàn)有的湖倉生態(tài)系統(tǒng)工具和查詢引擎,如Spark、StarRocks、Trino等。

6. 流式湖倉一體化實(shí)施流程

下面是實(shí)施Fluss流式湖倉一體化的詳細(xì)步驟和流程圖:

(1) 準(zhǔn)備環(huán)境

確保已安裝Fluss和Flink,并且兩者都正常運(yùn)行。Fluss需要一個運(yùn)行中的Flink集群來執(zhí)行數(shù)據(jù)湖分層服務(wù)。

(2) 配置Lakehouse存儲

在Fluss的server.yaml配置文件中設(shè)置Lakehouse存儲:

# 指定數(shù)據(jù)湖格式為Paimon  
datalake.format: paimon  


# Paimon目錄配置  
datalake.paimon.metastore: filesystem  
datalake.paimon.warehouse: /path/to/paimon/warehouse

(3) 啟動數(shù)據(jù)湖分層服務(wù)

數(shù)據(jù)湖分層服務(wù)是一個Flink作業(yè),負(fù)責(zé)將數(shù)據(jù)從Fluss壓縮到Paimon:

# 切換到Fluss安裝目錄  
cd $FLUSS_HOME  


# 啟動分層服務(wù),指定Flink REST端點(diǎn)  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081

您還可以設(shè)置其他Flink配置參數(shù),例如:

# 設(shè)置檢查點(diǎn)間隔為10秒  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=10s  


# 僅同步特定數(shù)據(jù)庫中的表  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D database=fluss_\\w+

(4) 創(chuàng)建啟用Lakehouse的表

使用Flink SQL創(chuàng)建啟用了Lakehouse的表,通過設(shè)置'table.datalake.enabled' = 'true'選項(xiàng):

-- 創(chuàng)建一個啟用了Lakehouse的日志表  
CREATE TABLE log_orders (  
    order_id BIGINT,  
    customer_id BIGINT,  
    order_time TIMESTAMP,  
    amount DECIMAL(10, 2),  
    status STRING  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);  


-- 創(chuàng)建一個啟用了Lakehouse的主鍵表  
CREATE TABLE pk_orders (  
    order_id BIGINT,  
    customer_id BIGINT,  
    order_time TIMESTAMP,  
    amount DECIMAL(10, 2),  
    status STRING,  
    PRIMARY KEY (order_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);

當(dāng)創(chuàng)建啟用了Lakehouse的表時,F(xiàn)luss會自動創(chuàng)建一個對應(yīng)的Paimon表,并在表末尾添加兩個額外的列__offset和__timestamp,用于支持流式消費(fèi)。

(5) 數(shù)據(jù)寫入

使用標(biāo)準(zhǔn)的Flink SQL向表中寫入數(shù)據(jù):

-- 插入數(shù)據(jù)到日志表  
INSERT INTO log_orders  
VALUES (1001, 101, TIMESTAMP '2025-05-18 10:30:00', 199.99, 'PENDING');  


-- 插入數(shù)據(jù)到主鍵表  
INSERT INTO pk_orders  
VALUES (1001, 101, TIMESTAMP '2025-05-18 10:30:00', 199.99, 'PENDING');  


-- 或者從其他表插入數(shù)據(jù)  
INSERT INTO pk_orders  
SELECT order_id, customer_id, order_time, amount, status  
FROM source_table;

數(shù)據(jù)寫入后,會首先存儲在Fluss的實(shí)時層中,然后由數(shù)據(jù)湖分層服務(wù)定期壓縮到Paimon中。對于主鍵表,還會生成Paimon格式的變更日志。

(6) 數(shù)據(jù)查詢

① 聯(lián)合讀取(實(shí)時+歷史數(shù)據(jù))

要查詢完整的數(shù)據(jù)(Fluss中的實(shí)時數(shù)據(jù)和Paimon中的歷史數(shù)據(jù)),直接查詢表名,無需任何后綴:

-- 聯(lián)合讀取實(shí)時和歷史數(shù)據(jù)  
SELECT COUNT(*) FROM pk_orders;  


-- 流式查詢,會持續(xù)獲取最新數(shù)據(jù)  
SELECT * FROM pk_orders WHERE amount > 100;

聯(lián)合讀取提供秒級數(shù)據(jù)新鮮度,但可能會比僅查詢Paimon數(shù)據(jù)慢一些。由于數(shù)據(jù)持續(xù)寫入,多次運(yùn)行相同的查詢可能會得到不同的結(jié)果。

② 僅讀取Lakehouse數(shù)據(jù)

要僅查詢Paimon中的歷史數(shù)據(jù),在表名后添加$lake后綴:

-- 僅查詢Paimon中的歷史數(shù)據(jù)  
SELECT COUNT(*) FROM pk_orders$lake;  


-- 查詢Paimon系統(tǒng)表  
SELECT * FROM pk_orders$lake$snapshots;  


-- 使用Paimon的時間旅行功能  
SELECT * FROM pk_orders$lake FOR TIMESTAMP AS OF '2025-05-17 00:00:00';

僅讀取Paimon數(shù)據(jù)提供更好的分析性能,但數(shù)據(jù)新鮮度較差(分鐘級)。您可以使用Paimon的所有功能,如查詢系統(tǒng)表、時間旅行等。

7. 流式湖倉一體化架構(gòu)圖

三、實(shí)際應(yīng)用場景示例

場景1:實(shí)時儀表盤

一個電子商務(wù)平臺需要一個實(shí)時儀表盤,顯示最新的銷售數(shù)據(jù)和趨勢。

實(shí)現(xiàn)方式:

  • 創(chuàng)建啟用Lakehouse的訂單表
  • 使用Flink SQL進(jìn)行聯(lián)合讀取,獲取最新的銷售數(shù)據(jù)
  • 將結(jié)果推送到儀表盤應(yīng)用
-- 創(chuàng)建啟用Lakehouse的訂單表  
CREATE TABLE sales_orders (  
    order_id BIGINT,  
    product_id BIGINT,  
    customer_id BIGINT,  
    order_time TIMESTAMP,  
    amount DECIMAL(10, 2),  
    PRIMARY KEY (order_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '8',  
    'table.datalake.enabled' = 'true'  
);  


-- 實(shí)時計算最近1小時的銷售額  
SELECT   
    TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,  
    SUM(amount) AS total_sales  
FROM sales_orders  
WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR  
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);

場景2:歷史數(shù)據(jù)分析與實(shí)時數(shù)據(jù)結(jié)合

一個金融機(jī)構(gòu)需要分析客戶的歷史交易模式,并將其與實(shí)時交易數(shù)據(jù)結(jié)合,以檢測潛在的欺詐行為。

實(shí)現(xiàn)方式:

  • 創(chuàng)建啟用Lakehouse的交易表
  • 使用Paimon查詢歷史交易模式
  • 使用聯(lián)合讀取將歷史模式與實(shí)時交易結(jié)合分析
-- 創(chuàng)建啟用Lakehouse的交易表  
CREATE TABLE transactions (  
    transaction_id BIGINT,  
    account_id BIGINT,  
    transaction_time TIMESTAMP,  
    amount DECIMAL(15, 2),  
    merchant_id BIGINT,  
    location STRING,  
    PRIMARY KEY (transaction_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '16',  
    'table.datalake.enabled' = 'true'  
);  


-- 查詢歷史交易模式(僅Paimon數(shù)據(jù))  
SELECT   
    account_id,  
    AVG(amount) AS avg_amount,  
    STDDEV(amount) AS stddev_amount  
FROM transactions$lake  
WHERE transaction_time BETWEEN TIMESTAMP '2025-04-01 00:00:00' AND TIMESTAMP '2025-05-01 00:00:00'  
GROUP BY account_id;
-- 將歷史模式與實(shí)時交易結(jié)合分析(聯(lián)合讀?。? 
WITH historical_patterns AS (  
    SELECT   
        account_id,  
        AVG(amount) AS avg_amount,  
        STDDEV(amount) AS stddev_amount  
    FROM transactions$lake  
    WHERE transaction_time BETWEEN TIMESTAMP '2025-04-01 00:00:00' AND TIMESTAMP '2025-05-01 00:00:00'  
    GROUP BY account_id  
)  
SELECT   
    t.transaction_id,  
    t.account_id,  
    t.transaction_time,  
    t.amount,  
    t.merchant_id,  
    t.location,  
    h.avg_amount,  
    h.stddev_amount,  
    CASE   
        WHEN ABS(t.amount - h.avg_amount) > 3 * h.stddev_amount THEN 'SUSPICIOUS'  
        ELSE 'NORMAL'  
    END AS risk_level  
FROM transactions t  
JOIN historical_patterns h ON t.account_id = h.account_id  
WHERE t.transaction_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR;

場景3:實(shí)時數(shù)據(jù)湖分層架構(gòu)

一個大型零售企業(yè)需要構(gòu)建一個數(shù)據(jù)湖分層架構(gòu)(Bronze、Silver、Gold),同時保持各層的數(shù)據(jù)新鮮度。

實(shí)現(xiàn)方式:

  • 創(chuàng)建各層的啟用Lakehouse的表
  • 使用Flink SQL將數(shù)據(jù)從一層轉(zhuǎn)換到下一層
  • 利用聯(lián)合讀取確保各層都具有秒級數(shù)據(jù)新鮮度

-- Bronze層:創(chuàng)建原始銷售數(shù)據(jù)表  
CREATE TABLE sales_raw (  
    event_time TIMESTAMP,  
    store_id BIGINT,  
    product_id BIGINT,  
    quantity INT,  
    amount DECIMAL(10, 2),  
    payment_method STRING,  
    source STRING  
) WITH (  
    'bucket.num' = '16',  
    'table.datalake.enabled' = 'true'  
);  


-- Silver層:創(chuàng)建清洗后的銷售數(shù)據(jù)表  
CREATE TABLE sales_cleaned (  
    event_time TIMESTAMP,  
    store_id BIGINT,  
    product_id BIGINT,  
    quantity INT,  
    amount DECIMAL(10, 2),  
    payment_method STRING,  
    source STRING,  
    PRIMARY KEY (store_id, product_id, event_time) NOT ENFORCED  
) WITH (  
    'bucket.num' = '16',  
    'table.datalake.enabled' = 'true'  
);  


-- Gold層:創(chuàng)建聚合的銷售指標(biāo)表  
CREATE TABLE sales_aggregated (  
    window_start TIMESTAMP,  
    window_end TIMESTAMP,  
    store_id BIGINT,  
    product_id BIGINT,  
    total_quantity BIGINT,  
    total_amount DECIMAL(15, 2),  
    PRIMARY KEY (window_start, window_end, store_id, product_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '8',  
    'table.datalake.enabled' = 'true'  
);  


-- Silver層ETL:清洗和轉(zhuǎn)換數(shù)據(jù)  
INSERT INTO sales_cleaned  
SELECT   
    event_time,  
    store_id,  
    product_id,  
    CASE WHEN quantity <= 0 THEN 1 ELSE quantity END AS quantity,  
    amount,  
    payment_method,  
    source  
FROM sales_raw  
WHERE amount > 0;  


-- Gold層ETL:聚合數(shù)據(jù)  
INSERT INTO sales_aggregated  
SELECT   
    TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,  
    TUMBLE_END(event_time, INTERVAL '1' HOUR) AS window_end,  
    store_id,  
    product_id,  
    SUM(quantity) AS total_quantity,  
    SUM(amount) AS total_amount  
FROM sales_cleaned  
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), store_id, product_id;

四、流式湖倉一體化的技術(shù)優(yōu)勢

1. 數(shù)據(jù)分布一致性

Fluss和Paimon之間的數(shù)據(jù)分布是嚴(yán)格對齊的。Fluss支持分區(qū)表和桶,其分桶算法與Paimon完全一致。這確保了給定的數(shù)據(jù)始終分配到兩個系統(tǒng)中的相同桶,創(chuàng)建了Fluss桶和Paimon桶之間的一對一對應(yīng)關(guān)系。

這種數(shù)據(jù)分布的一致性提供了兩個顯著優(yōu)勢:

(1) 消除分層過程中的Shuffle開銷

當(dāng)將數(shù)據(jù)從Fluss分層到Paimon格式時:

  • Fluss桶(例如bucket1)可以直接分層到相應(yīng)的Paimon桶(bucket1)
  • 無需讀取Fluss桶中的數(shù)據(jù),計算每條數(shù)據(jù)屬于哪個Paimon桶,然后將其寫入適當(dāng)?shù)腜aimon桶

通過繞過這個中間重新分配步驟,架構(gòu)避免了昂貴的shuffle開銷,顯著提高了壓縮效率。

(2) 防止數(shù)據(jù)不一致

通過在Fluss和Paimon中使用相同的分桶算法,確保了數(shù)據(jù)一致性。該算法計算每條數(shù)據(jù)的桶分配如下:

bucket_id = hash(row) % bucket_num

2. 更高效的數(shù)據(jù)追蹤

在Fluss架構(gòu)中,歷史數(shù)據(jù)存儲在Lakehouse存儲中,而實(shí)時數(shù)據(jù)保留在Fluss中。在流式讀取期間,這種架構(gòu)實(shí)現(xiàn)了歷史和實(shí)時數(shù)據(jù)訪問的無縫結(jié)合:

  • 歷史數(shù)據(jù)訪問:Fluss直接從Lakehouse存儲檢索歷史數(shù)據(jù),利用其固有優(yōu)勢,如:
  • 高效的過濾下推:使查詢引擎能夠在存儲層應(yīng)用過濾條件,減少讀取的數(shù)據(jù)量并提高性能
  • 列裁剪:允許僅檢索必要的列,優(yōu)化數(shù)據(jù)傳輸和查詢效率
  • 高壓縮比:在保持快速檢索速度的同時最小化存儲開銷
  • 實(shí)時數(shù)據(jù)訪問:Fluss同時從自己的存儲中讀取最新的實(shí)時數(shù)據(jù),確保毫秒級的新鮮度

3. 一致的數(shù)據(jù)新鮮度

在構(gòu)建數(shù)據(jù)倉庫時,通常按層組織和管理數(shù)據(jù),如Bronze、Silver和Gold層。當(dāng)數(shù)據(jù)流經(jīng)這些層時,維護(hù)數(shù)據(jù)新鮮度變得至關(guān)重要。

當(dāng)Paimon作為每一層的唯一存儲解決方案時,數(shù)據(jù)可見性取決于Flink檢查點(diǎn)間隔,這會引入累積延遲:

  • 給定層的變更日志僅在Flink檢查點(diǎn)完成后才可見
  • 當(dāng)這個變更日志傳播到后續(xù)層時,數(shù)據(jù)新鮮度延遲隨著每個檢查點(diǎn)間隔而增加

例如,使用1分鐘的Flink檢查點(diǎn)間隔:

  • Bronze層經(jīng)歷1分鐘延遲
  • Silver層增加另一個1分鐘延遲,總計2分鐘
  • Gold層再增加1分鐘延遲,累積為3分鐘延遲

而使用Fluss和Paimon,我們獲得:

  • 即時數(shù)據(jù)可見性:Fluss中的數(shù)據(jù)在攝入后立即可見,無需等待Flink檢查點(diǎn)完成。變更日志立即傳輸?shù)较乱粚?/li>
  • 一致的數(shù)據(jù)新鮮度:所有層的數(shù)據(jù)新鮮度是一致的,以秒為單位,消除了累積延遲

4. 完整實(shí)施示例:電子商務(wù)實(shí)時分析平臺

下面是一個完整的電子商務(wù)實(shí)時分析平臺實(shí)施示例,展示了如何使用Fluss的流式湖倉一體化架構(gòu)構(gòu)建端到端解決方案。

步驟1:環(huán)境準(zhǔn)備

首先,確保已安裝并配置好Fluss和Flink環(huán)境:

# 下載并解壓Fluss  
wget https://github.com/alibaba/fluss/releases/download/v0.6.0/fluss-0.6.0.tgz  
tar -xzf fluss-0.6.0.tgz  
cd fluss-0.6.0  


# 啟動本地Fluss集群  
./bin/start-local.sh  


# 啟動本地Flink集群(用于數(shù)據(jù)湖分層服務(wù))  
cd /path/to/flink  
./bin/start-cluster.sh
步驟2:配置Lakehouse存儲

在Fluss的conf/server.yaml中配置Paimon作為Lakehouse存儲:

datalake.format: paimon  
datalake.paimon.metastore: filesystem  
datalake.paimon.warehouse: /path/to/paimon/warehouse

步驟3:啟動數(shù)據(jù)湖分層服務(wù)

啟動數(shù)據(jù)湖分層服務(wù),將數(shù)據(jù)從Fluss壓縮到Paimon:

cd /path/to/fluss  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=10s

步驟4:創(chuàng)建數(shù)據(jù)模型

使用Flink SQL創(chuàng)建電子商務(wù)分析所需的表結(jié)構(gòu):

-- 創(chuàng)建訂單表(主鍵表)  
CREATE TABLE orders (  
    order_id BIGINT,  
    customer_id BIGINT,  
    order_time TIMESTAMP,  
    total_amount DECIMAL(10, 2),  
    status STRING,  
    PRIMARY KEY (order_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '8',  
    'table.datalake.enabled' = 'true'  
);  


-- 創(chuàng)建訂單明細(xì)表(日志表)  
CREATE TABLE order_items (  
    item_id BIGINT,  
    order_id BIGINT,  
    product_id BIGINT,  
    quantity INT,  
    unit_price DECIMAL(10, 2),  
    discount DECIMAL(5, 2)  
) WITH (  
    'bucket.num' = '8',  
    'table.datalake.enabled' = 'true'  
);  


-- 創(chuàng)建產(chǎn)品表(主鍵表)  
CREATE TABLE products (  
    product_id BIGINT,  
    name STRING,  
    category STRING,  
    brand STRING,  
    price DECIMAL(10, 2),  
    PRIMARY KEY (product_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);  


-- 創(chuàng)建客戶表(主鍵表)  
CREATE TABLE customers (  
    customer_id BIGINT,  
    name STRING,  
    email STRING,  
    registration_time TIMESTAMP,  
    vip_level INT,  
    PRIMARY KEY (customer_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);  


-- 創(chuàng)建實(shí)時銷售指標(biāo)表(主鍵表)  
CREATE TABLE sales_metrics (  
    window_start TIMESTAMP,  
    window_end TIMESTAMP,  
    category STRING,  
    total_orders BIGINT,  
    total_sales DECIMAL(15, 2),  
    PRIMARY KEY (window_start, window_end, category) NOT ENFORCED  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);

步驟5:數(shù)據(jù)處理流程

使用Flink SQL實(shí)現(xiàn)數(shù)據(jù)處理流程:

-- 1. 計算實(shí)時銷售指標(biāo)(每5分鐘更新一次)  
INSERT INTO sales_metrics  
SELECT   
    TUMBLE_START(o.order_time, INTERVAL '5' MINUTE) AS window_start,  
    TUMBLE_END(o.order_time, INTERVAL '5' MINUTE) AS window_end,  
    p.category,  
    COUNT(DISTINCT o.order_id) AS total_orders,  
    SUM(i.quantity * i.unit_price * (1 - i.discount/100)) AS total_sales  
FROM orders o  
JOIN order_items i ON o.order_id = i.order_id  
JOIN products p ON i.product_id = p.product_id  
WHERE o.status = 'COMPLETED'  
GROUP BY   
    TUMBLE(o.order_time, INTERVAL '5' MINUTE),  
    p.category;  


-- 2. 創(chuàng)建VIP客戶實(shí)時推薦視圖  
CREATE VIEW vip_recommendations AS  
SELECT   
    c.customer_id,  
    c.name,  
    p.category,  
    p.brand,  
    COUNT(*) AS purchase_count  
FROM customers c  
JOIN orders o ON c.customer_id = o.customer_id  
JOIN order_items i ON o.order_id = i.order_id  
JOIN products p ON i.product_id = p.product_id  
WHERE   
    c.vip_level >= 3 AND  
    o.order_time > CURRENT_TIMESTAMP - INTERVAL '30' DAY  
GROUP BY   
    c.customer_id, c.name, p.category, p.brand  
HAVING COUNT(*) >= 2;

步驟6:實(shí)時儀表盤查詢

使用聯(lián)合讀取查詢實(shí)時銷售指標(biāo):

-- 實(shí)時銷售趨勢(聯(lián)合讀?。? 
SELECT   
    window_start,  
    category,  
    total_sales  
FROM sales_metrics  
WHERE   
    window_start >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR  
ORDER BY   
    window_start DESC, total_sales DESC;

步驟7:歷史分析查詢

使用Paimon查詢歷史數(shù)據(jù)分析:

-- 歷史銷售分析(僅Paimon數(shù)據(jù))  
SELECT   
    p.category,  
    MONTH(o.order_time) AS month,  
    SUM(i.quantity * i.unit_price * (1 - i.discount/100)) AS total_sales  
FROM orders$lake o  
JOIN order_items$lake i ON o.order_id = i.order_id  
JOIN products$lake p ON i.product_id = p.product_id  
WHERE   
    o.order_time >= TIMESTAMP '2025-01-01 00:00:00' AND  
    o.order_time < TIMESTAMP '2025-05-01 00:00:00'  
GROUP BY   
    p.category, MONTH(o.order_time)  
ORDER BY   
    p.category, month;

這種查詢只訪問Paimon中的歷史數(shù)據(jù),提供更好的查詢性能,適合大規(guī)模歷史數(shù)據(jù)分析。

5. 流式湖倉一體化的關(guān)鍵技術(shù)點(diǎn)

(1) 統(tǒng)一元數(shù)據(jù)管理

在傳統(tǒng)架構(gòu)中,流存儲系統(tǒng)(如Kafka)和湖倉存儲解決方案(如Paimon)作為獨(dú)立實(shí)體運(yùn)行,各自維護(hù)自己的元數(shù)據(jù)。這給計算引擎(如Flink)帶來了兩個主要挑戰(zhàn):

雙重目錄:用戶需要創(chuàng)建和管理兩個獨(dú)立的目錄——一個用于流存儲,另一個用于湖存儲

手動切換:訪問數(shù)據(jù)需要手動在目錄之間切換,以確定是查詢流存儲還是湖存儲,導(dǎo)致操作復(fù)雜性和效率低下

而在Fluss中,雖然Fluss和Paimon仍然維護(hù)獨(dú)立的元數(shù)據(jù),但它們向計算引擎(如Flink)公開統(tǒng)一的目錄和單一的表抽象。這種統(tǒng)一方法提供了幾個關(guān)鍵優(yōu)勢:

  • 簡化數(shù)據(jù)訪問:用戶可以通過單一目錄無縫訪問湖倉存儲(Paimon)和流存儲(Fluss),無需管理或在獨(dú)立目錄之間切換
  • 集成查詢:統(tǒng)一表抽象允許直接訪問Fluss中的實(shí)時數(shù)據(jù)和Paimon中的歷史數(shù)據(jù)
  • 操作效率:通過提供一致的接口,該架構(gòu)降低了操作復(fù)雜性,使用戶更容易在單一工作流中處理實(shí)時和歷史數(shù)據(jù)

(2) 數(shù)據(jù)分布對齊

Fluss和Paimon之間的數(shù)據(jù)分布是嚴(yán)格對齊的,這是通過使用相同的分桶算法實(shí)現(xiàn)的:

bucket_id = hash(row) % bucket_num

這種對齊確保了:

  • 分層效率:Fluss桶可以直接分層到對應(yīng)的Paimon桶,無需重新分配數(shù)據(jù)
  • 數(shù)據(jù)一致性:相同的數(shù)據(jù)在兩個系統(tǒng)中分配到相同的桶,防止數(shù)據(jù)不一致

(3) 檢查點(diǎn)間隔與數(shù)據(jù)新鮮度

在傳統(tǒng)的Paimon架構(gòu)中,數(shù)據(jù)新鮮度受Flink檢查點(diǎn)間隔的限制。例如,使用1分鐘的檢查點(diǎn)間隔時:

而在Fluss流式湖倉架構(gòu)中,數(shù)據(jù)在寫入后立即可見:

6. 流式湖倉一體化的最佳實(shí)踐

(1) 表設(shè)計最佳實(shí)踐

  • 合理設(shè)置桶數(shù)量:根據(jù)數(shù)據(jù)量和查詢模式設(shè)置適當(dāng)?shù)耐皵?shù)量,通常為2的冪次方(4、8、16等)
  • 選擇合適的主鍵:對于主鍵表,選擇具有良好分布特性的主鍵,避免熱點(diǎn)
  • 考慮分區(qū)策略:對于大型表,使用分區(qū)來提高查詢性能和管理數(shù)據(jù)生命周期

(2) 數(shù)據(jù)湖分層服務(wù)配置

  • 檢查點(diǎn)間隔:根據(jù)實(shí)時性需求和系統(tǒng)負(fù)載調(diào)整檢查點(diǎn)間隔,通常在10-60秒之間
  • 資源分配:為數(shù)據(jù)湖分層服務(wù)分配足夠的資源,確保能夠及時處理數(shù)據(jù)
  • 監(jiān)控:定期監(jiān)控分層服務(wù)的性能和延遲,確保數(shù)據(jù)及時壓縮到Paimon

( 3)查詢優(yōu)化

①選擇合適的查詢模式

  • 對于需要最新數(shù)據(jù)的查詢,使用聯(lián)合讀取
  • 對于大規(guī)模歷史數(shù)據(jù)分析,使用僅Paimon查詢

②過濾下推:在查詢中盡可能早地應(yīng)用過濾條件,利用Paimon的過濾下推能力

③列裁剪:只選擇需要的列,減少數(shù)據(jù)傳輸和處理量

責(zé)任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關(guān)推薦

2022-09-29 09:22:33

數(shù)據(jù)倉

2022-12-13 17:42:47

Arctic存儲湖倉

2023-08-30 07:14:27

MaxCompute湖倉一體

2024-09-03 14:59:00

2021-06-07 11:22:38

大數(shù)據(jù)數(shù)據(jù)倉庫湖倉一體

2023-06-28 07:28:36

湖倉騰訊架構(gòu)

2023-12-14 13:01:00

Hudivivo

2020-12-02 17:20:58

數(shù)據(jù)倉庫阿里云數(shù)據(jù)湖

2023-05-16 07:24:25

數(shù)據(jù)湖快手

2023-05-26 06:45:08

2023-06-19 07:13:51

云原生湖倉一體

2021-06-07 10:45:16

大數(shù)據(jù)數(shù)據(jù)倉庫數(shù)據(jù)湖

2021-08-31 10:07:16

Flink Hud數(shù)據(jù)湖阿里云

2024-03-05 08:21:23

湖倉一體數(shù)據(jù)湖數(shù)據(jù)倉庫

2021-07-07 10:13:56

大數(shù)據(jù)Delta Lake 湖倉一體

2021-06-11 14:01:51

數(shù)據(jù)倉庫湖倉一體 Flink

2025-01-21 17:02:14

谷歌多模態(tài)AI
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號