偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

實(shí)時(shí)數(shù)倉(cāng)入門(mén)訓(xùn)練營(yíng):實(shí)時(shí)計(jì)算 Flink 版 SQL 實(shí)踐

數(shù)據(jù)庫(kù)
實(shí)時(shí)計(jì)算Flink版選擇了SQL這種聲明式語(yǔ)言作為頂層API,比較穩(wěn)定,也方便用戶(hù)使用。Flink SQL的應(yīng)用場(chǎng)景也比較廣泛,包括數(shù)據(jù)集成、實(shí)時(shí)報(bào)表、實(shí)時(shí)風(fēng)控,還有在線機(jī)器學(xué)習(xí)等場(chǎng)景。

 內(nèi)容簡(jiǎn)要:
一、實(shí)時(shí)計(jì)算Flink版SQL簡(jiǎn)介
二、實(shí)時(shí)計(jì)算Flink版SQL上手示例
三、開(kāi)發(fā)常見(jiàn)問(wèn)題和解法

實(shí)時(shí)計(jì)算Flink版SQL簡(jiǎn)介
(一)關(guān)于實(shí)時(shí)計(jì)算Flink版SQL

實(shí)時(shí)計(jì)算Flink版選擇了SQL這種聲明式語(yǔ)言作為頂層API,比較穩(wěn)定,也方便用戶(hù)使用。Flink SQL具備流批統(tǒng)一的特性,給用戶(hù)統(tǒng)一的開(kāi)發(fā)體驗(yàn),并且語(yǔ)義一致。另外,F(xiàn)link SQL能夠自動(dòng)優(yōu)化,包括屏蔽流計(jì)算里面State的復(fù)雜性,也提供了自動(dòng)優(yōu)化的Plan,并且還集成了AutoPilot自動(dòng)調(diào)優(yōu)的功能。Flink SQL的應(yīng)用場(chǎng)景也比較廣泛,包括數(shù)據(jù)集成、實(shí)時(shí)報(bào)表、實(shí)時(shí)風(fēng)控,還有在線機(jī)器學(xué)習(xí)等場(chǎng)景。

(二)基本操作

在基本操作上,可以看到SQL的語(yǔ)法和標(biāo)準(zhǔn)SQL非常類(lèi)似。示例中包括了基本的SELECT、FILTER操作。,可以使用內(nèi)置函數(shù),如日期的格式化,也可以使用自定義函數(shù),比如示例中的匯率轉(zhuǎn)換就是一個(gè)用戶(hù)自定義函數(shù),在平臺(tái)上注冊(cè)后就可以直接使用。

(三)維表 Lookup Join

在實(shí)際的數(shù)據(jù)處理過(guò)程中,維表的Lookup Join也是一個(gè)比較常見(jiàn)的例子。

在基本操作上,可以看到SQL的語(yǔ)法和標(biāo)準(zhǔn)SQL非常類(lèi)似。示例中包括了基本的SELECT、FILTER操作。,可以使用內(nèi)置函數(shù),如日期的格式化,也可以使用自定義函數(shù),比如示例中的匯率轉(zhuǎn)換就是一個(gè)用戶(hù)自定義函數(shù),在平臺(tái)上注冊(cè)后就可以直接使用。

(三)維表 Lookup Join

在實(shí)際的數(shù)據(jù)處理過(guò)程中,維表的Lookup Join也是一個(gè)比較常見(jiàn)的例子。

這里展示的是一個(gè)維表INNER JOIN示例。

例子中顯示的SOURCE表是一個(gè)實(shí)時(shí)變化的訂單信息表,它通過(guò)INNER JOIN去關(guān)聯(lián)維表信息,這里標(biāo)黃高亮的就是維表JOIN的語(yǔ)法,可以看到它和傳統(tǒng)的批處理有一個(gè)寫(xiě)法上的差異,多了FOR SYSTEM_TIME AS OF這個(gè)子句來(lái)標(biāo)明它是一個(gè)維表JOIN的操作。SOURCE表每來(lái)一條訂單消息,它都會(huì)觸發(fā)維表算子,去做一次對(duì)維表信息的查詢(xún),所以把它叫做一個(gè)Lookup Join。

(四)Window Aggregation

Window Aggregation(窗口聚合)操作也是常見(jiàn)的操作,F(xiàn)link SQL中內(nèi)置支持了幾種常用的Window類(lèi)型,比如Tumble Window,Session Window,Hop Window,還有新引入的Cumulate Window。

Tumble

Tumble Window可以理解成固定大小的時(shí)間窗口,也叫滾窗,比如說(shuō)5分鐘、10分鐘或者1個(gè)小時(shí)的固定間隔的窗口,窗口之間沒(méi)有重疊。

