一篇文章帶你深入理解FlinkSQL中的窗口
前言
時間語義,要配合窗口操作才能發(fā)揮作用。最主要的用途,當然就是開窗口、根據(jù)時間段做計算了。下面我們就來看看 Table API 和 SQL 中,怎么利用時間字段做窗口操作。在 Table API 和 SQL 中,主要有兩種窗口:Group Windows 和 Over Windows
一、分組窗口(Group Windows) 分組窗口(Group Windows)會根據(jù)時間或行計數(shù)間隔,將行聚合到有限的組(Group)中,并對每個組的數(shù)據(jù)執(zhí)行一次聚合函數(shù)。 Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定義的,并且必須由 as 子句指定一個別名。為了按窗口對表進行分組,窗口的別名必須在 group by 子句中,像常規(guī)的分組字段一樣引用。例子:
- val table = input
 - .window([w: GroupWindow] as 'w)
 - .groupBy('w, 'a)
 - .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)
 
Table API 提供了一組具有特定語義的預(yù)定義 Window 類,這些類會被轉(zhuǎn)換為底層DataStream 或 DataSet 的窗口操作。
Table API 支持的窗口定義,和我們熟悉的一樣,主要也是三種:滾動(Tumbling)、滑動(Sliding和 會話(Session)。
1.1 滾動窗口
滾動窗口(Tumbling windows)要用 Tumble 類來定義,另外還有三個方法:
- over:定義窗口長度
 - on:用來分組(按時間間隔)或者排序(按行數(shù))的時間字段
 - as:別名,必須出現(xiàn)在后面的 groupBy 中
 
實現(xiàn)案例
1.需求
設(shè)置滾動窗口為10秒鐘統(tǒng)計id出現(xiàn)的次數(shù)。
2.數(shù)據(jù)準備
- sensor_1,1547718199,35.8
 - sensor_6,1547718201,15.4
 - sensor_7,1547718202,6.7
 - sensor_10,1547718205,38.1
 - sensor_1,1547718206,32
 - sensor_1,1547718208,36.2
 - sensor_1,1547718210,29.7
 - sensor_1,1547718213,30.9
 
3.代碼實現(xiàn)
- package windows
 - import org.apache.flink.streaming.api.TimeCharacteristic
 - import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
 - import org.apache.flink.streaming.api.scala._
 - import org.apache.flink.streaming.api.windowing.time.Time
 - import org.apache.flink.table.api.scala._
 - import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble}
 - import org.apache.flink.types.Row
 - /**
 - * @Package Windows
 - * @File :FlinkSQLTumBlingTie.java
 - * @author 大數(shù)據(jù)老哥
 - * @date 2020/12/25 21:58
 - * @version V1.0
 - * 設(shè)置滾動窗口
 - */
 - object FlinkSQLTumBlingTie {
 - def main(args: Array[String]): Unit = {
 - val env = StreamExecutionEnvironment.getExecutionEnvironment
 - env.setParallelism(1)
 - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 - val settings = EnvironmentSettings.newInstance()
 - .useBlinkPlanner()
 - .inStreamingMode()
 - .build()
 - val tableEnv = StreamTableEnvironment.create(env, settings)
 - // 讀取數(shù)據(jù)
 - val inputPath = "./data/sensor.txt"
 - val inputStream = env.readTextFile(inputPath)
 - // 先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)
 - val dataStream = inputStream
 - .map(data => {
 - val arr = data.split(",")
 - SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
 - })
 - .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
 - override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
 - })
 - val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
 - // 注冊表
 - tableEnv.createTemporaryView("sensor", sensorTable)
 - // table 實現(xiàn)
 - val resultTable = sensorTable
 - .window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒統(tǒng)計一次,滾動時間窗口
 - .groupBy('id, 'tw)
 - .select('id, 'id.count, 'tw.end)
 - //sql 實現(xiàn)
 - val sqlTable = tableEnv.sqlQuery(
 - """
 - |select
 - |id,
 - |count(id) ,
 - |tumble_end(ts,interval '10' second)
 - |from sensor
 - |group by
 - |id,
 - |tumble(ts,interval '10' second)
 - |""".stripMargin)
 - /***
 - * .window(Tumble over 10.minutes on 'rowtime as 'w) (事件時間字段 rowtime)
 - * .window(Tumble over 10.minutes on 'proctime as 'w)(處理時間字段 proctime)
 - * .window(Tumble over 10.minutes on 'proctime as 'w) (類似于計數(shù)窗口,按處理時間排序,10 行一組)
 - */
 - resultTable.toAppendStream[Row].print("talbe")
 - sqlTable.toRetractStream[Row].print("sqlTable")
 - env.execute("FlinkSQLTumBlingTie")
 - }
 - case class SensorReading(id: String, timestamp: Long, temperature: Double)
 - }
 
