偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Flink SQL 知其所以然:在 Flink 中還能使用 Hive Udf?(附源碼)

數(shù)據(jù)庫(kù) 其他數(shù)據(jù)庫(kù)
本文主要介紹 flink sql 流任務(wù)中,不能使用 create temporary function 去引入一個(gè)用戶自定義的 hive udf。因此博主只能通過(guò) flink sql 提供的 module 插件能力,自定義了 module,來(lái)支持引入用戶自定義的 hive udf。

[[440014]]

 1.序篇

廢話不多說(shuō),咱們先直接上本文的目錄和結(jié)論,小伙伴可以先看結(jié)論快速了解博主期望本文能給小伙伴們帶來(lái)什么幫助:

  • 背景及應(yīng)用場(chǎng)景介紹:博主期望你能了解到,其實(shí)很多場(chǎng)景下實(shí)時(shí)數(shù)倉(cāng)的建設(shè)都是隨著離線數(shù)倉(cāng)而建設(shè)的(相同的邏輯在實(shí)時(shí)數(shù)倉(cāng)中重新實(shí)現(xiàn)一遍),因此能夠在 flink sql 中復(fù)用 hive udf 是能夠大大提高人效的。
  • flink 擴(kuò)展支持 hive 內(nèi)置 udf:flink sql 提供了擴(kuò)展 udf 的能力,即 module,并且 flink sql 也內(nèi)置了 HiveModule(需要你主動(dòng)加載進(jìn)環(huán)境),來(lái)支持一些 hive 內(nèi)置的 udf (比如 get_json_object)給小伙伴們使用。
  • flink 擴(kuò)展支持用戶自定義的 hive udf:主要介紹 flink sql 流任務(wù)中,不能使用 create temporary function 去引入一個(gè)用戶自定義的 hive udf。因此博主只能通過(guò) flink sql 提供的 module 插件能力,自定義了 module,來(lái)支持引入用戶自定義的 hive udf。

2.背景及應(yīng)用場(chǎng)景介紹

其實(shí)大多數(shù)公司都是從離線數(shù)倉(cāng)開(kāi)始建設(shè)的。相信大家必然在自己的生產(chǎn)環(huán)境中開(kāi)發(fā)了非常多的 hive udf。隨著需求對(duì)于時(shí)效性要求的增高,越來(lái)越多的公司也開(kāi)始建設(shè)起實(shí)時(shí)數(shù)倉(cāng)。很多場(chǎng)景下實(shí)時(shí)數(shù)倉(cāng)的建設(shè)都是隨著離線數(shù)倉(cāng)而建設(shè)的。實(shí)時(shí)數(shù)據(jù)使用 flink 產(chǎn)出,離線數(shù)據(jù)使用 hive\spark 產(chǎn)出。

那么回到我們文章標(biāo)題的問(wèn)題:為什么需要 flink 支持 hive udf 呢?

博主分析了下,結(jié)論如下:

站在數(shù)據(jù)需求的角度來(lái)說(shuō),一般會(huì)有以下兩種情況:

  • 以前已經(jīng)有了離線數(shù)據(jù)鏈路,需求方也想要實(shí)時(shí)數(shù)據(jù)。如果直接能用已經(jīng)開(kāi)發(fā)好的 hive udf,則不用將相同的邏輯遷移到 flink udf 中,并且后續(xù)無(wú)需費(fèi)時(shí)費(fèi)力維護(hù)兩個(gè) udf 的邏輯一致性。
  • 實(shí)時(shí)和離線的需求都是新的,需要新開(kāi)發(fā)。如果只開(kāi)發(fā)一套 udf,則事半功倍。

因此在 flink 中支持 hive udf 這件事對(duì)開(kāi)發(fā)人員提效來(lái)說(shuō)是非常有好處的。

3.在擴(kuò)展前,你需要知道一些基本概念

  • flink 支持 hive udf 這件事分為兩個(gè)部分。
  • flink 擴(kuò)展支持 hive 內(nèi)置 udf

flink 擴(kuò)展支持用戶自定義 hive udf