Session

Session Window(會(huì)話(huà)窗口) 定義了一個(gè)連續(xù)事件的范圍,窗口定義中的一個(gè)參數(shù)叫做Session Gap,表示兩條數(shù)據(jù)的間隔如果超過(guò)定義的時(shí)長(zhǎng),那么前一個(gè)Window就結(jié)束了,同時(shí)生成了一個(gè)新的窗口。

Hop

Hop Window不同于滾動(dòng)窗口的窗口不重疊,滑動(dòng)窗口的窗口之間可以重疊?;瑒?dòng)窗口有兩個(gè)參數(shù):size 和 slide。size 為窗口的大小,slide 為每次滑動(dòng)的步長(zhǎng)。如果slide < size,則窗口會(huì)重疊,同一條數(shù)據(jù)可能會(huì)被分配到多個(gè)窗口;如果 slide = size,則等同于 Tumble Window。如果 slide > size,窗口之間沒(méi)有重疊且有間隙。

Cumulate

Cumulate Window(累積窗口),是Flink社區(qū)1.13版本里新引入的,可以對(duì)比 Hop Window來(lái)理解,區(qū)別是從Window Start開(kāi)始不斷去累積。示例中Window 1、Window 2、Window 3是在不斷地增長(zhǎng)的。它有一個(gè)最大的窗口長(zhǎng)度,比如我們定義Window Size是一天,然后Step步長(zhǎng)是1個(gè)小時(shí),那么它會(huì)在一天中的每個(gè)小時(shí)產(chǎn)生累積到當(dāng)前小時(shí)的聚合結(jié)果。

看一個(gè)具體的Window聚合處理示例。

如上圖所示,比如說(shuō)需要進(jìn)行每5分鐘單個(gè)用戶(hù)的點(diǎn)擊數(shù)統(tǒng)計(jì)。

源數(shù)據(jù)是用戶(hù)的點(diǎn)擊日志,我們期望算出每5分鐘單個(gè)用戶(hù)的點(diǎn)擊總數(shù), SQL 中使用的是社區(qū)最新的 WindowTVF語(yǔ)法,先對(duì)源表開(kāi)窗,再 GROUP BY 窗口對(duì)應(yīng)的屬性 window_start和window_end, COUNT(*)就是點(diǎn)擊數(shù)統(tǒng)計(jì)。

可以看到,當(dāng)處理12:00到12:04的數(shù)據(jù),有2個(gè)用戶(hù)產(chǎn)生了4次點(diǎn)擊,分別能統(tǒng)計(jì)出來(lái)用戶(hù)Mary是3次,Bob是1次。在接下來(lái)一批數(shù)據(jù)里面,又來(lái)了3條數(shù)據(jù),對(duì)應(yīng)地更新到下一個(gè)窗口中,分別是1次和2次。

(五)Group Aggregation

相對(duì)于Window Aggregation來(lái)說(shuō),Group Aggregation直接觸發(fā)計(jì)算,并不需要等到窗口結(jié)束,適用的一個(gè)場(chǎng)景是計(jì)算累積值。

上圖的例子是單個(gè)用戶(hù)累積到當(dāng)前的點(diǎn)擊數(shù)統(tǒng)計(jì)。從Query上看,寫(xiě)法相對(duì)簡(jiǎn)單一點(diǎn),直接 GROUP BY user 去計(jì)算COUNT(*),就是累積計(jì)數(shù)。

可以看到,在結(jié)果上和Window的輸出是有差異的,在與Window相同的前4條輸入數(shù)據(jù),Group Aggregation輸出的結(jié)果是Mary的點(diǎn)擊數(shù)已更新到3次,具體的計(jì)算過(guò)程可能是從1變成2再變成3,Bob是1次,隨著后面3條數(shù)據(jù)的輸入,Bob對(duì)應(yīng)的點(diǎn)擊數(shù)又會(huì)更新成2次,對(duì)結(jié)果是持續(xù)更新的過(guò)程,這和Window的計(jì)算場(chǎng)景是有一些區(qū)別的。

之前Window窗口里面輸出的數(shù)據(jù),在窗口結(jié)束后結(jié)果就不會(huì)再改變,而在Group Aggregation里,同一個(gè)Group Key的結(jié)果是會(huì)產(chǎn)生持續(xù)更新的。

(六)Window Aggregation Vs Group Aggregation

更全面地對(duì)比一下Window和Group Aggregation的一些區(qū)別。

