偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN

開發(fā) 開發(fā)工具
本篇將介紹在UnBounded數(shù)據(jù)流上按時(shí)間維度進(jìn)行數(shù)據(jù)劃分進(jìn)行JOIN操作 - Time Interval(Time-windowed)JOIN, 后面我們叫做Interval JOIN。

一、說什么

JOIN 算子是數(shù)據(jù)處理的核心算子,前面我們在《Apache Flink 漫談系列(09) - JOIN 算子》介紹了UnBounded的雙流JOIN,在《Apache Flink 漫談系列(10) - JOIN LATERAL》介紹了單流與UDTF的JOIN操作,在《Apache Flink 漫談系列(11) - Temporal Table JOIN》又介紹了單流與版本表的JOIN,本篇將介紹在UnBounded數(shù)據(jù)流上按時(shí)間維度進(jìn)行數(shù)據(jù)劃分進(jìn)行JOIN操作 - Time Interval(Time-windowed)JOIN, 后面我們叫做Interval JOIN。

二、實(shí)際問題

前面章節(jié)我們介紹了Flink中對各種JOIN的支持,那么想想下面的查詢需求之前介紹的JOIN能否滿足?需求描述如下:

比如有一個訂單表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假設(shè)我們要統(tǒng)計(jì)下單一小時(shí)內(nèi)付款的訂單信息。

1. 傳統(tǒng)數(shù)據(jù)庫解決方式

在傳統(tǒng)劉數(shù)據(jù)庫中完成上面的需求非常簡單,查詢sql如下::

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime < orderTime + 3600 // 秒 

上面查詢可以***的完成查詢需求,那么在Apache Flink里面應(yīng)該如何完成上面的需求呢?

2. Apache Flink解決方式

(1) UnBounded 雙流 JOIN

上面查詢需求我們很容易想到利用《Apache Flink 漫談系列(09) - JOIN 算子》介紹了UnBounded的雙流JOIN,SQL語句如下:

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime as timestamp < TIMESTAMPADD(SECOND, 3600, orderTime) 

UnBounded雙流JOIN可以解決上面問題,這個示例和本篇要介紹的Interval JOIN有什么關(guān)系呢?

(2) 性能問題

雖然我們利用UnBounded的JOIN能解決上面的問題,但是仔細(xì)分析用戶需求,會發(fā)現(xiàn)這個需求場景訂單信息和付款信息并不需要長期存儲,比如2018-12-27 14:22:22的訂單只需要保持1小時(shí),因?yàn)槌^1個小時(shí)的訂單如果沒有被付款就是無效訂單了。同樣付款信息也不需要長期保持,2018-12-27 14:22:22的訂單付款信息如果是2018-12-27 15:22:22以后到達(dá)的那么我們也沒有必要保存到State中。 而對于UnBounded的雙流JOIN我們會一直將數(shù)據(jù)保存到State中,如下示意圖:

這樣的底層實(shí)現(xiàn),對于當(dāng)前需求有不必要的性能損失。所以我們有必要開發(fā)一種新的可以清除State的JOIN方式(Interval JOIN)來高性能的完成上面的查詢需求。

(3) 功能擴(kuò)展

目前的UnBounded的雙流JOIN是后面是沒有辦法再進(jìn)行Event-Time的Window Aggregate的。也就是下面的語句在Apache Flink上面是無法支持的:

  1. SELECT COUNT(*) FROM ( 
  2. SELECT 
  3. ..., 
  4. payTime 
  5. FROM Orders AS o JOIN Payment AS p ON 
  6. o.orderId = p.orderId 
  7. ) GROUP BY TUMBLE(payTime, INTERVAL '15' MINUTE) 

因?yàn)樵赨nBounded的雙流JOIN中無法保證payTime的值一定大于WaterMark(WaterMark相關(guān)可以查閱<>). Apache Flink的Interval JOIN之后可以進(jìn)行Event-Time的Window Aggregate。

3. Interval JOIN

為了完成上面需求,并且解決性能和功能擴(kuò)展的問題,Apache Flink在1.4開始開發(fā)了Time-windowed Join,也就是本文所說的Interval JOIN。接下來我們詳細(xì)介紹Interval JOIN的語法,語義和實(shí)現(xiàn)原理。

三、什么是Interval JOIN

