Flink SQL 知其所以然:基礎(chǔ) DML SQL 執(zhí)行語(yǔ)義!
1.DML:With 子句?
- 應(yīng)用場(chǎng)景(支持 Batch\Streaming):With 語(yǔ)句和離線 Hive SQL With 語(yǔ)句一樣的,xdm,語(yǔ)法糖 +1,使用它可以讓你的代碼邏輯更加清晰。
- 直接上案例:
-- 語(yǔ)法糖+1
WITH orders_with_total AS (
SELECT
order_id
, price + tax AS total
FROM Orders
)
SELECT
order_id
, SUM(total)
FROM orders_with_total
GROUP BY
order_id;
2.DML:SELECT & WHERE 子句?
INSERT INTO target_table
SELECT * FROM Orders
INSERT INTO target_table
SELECT order_id, price + tax FROM Orders
INSERT INTO target_table
-- 自定義 Source 的數(shù)據(jù)
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)
INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10
-- 使用 UDF 做字段標(biāo)準(zhǔn)化處理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 過(guò)濾條件
Where id > 3
- SQL 語(yǔ)義:
其實(shí)理解一個(gè) SQL 最后生成的任務(wù)是怎樣執(zhí)行的,最好的方式就是理解其語(yǔ)義。
以下面的 SQL 為例,我們來(lái)介紹下其在離線中和在實(shí)時(shí)中執(zhí)行的區(qū)別,對(duì)比學(xué)習(xí)一下,大家就比較清楚了。
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3
這個(gè) SQL 對(duì)應(yīng)的實(shí)時(shí)任務(wù),假設(shè) Orders 為 kafka,target_table 也為 Kafka,在執(zhí)行時(shí),會(huì)生成三個(gè)算子:
- 數(shù)據(jù)源算子(From Order):連接到 Kafka topic,數(shù)據(jù)源算子一直運(yùn)行,實(shí)時(shí)的從 Order Kafka 中一條一條的讀取數(shù)據(jù),然后一條一條發(fā)送給下游的 過(guò)濾和字段標(biāo)準(zhǔn)化算子。
- 過(guò)濾和字段標(biāo)準(zhǔn)化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子發(fā)的一條一條的數(shù)據(jù),然后判斷 id > 3?將判斷結(jié)果為 true 的數(shù)據(jù)執(zhí)行 PRETTY_PRINT UDF 后,一條一條將計(jì)算結(jié)果數(shù)據(jù)發(fā)給下游 數(shù)據(jù)匯算子。
- 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫(xiě)入到 target_table Kafka 中。
可以看到這個(gè)實(shí)時(shí)任務(wù)的所有算子是以一種 pipeline 模式運(yùn)行的,所有的算子在同一時(shí)刻都是處于 running 狀態(tài)的,24 小時(shí)一直在運(yùn)行,實(shí)時(shí)任務(wù)中也沒(méi)有離線中常見(jiàn)的分區(qū)概念。
select & where
關(guān)于看如何看一段 Flink SQL 最終的執(zhí)行計(jì)劃:
最好的方法就如上圖,看 Flink web ui 的算子圖,算子圖上詳細(xì)的標(biāo)記清楚了每一個(gè)算子做的事情。以上圖來(lái)說(shuō),我們可以看到主要有三個(gè)算子:
- Source 算子:Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) ,其中 Source 表名稱(chēng)為 table=[[default_catalog, default_database, Orders],字段為 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time],Watermark 策略為 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]。
- 過(guò)濾算子:Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id]),其中過(guò)濾條件為 where=[(order_id > 3)],結(jié)果字段為 select=[order_id, name, row_time]
- Sink 算子:Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time]),其中最終產(chǎn)出的表名稱(chēng)為 table=[default_catalog.default_database.target_table],表字段為 fields=[order_id, name, row_time]。
可以看到 Flink SQL 具體執(zhí)行了哪些操作是非常詳細(xì)的標(biāo)記在算子圖上。所以小伙伴萌一定要學(xué)會(huì)看算子圖,這是掌握 debug、調(diào)優(yōu)前最基礎(chǔ)的一個(gè)技巧。
那么如果這個(gè) SQL 放在 Hive 中執(zhí)行時(shí),假設(shè)其中 Orders 為 Hive 表,target_table 也為 Hive 表,其也會(huì)生成三個(gè)類(lèi)似的算子(雖然實(shí)際可能會(huì)被優(yōu)化為一個(gè)算子,這里為了方便對(duì)比,劃分為三個(gè)進(jìn)行介紹),離線和實(shí)時(shí)任務(wù)的執(zhí)行方式完全不同:
- 數(shù)據(jù)源算子(From Order):數(shù)據(jù)源從 Order Hive 表(通常都是讀一天、一小時(shí)的分區(qū)數(shù)據(jù))中一次性讀取所有的數(shù)據(jù),然后將讀到的數(shù)據(jù)全部發(fā)給下游 過(guò)濾字段標(biāo)準(zhǔn)化算子,然后 數(shù)據(jù)源算子就運(yùn)行結(jié)束了,釋放資源了。
- 過(guò)濾和字段標(biāo)準(zhǔn)化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子的所有數(shù)據(jù),然后遍歷所有數(shù)據(jù)判斷 id > 3?將判斷結(jié)果為 true 的數(shù)據(jù)執(zhí)行 PRETTY_PRINT UDF 后,將所有數(shù)據(jù)發(fā)給下游 數(shù)據(jù)匯算子,然后 過(guò)濾和字段標(biāo)準(zhǔn)化算子 就運(yùn)行結(jié)束了,釋放資源了。
- 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游的所有數(shù)據(jù),將所有數(shù)據(jù)都寫(xiě)到 target_table Hive 表中,然后整個(gè)任務(wù)就運(yùn)行結(jié)束了,整個(gè)任務(wù)的資源也就都釋放了。
可以看到離線任務(wù)的算子是分階段(stage)進(jìn)行運(yùn)行的,每一個(gè) stage 運(yùn)行結(jié)束之后,然后下一個(gè) stage 開(kāi)始運(yùn)行,全部的 stage 運(yùn)行完成之后,這個(gè)離線任務(wù)就跑結(jié)束了。
注意:
很多小伙伴都是之前做過(guò)離線數(shù)倉(cāng)的,熟悉了離線的分區(qū)、計(jì)算任務(wù)定時(shí)調(diào)度運(yùn)行這兩個(gè)概念,所以在最初接觸 Flink SQL 時(shí),會(huì)以為 Flink SQL 實(shí)時(shí)任務(wù)也會(huì)存在這兩個(gè)概念,這里博主做一下解釋。
- 分區(qū)概念:離線由于能力限制問(wèn)題,通常都是進(jìn)行一批一批的數(shù)據(jù)計(jì)算,每一批數(shù)據(jù)的數(shù)據(jù)量都是有限的集合,這一批一批的數(shù)據(jù)自然的劃分方式就是時(shí)間,比如按小時(shí)、天進(jìn)行劃分分區(qū)。但是 在實(shí)時(shí)任務(wù)中,是沒(méi)有分區(qū)的概念的,實(shí)時(shí)任務(wù)的上游、下游都是無(wú)限的數(shù)據(jù)流。
- 計(jì)算任務(wù)定時(shí)調(diào)度概念:同上,離線就是由于計(jì)算能力限制,數(shù)據(jù)要一批一批算,一批一批輸入、產(chǎn)出,所以要按照小時(shí)、天定時(shí)的調(diào)度和計(jì)算。但是在實(shí)時(shí)任務(wù)中,是沒(méi)有定時(shí)調(diào)度的概念的,實(shí)時(shí)任務(wù)一旦運(yùn)行起來(lái)就是 24 小時(shí)不間斷,不間斷的處理上游無(wú)限的數(shù)據(jù),不簡(jiǎn)單的產(chǎn)出數(shù)據(jù)給到下游。
3.DML:SELECT DISTINCT 子句
- 應(yīng)用場(chǎng)景(支持 Batch\Streaming):語(yǔ)句和離線 Hive SQL SELECT DISTINCT 語(yǔ)句一樣的,xdm,用作根據(jù) key 進(jìn)行數(shù)據(jù)去重。
- 直接上案例:
INSERT into target_table
SELECT
DISTINCT id
FROM Orders
- SQL 語(yǔ)義:
也是拿離線和實(shí)時(shí)做對(duì)比。
這個(gè) SQL 對(duì)應(yīng)的實(shí)時(shí)任務(wù),假設(shè) Orders 為 kafka,target_table 也為 Kafka,在執(zhí)行時(shí),會(huì)生成三個(gè)算子:
- 數(shù)據(jù)源算子(From Order):連接到 Kafka topic,數(shù)據(jù)源算子一直運(yùn)行,實(shí)時(shí)的從 Order Kafka 中一條一條的讀取數(shù)據(jù),然后一條一條發(fā)送給下游的 去重算子。
- 去重算子(DISTINCT id):接收到上游算子發(fā)的一條一條的數(shù)據(jù),然后判斷這個(gè) id 之前是否已經(jīng)來(lái)過(guò)了,判斷方式就是使用 Flink 中的 state 狀態(tài),如果狀態(tài)中已經(jīng)有這個(gè) id 了,則說(shuō)明已經(jīng)來(lái)過(guò)了,不往下游算子發(fā),如果狀態(tài)中沒(méi)有這個(gè) id,則說(shuō)明沒(méi)來(lái)過(guò),則往下游算子發(fā),也是一條一條發(fā)給下游 。 數(shù)據(jù)匯算子數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫(xiě)入到target_table Kafka 中。
select distinct
注意:
對(duì)于實(shí)時(shí)任務(wù),計(jì)算時(shí)的狀態(tài)可能會(huì)無(wú)限增長(zhǎng)。
狀態(tài)大小取決于不同 key(上述案例為 id 字段)的數(shù)量。為了防止?fàn)顟B(tài)無(wú)限變大,我們可以設(shè)置狀態(tài)的 TTL。但是這可能會(huì)影響查詢(xún)結(jié)果的正確性,比如某個(gè) key 的數(shù)據(jù)過(guò)期從狀態(tài)中刪除了,那么下次再來(lái)這么一個(gè) key,由于在狀態(tài)中找不到,就又會(huì)輸出一遍。
那么如果這個(gè) SQL 放在 Hive 中執(zhí)行時(shí),假設(shè)其中 Orders 為 Hive 表,target_table 也為 Hive 表,其也會(huì)生成三個(gè)相同的算子(雖然可能會(huì)被優(yōu)化為一個(gè)算子,這里為了方便對(duì)比,劃分為三個(gè)進(jìn)行介紹),但是其和實(shí)時(shí)任務(wù)的執(zhí)行方式完全不同:
- 數(shù)據(jù)源算子(From Order):數(shù)據(jù)源從 Order Hive 表(通常都有天、小時(shí)分區(qū)限制)中一次性讀取所有的數(shù)據(jù),然后將讀到的數(shù)據(jù)全部發(fā)給下游去重算子,然后 數(shù)據(jù)源算子 就運(yùn)行結(jié)束了,釋放資源了。
- 去重算子(DISTINCT id):接收到上游算子的所有數(shù)據(jù),然后遍歷所有數(shù)據(jù)進(jìn)行去重,將去重完的所有結(jié)果數(shù)據(jù)發(fā)給下游 數(shù)據(jù)匯算子,然后 去重算子就運(yùn)行結(jié)束了,釋放資源了。
- 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游的所有數(shù)據(jù),將所有數(shù)據(jù)都寫(xiě)到 target_table Hive 中,然后整個(gè)任務(wù)就運(yùn)行結(jié)束了,整個(gè)任務(wù)的資源也就都釋放了。