Window Aggregation在輸出模式上是按時(shí)輸出,是在定義的數(shù)據(jù)到期之后它才會(huì)輸出。比如定義5分鐘的窗口,結(jié)果是延遲輸出的,比如00:00~00:05這個(gè)時(shí)間段,它會(huì)等整個(gè)窗口數(shù)據(jù)都到齊之后,才完整輸出出來(lái),并且結(jié)果只輸出一次,不會(huì)再改變。

Group Aggregation是數(shù)據(jù)觸發(fā),比如第一條數(shù)據(jù)來(lái)它就會(huì)輸出結(jié)果,同一個(gè)Key 的第二條數(shù)據(jù)來(lái)結(jié)果會(huì)更新,所以在輸出流的性質(zhì)上兩者也是不一樣的。Window Aggregation一般情況下輸出的是Append Stream,而在Group Aggregation輸出的是Update Stream。

在狀態(tài)State處理上兩者的差異也比較大。Window Aggregation會(huì)自動(dòng)清理過(guò)期數(shù)據(jù),用戶(hù)就不需要額外再去關(guān)注 State的膨脹情況。Group Aggregation是基于無(wú)限的狀態(tài)去做累積,所以需要用戶(hù)根據(jù)自己的計(jì)算場(chǎng)景來(lái)定義State的TTL,就是State保存多久。

比如統(tǒng)計(jì)一天內(nèi)累計(jì)的PV和UV,不考慮數(shù)據(jù)延遲的情況,也至少要保證State的TTL要大于等于一天,這樣才能保證計(jì)算的精確性。如果State的TTL定義成半天,統(tǒng)計(jì)值就可能不準(zhǔn)確了。

對(duì)輸出的存儲(chǔ)要求也是由輸出流的性質(zhì)來(lái)決定的。在Window的輸出上,因?yàn)樗茿ppend流,所有的類(lèi)型都是可以對(duì)接輸出的。而Group Aggregatio輸出了更新流,所以要求目標(biāo)存儲(chǔ)支持更新,可以用Hologres、MySQL或者HBase這些支持更新的存儲(chǔ)。

實(shí)時(shí)計(jì)算 Flink 版SQL上手示例

下面通過(guò)具體的例子來(lái)看每一種SQL操作在真實(shí)的業(yè)務(wù)場(chǎng)景中會(huì)怎么使用,比如SQL基本的語(yǔ)法操作,包括一些常見(jiàn)的Aggregation的使用。

(一)示例場(chǎng)景說(shuō)明:電商交易數(shù)據(jù) - 實(shí)時(shí)數(shù)倉(cāng)場(chǎng)景

這里的例子是電商交易數(shù)據(jù)場(chǎng)景,模擬了實(shí)時(shí)數(shù)倉(cāng)里分層數(shù)據(jù)處理的情況。

在數(shù)據(jù)接入層,我們模擬了電商的交易訂單數(shù)據(jù),它包括了訂單ID,商品ID,用戶(hù)ID,交易金額,商品的葉子類(lèi)目,交易時(shí)間等基本信息,這是一個(gè)簡(jiǎn)化的表。

示例1會(huì)從接入層到數(shù)據(jù)明細(xì)層,完成一個(gè)數(shù)據(jù)清洗工作,此外還會(huì)做類(lèi)目信息的關(guān)聯(lián),然后數(shù)據(jù)的匯總層我們會(huì)演示怎么完成分鐘級(jí)的成交統(tǒng)計(jì)、小時(shí)級(jí)口徑怎么做實(shí)時(shí)成交統(tǒng)計(jì),最后會(huì)介紹下在天級(jí)累積的成交場(chǎng)景上,怎么去做準(zhǔn)實(shí)時(shí)統(tǒng)計(jì)。

- 示例環(huán)境:內(nèi)測(cè)版

演示環(huán)境是目前內(nèi)測(cè)版的實(shí)時(shí)計(jì)算Flink產(chǎn)品,在這個(gè)平臺(tái)可以直接做一站式的作業(yè)開(kāi)發(fā),包括調(diào)試,還有線上的運(yùn)維工作。

- 接入層數(shù)據(jù)

使用 SQL DataGen Connector 生成模擬電商交易數(shù)據(jù)。

接入層數(shù)據(jù):為了方便演示,簡(jiǎn)化了鏈路,用內(nèi)置的SQL DataGen Connector來(lái)模擬電商數(shù)據(jù)的產(chǎn)生。

這里面order_id是設(shè)計(jì)了一個(gè)自增序列,Connector的參數(shù)沒(méi)有完整貼出來(lái)。 DataGen Connector支持幾種生成模式,比如可以用Sequence產(chǎn)生自增序列,Random模式可以模擬隨機(jī)值,這里根據(jù)不同的字段業(yè)務(wù)含義,選擇了不同的生成策略。

