偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

十分鐘入門(mén)Fink SQL

運(yùn)維 數(shù)據(jù)庫(kù)運(yùn)維
本篇文章主要講解了Flink SQL 入門(mén)操作,后面我會(huì)分享一些關(guān)于Flink SQL連接Kafka、輸出到kafka、MySQL等

[[358221]]

前言

Flink 本身是批流統(tǒng)一的處理框架,所以 Table API 和 SQL,就是批流統(tǒng)一的上層處理 API。目前功能尚未完善,處于活躍的開(kāi)發(fā)階段。 Table API 是一套內(nèi)嵌在 Java 和 Scala 語(yǔ)言中的查詢(xún) API,它允許我們以非常直觀的方式,組合來(lái)自一些關(guān)系運(yùn)算符的查詢(xún)(比如 select、filter 和 join)。而對(duì)于 Flink SQL,就是直接可以在代碼中寫(xiě) SQL,來(lái)實(shí)現(xiàn)一些查詢(xún)(Query)操作。Flink 的 SQL 支持,基于實(shí)現(xiàn)了 SQL 標(biāo)準(zhǔn)的 Apache Calcite(Apache 開(kāi)源 SQL 解析工具)。圖片

1、導(dǎo)入所需要的的依賴(lài)包

  1. <dependency> 
  2.           <groupId>org.apache.flink</groupId> 
  3.           <artifactId>flink-table-planner_2.12</artifactId> 
  4.           <version>1.10.1</version> 
  5.       </dependency> 
  6.       <dependency> 
  7.           <groupId>org.apache.flink</groupId> 
  8.           <artifactId>flink-table-api-scala-bridge_2.12</artifactId> 
  9.           <version>1.10.1</version> 
  10.       </dependency> 
  11.       <dependency> 
  12.           <groupId>org.apache.flink</groupId> 
  13.           <artifactId>flink-csv</artifactId> 
  14.           <version>1.10.1</version> 
  15.      </dependency> 

flink-table-planner:planner 計(jì)劃器,是 table API 最主要的部分,提供了運(yùn)行時(shí)環(huán)境和生成程序執(zhí)行計(jì)劃的 planner; flink-table-api-scala-bridge:bridge 橋接器,主要負(fù)責(zé) table API 和 DataStream/DataSet API的連接支持,按照語(yǔ)言分 java 和 scala。

這里的兩個(gè)依賴(lài),是 IDE 環(huán)境下運(yùn)行需要添加的;如果是生產(chǎn)環(huán)境,lib 目錄下默認(rèn)已經(jīng)有了 planner,就只需要有 bridge 就可以了。

當(dāng)然,如果想使用用戶自定義函數(shù),或是跟 kafka 做連接,需要有一個(gè) SQL client,這個(gè)包含在 flink-table-common 里。

2、兩種 planner(old& blink)的區(qū)別

  1. 批流統(tǒng)一:Blink 將批處理作業(yè),視為流式處理的特殊情況。所以,blink 不支持表和DataSet 之間的轉(zhuǎn)換,批處理作業(yè)將不轉(zhuǎn)換為 DataSet 應(yīng)用程序,而是跟流處理一樣,轉(zhuǎn)換為 DataStream 程序來(lái)處理。
  2. 因 為 批 流 統(tǒng) 一 , Blink planner 也 不 支 持 BatchTableSource , 而 使 用 有 界 的
  3. Blink planner 只支持全新的目錄,不支持已棄用的 ExternalCatalog。
  4. 舊 planner 和 Blink planner 的 FilterableTableSource 實(shí)現(xiàn)不兼容。舊的 planner 會(huì)把PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 則會(huì)把 Expressions 下推。
  5. 基于字符串的鍵值配置選項(xiàng)僅適用于 Blink planner。
  6. PlannerConfig 在兩個(gè) planner 中的實(shí)現(xiàn)不同。
  7. Blink planner 會(huì)將多個(gè) sink 優(yōu)化在一個(gè) DAG 中(僅在 TableEnvironment 上受支持,而在 StreamTableEnvironment 上不受支持)。而舊 planner 的優(yōu)化總是將每一個(gè) sink 放在一個(gè)新的 DAG 中,其中所有 DAG 彼此獨(dú)立。
  8. 舊的 planner 不支持目錄統(tǒng)計(jì),而 Blink planner 支持。

3、表(Table)的概念

TableEnvironment 可以注冊(cè)目錄 Catalog,并可以基于 Catalog 注冊(cè)表。它會(huì)維護(hù)一個(gè)Catalog-Table 表之間的 map。 表(Table)是由一個(gè)標(biāo)識(shí)符來(lái)指定的,由 3 部分組成:Catalog 名、數(shù)據(jù)庫(kù)(database)名和對(duì)象名(表名)。如果沒(méi)有指定目錄或數(shù)據(jù)庫(kù),就使用當(dāng)前的默認(rèn)值。

4、連接到文件系統(tǒng)(Csv 格式)

連接外部系統(tǒng)在 Catalog 中注冊(cè)表,直接調(diào)用 tableEnv.connect()就可以,里面參數(shù)要傳入一個(gè) ConnectorDescriptor,也就是 connector 描述器。對(duì)于文件系統(tǒng)的 connector 而言,flink內(nèi)部已經(jīng)提供了,就叫做 FileSystem()。

5、測(cè)試案例 (新)