運行結(jié)果
1.2 滑動窗口
滑動窗口(Sliding windows)要用 Slide 類來定義,另外還有四個方法:
- over:定義窗口長度
 - every:定義滑動步長
 - on:用來分組(按時間間隔)或者排序(按行數(shù))的時間字段
 - as:別名,必須出現(xiàn)在后面的 groupBy 中
 
實現(xiàn)案例
1.需求描述
設(shè)置窗口大小為10秒鐘設(shè)置滑動距離為5秒鐘,統(tǒng)計id的出現(xiàn)的次數(shù)。
2.數(shù)據(jù)準備
- sensor_1,1547718199,35.8
 - sensor_6,1547718201,15.4
 - sensor_7,1547718202,6.7
 - sensor_10,1547718205,38.1
 - sensor_1,1547718206,32
 - sensor_1,1547718208,36.2
 - sensor_1,1547718210,29.7
 - sensor_1,1547718213,30.9
 
3.實現(xiàn)代碼
- package windows
 - import org.apache.flink.streaming.api.TimeCharacteristic
 - import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
 - import org.apache.flink.streaming.api.scala._
 - import org.apache.flink.streaming.api.windowing.time.Time
 - import org.apache.flink.table.api.{EnvironmentSettings, Slide, Table}
 - import org.apache.flink.table.api.scala._
 - import org.apache.flink.types.Row
 - import windows.FlinkSQLTumBlingTie.SensorReading
 - /**
 - * @Package windows
 - * @File :FlinkSQLSlideTime.java
 - * @author 大數(shù)據(jù)老哥
 - * @date 2020/12/27 22:19
 - * @version V1.0
 - * 滑動窗口
 - */
 - object FlinkSQLSlideTime {
 - def main(args: Array[String]): Unit = {
 - //構(gòu)建運行環(huán)境
 - val env = StreamExecutionEnvironment.getExecutionEnvironment
 - env.setParallelism(1) // 設(shè)置分區(qū)為1 方便后面測試
 - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時間
 - val settings = EnvironmentSettings.newInstance()
 - .useBlinkPlanner()
 - .inStreamingMode()
 - .build()
 - // 創(chuàng)建表env
 - val tableEnv = StreamTableEnvironment.create(env, settings)
 - // 讀取數(shù)據(jù)
 - val inputPath = "./data/sensor.txt"
 - val inputStream = env.readTextFile(inputPath)
 - // 先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)
 - val dataStream = inputStream
 - .map(data => {
 - val arr = data.split(",")
 - SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
 - })
 - .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
 - override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
 - })
 - val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
 - // 注冊表
 - tableEnv.createTemporaryView("sensor", sensorTable)
 - // table API 實現(xiàn)
 - val tableApi = sensorTable.window(Slide over 10.seconds every 5.seconds on 'ts as 'w)
 - .groupBy('w, 'id)
 - .select('id, 'id.count, 'w.end)
 - val tableSql = tableEnv.sqlQuery(
 - """
 - |select
 - |id,
 - |count(id),
 - |HOP_END(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND )as w
 - |from sensor
 - |group by
 - |HOP(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND),id
 - |""".stripMargin)
 - tableApi.toAppendStream[Row].print("tableApi")
 - tableSql.toAppendStream[Row].print("tableSql")
 - /**
 - .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) (事件時間字段 rowtime)
 - .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) (處理時間字段 proctime)
 - .window(Slide over 10.rows every 5.rows on 'proctime as 'w) (類似于計數(shù)窗口,按處理時間排序,10 行一組)
 - **/
 - env.execute("FlinkSQLSlideTime")
 - }
 - }
 
4.運行結(jié)果
1.3 會話窗口
會話窗口(Session windows)要用 Session 類來定義,另外還有三個方法:
- withGap:會話時間間隔
 - on:用來分組(按時間間隔)或者排序(按行數(shù))的時間字段
 - as:別名,必須出現(xiàn)在后面的 groupBy 中實現(xiàn)案例
 
