偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

FlinkSQL 中 Catalog 的使用場景及案例詳解

大數(shù)據(jù)
Catalog 在 Flink SQL 中是一個元數(shù)據(jù)管理組件,用于存儲和管理數(shù)據(jù)庫、表、視 圖、函數(shù)等元數(shù)據(jù)對象的抽象接口。

Catalog 在 Flink SQL 中是一個元數(shù)據(jù)管理組件,用于存儲和管理數(shù)據(jù)庫、表、視 圖、函數(shù)等元數(shù)據(jù)對象的抽象接口。它類似于傳統(tǒng)數(shù)據(jù)庫系統(tǒng)中的元數(shù)據(jù)倉庫,為 Flink SQL 提供了元數(shù)據(jù)管理能力。

Catalog 使 Flink 能夠:

  • 以統(tǒng)一的方式訪問不同的外部系統(tǒng)
  • 減少代碼中的硬編 碼配置
  • 實現(xiàn)表元數(shù)據(jù)的持久化
  • 支持跨會話的元數(shù)據(jù)共享。

1. Catalog的作用

(1) 管理元數(shù)據(jù)對象

Catalog 可以管理以下元數(shù)據(jù)對象: - 數(shù)據(jù)庫(Database) - 表(Table) - 視圖(View) - 函數(shù)(Function) - 分區(qū)(Partition)等

(2) 支持多樣化的元數(shù)據(jù)存儲

Flink 支持多種Catalog 實現(xiàn),可以連接各種外部元數(shù)據(jù)系統(tǒng): - 內(nèi)存Catalog(默認(rèn)) - Hive Catalog - JDBC Catalog - 自定義 Catalog

(3) 提供統(tǒng)一的數(shù)據(jù)訪問接口

無論底層元數(shù)據(jù)存儲在哪里,都可以通過統(tǒng)一的接口訪問和操作

(4) 簡化元數(shù)據(jù)管理

開發(fā)者可以通過Catalog 注冊永久表,而不是在代碼中重復(fù)定義表結(jié)構(gòu)

2. Flink內(nèi)置的catalog類型

(1) GenericInMemoryCatalog

默認(rèn)的內(nèi)存 Catalog,元數(shù)據(jù)只在單個 Flink 會話中有效,會話結(jié)束數(shù)據(jù)就會丟失。

// 創(chuàng)建內(nèi)存Catalog 
Catalog inmemory = new GenericInMemoryCatalog("in_memory_catalog"); 
tableEnv.registerCatalog("in_memory_catalog", inmemory); -- SQL 中創(chuàng)建和使用內(nèi)存Catalog 
CREATE CATALOG my_memory_catalog WITH ( 
'type' = 'generic_in_memory' 
); 
USE CATALOG my_memory_catalog;

(2) HiveCatalog

使用Hive Metastore 存儲元數(shù)據(jù),支持持久化,適合生產(chǎn)環(huán)境。

// 創(chuàng)建Hive Catalog 
String name = "my_hive_catalog"; 
String defaultDatabase = "default"; 
String hiveConfDir = "/path/to/hive/conf"; 
String version = "2.3.6"; 
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version); 
tableEnv.registerCatalog("my_hive_catalog", hive); -- SQL 中創(chuàng)建和使用Hive Catalog 
CREATE CATALOG my_hive_catalog WITH ( 
'type' = 'hive', 
'default-database' = 'default', 
'hive-conf-dir' = '/path/to/hive/conf', 
'hive-version' = '2.3.6' 
); 
USE CATALOG my_hive_catalog; 
3. JdbcCatalog 
使用關(guān)系型數(shù)據(jù)庫存儲元數(shù)據(jù)。 
// 創(chuàng)建JDBC Catalog 
String name = "my_jdbc_catalog"; 
String defaultDatabase = "default"; 
String username = "username"; 
String password = "password"; 


String baseUrl = "jdbc:mysql://localhost:3306"; 


JdbcCatalog jdbc = new JdbcCatalog(name, defaultDatabase, username, pas
 sword, baseUrl); 
tableEnv.registerCatalog("my_jdbc_catalog", jdbc); -- SQL中創(chuàng)建和使用JDBC Catalog(Flink 1.15+) 
CREATE CATALOG my_jdbc_catalog WITH ( 
    'type' = 'jdbc', 
    'default-database' = 'default', 
    'username' = 'username', 
    'password' = 'password', 
    'base-url' = 'jdbc:mysql://localhost:3306' 
); 


USE CATALOG my_jdbc_catalog;

(3) 使用Catalog的SQL操作

