偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Flink SQL 知其所以然:基礎(chǔ) DML SQL 執(zhí)行語(yǔ)義!

數(shù)據(jù)庫(kù) 其他數(shù)據(jù)庫(kù)
如果這個(gè) SQL 放在 Hive 中執(zhí)行時(shí),假設(shè)其中 Orders 為 Hive 表,target_table 也為 Hive 表,其也會(huì)生成三個(gè)類(lèi)似的算子(雖然實(shí)際可能會(huì)被優(yōu)化為一個(gè)算子,這里為了方便對(duì)比,劃分為三個(gè)進(jìn)行介紹),離線和實(shí)時(shí)任務(wù)的執(zhí)行方式完全不同。

1.DML:With 子句?

  • 應(yīng)用場(chǎng)景(支持 Batch\Streaming):With 語(yǔ)句和離線 Hive SQL With 語(yǔ)句一樣的,xdm,語(yǔ)法糖 +1,使用它可以讓你的代碼邏輯更加清晰。
  • 直接上案例:
-- 語(yǔ)法糖+1
WITH orders_with_total AS (
SELECT
order_id
, price + tax AS total
FROM Orders
)
SELECT
order_id
, SUM(total)
FROM orders_with_total
GROUP BY
order_id;

2.DML:SELECT & WHERE 子句?

INSERT INTO target_table
SELECT * FROM Orders

INSERT INTO target_table
SELECT order_id, price + tax FROM Orders

INSERT INTO target_table
-- 自定義 Source 的數(shù)據(jù)
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)

INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10

-- 使用 UDF 做字段標(biāo)準(zhǔn)化處理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 過(guò)濾條件
Where id > 3
  • SQL 語(yǔ)義:

其實(shí)理解一個(gè) SQL 最后生成的任務(wù)是怎樣執(zhí)行的,最好的方式就是理解其語(yǔ)義。

以下面的 SQL 為例,我們來(lái)介紹下其在離線中和在實(shí)時(shí)中執(zhí)行的區(qū)別,對(duì)比學(xué)習(xí)一下,大家就比較清楚了

INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3

這個(gè) SQL 對(duì)應(yīng)的實(shí)時(shí)任務(wù),假設(shè) Orders 為 kafka,target_table 也為 Kafka,在執(zhí)行時(shí),會(huì)生成三個(gè)算子:

  • 數(shù)據(jù)源算子(From Order):連接到 Kafka topic,數(shù)據(jù)源算子一直運(yùn)行,實(shí)時(shí)的從 Order Kafka 中一條一條的讀取數(shù)據(jù),然后一條一條發(fā)送給下游的 過(guò)濾和字段標(biāo)準(zhǔn)化算子。
  • 過(guò)濾和字段標(biāo)準(zhǔn)化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子發(fā)的一條一條的數(shù)據(jù),然后判斷 id > 3?將判斷結(jié)果為 true 的數(shù)據(jù)執(zhí)行 PRETTY_PRINT UDF 后,一條一條將計(jì)算結(jié)果數(shù)據(jù)發(fā)給下游 數(shù)據(jù)匯算子
  • 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫(xiě)入到 target_table Kafka 中。

可以看到這個(gè)實(shí)時(shí)任務(wù)的所有算子是以一種 pipeline 模式運(yùn)行的,所有的算子在同一時(shí)刻都是處于 running 狀態(tài)的,24 小時(shí)一直在運(yùn)行,實(shí)時(shí)任務(wù)中也沒(méi)有離線中常見(jiàn)的分區(qū)概念。

select & where

關(guān)于看如何看一段 Flink SQL 最終的執(zhí)行計(jì)劃:

最好的方法就如上圖,看 Flink web ui 的算子圖,算子圖上詳細(xì)的標(biāo)記清楚了每一個(gè)算子做的事情。以上圖來(lái)說(shuō),我們可以看到主要有三個(gè)算子:

  • Source 算子:Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) ,其中 Source 表名稱(chēng)為 table=[[default_catalog, default_database, Orders],字段為 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time],Watermark 策略為 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]。
  • 過(guò)濾算子:Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id]),其中過(guò)濾條件為 where=[(order_id > 3)],結(jié)果字段為 select=[order_id, name, row_time]
  • Sink 算子:Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time]),其中最終產(chǎn)出的表名稱(chēng)為 table=[default_catalog.default_database.target_table],表字段為 fields=[order_id, name, row_time]