1.需求描述
設(shè)置一個session 為10秒鐘 統(tǒng)計id的個數(shù)
2.準備數(shù)據(jù)
- sensor_1,1547718199,35.8
 - sensor_6,1547718201,15.4
 - sensor_7,1547718202,6.7
 - sensor_10,1547718205,38.1
 - sensor_1,1547718206,32
 - sensor_1,1547718208,36.2
 - sensor_1,1547718210,29.7
 - sensor_1,1547718213,30.9
 
3.編寫代碼
- package windows
 - import org.apache.flink.streaming.api.TimeCharacteristic
 - import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
 - import org.apache.flink.streaming.api.scala._
 - import org.apache.flink.streaming.api.windowing.time.Time
 - import org.apache.flink.table.api.{EnvironmentSettings, Session, Table}
 - import org.apache.flink.table.api.scala._
 - import org.apache.flink.types.Row
 - import windows.FlinkSQLTumBlingTie.SensorReading
 - /**
 - * @Package windows
 - * @File :FlinkSqlSessionTime.java
 - * @author 大數(shù)據(jù)老哥
 - * @date 2020/12/27 22:52
 - * @version V1.0
 - */
 - object FlinkSqlSessionTime {
 - def main(args: Array[String]): Unit = {
 - //構(gòu)建運行環(huán)境
 - val env = StreamExecutionEnvironment.getExecutionEnvironment
 - env.setParallelism(1) // 設(shè)置分區(qū)為1 方便后面測試
 - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時間
 - val settings = EnvironmentSettings.newInstance()
 - .useBlinkPlanner()
 - .inStreamingMode()
 - .build()
 - // 創(chuàng)建表env
 - val tableEnv = StreamTableEnvironment.create(env, settings)
 - // 讀取數(shù)據(jù)
 - val inputPath = "./data/sensor.txt"
 - val inputStream = env.readTextFile(inputPath)
 - // 先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)
 - val dataStream = inputStream
 - .map(data => {
 - val arr = data.split(",")
 - SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
 - })
 - .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
 - override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
 - })
 - val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
 - // 注冊表
 - tableEnv.createTemporaryView("sensor", sensorTable)
 - val tableApi = sensorTable.
 - window(Session withGap 10.seconds on 'ts as 'w)
 - .groupBy('id, 'w)
 - .select('id, 'id.count, 'w.end)
 - val tableSQL = tableEnv.sqlQuery(
 - """
 - |SELECT
 - |id,
 - |COUNT(id),
 - |SESSION_END(ts, INTERVAL '10' SECOND) AS w
 - |FROM sensor
 - |GROUP BY
 - |id,
 - |SESSION(ts, INTERVAL '10' SECOND)
 - |""".stripMargin)
 - tableApi.toAppendStream[Row].print("tableApi")
 - tableSQL.toAppendStream[Row].print("tableSQL")
 - /**
 - * .window(Session withGap 10.minutes on 'rowtime as 'w) 事件時間字段 rowtime)
 - * .window(Session withGap 10.minutes on 'proctime as 'w) 處理時間字段 proctime)
 - */
 - env.execute("FlinkSqlSessionTime")
 - }
 - }
 