Interval JOIN 相對于UnBounded的雙流JOIN來說是Bounded JOIN。就是每條流的每一條數(shù)據(jù)會與另一條流上的不同時(shí)間區(qū)域的數(shù)據(jù)進(jìn)行JOIN。對應(yīng)Apache Flink官方文檔的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。

1. Interval JOIN 語法

  1. SELECT ... FROM t1 JOIN t2 ON t1.key = t2.key AND TIMEBOUND_EXPRESSION 

TIMEBOUND_EXPRESSION 有兩種寫法,如下:

  • L.time between LowerBound(R.time) and UpperBound(R.time)
  • R.time between LowerBound(L.time) and UpperBound(L.time)
  • 帶有時(shí)間屬性(L.time/R.time)的比較表達(dá)式。

2. Interval JOIN 語義

Interval JOIN 的語義就是每條數(shù)據(jù)對應(yīng)一個 Interval 的數(shù)據(jù)區(qū)間,比如有一個訂單表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假設(shè)我們要統(tǒng)計(jì)在下單一小時(shí)內(nèi)付款的訂單信息。SQL查詢?nèi)缦?

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. cast(payTime as timestamp) as payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND 
  10. p.payTime BETWEEN orderTime AND 
  11. orderTime + INTERVAL '1' HOUR 
  • Orders訂單數(shù)據(jù)

  • Payment付款數(shù)據(jù)

符合語義的預(yù)期結(jié)果是 訂單id為003的信息不出現(xiàn)在結(jié)果表中,因?yàn)橄聠螘r(shí)間2018-12-26 04:53:24.0, 付款時(shí)間是 2018-12-26 05:53:30.0超過了1小時(shí)付款。

那么預(yù)期的結(jié)果信息如下:

這樣Id為003的訂單是無效訂單,可以更新庫存繼續(xù)售賣。

接下來我們以圖示的方式直觀說明Interval JOIN的語義,我們對上面的示例需求稍微變化一下: 訂單可以預(yù)付款(不管是否合理,我們只是為了說明語義)也就是訂單 前后 1小時(shí)的付款都是有效的。SQL語句如下:

  1. SELECT 
  2. ... 
  3. FROM 
  4. Orders AS o JOIN Payment AS p ON 
  5. o.orderId = p.orderId AND 
  6. p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND 
  7. orderTime + INTERVAL '1' HOUR 

這樣的查詢語義示意圖如下:

上圖有幾個關(guān)鍵點(diǎn),如下:

  • 數(shù)據(jù)JOIN的區(qū)間 - 比如Order時(shí)間為3的訂單會在付款時(shí)間為[2, 4]區(qū)間進(jìn)行JOIN。
  • WaterMark - 比如圖示Order***一條數(shù)據(jù)時(shí)間是3,Payment***一條數(shù)據(jù)時(shí)間是5,那么WaterMark是根據(jù)實(shí)際最小值減去UpperBound生成,即:Min(3,5)-1 = 2
  • 過期數(shù)據(jù) - 出于性能和存儲的考慮,要將過期數(shù)據(jù)清除,如圖當(dāng)WaterMark是2的時(shí)候時(shí)間為2以前的數(shù)據(jù)過期了,可以被清除。

3. Interval JOIN 實(shí)現(xiàn)原理

由于Interval JOIN和雙流JOIN類似都要存儲左右兩邊的數(shù)據(jù),所以底層實(shí)現(xiàn)中仍然是利用State進(jìn)行數(shù)據(jù)的存儲。流計(jì)算的特點(diǎn)是數(shù)據(jù)不停的流入,我們可以不停的進(jìn)行增量計(jì)算,也就是我們每條數(shù)據(jù)流入都可以進(jìn)行JOIN計(jì)算。我們還是以具體示例和圖示來說明內(nèi)部計(jì)算邏輯,如下圖:

簡單解釋一下每條記錄的處理邏輯如下:

實(shí)際的內(nèi)部邏輯會比描述的復(fù)雜的多,大家可以根據(jù)如上簡述理解內(nèi)部原理即可。

四、示例代碼

