偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

小米面試:Paimon Join 用法有哪些?大規(guī)模數(shù)據(jù)場景下如何優(yōu)化 Join 性能?

大數(shù)據(jù)
本文將詳細(xì)介紹 Paimon 中的各類 Join 語法,包括 Lookup Join、Batch Join 以及其他特殊類型的 Join,并提供詳細(xì)的案例說明。

Apache Paimon 是一個開源的流式數(shù)據(jù)湖框架,它支持多種 Join 操作,使用戶能夠在不同場景下高效地關(guān)聯(lián)數(shù)據(jù)。本文將詳細(xì)介紹 Paimon 中的各類 Join 語法,包括 Lookup Join、Batch Join 以及其他特殊類型的 Join,并提供詳細(xì)的案例說明。

一、Lookup Join

Lookup Join 是 Paimon 中最重要的 Join 類型之一,主要用于流式查詢中將流數(shù)據(jù)與維度表數(shù)據(jù)進(jìn)行關(guān)聯(lián)。它要求一個表具有處理時間屬性,另一個表作為查找源。

1. 基本概念

Lookup Join 的核心思想是:當(dāng)流數(shù)據(jù)到達(dá)時,系統(tǒng)會查找維度表中的匹配記錄,然后將兩者關(guān)聯(lián)起來。這種 Join 方式特別適合于數(shù)據(jù)流需要與相對靜態(tài)的維度數(shù)據(jù)進(jìn)行豐富的場景。

2. 語法結(jié)構(gòu)

Paimon 中 Lookup Join 的基本語法如下:

SELECT [列名列表]  
FROM 流表 AS 別名1  
JOIN 維度表 FOR SYSTEM_TIME AS OF 別名1.處理時間列 AS 別名2  
ON 別名1.關(guān)聯(lián)列 = 別名2.關(guān)聯(lián)列

其中:

  • FOR SYSTEM_TIME AS OF:是 Lookup Join 的關(guān)鍵語法
  • 處理時間列:必須是流表中的 PROCTIME() 類型列
  • 維度表通常是 Paimon 表,具有主鍵

3. 基本案例

以下是一個基本的 Lookup Join 案例:

-- 創(chuàng)建維度表  
CREATE TABLE customers (  
    id INT PRIMARY KEY NOT ENFORCED,  
    name STRING,  
    country STRING,  
    zip STRING  
);  


-- 插入數(shù)據(jù)到維度表  
INSERT INTO customers VALUES (1, 'Alice', 'USA', '10001'), (2, 'Bob', 'UK', '20002');  


-- 創(chuàng)建流表  
CREATE TABLE orders (  
    order_id INT,  
    total INT,  
    customer_id INT,  
    proc_time AS PROCTIME()  
);  


-- Lookup Join 查詢  
SELECT o.order_id, o.total, c.name, c.country  
FROM orders AS o  
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.customer_id = c.id;

在這個案例中,每當(dāng)一條新的訂單記錄到達(dá)時,系統(tǒng)會查找對應(yīng)的客戶信息并關(guān)聯(lián)起來。

4. 緩存模式

Paimon 支持多種緩存策略來優(yōu)化 Lookup Join 的性能:

(1) AUTO 模式

AUTO 模式是默認(rèn)的緩存策略,系統(tǒng)會自動選擇最高效的緩存方式。

CREATE TABLE dim (  
  id INT PRIMARY KEY NOT ENFORCED,   
  value STRING  
) WITH (  
  'continuous.discovery-interval' = '1 s',  
  'lookup.cache' = 'auto'  
);

(2) FULL 模式

FULL 模式會緩存整個維度表,適用于小到中等大小的維度表。

CREATE TABLE dim (  
  id INT PRIMARY KEY NOT ENFORCED,   
  value STRING  
) WITH (  
  'continuous.discovery-interval' = '1 s',  
  'lookup.cache' = 'full'  
);

5. 高級特性

(1) 動態(tài)分區(qū)查找

對于分區(qū)表,Paimon 支持動態(tài)分區(qū)選擇,可以顯著提高性能:

-- 使用最新分區(qū)進(jìn)行查找  
SELECT * FROM orders AS o  
LEFT JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()') */  
FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.customer_id = c.id;

分區(qū)表達(dá)式函數(shù)包括:

  • max_pt() - 最新分區(qū)
  • max_pt(n) - 第 n 個最新分區(qū)
  • 自定義 SQL 表達(dá)式

(2) 異步查找

Paimon 支持異步查找,可以提高 Lookup Join 的性能:

SELECT o.order_id, o.total, c.country, c.zip  
FROM orders AS o  
JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */  
FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.customer_id = c.id;

(3) 重試機(jī)制

對于可能因數(shù)據(jù)不就緒而導(dǎo)致的 Join 失敗,Paimon 提供了重試機(jī)制:

-- 同步重試  
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */  
o.order_id, o.total, c.country, c.zip  
FROM orders AS o  
JOIN customers  
FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.customer_id = c.id;  


-- 異步重試  
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */  
o.order_id, o.total, c.country, c.zip  
FROM orders AS o  
JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */  
FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.customer_id = c.id;

(4) 刷新黑名單時段

可以定義不應(yīng)刷新查找緩存的時間段,這對于防止在高峰負(fù)載期間或維護(hù)窗口期間刷新緩存很有用:

CREATE TABLE dim (  
  id INT PRIMARY KEY NOT ENFORCED,   
  value STRING  
) WITH (  
  'lookup.refresh-time-periods-blacklist' = '2023-10-31 12:00->2023-10-31 16:00'  
);

格式為 start-time->end-time,時間戳采用 yyyy-MM-dd HH:mm 格式。多個時段可以用逗號分隔。

6. 查詢服務(wù)優(yōu)化

Paimon 提供了查詢服務(wù)功能,可以顯著提高 Lookup Join 的性能:

-- 啟動查詢服務(wù)  
CALL sys.query_service('database_name.table_name', parallelism);  


-- 使用查詢服務(wù)的 Lookup Join  
SELECT o.order_id, o.total, c.country, c.zip  
FROM orders AS o  
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.customer_id = c.id;

7. 大規(guī)模 Lookup Join 優(yōu)化

對于數(shù)據(jù)量較大的維度表,Paimon 提供了 shuffle lookup 優(yōu)化:

-- 啟用 shuffle lookup 優(yōu)化  
SELECT /*+ LOOKUP('table'='c', 'shuffle'='true') */  
o.order_id, o.total, c.country, c.zip  
FROM orders AS o  
JOIN customers  
FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.customer_id = c.id;

8. 復(fù)雜條件 Lookup Join

Paimon 支持在 Lookup Join 中使用復(fù)雜的連接條件:

-- 帶過濾條件的 Lookup Join  
SELECT T.i, D.j, D.k1   
FROM T   
LEFT JOIN DIM for system_time as of T.proctime AS D   
ON T.i = D.i AND D.k1 > 111;

二、Batch Join

除了 Lookup Join,Paimon 還支持批處理場景下的各種 Join 操作。

1. 基本 Join 類型

(1) INNER JOIN

內(nèi)連接返回兩個表中滿足連接條件的記錄:

SELECT a.id, a.name, b.value  
FROM table_a a  
INNER JOIN table_b b  
ON a.id = b.id;

(2) LEFT JOIN

左連接返回左表中的所有記錄,以及右表中滿足連接條件的記錄:

SELECT a.id, a.name, b.value  
FROM table_a a  
LEFT JOIN table_b b  
ON a.id = b.id;

(3) RIGHT JOIN

右連接返回右表中的所有記錄,以及左表中滿足連接條件的記錄:

SELECT a.id, a.name, b.value  
FROM table_a a  
RIGHT JOIN table_b b  
ON a.id = b.id;

(4) FULL JOIN

全連接返回兩個表中的所有記錄,無論它們是否滿足連接條件:

SELECT a.id, a.name, b.value  
FROM table_a a  
FULL JOIN table_b b  
ON a.id = b.id;

2. 動態(tài)分區(qū)剪枝

Paimon 支持動態(tài)分區(qū)剪枝,可以顯著提高分區(qū)表的 Join 性能:

-- 創(chuàng)建維度表  
CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT);  
INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2);  


-- 創(chuàng)建分區(qū)事實表  
CREATE TABLE fact (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p);  
INSERT INTO fact PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc');  
INSERT INTO fact PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc');  
INSERT INTO fact PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc');  


-- 使用動態(tài)分區(qū)剪枝的 Join  
SELECT a, b, c, p, x, y FROM fact INNER JOIN dim ON x = p and z = 1 ORDER BY a;

3. Bucket Join

對于使用相同 bucket 策略的表,Paimon 支持高效的 Bucket Join:

-- 創(chuàng)建具有相同 bucket 配置的表  
CREATE TABLE t1 (id INT, c STRING) using paimon TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10');  
INSERT INTO t1 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5');  


CREATE TABLE t2 (id INT, c STRING) using paimon TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10');  
INSERT INTO t2 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5');  


-- Bucket Join  
SELECT * FROM t1 JOIN t2 on t1.id = t2.id order by t1.id;

4. 多表 Join

Paimon 支持多表 Join,可以同時關(guān)聯(lián)多個表:

-- 多表 Join  
SELECT a.id, a.name, b.value, c.description  
FROM table_a a  
JOIN table_b b ON a.id = b.id  
JOIN table_c c ON a.id = c.id;

5. 自連接

Paimon 支持自連接,即表與自身進(jìn)行 Join:

-- 自連接  
SELECT a.id, a.name, b.name as manager_name  
FROM employees a  
JOIN employees b ON a.manager_id = b.id;

三、特殊 Join 類型

1. 大規(guī)模數(shù)據(jù)的 Bucket Shuffle Join

對于數(shù)據(jù)量特別大的維度表,傳統(tǒng)的 Lookup Join 可能會導(dǎo)致每個 Flink 子任務(wù)都需要存儲整個維度表的副本,這會造成內(nèi)存壓力。Paimon 提供了 Bucket Shuffle 優(yōu)化策略,特別適用于固定桶的 Paimon 表。

-- 啟用 bucket shuffle 優(yōu)化的 Lookup Join  
SELECT /*+ LOOKUP('table'='c', 'shuffle'='true') */  
o.order_id, o.total, c.country, c.zip  
FROM orders AS o  
JOIN customers  
FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.customer_id = c.id;

這種優(yōu)化將相同桶的數(shù)據(jù)發(fā)送到指定的子任務(wù),使每個 Flink 子任務(wù)只需存儲部分?jǐn)?shù)據(jù),而不是整個表的數(shù)據(jù)。這種方式適用于 Flink 2.0+ 版本和使用固定桶策略的 Paimon 表。

2. 動態(tài)分區(qū) Lookup Join

在傳統(tǒng)數(shù)據(jù)倉庫中,每個分區(qū)通常維護(hù)最新的完整數(shù)據(jù)。對于這種場景,Paimon 專門開發(fā)了 max_pt() 功能,允許 Lookup Join 只關(guān)聯(lián)最新分區(qū)的數(shù)據(jù)。

首先,創(chuàng)建一個分區(qū)表:

CREATE TABLE customers (  
  id INT,  
  name STRING,  
  country STRING,  
  zip STRING,  
  dt STRING,  
  PRIMARY KEY (id, dt) NOT ENFORCED  
) PARTITIONED BY (dt);

然后,使用動態(tài)分區(qū) Lookup Join:

SELECT o.order_id, o.total, c.country, c.zip  
FROM orders AS o  
JOIN customers /*+ OPTIONS('scan.partitions'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */  
FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.customer_id = c.id;

Lookup 節(jié)點會自動刷新最新分區(qū)并查詢最新分區(qū)的數(shù)據(jù)。scan.partitions 選項也可以指定固定分區(qū),格式為 key1=value1,key2=value2,多個分區(qū)用分號 (;) 分隔。