1. 創(chuàng)建和切換Catalog -- 創(chuàng)建Catalog 
CREATE CATALOG my_catalog WITH ( 
    'type' = 'hive', 
    'hive-conf-dir' = '/path/to/hive/conf' 
); 
 -- 查看所有Catalog 
SHOW CATALOGS; 
 -- 切換當(dāng)前Catalog 
USE CATALOG my_catalog; 
2. 創(chuàng)建和使用數(shù)據(jù)庫 -- 創(chuàng)建數(shù)據(jù)庫 
CREATE DATABASE my_database; 
 -- 查看所有數(shù)據(jù)庫 
SHOW DATABASES; 
 -- 切換當(dāng)前數(shù)據(jù)庫 
USE my_database; 
3. 管理表和視圖 -- 創(chuàng)建表 
CREATE TABLE my_table ( 
    id INT, 
    name STRING, 
    create_time TIMESTAMP(3) 


) WITH ( 
'connector' = 'kafka', 
'topic' = 'my_topic', 
'properties.bootstrap.servers' = 'localhost:9092', 
'format' = 'json' 
); -- 查看所有表 
SHOW TABLES; -- 查看表結(jié)構(gòu) 
DESCRIBE my_table; 
4. 管理函數(shù) -- 創(chuàng)建自定義函數(shù) 
CREATE FUNCTION my_function AS 'com.example.MyFunction'; -- 查看所有函數(shù) 
SHOW FUNCTIONS;

(4) Catalog 的實際應(yīng)用示

// 跨會話持久化元數(shù)據(jù) 
// 會話1:注冊Hive Catalog和表 
StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecuti
 onEnvironment(); 
StreamTableEnvironment tEnv1 = StreamTableEnvironment.create(env1); 
tEnv1.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')"); 
tEnv1.executeSql("USE CATALOG hive_catalog"); 
tEnv1.executeSql("CREATE DATABASE IF NOT EXISTS db1"); 
tEnv1.executeSql("USE db1"); 
tEnv1.executeSql( 
"CREATE TABLE IF NOT EXISTS orders (" + 
"  order_id STRING, " + 
"  price DECIMAL(10, 2)" + 
") WITH (" + 
"  'connector' = 'kafka', " + 
"  'topic' = 'orders'" + 
")"); 
// 會話2:直接使用之前注冊的表 
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecuti
 onEnvironment(); 
StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env2); 
tEnv2.executeSql("USE CATALOG hive_catalog"); 
tEnv2.executeSql("USE db1"); 

// 不需要重新定義表結(jié)構(gòu),可以直接查詢 
tEnv2.executeSql("SELECT * FROM orders"); 
使用不同類型的Catalog實現(xiàn)多源數(shù)據(jù)集成 
// 注冊多個Catalog訪問不同系統(tǒng) 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutio
 nEnvironment(); 
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); 
// 注冊Hive Catalog 
tEnv.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')"); 
// 注冊JDBC Catalog 
tEnv.executeSql( 
"CREATE CATALOG jdbc_catalog WITH (" + 
"  'type' = 'jdbc', " + 
"  'default-database' = 'default', " + 
"  'username' = 'user', " + 
"  'password' = 'password', " + 
"  'base-url' = 'jdbc:mysql://localhost:3306'" + 
")"); 
// 從不同Catalog中的表關(guān)聯(lián)查詢 
tEnv.executeSql( 
"SELECT o.order_id, o.price, c.name " + 
"FROM hive_catalog.db1.orders o " + 
"JOIN jdbc_catalog.default.customers c " + 
"ON o.customer_id = c.id" 
); 
責(zé)任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關(guān)推薦

2013-12-25 16:03:39

GitGit 命令

2022-07-29 07:48:15

HTTP常用狀態(tài)碼

2025-02-07 14:33:04

2020-02-14 13:50:32

JavaScript前端技術(shù)

2023-05-15 08:50:58

ContextGolang

2024-10-10 08:46:28

2024-10-06 12:35:50

2020-09-04 13:30:43

Java自定義代碼

2023-05-16 07:47:18

RabbitMQ消息隊列系統(tǒng)

2013-07-10 15:52:17

fragmentAndroid

2024-04-16 12:13:07

usingC#開發(fā)

2024-01-30 09:43:43

Java緩存技術(shù)

2022-12-08 11:54:55

元宇宙

2024-11-12 06:27:16

Python列表元組

2018-08-15 09:48:27

數(shù)據(jù)庫Redis應(yīng)用場景

2009-08-03 11:38:57

linux at命令詳linux at命令

2011-05-16 15:49:58

JAVA

2009-05-18 13:07:44

類隱藏Java關(guān)鍵字

2024-09-06 11:52:47

2025-02-11 09:49:12

點贊
收藏

51CTO技術(shù)棧公眾號