比如order_id是自增的,商品ID是隨機(jī)選取了1~10萬(wàn),用戶(hù)ID是1~1000萬(wàn),交易金額用分做單位, cate_id是葉子類(lèi)目ID,這里共模擬100個(gè)葉子類(lèi)目,直接通過(guò)計(jì)算列對(duì)商品ID取余來(lái)生成,訂單創(chuàng)建時(shí)間使用當(dāng)前時(shí)間模擬,這樣就可以在開(kāi)發(fā)平臺(tái)上調(diào)試,而不需要去創(chuàng)建Kafka或者DataHub做接入層的模擬。

(二)示例1-1 數(shù)據(jù)清洗

- 電商交易數(shù)據(jù)-訂單過(guò)濾

這是一個(gè)數(shù)據(jù)清洗的場(chǎng)景,比如需要完成業(yè)務(wù)上的訂單過(guò)濾,業(yè)務(wù)方可能會(huì)對(duì)交易金額有最大最小的異常過(guò)濾,比如要大于1元,小于1萬(wàn)才保留為有效數(shù)據(jù)。

交易的創(chuàng)建時(shí)間是選取某個(gè)時(shí)刻之后的,通過(guò)WHERE條件組合過(guò)濾,就可以完成這個(gè)邏輯。

真實(shí)的業(yè)務(wù)場(chǎng)景可能會(huì)復(fù)雜很多,下面來(lái)看下SQL如何運(yùn)行。

這是使用調(diào)試模式,在平臺(tái)上點(diǎn)擊運(yùn)行按鈕進(jìn)行本地調(diào)試,可以看到金額這一列被過(guò)濾,訂單創(chuàng)建時(shí)間也都是大于要求的時(shí)間值。

從這個(gè)簡(jiǎn)單的清洗場(chǎng)景可以看到,實(shí)時(shí)和傳統(tǒng)的批處理相比,在寫(xiě)法上包括輸出結(jié)果差異并不大,流作業(yè)主要的差異是運(yùn)行起來(lái)之后是長(zhǎng)周期保持運(yùn)行的,而不像傳統(tǒng)批處理,處理完數(shù)據(jù)之后就結(jié)束了。

(三)示例1-2 類(lèi)目信息關(guān)聯(lián)

接下來(lái)看一下怎么做維表關(guān)聯(lián)。

根據(jù)剛才接入層的訂單數(shù)據(jù),因?yàn)樵紨?shù)據(jù)里面是葉子類(lèi)目信息,在業(yè)務(wù)上需要關(guān)聯(lián)類(lèi)目的維度表,維度表里面記錄了葉子類(lèi)目到一級(jí)類(lèi)目的關(guān)聯(lián)關(guān)系,ID和名稱(chēng),清洗過(guò)程需要完成的目標(biāo)是用原始表里面葉子類(lèi)目ID去關(guān)聯(lián)維表,補(bǔ)齊一級(jí)類(lèi)目的ID和Name。這里通過(guò)INNER JOIN維表的寫(xiě)法,關(guān)聯(lián)之后把維表對(duì)應(yīng)的字段選出來(lái)。

和批處理的寫(xiě)法差異僅僅在于維表的特殊語(yǔ)法FOR SYSTEM_TIME AS OF。

如上所示,平臺(tái)上可以上傳自己的數(shù)據(jù)用于調(diào)試,比如這里使用了1個(gè)CSV的測(cè)試數(shù)據(jù),把100個(gè)葉子類(lèi)目映射到10個(gè)一級(jí)類(lèi)目上。

對(duì)應(yīng)葉子類(lèi)目ID的個(gè)位數(shù)就是它一級(jí)類(lèi)目的ID,會(huì)關(guān)聯(lián)到對(duì)應(yīng)的一級(jí)類(lèi)目信息,返回它的名稱(chēng)。本地調(diào)試運(yùn)行優(yōu)點(diǎn)是速度比較快,可以即時(shí)看到結(jié)果。在本地調(diào)試模式中,終端收到1000條數(shù)據(jù)之后,會(huì)自動(dòng)暫停,防止結(jié)果過(guò)大而影響使用。

(四)示例2-1 分鐘級(jí)成交統(tǒng)計(jì)

接下來(lái)我們來(lái)看一下基于Window的統(tǒng)計(jì)。

第一個(gè)場(chǎng)景是分鐘級(jí)成交統(tǒng)計(jì),這是在匯總層比較常用的計(jì)算邏輯。

