Flink SQL 知其所以然:Deduplication去重以及如何獲取最新狀態(tài)操作
作者:antigeneral了呀
今天我們來學(xué)習(xí) Flink SQL 中的 Deduplication 去重以及如何通過 Deduplication 操作獲取最新的狀態(tài)。

大家好,我是老羊,今天我們來學(xué)習(xí) Flink SQL 中的 Deduplication 去重以及如何通過 Deduplication 操作獲取最新的狀態(tài)。
- Deduplication 定義(支持 Batch\Streaming):Deduplication 其實就是去重,也即上文介紹到的 TopN 中 row_number = 1 的場景,但是這里有一點不一樣在于其排序字段一定是時間屬性列,不能是其他非時間屬性的普通列。在 row_number = 1 時,如果排序字段是普通列 planner 會翻譯成 TopN 算子,如果是時間屬性列 planner 會翻譯成 Deduplication,這兩者最終的執(zhí)行算子是不一樣的,Deduplication 相比 TopN 算子專門做了對應(yīng)的優(yōu)化,性能會有很大提升。
- 應(yīng)用場景:比如上游數(shù)據(jù)發(fā)重了,或者計算 DAU 明細數(shù)據(jù)等場景,都可以使用 Deduplication 語法去做去重。
- SQL 語法標準:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
其中:
- ROW_NUMBER():標識當(dāng)前數(shù)據(jù)的排序值。
- PARTITION BY col1[, col2...]:標識分區(qū)字段,代表按照這個 col 字段作為分區(qū)粒度對數(shù)據(jù)進行排序。
- ORDER BY time_attr [asc|desc]:標識排序規(guī)則,必須為時間戳列,當(dāng)前 Flink SQL 支持處理時間、事件時間,ASC 代表保留第一行,DESC 代表保留最后一行。
- WHERE rownum = 1:這個子句是一定需要的,而且必須為 rownum = 1。
- 實際案例:
博主這里舉兩個案例:
- 案例 1(事件時間):是騰訊 QQ 用戶等級的場景,每一個 QQ 用戶都有一個 QQ 用戶等級,需要求出當(dāng)前用戶等級在星星,月亮,太陽 的用戶數(shù)分別有多少。
-- 數(shù)據(jù)源:當(dāng)每一個用戶的等級初始化及后續(xù)變化的時候的數(shù)據(jù),即用戶等級變化明細數(shù)據(jù)。
CREATE TABLE source_table (
user_id BIGINT COMMENT '用戶 id',
level STRING COMMENT '用戶等級',
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件時間戳',
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.level.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000000'
);
-- 數(shù)據(jù)匯:輸出即每一個等級的用戶數(shù)
CREATE TABLE sink_table (
level STRING COMMENT '等級',
uv BIGINT COMMENT '當(dāng)前等級用戶數(shù)',
row_time timestamp(3) COMMENT '時間戳'
) WITH (
'connector' = 'print'
);
-- 處理邏輯:
INSERT INTO sink_table
select
level
, count(1) as uv
, max(row_time) as row_time
from (
SELECT
user_id,
level,
row_time,
row_number() over(partition by user_id order by row_time) as rn
FROM source_table
)
where rn = 1
group by
level
輸出結(jié)果:
+I[等級 1, 6928, 2021-1-28T22:34]
-I[等級 1, 6928, 2021-1-28T22:34]
+I[等級 1, 8670, 2021-1-28T22:34]
-I[等級 1, 8670, 2021-1-28T22:34]
+I[等級 1, 77287, 2021-1-28T22:34]
...
可以看到其有回撤數(shù)據(jù)。
其對應(yīng)的 SQL 語義如下:
- 數(shù)據(jù)源:消費到 Kafka 中數(shù)據(jù)后,將數(shù)據(jù)按照 partition by 的 key 通過 hash 分發(fā)策略發(fā)送到下游去重算子。
- Deduplication 去重算子:接受到上游數(shù)據(jù)之后,根據(jù) order by 中的條件判斷當(dāng)前的這條數(shù)據(jù)和之前數(shù)據(jù)時間戳大小,以上面案例來說,如果當(dāng)前數(shù)據(jù)時間戳大于之前數(shù)據(jù)時間戳,則撤回之前向下游發(fā)的中間結(jié)果,然后將最新的結(jié)果發(fā)向下游(發(fā)送策略也為 hash,具體的 hash 策略為按照 group by 中 key 進行發(fā)送),如果當(dāng)前數(shù)據(jù)時間戳小于之前數(shù)據(jù)時間戳,則不做操作。次算子產(chǎn)出的結(jié)果就是每一個用戶的對應(yīng)的最新等級信息。
- Group by 聚合算子:接受到上游數(shù)據(jù)之后,根據(jù) Group by 聚合粒度對數(shù)據(jù)進行聚合計算結(jié)果(每一個等級的用戶數(shù)),發(fā)往下游數(shù)據(jù)匯算子。
- 數(shù)據(jù)匯:接收到上游的數(shù)據(jù)之后,然后輸出到外部存儲引擎中。
- 案例 2(處理時間):最原始的日志是明細數(shù)據(jù),需要我們根據(jù)用戶 id 篩選出這個用戶當(dāng)天的第一條數(shù)據(jù),發(fā)往下游,下游可以據(jù)此計算分各種維度的 DAU。
-- 數(shù)據(jù)源:原始日志明細數(shù)據(jù)
CREATE TABLE source_table (
user_id BIGINT COMMENT '用戶 id',
name STRING COMMENT '用戶姓名',
server_timestamp BIGINT COMMENT '用戶訪問時間戳',
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.server_timestamp.min' = '1',
'fields.server_timestamp.max' = '100000'
);
-- 數(shù)據(jù)匯:根據(jù) user_id 去重的第一條數(shù)據(jù)
CREATE TABLE sink_table (
user_id BIGINT,
name STRING,
server_timestamp BIGINT
) WITH (
'connector' = 'print'
);
-- 處理邏輯:
INSERT INTO sink_table
select user_id,
name,
server_timestamp
from (
SELECT
user_id,
name,
server_timestamp,
row_number() over(partition by user_id order by proctime) as rn
FROM source_table
)
where rn = 1
輸出結(jié)果:
+I[1, 用戶 1, 2021-1-28T22:34]
+I[2, 用戶 2, 2021-1-28T22:34]
+I[3, 用戶 3, 2021-1-28T22:34]
...
可以看到這個處理邏輯是沒有回撤數(shù)據(jù)的。其對應(yīng)的 SQL 語義如下:
- 數(shù)據(jù)源:消費到 Kafka 中數(shù)據(jù)后,將數(shù)據(jù)按照 partition by 的 key 通過 hash 分發(fā)策略發(fā)送到下游去重算子。
- Deduplication 去重算子:處理時間語義下,如果是當(dāng)前 key 的第一條數(shù)據(jù),則直接發(fā)往下游,如果判斷(根據(jù) state 中是否存儲過改 key)不是第一條,則直接丟棄。
- 數(shù)據(jù)匯:接收到上游的數(shù)據(jù)之后,然后輸出到外部存儲引擎中。
注意:
在 Deduplication 關(guān)于是否會出現(xiàn)回撤流,博主總結(jié)如下:
- ? Order by 事件時間 DESC:會出現(xiàn)回撤流,因為當(dāng)前 key 下可能會有 比當(dāng)前事件時間還大的數(shù)據(jù)。
- ? Order by 事件時間 ASC:會出現(xiàn)回撤流,因為當(dāng)前 key 下可能會有 比當(dāng)前事件時間還小的數(shù)據(jù)。
- ? Order by 處理時間 DESC:會出現(xiàn)回撤流,因為當(dāng)前 key 下可能會有 比當(dāng)前處理時間還大的數(shù)據(jù)。
- ? Order by 處理時間 ASC:不會出現(xiàn)回撤流,因為當(dāng)前 key 下不可能會有 比當(dāng)前處理時間還小的數(shù)據(jù)。
責(zé)任編輯:姜華
來源:
大數(shù)據(jù)羊說































