十分鐘入門(mén)Fink SQL
前言
Flink 本身是批流統(tǒng)一的處理框架,所以 Table API 和 SQL,就是批流統(tǒng)一的上層處理 API。目前功能尚未完善,處于活躍的開(kāi)發(fā)階段。 Table API 是一套內(nèi)嵌在 Java 和 Scala 語(yǔ)言中的查詢(xún) API,它允許我們以非常直觀的方式,組合來(lái)自一些關(guān)系運(yùn)算符的查詢(xún)(比如 select、filter 和 join)。而對(duì)于 Flink SQL,就是直接可以在代碼中寫(xiě) SQL,來(lái)實(shí)現(xiàn)一些查詢(xún)(Query)操作。Flink 的 SQL 支持,基于實(shí)現(xiàn)了 SQL 標(biāo)準(zhǔn)的 Apache Calcite(Apache 開(kāi)源 SQL 解析工具)。圖片
1、導(dǎo)入所需要的的依賴(lài)包
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-csv</artifactId>
- <version>1.10.1</version>
- </dependency>
flink-table-planner:planner 計(jì)劃器,是 table API 最主要的部分,提供了運(yùn)行時(shí)環(huán)境和生成程序執(zhí)行計(jì)劃的 planner; flink-table-api-scala-bridge:bridge 橋接器,主要負(fù)責(zé) table API 和 DataStream/DataSet API的連接支持,按照語(yǔ)言分 java 和 scala。
這里的兩個(gè)依賴(lài),是 IDE 環(huán)境下運(yùn)行需要添加的;如果是生產(chǎn)環(huán)境,lib 目錄下默認(rèn)已經(jīng)有了 planner,就只需要有 bridge 就可以了。
當(dāng)然,如果想使用用戶自定義函數(shù),或是跟 kafka 做連接,需要有一個(gè) SQL client,這個(gè)包含在 flink-table-common 里。
2、兩種 planner(old& blink)的區(qū)別
- 批流統(tǒng)一:Blink 將批處理作業(yè),視為流式處理的特殊情況。所以,blink 不支持表和DataSet 之間的轉(zhuǎn)換,批處理作業(yè)將不轉(zhuǎn)換為 DataSet 應(yīng)用程序,而是跟流處理一樣,轉(zhuǎn)換為 DataStream 程序來(lái)處理。
- 因 為 批 流 統(tǒng) 一 , Blink planner 也 不 支 持 BatchTableSource , 而 使 用 有 界 的
- Blink planner 只支持全新的目錄,不支持已棄用的 ExternalCatalog。
- 舊 planner 和 Blink planner 的 FilterableTableSource 實(shí)現(xiàn)不兼容。舊的 planner 會(huì)把PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 則會(huì)把 Expressions 下推。
- 基于字符串的鍵值配置選項(xiàng)僅適用于 Blink planner。
- PlannerConfig 在兩個(gè) planner 中的實(shí)現(xiàn)不同。
- Blink planner 會(huì)將多個(gè) sink 優(yōu)化在一個(gè) DAG 中(僅在 TableEnvironment 上受支持,而在 StreamTableEnvironment 上不受支持)。而舊 planner 的優(yōu)化總是將每一個(gè) sink 放在一個(gè)新的 DAG 中,其中所有 DAG 彼此獨(dú)立。
- 舊的 planner 不支持目錄統(tǒng)計(jì),而 Blink planner 支持。
3、表(Table)的概念
TableEnvironment 可以注冊(cè)目錄 Catalog,并可以基于 Catalog 注冊(cè)表。它會(huì)維護(hù)一個(gè)Catalog-Table 表之間的 map。 表(Table)是由一個(gè)標(biāo)識(shí)符來(lái)指定的,由 3 部分組成:Catalog 名、數(shù)據(jù)庫(kù)(database)名和對(duì)象名(表名)。如果沒(méi)有指定目錄或數(shù)據(jù)庫(kù),就使用當(dāng)前的默認(rèn)值。
4、連接到文件系統(tǒng)(Csv 格式)
連接外部系統(tǒng)在 Catalog 中注冊(cè)表,直接調(diào)用 tableEnv.connect()就可以,里面參數(shù)要傳入一個(gè) ConnectorDescriptor,也就是 connector 描述器。對(duì)于文件系統(tǒng)的 connector 而言,flink內(nèi)部已經(jīng)提供了,就叫做 FileSystem()。
5、測(cè)試案例 (新)
需求: 將一個(gè)txt文本文件作為輸入流讀取數(shù)據(jù)過(guò)濾id不等于sensor_1的數(shù)據(jù)實(shí)現(xiàn)思路: 首先我們先構(gòu)建一個(gè)table的env環(huán)境通過(guò)connect提供的方法來(lái)讀取數(shù)據(jù)然后設(shè)置表結(jié)構(gòu)將數(shù)據(jù)注冊(cè)為一張表就可進(jìn)行我們的數(shù)據(jù)過(guò)濾了(使用sql或者流處理方式進(jìn)行解析)
準(zhǔn)備數(shù)據(jù)
- sensor_1,1547718199,35.8
- sensor_6,1547718201,15.4
- sensor_7,1547718202,6.7
- sensor_10,1547718205,38.1
- sensor_1,1547718206,32
- sensor_1,1547718208,36.2
- sensor_1,1547718210,29.7
- sensor_1,1547718213,30.9
代碼實(shí)現(xiàn)
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.table.api.{DataTypes}
- import org.apache.flink.table.api.scala._
- import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
- /**
- * @Package
- * @author 大數(shù)據(jù)老哥
- * @date 2020/12/12 21:22
- * @version V1.0
- * 第一個(gè)Flinksql測(cè)試案例
- */
- object FlinkSqlTable {
- def main(args: Array[String]): Unit = {
- // 構(gòu)建運(yùn)行流處理的運(yùn)行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // 構(gòu)建table環(huán)境
- val tableEnv = StreamTableEnvironment.create(env)
- //通過(guò) connect 讀取數(shù)據(jù)
- tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt"))
- .withFormat(new Csv()) //設(shè)置類(lèi)型
- .withSchema(new Schema() // 給數(shù)據(jù)添加元數(shù)信息
- .field("id", DataTypes.STRING())
- .field("time", DataTypes.BIGINT())
- .field("temperature", DataTypes.DOUBLE())
- ).createTemporaryTable("inputTable") // 創(chuàng)建一個(gè)臨時(shí)表
- val resTable = tableEnv.from("inputTable")
- .select("*").filter('id === "sensor_1")
- // 使用sql的方式查詢(xún)數(shù)據(jù)
- var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'")
- // 將數(shù)據(jù)轉(zhuǎn)為流進(jìn)行輸出
- resTable.toAppendStream[(String, Long, Double)].print("resTable")
- resSql.toAppendStream[(String, Long, Double)].print("resSql")
- env.execute("FlinkSqlWrodCount")
- }
- }
6、TableEnvironment 的作用
- 注冊(cè) catalog
- 在內(nèi)部 catalog 中注冊(cè)表
- 執(zhí)行 SQL 查詢(xún)
- 注冊(cè)用戶自定義函數(shù)
- 注冊(cè)用戶自定義函數(shù)
- 保存對(duì) ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
在創(chuàng)建 TableEnv 的時(shí)候,可以多傳入一個(gè) EnvironmentSettings 或者 TableConfig 參數(shù),可以用來(lái)配置 TableEnvironment 的一些特性。
7、 老版本創(chuàng)建流處理批處理
7.1老版本流處理
- val settings = EnvironmentSettings.newInstance()
- .useOldPlanner() // 使用老版本 planner
- .inStreamingMode() // 流處理模式
- .build()
- val tableEnv = StreamTableEnvironment.create(env, settings)
7.2 老版本批處理
- val batchEnv = ExecutionEnvironment.getExecutionEnvironment
- val batchTableEnv = BatchTableEnvironment.create(batchEnv)
7.3 blink 版本的流處理環(huán)境
- val bsSettings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode().build()
- val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
7.4 blink 版本的批處理環(huán)境
- val bbSettings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inBatchMode().build()
- val bbTableEnv = TableEnvironment.create(bbSettings)
總結(jié):
本篇文章主要講解了Flink SQL 入門(mén)操作,后面我會(huì)分享一些關(guān)于Flink SQL連接Kafka、輸出到kafka、MySQL等
本文轉(zhuǎn)載自微信公眾號(hào)「 大數(shù)據(jù)老哥」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系 大數(shù)據(jù)老哥公眾號(hào)。