3. Merge Into 操作

Paimon 支持 Merge Into 操作,這是一種特殊的 Join 類型,用于根據(jù)源表數(shù)據(jù)更新目標(biāo)表。它使用"upsert"語義,意味著如果行存在則更新,否則插入。

MERGE INTO target_table [AS target_alias]  
USING source_table [AS source_alias]  
ON merge-condition  
WHEN MATCHED [AND matched-condition]  
  THEN UPDATE SET xxx  
WHEN MATCHED [AND matched-condition]  
  THEN DELETE  
WHEN NOT MATCHED [AND not_matched_condition]  
  THEN INSERT VALUES (xxx)  
WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]  
  THEN UPDATE SET xxx  
WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]  
  THEN DELETE

例如,查找源表中提到的所有訂單,如果價格高于 100 則標(biāo)記為重要,如果價格低于 10 則刪除:

MERGE INTO T  
USING S  
ON T.id = S.order_id  
WHEN MATCHED AND T.price > 100  
  THEN UPDATE SET mark = 'important'  
WHEN MATCHED AND T.price < 10  
  THEN DELETE

Merge Into 操作在 Spark 中也有支持:

// 只更新匹配的記錄  
MERGE INTO target  
USING source  
ON target.a = source.a  
WHEN MATCHED THEN  
UPDATE SET a = source.a, b = source.b, c = source.c

四、Paimon 中 Join 的優(yōu)化技術(shù)

1. 查詢服務(wù)優(yōu)化

Paimon 提供了查詢服務(wù)功能,可以顯著提高 Lookup Join 的性能。通過運行一個 Flink 流式作業(yè)來啟動表的查詢服務(wù),當(dāng)查詢服務(wù)存在時,F(xiàn)link Lookup Join 會優(yōu)先從中獲取數(shù)據(jù)。

-- 使用 Flink SQL 啟動查詢服務(wù)  
CALL sys.query_service('database_name.table_name', parallelism);

也可以使用 Flink Action 啟動查詢服務(wù)。

<FLINK_HOME>/bin/flink run \  
    /path/to/paimon-flink-action-{{< version >}}.jar \  
    query_service \  
    --warehouse <warehouse-path> \  
    --database <database-name> \  
    --table <table-name> \  
    [--parallelism <parallelism>] \  
    [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]


責(zé)任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關(guān)推薦

2024-05-20 10:03:15

線程池優(yōu)先級隊列排序方法

2013-08-27 14:04:29

2022-09-13 07:50:26

小米面試官MySQL

2023-05-31 08:37:06

Java并發(fā)編程

2024-04-02 14:29:12

網(wǎng)絡(luò)安全數(shù)據(jù)泄露

2023-07-28 07:18:39

final繼承結(jié)構(gòu)

2025-06-04 07:48:46

2017-09-07 16:50:47

MySQL性能優(yōu)化

2012-02-27 10:03:19

小米雷軍小米之家

2023-10-26 01:26:04

Vaex數(shù)據(jù)數(shù)據(jù)集

2024-08-21 15:14:21

2022-06-24 09:00:00

數(shù)據(jù)管理數(shù)據(jù)卷數(shù)據(jù)存儲

2020-07-23 14:03:09

數(shù)據(jù)中心數(shù)據(jù)網(wǎng)絡(luò)

2025-01-15 08:05:06

MySQLLEFT JOIN數(shù)據(jù)庫

2023-11-22 09:27:15

數(shù)據(jù)遷移

2023-05-26 15:46:23

數(shù)據(jù)結(jié)構(gòu)布隆過濾器開發(fā)

2023-10-05 12:43:48

數(shù)據(jù)處理

2025-04-29 10:24:01

大數(shù)據(jù)StarRocksJOIN

2022-09-06 11:57:32

ClickHouse火山引擎數(shù)據(jù)

2024-11-11 00:00:01

線程池工具
點贊
收藏

51CTO技術(shù)棧公眾號