我們還是以訂單和付款示例,將完整代碼分享給大家,具體如下(代碼基于flink-1.7.0):

  1. import java.sql.Timestamp 
  2.  
  3. import org.apache.flink.api.scala._ 
  4. import org.apache.flink.streaming.api.TimeCharacteristic 
  5. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8. import org.apache.flink.table.api.TableEnvironment 
  9. import org.apache.flink.table.api.scala._ 
  10. import org.apache.flink.types.Row 
  11.  
  12. import scala.collection.mutable 
  13.  
  14. object SimpleTimeIntervalJoin { 
  15. def main(args: Array[String]): Unit = { 
  16. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  17. val tEnv = TableEnvironment.getTableEnvironment(env) 
  18. env.setParallelism(1) 
  19. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  20. // 構(gòu)造訂單數(shù)據(jù) 
  21. val ordersData = new mutable.MutableList[(String, String, Timestamp)] 
  22. ordersData.+=(("001", "iphone", new Timestamp(1545800002000L))) 
  23. ordersData.+=(("002", "mac", new Timestamp(1545800003000L))) 
  24. ordersData.+=(("003", "book", new Timestamp(1545800004000L))) 
  25. ordersData.+=(("004", "cup", new Timestamp(1545800018000L))) 
  26.  
  27. // 構(gòu)造付款表 
  28. val paymentData = new mutable.MutableList[(String, String, Timestamp)] 
  29. paymentData.+=(("001", "alipay", new Timestamp(1545803501000L))) 
  30. paymentData.+=(("002", "card", new Timestamp(1545803602000L))) 
  31. paymentData.+=(("003", "card", new Timestamp(1545803610000L))) 
  32. paymentData.+=(("004", "alipay", new Timestamp(1545803611000L))) 
  33. val orders = env 
  34. .fromCollection(ordersData) 
  35. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  36. .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime) 
  37. val ratesHistory = env 
  38. .fromCollection(paymentData) 
  39. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  40. .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime) 
  41.  
  42. tEnv.registerTable("Orders", orders) 
  43. tEnv.registerTable("Payment", ratesHistory) 
  44.  
  45. var sqlQuery = 
  46. ""
  47. |SELECT 
  48. | o.orderId, 
  49. | o.productName, 
  50. | p.payType, 
  51. | o.orderTime, 
  52. | cast(payTime as timestamp) as payTime 
  53. |FROM 
  54. | Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND 
  55. | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR 
  56. |""".stripMargin 
  57. tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) 
  58.  
  59. val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] 
  60. result.print() 
  61. env.execute() 
  62.  
  63.  
  64. class TimestampExtractor[T1, T2] 
  65. extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { 
  66. override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { 
  67. element._3.getTime 

運(yùn)行結(jié)果如下:

五、小節(jié)

本篇由實(shí)際業(yè)務(wù)需求場景切入,介紹了相同業(yè)務(wù)需求既可以利用Unbounded 雙流JOIN實(shí)現(xiàn),也可以利用Time Interval JOIN來實(shí)現(xiàn),Time Interval JOIN 性能優(yōu)于UnBounded的雙流JOIN,并且Interval JOIN之后可以進(jìn)行Window Aggregate算子計(jì)算。然后介紹了Interval JOIN的語法,語義和實(shí)現(xiàn)原理,***將訂單和付款的完整示例代碼分享給大家。期望本篇能夠讓大家對Apache Flink Time Interval JOIN有一個具體的了解!

關(guān)于點(diǎn)贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點(diǎn)贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺Blink的設(shè)計(jì)研發(fā)工作。

【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請聯(lián)系原作者】

戳這里,看該作者更多好文

責(zé)任編輯:趙寧寧 來源: 51CTO專欄
相關(guān)推薦

2022-07-13 12:53:59

數(shù)據(jù)存儲

2018-11-29 09:01:26

Apache FlinJOIN代碼

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-06-10 17:26:07

數(shù)據(jù)集計(jì)算

2018-10-09 10:55:52

Apache FlinWatermark流計(jì)算

2018-09-26 07:50:52

Apache Flin流計(jì)算計(jì)算模式

2018-09-26 08:44:22

Apache Flin流計(jì)算計(jì)算模式

2018-10-16 08:54:35

Apache Flin流計(jì)算State

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計(jì)算Fault Toler

2019-01-03 10:17:53

Apache FlinTable API代碼

2022-07-13 13:03:29

流計(jì)算亂序

2022-07-12 10:38:25

分布式框架

2018-11-07 08:48:31

Apache Flin持續(xù)查詢流計(jì)算

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對偶duality

2022-07-12 11:01:03

數(shù)據(jù)庫

2022-08-31 14:49:05

IoTDBIoTDatabase

2025-04-25 10:28:40

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號