第一部分:flink 擴(kuò)展支持 hive 內(nèi)置 udf,比如 get_json_object,rlike 等等。

有同學(xué)問(wèn)了,這么基本的 udf,flink 都沒(méi)有嗎?

確實(shí)沒(méi)有。關(guān)于 flink sql 內(nèi)置的 udf 見(jiàn)如下鏈接,大家可以看看 flink 支持了哪些 udf:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/

那么如果我如果強(qiáng)行使用 get_json_object 這個(gè) udf,會(huì)發(fā)生啥呢?結(jié)果如下圖。

直接報(bào)錯(cuò)找不到 udf。

第二部分:flink 擴(kuò)展支持用戶自定義 hive udf。

內(nèi)置函數(shù)解決不了用戶的復(fù)雜需求,用戶就需要自己寫 hive udf,并且這部分自定義 udf 也想在 flink sql 中使用。

下面看看怎么在 flink sql 中進(jìn)行這兩種擴(kuò)展。

4.hive udf 擴(kuò)展支持

4.1.flink sql module

涉及到擴(kuò)展 udf 就不得不提到 flink 提供的 module。見(jiàn)官網(wǎng)下圖。

從第一句話就可以看到,module 的作用就是讓用戶去擴(kuò)展 udf 的。

flink 本身已經(jīng)內(nèi)置了一個(gè) module,名字叫 CoreModule,其中已經(jīng)包含了一些 udf。

那我們要怎么使用 module 這玩意去擴(kuò)展我們的 hive udf 呢?

4.2.flink 擴(kuò)展支持 hive 內(nèi)置 udf

步驟如下:

引入 hive 的 connector。其中包含了 flink 官方提供的一個(gè) HiveModule。在 HiveModule 中包含了 hive 內(nèi)置的 udf。

  1. <dependency> 
  2.     <groupId>org.apache.flink</groupId> 
  3.     <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> 
  4.     <version>${flink.version}</version> 
  5. </dependency> 

在 StreamTableEnvironment 中加載 HiveModule。

  1. String name = "default"
  2. String version = "3.1.2"
  3. tEnv.loadModule(name, new HiveModule(version)); 

然后在控制臺(tái)打印一下目前有的 module。

  1. String[] modules = tEnv.listModules(); 
  2. Arrays.stream(modules).forEach(System.out::println); 

然后可以看到除了 core module,還有我們剛剛加載進(jìn)去的 default module。

  1. default 
  2. core 

查看所有 module 的所有 udf。在控制臺(tái)打印一下。

  1. String[] functions = tEnv.listFunctions(); 
  2. Arrays.stream(functions).forEach(System.out::println); 

就會(huì)將 default 和 core module 中的所有包含的 udf 給列舉出來(lái),當(dāng)然也就包含了 hive module 中的 get_json_object。

然后我們?cè)偃ピ?flink sql 中使用 get_json_object 這個(gè) udf,就沒(méi)有報(bào)錯(cuò),能正常輸出結(jié)果了。

使用 flink hive connector 自帶的 HiveModule,已經(jīng)能夠解決很大一部分常見(jiàn) udf 使用的問(wèn)題了。

4.2.flink 擴(kuò)展支持用戶自定義 hive udf

原本博主是直接想要使用 flink sql 中的 create temporary function 去執(zhí)行引入自定義 hive udf 的。

舉例如下:

  1. CREATE TEMPORARY FUNCTION test_hive_udf as 'flink.examples.sql._09.udf._02_stream_hive_udf.TestGenericUDF'

發(fā)現(xiàn)在執(zhí)行這句 sql 時(shí),是可以執(zhí)行成功,將 udf 注冊(cè)進(jìn)去的。

但是在后續(xù) udf 初始化時(shí)就報(bào)錯(cuò)了。具體錯(cuò)誤如下圖。直接報(bào)錯(cuò) ClassCastException。

看了下源碼,flink 流環(huán)境下(未連接 hive catalog 時(shí))在創(chuàng)建 udf 時(shí)會(huì)認(rèn)為這個(gè) udf 是 flink 生態(tài)體系中的 udf。

