偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Flink SQL 知其所以然:TopN、Order By、Limit 操作

數(shù)據(jù)庫 其他數(shù)據(jù)庫
實時任務(wù)中,Order By 子句中必須要有時間屬性字段,并且時間屬性必須為升序時間屬性,即 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND? 或者 WATERMARK FOR rowtime_column AS rowtime_column。

DML:Order By、Limit 子句

大家好,我是老羊,今天我們來學習 Flink SQL 中的 TopN、Order By、Limit 3個操作。

1.Order By 子句

支持 Batch\Streaming,但在實時任務(wù)中一般用的非常少。

實時任務(wù)中,Order By 子句中必須要有時間屬性字段,并且時間屬性必須為升序時間屬性,即 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 或者 WATERMARK FOR rowtime_column AS rowtime_column。

舉例:

CREATE TABLE source_table_1 (
user_id BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);

CREATE TABLE sink_table (
user_id BIGINT
) WITH (
'connector' = 'print'
);

INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Order By row_time, user_id desc

2.Limit 子句

支持 Batch\Streaming,但實時場景一般不使用,但是此處依然舉一個例子:

CREATE TABLE source_table_1 (
user_id BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);

CREATE TABLE sink_table (
user_id BIGINT
) WITH (
'connector' = 'print'
);

INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Limit 3

結(jié)果如下,只有 3 條輸出:

+I[5]
+I[9]
+I[4]

DML:TopN 子句

  • TopN 定義(支持 Batch\Streaming):TopN 其實就是對應(yīng)到離線數(shù)倉中的 row_number(),可以使用 row_number() 對某一個分組的數(shù)據(jù)進行排序
  • 應(yīng)用場景:根據(jù) 某個排序 條件,計算某個分組下的排行榜數(shù)據(jù)
  • SQL 語法標準:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]

ROW_NUMBER():標識 TopN 排序子句

PARTITION BY col1[, col2...]:標識分區(qū)字段,代表按照這個 col 字段作為分區(qū)粒度對數(shù)據(jù)進行排序取 topN,比如下述案例中的partition by key,就是根據(jù)需求中的搜索關(guān)鍵詞(key)做為分區(qū)

ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:標識 TopN 的排序規(guī)則,是按照哪些字段、順序或逆序進行排序

WHERE rownum <= N:這個子句是一定需要的,只有加上了這個子句,F(xiàn)link 才能將其識別為一個 TopN 的查詢,其中 N 代表 TopN 的條目數(shù)

[AND conditions]:其他的限制條件也可以加上

  • 實際案例:取某個搜索關(guān)鍵詞下的搜索熱度前 10 名的詞條數(shù)據(jù)。

輸入數(shù)據(jù)為搜索詞條數(shù)據(jù)的搜索熱度數(shù)據(jù),當搜索熱度發(fā)生變化時,會將變化后的數(shù)據(jù)寫入到數(shù)據(jù)源的 Kafka 中:

數(shù)據(jù)源 schema:

-- 字段名         備注
-- key 搜索關(guān)鍵詞
-- name 搜索熱度名稱
-- search_cnt 熱搜消費熱度(比如 3000)
-- timestamp 消費詞條時間戳

CREATE TABLE source_table (
name BIGINT NOT NULL,
search_cnt BIGINT NOT NULL,
key BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
...
);

-- 數(shù)據(jù)匯 schema:

-- key 搜索關(guān)鍵詞
-- name 搜索熱度名稱
-- search_cnt 熱搜消費熱度(比如 3000)
-- timestamp 消費詞條時間戳

CREATE TABLE sink_table (
key BIGINT,
name BIGINT,
search_cnt BIGINT,
`timestamp` TIMESTAMP(3)
) WITH (
...
);

-- DML 邏輯
INSERT INTO sink_table
SELECT key, name, search_cnt, row_time as `timestamp`
FROM (
SELECT key, name, search_cnt, row_time,
-- 根據(jù)熱搜關(guān)鍵詞 key 作為 partition key,然后按照 search_cnt 倒排取前 100 名
ROW_NUMBER() OVER (PARTITION BY key
ORDER BY search_cnt desc) AS rownum
FROM source_table)
WHERE rownum <= 100

輸出結(jié)果:

-D[關(guān)鍵詞1, 詞條1, 4944]
+I[關(guān)鍵詞1, 詞條1, 8670]
+I[關(guān)鍵詞1, 詞條2, 1735]
-D[關(guān)鍵詞1, 詞條3, 6641]
+I[關(guān)鍵詞1, 詞條3, 6928]
-D[關(guān)鍵詞1, 詞條4, 6312]
+I[關(guān)鍵詞1, 詞條4, 7287]

可以看到輸出數(shù)據(jù)是有回撤數(shù)據(jù)的,為什么會出現(xiàn)回撤,我們來看看 SQL 語義。

  • SQL 語義

上面的 SQL 會翻譯成以下三個算子:

數(shù)據(jù)源:數(shù)據(jù)源即最新的詞條下面的搜索詞的搜索熱度數(shù)據(jù),消費到 Kafka 中數(shù)據(jù)后,按照 partition key 將數(shù)據(jù)進行 hash 分發(fā)到下游排序算子,相同的 key 數(shù)據(jù)將會發(fā)送到一個并發(fā)中

排序算子:為每個 Key 維護了一個 TopN 的榜單數(shù)據(jù),接受到上游的一條數(shù)據(jù)后,如果 TopN 榜單還沒有到達 N 條,則將這條數(shù)據(jù)加入 TopN 榜單后,直接下發(fā)數(shù)據(jù),如果到達 N 條之后,經(jīng)過 TopN 計算,發(fā)現(xiàn)這條數(shù)據(jù)比原有的數(shù)據(jù)排序靠前,那么新的 TopN 排名就會有變化,就變化了的這部分數(shù)據(jù)之前下發(fā)的排名數(shù)據(jù)撤回(即回撤數(shù)據(jù)),然后下發(fā)新的排名數(shù)據(jù)

數(shù)據(jù)匯:接收到上游的數(shù)據(jù)之后,然后輸出到外部存儲引擎中

上面三個算子也是會 24 小時一直運行的。

責任編輯:武曉燕 來源: 大數(shù)據(jù)羊說
相關(guān)推薦

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時間語義

2021-12-09 06:59:24

FlinkSQL 開發(fā)

2022-05-27 09:02:58

SQLHive語義

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類型

2021-11-28 11:36:08

SQL Flink Join

2022-08-10 10:05:29

FlinkSQL

2021-11-27 09:03:26

flink join數(shù)倉

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2022-07-12 09:02:18

Flink SQL去重

2021-12-06 07:15:47

開發(fā)Flink SQL

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2021-11-24 08:17:21

Flink SQLCumulate WiSQL

2018-08-27 06:30:49

InnoDBMySQLMyISAM
點贊
收藏

51CTO技術(shù)棧公眾號