分鐘級(jí)統(tǒng)計(jì)很容易想到Tumble Window,每一分鐘都是各算各的,需要計(jì)算幾個(gè)指標(biāo),包括總訂單數(shù)、總金額、成交商品數(shù)、成交用戶(hù)數(shù)等。成交的商品數(shù)和用戶(hù)數(shù)要做去重,所以在寫(xiě)法上做了一個(gè)Distinct處理。
窗口是剛剛介紹過(guò)的Tumble Window,按照訂單創(chuàng)建時(shí)間去劃一分鐘的窗口,然后按一級(jí)類(lèi)目的維度統(tǒng)計(jì)每一分鐘的成交情況。

- 運(yùn)行模式

上圖和剛才的調(diào)試模式有點(diǎn)區(qū)別,上線之后就真正提交到集群里去運(yùn)行一個(gè)作業(yè),它的輸出采用了調(diào)試輸出,直接Print到Log里。展開(kāi)作業(yè)拓?fù)?,可以看到自?dòng)開(kāi)啟了Local-Global的兩階段優(yōu)化。

- 運(yùn)行日志 - 查看調(diào)試輸出結(jié)果

在運(yùn)行一段時(shí)間之后,通過(guò)Task里面的日志可以看到最終的輸出結(jié)果。

用的是Print Sink,會(huì)直接打到Log里面。在真實(shí)場(chǎng)景的輸出上,比如寫(xiě)到Hologres/MySQL,那就需要去對(duì)應(yīng)存儲(chǔ)的數(shù)據(jù)庫(kù)上查看。

可以看到,輸出的數(shù)據(jù)相對(duì)于數(shù)據(jù)的原始時(shí)間是存在一定滯后的。

在19:46:05的時(shí)候,輸出了19:45:00這一個(gè)窗口的數(shù)據(jù),延遲了5秒鐘左右輸出前1分鐘的聚合結(jié)果。

這5秒鐘實(shí)際上和定義源表時(shí)WATERMARK的設(shè)定是有關(guān)系的,在聲明WATERMARK時(shí)是相對(duì)gmt_create字段加了5秒的offset。這樣起到的效果是,當(dāng)?shù)竭_(dá)的最早數(shù)據(jù)是 19:46:00 時(shí),我們認(rèn)為水位線是到了19:45:55,這就是5秒的延遲效果,來(lái)實(shí)現(xiàn)對(duì)亂序數(shù)據(jù)的寬容處理。

(五)示例2-2 小時(shí)級(jí)實(shí)時(shí)成交統(tǒng)計(jì)

第二個(gè)例子是做小時(shí)級(jí)實(shí)時(shí)成交統(tǒng)計(jì)。

如上圖所示,當(dāng)要求實(shí)時(shí)統(tǒng)計(jì),直接把Tumble Window開(kāi)成1小時(shí)Size的Tumble Window,這樣能滿(mǎn)足實(shí)時(shí)性嗎?按照剛才展示的輸出結(jié)果,具有一定的延遲效果。因此開(kāi)一個(gè)小時(shí)的窗口,必須等到這一個(gè)小時(shí)的數(shù)據(jù)都收到之后,在下一個(gè)小時(shí)的開(kāi)始,才能輸出上一個(gè)小時(shí)的結(jié)果,延遲在小時(shí)級(jí)別的,滿(mǎn)足不了實(shí)時(shí)性的要求?;仡欀敖榻B的 Group Aggregation 是可以滿(mǎn)足實(shí)時(shí)要求的。

具體來(lái)看,比如需要完成小時(shí)+類(lèi)目以及只算小時(shí)的兩個(gè)口徑統(tǒng)計(jì),兩個(gè)統(tǒng)計(jì)一起做,在傳統(tǒng)批處理中常用的GROUPING SETS功能,在實(shí)時(shí)Flink上也是支持的。

我們可以直接GROUP BY GROUPING SETS,第一個(gè)是小時(shí)全口徑,第二個(gè)是類(lèi)目+小時(shí)的統(tǒng)計(jì)口徑,然后計(jì)算它的訂單數(shù),包括總金額,去重的商品數(shù)和用戶(hù)數(shù)。

這種寫(xiě)法對(duì)結(jié)果加了空值轉(zhuǎn)換處理便于查看數(shù)據(jù),就是對(duì)小時(shí)全口徑的統(tǒng)計(jì),輸出的一級(jí)類(lèi)目是空的,需要對(duì)它做一個(gè)空值轉(zhuǎn)換處理。

具體來(lái)看,比如需要完成小時(shí)+類(lèi)目以及只算小時(shí)的兩個(gè)口徑統(tǒng)計(jì),兩個(gè)統(tǒng)計(jì)一起做,在傳統(tǒng)批處理中常用的GROUPING SETS功能,在實(shí)時(shí)Flink上也是支持的。