可以看到 Flink SQL 具體執(zhí)行了哪些操作是非常詳細(xì)的標(biāo)記在算子圖上。所以小伙伴萌一定要學(xué)會(huì)看算子圖,這是掌握 debug、調(diào)優(yōu)前最基礎(chǔ)的一個(gè)技巧。

那么如果這個(gè) SQL 放在 Hive 中執(zhí)行時(shí),假設(shè)其中 Orders 為 Hive 表,target_table 也為 Hive 表,其也會(huì)生成三個(gè)類(lèi)似的算子(雖然實(shí)際可能會(huì)被優(yōu)化為一個(gè)算子,這里為了方便對(duì)比,劃分為三個(gè)進(jìn)行介紹),離線和實(shí)時(shí)任務(wù)的執(zhí)行方式完全不同:

  • 數(shù)據(jù)源算子(From Order):數(shù)據(jù)源從 Order Hive 表(通常都是讀一天、一小時(shí)的分區(qū)數(shù)據(jù))中一次性讀取所有的數(shù)據(jù),然后將讀到的數(shù)據(jù)全部發(fā)給下游 過(guò)濾字段標(biāo)準(zhǔn)化算子,然后 數(shù)據(jù)源算子就運(yùn)行結(jié)束了,釋放資源了
  • 過(guò)濾和字段標(biāo)準(zhǔn)化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子的所有數(shù)據(jù),然后遍歷所有數(shù)據(jù)判斷 id > 3?將判斷結(jié)果為 true 的數(shù)據(jù)執(zhí)行 PRETTY_PRINT UDF 后,將所有數(shù)據(jù)發(fā)給下游 數(shù)據(jù)匯算子,然后 過(guò)濾和字段標(biāo)準(zhǔn)化算子 就運(yùn)行結(jié)束了,釋放資源了
  • 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游的所有數(shù)據(jù),將所有數(shù)據(jù)都寫(xiě)到 target_table Hive 表中,然后整個(gè)任務(wù)就運(yùn)行結(jié)束了,整個(gè)任務(wù)的資源也就都釋放了

可以看到離線任務(wù)的算子是分階段(stage)進(jìn)行運(yùn)行的,每一個(gè) stage 運(yùn)行結(jié)束之后,然后下一個(gè) stage 開(kāi)始運(yùn)行,全部的 stage 運(yùn)行完成之后,這個(gè)離線任務(wù)就跑結(jié)束了。

注意:

很多小伙伴都是之前做過(guò)離線數(shù)倉(cāng)的,熟悉了離線的分區(qū)、計(jì)算任務(wù)定時(shí)調(diào)度運(yùn)行這兩個(gè)概念,所以在最初接觸 Flink SQL 時(shí),會(huì)以為 Flink SQL 實(shí)時(shí)任務(wù)也會(huì)存在這兩個(gè)概念,這里博主做一下解釋。

  • 分區(qū)概念:離線由于能力限制問(wèn)題,通常都是進(jìn)行一批一批的數(shù)據(jù)計(jì)算,每一批數(shù)據(jù)的數(shù)據(jù)量都是有限的集合,這一批一批的數(shù)據(jù)自然的劃分方式就是時(shí)間,比如按小時(shí)、天進(jìn)行劃分分區(qū)。但是 在實(shí)時(shí)任務(wù)中,是沒(méi)有分區(qū)的概念的,實(shí)時(shí)任務(wù)的上游、下游都是無(wú)限的數(shù)據(jù)流。
  • 計(jì)算任務(wù)定時(shí)調(diào)度概念:同上,離線就是由于計(jì)算能力限制,數(shù)據(jù)要一批一批算,一批一批輸入、產(chǎn)出,所以要按照小時(shí)、天定時(shí)的調(diào)度和計(jì)算。但是在實(shí)時(shí)任務(wù)中,是沒(méi)有定時(shí)調(diào)度的概念的,實(shí)時(shí)任務(wù)一旦運(yùn)行起來(lái)就是 24 小時(shí)不間斷,不間斷的處理上游無(wú)限的數(shù)據(jù),不簡(jiǎn)單的產(chǎn)出數(shù)據(jù)給到下游。

3.DML:SELECT DISTINCT 子句

  • 應(yīng)用場(chǎng)景(支持 Batch\Streaming):語(yǔ)句和離線 Hive SQL SELECT DISTINCT 語(yǔ)句一樣的,xdm,用作根據(jù) key 進(jìn)行數(shù)據(jù)去重。
  • 直接上案例:
INSERT into target_table
SELECT
DISTINCT id
FROM Orders
  • SQL 語(yǔ)義:

也是拿離線和實(shí)時(shí)做對(duì)比。

這個(gè) SQL 對(duì)應(yīng)的實(shí)時(shí)任務(wù),假設(shè) Orders 為 kafka,target_table 也為 Kafka,在執(zhí)行時(shí),會(huì)生成三個(gè)算子:

  • 數(shù)據(jù)源算子(From Order):連接到 Kafka topic,數(shù)據(jù)源算子一直運(yùn)行,實(shí)時(shí)的從 Order Kafka 中一條一條的讀取數(shù)據(jù),然后一條一條發(fā)送給下游的 去重算子
  • 去重算子(DISTINCT id):接收到上游算子發(fā)的一條一條的數(shù)據(jù),然后判斷這個(gè) id 之前是否已經(jīng)來(lái)過(guò)了,判斷方式就是使用 Flink 中的 state 狀態(tài),如果狀態(tài)中已經(jīng)有這個(gè) id 了,則說(shuō)明已經(jīng)來(lái)過(guò)了,不往下游算子發(fā),如果狀態(tài)中沒(méi)有這個(gè) id,則說(shuō)明沒(méi)來(lái)過(guò),則往下游算子發(fā),也是一條一條發(fā)給下游 。 數(shù)據(jù)匯算子數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫(xiě)入到target_table Kafka 中

select distinct

注意:

對(duì)于實(shí)時(shí)任務(wù),計(jì)算時(shí)的狀態(tài)可能會(huì)無(wú)限增長(zhǎng)。

狀態(tài)大小取決于不同 key(上述案例為 id 字段)的數(shù)量。為了防止?fàn)顟B(tài)無(wú)限變大,我們可以設(shè)置狀態(tài)的 TTL。但是這可能會(huì)影響查詢(xún)結(jié)果的正確性,比如某個(gè) key 的數(shù)據(jù)過(guò)期從狀態(tài)中刪除了,那么下次再來(lái)這么一個(gè) key,由于在狀態(tài)中找不到,就又會(huì)輸出一遍。

那么如果這個(gè) SQL 放在 Hive 中執(zhí)行時(shí),假設(shè)其中 Orders 為 Hive 表,target_table 也為 Hive 表,其也會(huì)生成三個(gè)相同的算子(雖然可能會(huì)被優(yōu)化為一個(gè)算子,這里為了方便對(duì)比,劃分為三個(gè)進(jìn)行介紹),但是其和實(shí)時(shí)任務(wù)的執(zhí)行方式完全不同:

  • 數(shù)據(jù)源算子(From Order):數(shù)據(jù)源從 Order Hive 表(通常都有天、小時(shí)分區(qū)限制)中一次性讀取所有的數(shù)據(jù),然后將讀到的數(shù)據(jù)全部發(fā)給下游去重算子,然后 數(shù)據(jù)源算子 就運(yùn)行結(jié)束了,釋放資源了。
  • 去重算子(DISTINCT id):接收到上游算子的所有數(shù)據(jù),然后遍歷所有數(shù)據(jù)進(jìn)行去重,將去重完的所有結(jié)果數(shù)據(jù)發(fā)給下游 數(shù)據(jù)匯算子,然后 去重算子就運(yùn)行結(jié)束了,釋放資源了。
  • 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游的所有數(shù)據(jù),將所有數(shù)據(jù)都寫(xiě)到 target_table Hive 中,然后整個(gè)任務(wù)就運(yùn)行結(jié)束了,整個(gè)任務(wù)的資源也就都釋放了。
責(zé)任編輯:武曉燕 來(lái)源: 大數(shù)據(jù)羊說(shuō)
相關(guān)推薦

2022-05-15 09:57:59

Flink SQL時(shí)間語(yǔ)義

2022-05-22 10:02:32

CREATESQL 查詢(xún)SQL DDL

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2021-12-09 06:59:24

FlinkSQL 開(kāi)發(fā)

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類(lèi)型

2022-06-29 09:01:38

FlinkSQL時(shí)間屬性

2021-11-28 11:36:08

SQL Flink Join

2022-08-10 10:05:29

FlinkSQL

2021-11-27 09:03:26

flink join數(shù)倉(cāng)

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-29 22:34:23

滾動(dòng)窗口Flink SQL

2021-12-06 07:15:47

開(kāi)發(fā)Flink SQL

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2021-11-24 08:17:21

Flink SQLCumulate WiSQL

2021-12-13 07:57:47

Flink SQL Flink Hive Udf
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)