高級測試:如何使用Flink對Strom任務的邏輯功能進行復現(xiàn)測試?
Flink和Strom都是時下較為流行的數(shù)據(jù)流平臺,考慮以下一種應用場景:已經(jīng)使用Strom完成了對于某一邏輯功能的開發(fā),如果現(xiàn)在期望使用Flink實現(xiàn)相同的邏輯,那么就需要考慮如何使用Flink來對Strom任務的邏輯功能進行最簡單的復現(xiàn)測試。

使用Flink來測試Strom任務的邏輯主要存在兩個最基本的問題:第一,Storm通過自定義的Bolt類實現(xiàn)自定義的邏輯,在Flink中如何實現(xiàn)?第二,Storm按照自定義標準實現(xiàn)數(shù)據(jù)分發(fā)的邏輯,在Flink中如何實現(xiàn)?
本文主要通過兩個最基本的Flink程序?qū)嵗龑ι鲜鰞蓚€使用Flink測試Strom任務邏輯存在的基本問題進行解答。
第一個問題,我們可以通過Flink的ProcessFuction類進行實現(xiàn),通過繼承該類,在該類的processElement方法中實現(xiàn)自定義邏輯。ProcessFuction類如下圖所示,我們可以通過var1這個參數(shù)直接獲取當前流中的數(shù)據(jù),然后進行自定義的邏輯加工,再通過Collector類var3的collect方法將處理后的數(shù)據(jù)發(fā)送到下一個流中。

假設某一Strom任務的功能邏輯是:① 對初始數(shù)據(jù)源(一個字符串)末尾添加一個字符串。② 然后再次添加另一個字符串。
我們以上述對字符串加工的Strom任務為例,說明Flink程序如何通過ProcessFuction類對該任務實現(xiàn)復現(xiàn)測試。
(1)Flink主程序,假設初始數(shù)據(jù)源為“abc”。

(2)第一個業(yè)務加工類,給數(shù)據(jù)流末尾添加“def”。

(3)第二個業(yè)務加工類,給數(shù)據(jù)流末尾添加“ghi”。

(4)執(zhí)行Flink程序,觀察輸出結(jié)果,“abc”被二次加工為“abcdefghi”。

第二個分發(fā)數(shù)據(jù)的問題,我們假設某一Strom任務的功能邏輯是對數(shù)據(jù)源(股票對象)進行分類,將股價高于X的分為一類,將股價小于等于X的分為另一類。
我們以上述對股票數(shù)據(jù)對象分類處理的Strom任務為例,說明Flink程序如何通過旁路輸出特性實現(xiàn)對數(shù)據(jù)流按照自定義標準分類,輸出到不同的子數(shù)據(jù)流中處理。

Flink 的旁路輸出依然涉及ProcessFunction類的processElement方法,該方法的Context類型的var2參數(shù)的主要作用是利用其output方法進行旁路輸出(我們用于進行數(shù)據(jù)分流)。
Flink的旁路輸出特性可以用來對數(shù)據(jù)進行分流,通過創(chuàng)建一個流的標簽(OutputTag),再利用這個OutputTag標簽對象作為參數(shù),調(diào)用初始/父級數(shù)據(jù)流的getSideOutput(OutputTag)方法獲取子數(shù)據(jù)流。
每個流標簽都有一個id,也可以不創(chuàng)建對象,只要流標簽的id相同,其中的數(shù)據(jù)就相同。因此,可以通過匿名內(nèi)部類的形式來獲取子數(shù)據(jù)流。第一個參數(shù)是id,第二個參數(shù)是數(shù)據(jù)類型(不能省略)。
(1)創(chuàng)建股票類Stock,屬性包括名稱和價格。

(2)創(chuàng)建消費消息的Flink程序。

(3)創(chuàng)建生產(chǎn)消息的Flink程序。

我們用“STOCK_LOW_PRICE”和“STOCK_HIGH_PRICE”這兩個ID作為兩個旁路輸出標簽的ID。
在processElement方法中,我們通過判斷股票的價格是否大于50區(qū)分出低價股和高價股,利用Context對象的output方法進行旁路輸出,把price小于50的Stock對象輸出到ID為“STOCK_LOW_PRICE”的低價股標簽旁路中,而把price大于等于50的Stock對象輸出到ID為“STOCK_HIGH_PRICE”的高價股標簽旁路中。

(4)依次啟動消費者程序、生產(chǎn)者程序,觀察消費者程序控制臺中的輸出:

此時,桌面生成了兩個文件夾,當中記錄了股票數(shù)據(jù),result1記錄了小于50的低價股,result2中記錄了股價大于等于50的高價股。

? ?