我們可以直接GROUP BY GROUPING SETS,第一個(gè)是小時(shí)全口徑,第二個(gè)是類(lèi)目+小時(shí)的統(tǒng)計(jì)口徑,然后計(jì)算它的訂單數(shù),包括總金額,去重的商品數(shù)和用戶(hù)數(shù)。

這種寫(xiě)法對(duì)結(jié)果加了空值轉(zhuǎn)換處理便于查看數(shù)據(jù),就是對(duì)小時(shí)全口徑的統(tǒng)計(jì),輸出的一級(jí)類(lèi)目是空的,需要對(duì)它做一個(gè)空值轉(zhuǎn)換處理。

上方為調(diào)試模式的運(yùn)行過(guò)程,可以看到Datagen生成的數(shù)據(jù)實(shí)時(shí)更新到一級(jí)類(lèi)目和它對(duì)應(yīng)的小時(shí)上。

這里可以看到,兩個(gè)不同GROUP BY的結(jié)果在一起輸出,中間有一列ALL是通過(guò)空值轉(zhuǎn)換來(lái)的,這就是全口徑的統(tǒng)計(jì)值。本地調(diào)試相對(duì)來(lái)說(shuō)比較直觀和方便,有興趣的話(huà)也可以到阿里云官網(wǎng)申請(qǐng)或購(gòu)買(mǎi)進(jìn)行體驗(yàn)。

(六)示例2-3 天級(jí)累積成交準(zhǔn)實(shí)時(shí)統(tǒng)計(jì)

第三個(gè)示例是天級(jí)累計(jì)成交統(tǒng)計(jì),業(yè)務(wù)要求是準(zhǔn)實(shí)時(shí),比如說(shuō)能夠接受分鐘級(jí)的更新延遲。

按照剛才Group Aggregation小時(shí)的實(shí)時(shí)統(tǒng)計(jì),容易聯(lián)想到直接把Query改成天維度,就可以實(shí)現(xiàn)這個(gè)需求,而且實(shí)時(shí)性比較高,數(shù)據(jù)觸發(fā)之后可以達(dá)到秒級(jí)的更新。

回顧下之前提到的Window和Group Aggregation對(duì)于內(nèi)置狀態(tài)處理上的區(qū)別,Window Aggregation可以實(shí)現(xiàn)State的自動(dòng)清理,Group Aggregation需要用戶(hù)自己去調(diào)整 TTL。由于業(yè)務(wù)上是準(zhǔn)實(shí)時(shí)的要求,在這里可以有一個(gè)替代的方案,比如用新引入的Cumulate Window做累積的Window計(jì)算,天級(jí)的累積然后使用分鐘級(jí)的步長(zhǎng),可以實(shí)現(xiàn)每分鐘更新的準(zhǔn)實(shí)時(shí)要求。

回顧一下Cumulate Window,如上所示。天級(jí)累積的話(huà),Window的最大Size是到天,它的Window Step就是一分鐘,這樣就可以表達(dá)天級(jí)的累積統(tǒng)計(jì)。

具體的Query如上,這里使用新的TVF語(yǔ)法,通過(guò)一個(gè)TABLE關(guān)鍵字把Windows的定義包含在中間,然后 Cumulate Window引用輸入表,接著定義它的時(shí)間屬性,步長(zhǎng)和size 參數(shù)。GROUP BY就是普通寫(xiě)法,因?yàn)樗刑崆拜敵?,所以我們把窗口的開(kāi)始時(shí)間和結(jié)束時(shí)間一起打印出來(lái)。

這個(gè)例子也通過(guò)線上運(yùn)行的方式去看Log輸出。

- 運(yùn)行模式

可以看到,它和之前Tumble Window運(yùn)行的結(jié)構(gòu)類(lèi)似,也是預(yù)聚合加上全局聚合,它和Tumble Window的區(qū)別就是并不需要等到這一天數(shù)據(jù)都到齊了才輸出結(jié)果。

- 運(yùn)行日志 – 觀察調(diào)試結(jié)果

從上方示例可以看到,在20:47:00的時(shí)候,已經(jīng)有00:00:00到20:47:00的結(jié)果累積,還有對(duì)應(yīng)的4列統(tǒng)計(jì)值。下一個(gè)輸出就是接下來(lái)的累計(jì)窗口,可以看到20:47:00到20:48:00就是一個(gè)累計(jì)的步長(zhǎng),這樣既滿(mǎn)足了天級(jí)別的累計(jì)統(tǒng)計(jì)需求,也能夠滿(mǎn)足準(zhǔn)實(shí)時(shí)的要求。