4.運行結(jié)果
二、 Over Windows
Over window 聚合是標準 SQL 中已有的(Over 子句),可以在查詢的 SELECT 子句中定義。Over window 聚合,會針對每個輸入行,計算相鄰行范圍內(nèi)的聚合。Over windows使用.window(w:overwindows*)子句定義,并在 select()方法中通過別名來引用。例子:
- val table = input
 - .window([w: OverWindow] as 'w)
 - .select('a, 'b.sum over 'w, 'c.min over 'w)
 
Table API 提供了 Over 類,來配置 Over 窗口的屬性??梢栽谑录r間或處理時間,以及指定為時間間隔、或行計數(shù)的范圍內(nèi),定義 Over windows。
無界的 over window 是使用常量指定的。也就是說,時間間隔要指定 UNBOUNDED_RANGE,或者行計數(shù)間隔要指定 UNBOUNDED_ROW。而有界的 over window 是用間隔的大小指定的。
2.1 無界的 over window
- // 無界的事件時間 over window (時間字段 "rowtime")
 - .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
 - //無界的處理時間 over window (時間字段"proctime")
 - .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
 - // 無界的事件時間 Row-count over window (時間字段 "rowtime")
 - .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
 - //無界的處理時間 Row-count over window (時間字段 "rowtime")
 - .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
 
2.2 有界的 over window
- // 有界的事件時間 over window (時間字段 "rowtime",之前 1 分鐘)
 - .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
 - // 有界的處理時間 over window (時間字段 "rowtime",之前 1 分鐘)
 - .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
 - // 有界的事件時間 Row-count over window (時間字段 "rowtime",之前 10 行)
 - .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
 - // 有界的處理時間 Row-count over window (時間字段 "rowtime",之前 10 行)
 - .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)
 
2.3 代碼練習
我們可以綜合學習過的內(nèi)容,用一段完整的代碼實現(xiàn)一個具體的需求。例如,統(tǒng)計每個sensor每條數(shù)據(jù),與之前兩行數(shù)據(jù)的平均溫度。
數(shù)據(jù)準備
- sensor_1,1547718199,35.8
 - sensor_6,1547718201,15.4
 - sensor_7,1547718202,6.7
 - sensor_10,1547718205,38.1
 - sensor_1,1547718206,32
 - sensor_1,1547718208,36.2
 - sensor_1,1547718210,29.7
 - sensor_1,1547718213,30.9
 
代碼分析:
- package windows
 - import org.apache.flink.streaming.api.TimeCharacteristic
 - import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
 - import org.apache.flink.streaming.api.scala._
 - import org.apache.flink.streaming.api.windowing.time.Time
 - import org.apache.flink.table.api.{EnvironmentSettings, Over, Tumble}
 - import org.apache.flink.table.api.scala._
 - import org.apache.flink.types.Row
 - /**
 - * @Package windows
 - * @File :FlinkSqlTumBlingOverTime.java
 - * @author 大數(shù)據(jù)老哥
 - * @date 2020/12/28 21:45
 - * @version V1.0
 - */
 - object FlinkSqlTumBlingOverTime {
 - def main(args: Array[String]): Unit = {
 - // 構(gòu)建運行環(huán)境
 - val env = StreamExecutionEnvironment.getExecutionEnvironment
 - env.setParallelism(1) // 設(shè)置并行度為1方便后面進行測試
 - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 設(shè)置事件時間
 - val settings = EnvironmentSettings.newInstance()
 - .useBlinkPlanner()
 - .inStreamingMode()
 - .build()
 - //構(gòu)建table Env
 - val tableEnv = StreamTableEnvironment.create(env, settings)
 - // 讀取數(shù)據(jù)
 - val inputPath = "./data/sensor.txt"
 - val inputStream = env.readTextFile(inputPath)
 - // 先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)
 - // 解析數(shù)據(jù) 封裝成樣例類
 - val dataStream = inputStream
 - .map(data => {
 - val arr = data.split(",")
 - SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
 - })
 - .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
 - override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
 - })
 - // 將數(shù)據(jù)注冊成一張臨時表
 - val dataTable = tableEnv.fromDataStream(dataStream,'id, 'temperature, 'timestamp.rowtime as 'ts)
 - tableEnv.createTemporaryView("sensor",dataTable)
 - var tableRes= dataTable.window( Over partitionBy 'id orderBy 'ts preceding 2.rows as 'ow)
 - .select('id,'ts,'id.count over 'ow, 'temperature.avg over 'ow)
 - var tableSql= tableEnv.sqlQuery(
 - """
 - |select
 - |id,
 - |ts,
 - |count(id) over ow,
 - |avg(temperature) over ow
 - |from sensor
 - |window ow as(
 - | partition by id
 - | order by ts
 - | rows between 2 preceding and current row
 - |)
 - |""".stripMargin)
 - tableRes.toAppendStream[Row].print("tableRes")
 - tableSql.toAppendStream[Row].print("tableSql")
 - env.execute("FlinkSqlTumBlingOverTime")
 - }
 - case class SensorReading(id: String, timestamp: Long, temperature: Double)
 - }
 
運行結(jié)果
總結(jié)
好了到這里FlinkSql中窗口使用到這里就結(jié)束啦,喜歡的可以給了三連。其中FlinkSql中的窗口的用法還是比較多得,所有還是要多加練習。老話說的好,師傅領(lǐng)進門,修行在個人。
本文轉(zhuǎn)載自微信公眾號「 大數(shù)據(jù)老哥」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系 大數(shù)據(jù)老哥公眾號。




















 
 
 









 
 
 
 