所以在初始化我們引入的 TestGenericUDF 時(shí),默認(rèn)會(huì)按照 flink 的 UserDefinedFunction 強(qiáng)轉(zhuǎn),因此才會(huì)報(bào)強(qiáng)轉(zhuǎn)錯(cuò)誤。

那么我們就不能使用 hive udf 了嗎?

錯(cuò)誤,小伙伴萌豈敢有這種想法。博主都把這個(gè)標(biāo)題列出來(lái)了(牛逼都吹出去了),還能給不出解決方案嘛。

思路見(jiàn)下一章節(jié)。

4.3.flink 擴(kuò)展支持用戶自定義 hive udf 的增強(qiáng) module

其實(shí)思路很簡(jiǎn)單。

使用 flink sql 中的 create temporary function 雖然不能執(zhí)行,但是 flink 提供了插件化的自定義 module。

我們可以擴(kuò)展一個(gè)支持用戶自定義 hive udf 的 module,使用這個(gè) module 來(lái)支持自定義的 hive udf。

實(shí)現(xiàn)的代碼也非常簡(jiǎn)單。簡(jiǎn)單的把 flink hive connector 提供的 HiveModule 做一個(gè)增強(qiáng)即可,即下圖中的 HiveModuleV2。

使用方式如下圖所示:

然后程序就正常跑起來(lái)了。

肥腸滴好用!

5.總結(jié)與展望

本文主要介紹了如果在 flink sql 使用 hive 內(nèi)置 udf 及用戶自定義 hive udf,總結(jié)如下:

  • 背景及應(yīng)用場(chǎng)景介紹:博主期望你能了解到,其實(shí)很多場(chǎng)景下實(shí)時(shí)數(shù)倉(cāng)的建設(shè)都是隨著離線數(shù)倉(cāng)而建設(shè)的(相同的邏輯在實(shí)時(shí)數(shù)倉(cāng)中重新實(shí)現(xiàn)一遍),因此能夠在 flink sql 中復(fù)用 hive udf 是能夠大大提高人效的。
  • flink 擴(kuò)展支持 hive 內(nèi)置 udf:flink sql 提供了擴(kuò)展 udf 的能力,即 module,并且 flink sql 也內(nèi)置了 HiveModule(需要你主動(dòng)加載進(jìn)環(huán)境),來(lái)支持一些 hive 內(nèi)置的 udf (比如 get_json_object)給小伙伴們使用。
  • flink 擴(kuò)展支持用戶自定義的 hive udf:主要介紹 flink sql 流任務(wù)中,不能使用 create temporary function 去引入一個(gè)用戶自定義的 hive udf。因此博主只能通過(guò) flink sql 提供的 module 插件能力,自定義了 module,來(lái)支持引入用戶自定義的 hive udf。

 

責(zé)任編輯:姜華 來(lái)源: 大數(shù)據(jù)羊說(shuō)
相關(guān)推薦

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-06-06 09:27:23

FlinkSQLGroup

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2021-12-09 06:59:24

FlinkSQL 開(kāi)發(fā)

2022-05-15 09:57:59

Flink SQL時(shí)間語(yǔ)義

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-06-29 09:01:38

FlinkSQL時(shí)間屬性

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類型

2022-05-27 09:02:58

SQLHive語(yǔ)義

2021-09-12 07:01:07

Flink SQL ETL datastream

2022-08-10 10:05:29

FlinkSQL

2021-11-27 09:03:26

flink join數(shù)倉(cāng)

2021-11-28 11:36:08

SQL Flink Join

2021-12-05 08:28:39

Flink SQLbatch lookuSQL

2022-06-18 09:26:00

Flink SQLJoin 操作

2021-12-17 07:54:16

Flink SQLTable DataStream

2021-12-06 07:15:47

開(kāi)發(fā)Flink SQL

2021-11-30 23:30:45

sql 性能異步

2021-11-24 08:17:21

Flink SQLCumulate WiSQL
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)