(七)示例小結(jié):電商交易數(shù)據(jù)-實(shí)時(shí)數(shù)倉(cāng)場(chǎng)景

然后我們來(lái)整體總結(jié)一下以上的示例。

在接入層到明細(xì)層的清洗處理特點(diǎn)是相對(duì)簡(jiǎn)單,也比較明確,比如業(yè)務(wù)邏輯上需要做固定的過(guò)濾條件,包括維度的擴(kuò)展,這都是非常明確和直接的。

從明細(xì)層到匯總層,例子中的分鐘級(jí)統(tǒng)計(jì),我們是用了Tumble Window,而小時(shí)級(jí)因?yàn)閷?shí)時(shí)性的要求,換成了Group Aggregation,然后到天級(jí)累積分別展示Group Aggregation和新引入的Cumulate Window。

從匯總層的計(jì)算特點(diǎn)來(lái)說(shuō),我們需要去關(guān)注業(yè)務(wù)上的實(shí)時(shí)性要求和數(shù)據(jù)準(zhǔn)確性要求,然后根據(jù)實(shí)際情況選擇Window聚合或者Group 聚合。

這里為什么要提到數(shù)據(jù)準(zhǔn)確性?

在一開(kāi)始比較Window Aggregation和Group Aggregation的時(shí)候,提到Group Aggregation的實(shí)時(shí)性非常好,但是它的數(shù)據(jù)準(zhǔn)確性是依賴(lài)于State的TTL,當(dāng)統(tǒng)計(jì)的周期大于TTL,那么TTL的數(shù)據(jù)可能會(huì)失真。

相反,在Window Aggregation上,對(duì)亂序的容忍度有一個(gè)上限,比如最多接受等一分鐘,但在實(shí)際的業(yè)務(wù)數(shù)據(jù)中,可能99%的數(shù)據(jù)能滿(mǎn)足這樣的要求,還有1%的數(shù)據(jù)可能需要一個(gè)小時(shí)后才來(lái)。基于WATERMARK的處理,默認(rèn)它就是一個(gè)丟棄策略,超過(guò)了最大的offset的這些數(shù)據(jù)就會(huì)被丟棄,不納入統(tǒng)計(jì),此時(shí)數(shù)據(jù)也會(huì)失去它的準(zhǔn)確性,所以這是一個(gè)相對(duì)的指標(biāo),需要根據(jù)具體的業(yè)務(wù)場(chǎng)景做選擇。

開(kāi)發(fā)常見(jiàn)問(wèn)題和解法
(一)開(kāi)發(fā)中的常見(jiàn)問(wèn)題

上方是實(shí)時(shí)計(jì)算真實(shí)業(yè)務(wù)接觸過(guò)程中比較高頻的問(wèn)題。

首先是實(shí)時(shí)計(jì)算不知道該如何下手,怎么開(kāi)始做實(shí)時(shí)計(jì)算,比如有些同學(xué)有批處理的背景,然后剛開(kāi)始接觸Flink SQL,不知道從哪開(kāi)始。

另外一類(lèi)問(wèn)題是SQL寫(xiě)完了,也清楚輸入處理的數(shù)據(jù)量大概是什么級(jí)別,但是不知道實(shí)時(shí)作業(yè)運(yùn)行起來(lái)之后需要設(shè)定多大的資源

還有一類(lèi)是SQL寫(xiě)得比較復(fù)雜,這個(gè)時(shí)候要去做調(diào)試,比如要查為什么計(jì)算出的數(shù)據(jù)不符合預(yù)期等類(lèi)似問(wèn)題,許多同學(xué)反映無(wú)從下手。

作業(yè)跑起來(lái)之后如何調(diào)優(yōu),這也是一個(gè)非常高頻的問(wèn)題。

(二)開(kāi)發(fā)常見(jiàn)問(wèn)題解法

1.實(shí)時(shí)計(jì)算如何下手?

對(duì)于上手的問(wèn)題,社區(qū)有很多官方的文檔,也提供了一些示例,大家可以從簡(jiǎn)單的例子上手,慢慢了解SQL里面不同的算子,在流式計(jì)算的時(shí)候會(huì)有一些什么樣的特性。

此外,還可以關(guān)注開(kāi)發(fā)者社區(qū)實(shí)時(shí)計(jì)算 Flink 版、 ververica.cn網(wǎng)站、 B 站的Apache Flink 公眾號(hào)等分享內(nèi)容。

逐漸熟悉了SQL之后,如果想應(yīng)用到生產(chǎn)環(huán)境中去解決真實(shí)的業(yè)務(wù)問(wèn)題,阿里云的行業(yè)解決方案里也提供了一些典型的架構(gòu)設(shè)計(jì),可以作為參考。