需求: 將一個(gè)txt文本文件作為輸入流讀取數(shù)據(jù)過(guò)濾id不等于sensor_1的數(shù)據(jù)實(shí)現(xiàn)思路: 首先我們先構(gòu)建一個(gè)table的env環(huán)境通過(guò)connect提供的方法來(lái)讀取數(shù)據(jù)然后設(shè)置表結(jié)構(gòu)將數(shù)據(jù)注冊(cè)為一張表就可進(jìn)行我們的數(shù)據(jù)過(guò)濾了(使用sql或者流處理方式進(jìn)行解析)

準(zhǔn)備數(shù)據(jù)

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

代碼實(shí)現(xiàn)

  1. import org.apache.flink.streaming.api.scala._ 
  2. import org.apache.flink.table.api.{DataTypes} 
  3. import org.apache.flink.table.api.scala._ 
  4. import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema
  5.  
  6. /** 
  7.  * @Package 
  8.  * @author 大數(shù)據(jù)老哥 
  9.  * @date 2020/12/12 21:22 
  10.  * @version V1.0 
  11.  *          第一個(gè)Flinksql測(cè)試案例 
  12.  */ 
  13.  
  14. object FlinkSqlTable { 
  15.   def main(args: Array[String]): Unit = { 
  16.     // 構(gòu)建運(yùn)行流處理的運(yùn)行環(huán)境 
  17.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18.     // 構(gòu)建table環(huán)境 
  19.     val tableEnv = StreamTableEnvironment.create(env) 
  20.      //通過(guò) connect 讀取數(shù)據(jù) 
  21.     tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt")) 
  22.       .withFormat(new Csv()) //設(shè)置類(lèi)型 
  23.       .withSchema(new Schema() // 給數(shù)據(jù)添加元數(shù)信息 
  24.         .field("id", DataTypes.STRING()) 
  25.         .field("time", DataTypes.BIGINT()) 
  26.         .field("temperature", DataTypes.DOUBLE()) 
  27.       ).createTemporaryTable("inputTable")  // 創(chuàng)建一個(gè)臨時(shí)表 
  28.      
  29.     val resTable = tableEnv.from("inputTable"
  30.       .select("*").filter('id === "sensor_1"
  31.     // 使用sql的方式查詢(xún)數(shù)據(jù) 
  32.     var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'"
  33.     // 將數(shù)據(jù)轉(zhuǎn)為流進(jìn)行輸出 
  34.     resTable.toAppendStream[(String, Long, Double)].print("resTable"
  35.     resSql.toAppendStream[(String, Long, Double)].print("resSql"
  36.  
  37.     env.execute("FlinkSqlWrodCount"
  38.   } 

6、TableEnvironment 的作用

  • 注冊(cè) catalog
  • 在內(nèi)部 catalog 中注冊(cè)表
  • 執(zhí)行 SQL 查詢(xún)
  • 注冊(cè)用戶自定義函數(shù)
  • 注冊(cè)用戶自定義函數(shù)
  • 保存對(duì) ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在創(chuàng)建 TableEnv 的時(shí)候,可以多傳入一個(gè) EnvironmentSettings 或者 TableConfig 參數(shù),可以用來(lái)配置 TableEnvironment 的一些特性。

7、 老版本創(chuàng)建流處理批處理

7.1老版本流處理

  1. val settings = EnvironmentSettings.newInstance() 
  2. .useOldPlanner() // 使用老版本 planner 
  3. .inStreamingMode() // 流處理模式 
  4. .build() 
  5. val tableEnv = StreamTableEnvironment.create(env, settings) 

7.2 老版本批處理

  1. val batchEnv = ExecutionEnvironment.getExecutionEnvironment  
  2. val batchTableEnv = BatchTableEnvironment.create(batchEnv) 

7.3 blink 版本的流處理環(huán)境

  1. val bsSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inStreamingMode().build() 
  4. val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) 

7.4 blink 版本的批處理環(huán)境

  1. val bbSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inBatchMode().build() 
  4. val bbTableEnv = TableEnvironment.create(bbSettings) 

總結(jié):

本篇文章主要講解了Flink SQL 入門(mén)操作,后面我會(huì)分享一些關(guān)于Flink SQL連接Kafka、輸出到kafka、MySQL等

本文轉(zhuǎn)載自微信公眾號(hào)「 大數(shù)據(jù)老哥」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系 大數(shù)據(jù)老哥公眾號(hào)。

 

責(zé)任編輯:武曉燕 來(lái)源: 大數(shù)據(jù)老哥
相關(guān)推薦

2022-06-16 07:31:41

Web組件封裝HTML 標(biāo)簽

2012-07-10 01:22:32

PythonPython教程

2024-05-13 09:28:43

Flink SQL大數(shù)據(jù)

2019-04-01 14:59:56

負(fù)載均衡服務(wù)器網(wǎng)絡(luò)

2023-06-07 08:27:10

Docker容器

2024-06-19 09:58:29

2021-09-07 09:40:20

Spark大數(shù)據(jù)引擎

2023-04-12 11:18:51

甘特圖前端

2023-11-30 10:21:48

虛擬列表虛擬列表工具庫(kù)

2015-09-06 09:22:24

框架搭建快速高效app

2023-10-07 00:06:09

SQL數(shù)據(jù)庫(kù)

2019-09-16 09:14:51

2009-10-09 14:45:29

VB程序

2022-08-26 09:01:07

CSSFlex 布局

2023-07-15 18:26:51

LinuxABI

2024-11-07 16:09:53

2023-11-09 14:44:27

Docker鏡像容器

2020-12-11 09:40:10

DevOpsCICD

2015-11-06 11:03:36

2022-04-13 22:01:44

錯(cuò)誤監(jiān)控系統(tǒng)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)