2.復(fù)雜作業(yè)如何調(diào)試?

如果遇到千行級(jí)別的復(fù)雜SQL,即使對(duì)于Flink的開(kāi)發(fā)同學(xué)來(lái)也不能一目了然地把問(wèn)題定位出來(lái),其實(shí)還是需要遵循由簡(jiǎn)到繁的過(guò)程,可能需要借助一些調(diào)試的工具,比如前面演示的平臺(tái)調(diào)試功能,然后做分段的驗(yàn)證,把小段SQL局部的結(jié)果正確性調(diào)試完之后,再一步一步組裝起來(lái),最終讓這個(gè)復(fù)雜作業(yè)能達(dá)到正確性的要求。

另外,可以利用SQL語(yǔ)法上的特性,把SQL組織得更加清晰一點(diǎn)。實(shí)時(shí)計(jì)算Flink產(chǎn)品上有一個(gè)代碼結(jié)構(gòu)功能,可以比較方便地定位長(zhǎng)SQL里具體的語(yǔ)句,這都是一些輔助工具。

3.作業(yè)初始資源設(shè)置,如何調(diào)優(yōu)?

我們有一個(gè)經(jīng)驗(yàn)是根據(jù)輸入的數(shù)據(jù),初始做小并發(fā)測(cè)試一下,看它的性能如何,然后再去估算。在大并發(fā)壓測(cè)的時(shí)候,按照需求的吞吐量,逐步逼近,然后拿到預(yù)期的性能配置,這個(gè)是比較直接但也比較可靠的方式。

調(diào)優(yōu)這一塊主要是借助于作業(yè)的運(yùn)行是情況,我們會(huì)去關(guān)注一些重點(diǎn)指標(biāo),比如說(shuō)有沒(méi)有產(chǎn)生數(shù)據(jù)的傾斜,維表的Lookup Join需要訪問(wèn)外部存儲(chǔ),有沒(méi)有產(chǎn)生IO的瓶頸,這都是影響作業(yè)性能的常見(jiàn)瓶頸點(diǎn),需要加以關(guān)注。

在實(shí)時(shí)計(jì)算Flink產(chǎn)品上集成了一個(gè)叫AutoPilot的功能,可以理解為類(lèi)似于自動(dòng)駕駛,在這種功能下,初始資源設(shè)多少就不是一個(gè)麻煩問(wèn)題了。

在產(chǎn)品上,設(shè)定作業(yè)最大的資源限制后,根據(jù)實(shí)際的數(shù)據(jù)處理量,該用多少資源可以由引擎自動(dòng)幫我們?nèi)フ{(diào)到最優(yōu)狀態(tài),根據(jù)負(fù)載情況來(lái)做伸縮。

原文鏈接:http://click.aliyun.com/m/1000283891/

 

責(zé)任編輯:梁菲 來(lái)源: 阿里云云棲號(hào)
相關(guān)推薦

2025-05-20 10:03:59

數(shù)據(jù)倉(cāng)庫(kù)Flink SQLPaimon

2023-08-29 10:20:00

2021-08-31 10:18:34

Flink 數(shù)倉(cāng)一體快手

2018-10-19 14:16:09

Flink數(shù)據(jù)倉(cāng)庫(kù)數(shù)據(jù)系統(tǒng)

2021-07-13 07:04:19

Flink數(shù)倉(cāng)數(shù)據(jù)

2021-03-10 08:22:47

FlinktopN計(jì)算

2023-10-13 07:25:50

2022-12-29 09:13:02

實(shí)時(shí)計(jì)算平臺(tái)

2022-09-28 07:08:25

技術(shù)實(shí)時(shí)數(shù)倉(cāng)

2021-07-22 18:29:58

AI

2023-07-27 07:44:07

云音樂(lè)數(shù)倉(cāng)平臺(tái)

2022-06-27 09:09:34

快手Flink數(shù)倉(cāng)建設(shè)

2022-08-01 15:58:48

數(shù)據(jù)倉(cāng)庫(kù)架構(gòu)數(shù)據(jù)

2019-11-21 09:49:29

架構(gòu)運(yùn)維技術(shù)

2021-06-06 13:10:12

FlinkPvUv

2023-05-06 07:19:48

數(shù)倉(cāng)架構(gòu)技術(shù)架構(gòu)

2022-01-05 18:18:01

Flink 數(shù)倉(cāng)連接器

2019-02-18 15:23:21

馬蜂窩MESLambda

2020-12-01 15:06:46

KafkaFlink數(shù)據(jù)倉(cāng)庫(kù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)