Paimon Python SDK (pypaimon) 詳細(xì)使用指南
一、pypaimon 簡(jiǎn)介
pypaimon 是 Apache Paimon 數(shù)據(jù)湖的 Python 客戶端 SDK,基于 Py4J 實(shí)現(xiàn) Python 與 Java 代碼的橋接,允許開發(fā)者通過 Python API 操作 Paimon 數(shù)據(jù)湖。作為 Apache Paimon 的重要組件,pypaimon 繼承了 Paimon 的核心特性:流批一體存儲(chǔ)、實(shí)時(shí)數(shù)據(jù)更新、ACID 事務(wù)支持和低成本存儲(chǔ),同時(shí)提供 Python 生態(tài)友好的接口,支持與 PyArrow、Pandas 等數(shù)據(jù)科學(xué)工具無縫集成。
核心定位:
- 技術(shù)橋梁:連接 Python 生態(tài)與 Paimon 數(shù)據(jù)湖,支持 Python 開發(fā)者直接操作數(shù)據(jù)湖表
 - 輕量化集成:無需編寫 Java 代碼即可利用 Paimon 的 LSM 樹存儲(chǔ)結(jié)構(gòu)和高效 compaction 機(jī)制
 - 多場(chǎng)景適配:適用于實(shí)時(shí)數(shù)據(jù)入湖、批式數(shù)據(jù)處理、OLAP 查詢加速等場(chǎng)景
 

二、環(huán)境準(zhǔn)備與安裝
1. 系統(tǒng)要求
依賴項(xiàng)  | 版本要求  | 
Python  | 3.8 及以上  | 
JRE  | 1.8  | 
Hadoop 環(huán)境  | 可選(本地測(cè)試可省略)  | 
PyArrow  | 推薦 7.0+  | 
Pandas  | 推薦 1.3+  | 
2. 安裝方式
(1) 阿里云 DLF 專用版本(推薦生產(chǎn)環(huán)境)
# 下載 pypaimon_dlf2 安裝包
wget https://help.aliyun.com/zh/dlf/dlf-2-0/use-cases/pypaimon-dlf-for-data-into-the-lake
pip3 install pypaimon_dlf2-0.3.dev0.tar.gz(2) 官方開發(fā)版(適合測(cè)試)
pip install paimon-python==0.9.0.dev12. 環(huán)境驗(yàn)證
# 檢查 Java 環(huán)境
import os
assert'JAVA_HOME'in os.environ,"請(qǐng)配置 JAVA_HOME 環(huán)境變量"
# 驗(yàn)證安裝
from pypaimon import Schema
print("pypaimon 安裝成功")三、核心 API 與基礎(chǔ)操作
1. Catalog 管理
Catalog 是 Paimon 數(shù)據(jù)湖的元數(shù)據(jù)入口,用于管理數(shù)據(jù)庫(kù)和表。pypaimon 支持多種 Catalog 類型,包括本地文件系統(tǒng)、HDFS、阿里云 DLF 等。
創(chuàng)建 DLF Catalog(阿里云場(chǎng)景):
from pypaimon.py4j import Catalog
catalog_options ={
'metastore':'dlf-paimon',
'dlf.region':'cn-hangzhou',
'dlf.endpoint':'dlf.cn-hangzhou.aliyuncs.com',
'dlf.catalog.id':'your-catalog-id',
'dlf.catalog.accessKeyId':'your-ak',
'dlf.catalog.accessKeySecret':'your-sk',
'max-workers':'4'# 并行讀取線程數(shù)
}
catalog = Catalog.create(catalog_options)2. 數(shù)據(jù)庫(kù)與表操作
(1) 創(chuàng)建數(shù)據(jù)庫(kù)
# 創(chuàng)建數(shù)據(jù)庫(kù)(忽略已存在錯(cuò)誤)
catalog.create_database(
    name='paimon_demo',
    ignore_if_exists=True
)(2) 定義表 Schema
通過 PyArrow 定義表結(jié)構(gòu),支持分區(qū)鍵、主鍵和表屬性配置:
import pyarrow as pa
from pypaimon import Schema
# 定義 PyArrow Schema
pa_schema = pa.schema([
('dt', pa.string()),
('user_id', pa.int64()),
('order_id', pa.int64()),
('amount', pa.float64())
])
# 轉(zhuǎn)換為 Paimon Schema
table_schema = Schema(
    pa_schema=pa_schema,
    partition_keys=['dt'],# 分區(qū)鍵
    primary_keys=['dt','order_id'],# 主鍵
    options={
'bucket':'8',# 分桶數(shù)
'file.format':'parquet'# 文件格式
},
    comment='電商訂單事實(shí)表'
)(3) 創(chuàng)建表
# 在指定數(shù)據(jù)庫(kù)創(chuàng)建表
catalog.create_table(
    identifier='paimon_demo.orders',
    schema=table_schema,
    ignore_if_exists=True
)
# 獲取表對(duì)象
table = catalog.get_table('paimon_demo.orders')3. 數(shù)據(jù)寫入與提交
pypaimon 支持 PyArrow Table 和 Pandas DataFrame 兩種寫入格式,通過兩階段提交保證數(shù)據(jù)一致性:
import pandas as pd
# 準(zhǔn)備測(cè)試數(shù)據(jù)
data ={
'dt':['2024-01-01','2024-01-01','2024-01-02'],
'user_id':[1001,1002,1001],
'order_id':[10001,10002,10003],
'amount':[299.5,159.0,499.9]
}
df = pd.DataFrame(data)
# 創(chuàng)建寫入器
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
try:
# 寫入 Pandas DataFrame
    table_write.write_pandas(df)
# 準(zhǔn)備提交
    commit_msg = table_write.prepare_commit()
# 執(zhí)行提交
    table_commit.commit(commit_msg)
finally:
# 釋放資源
    table_write.close()
    table_commit.close()4. 數(shù)據(jù)查詢與過濾
支持謂詞下推和投影優(yōu)化,通過 ReadBuilder 配置查詢參數(shù):
# 創(chuàng)建讀取器
read_builder = table.new_read_builder()
# 構(gòu)建過濾條件 (dt = '2024-01-01')
predicate_builder = read_builder.new_predicate_builder()
predicate = predicate_builder.equal('dt','2024-01-01')
read_builder = read_builder.with_filter(predicate)
# 執(zhí)行查詢
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()# 獲取數(shù)據(jù)分片
# 轉(zhuǎn)換為 PyArrow Table
table_read = read_builder.new_read()
pa_table = table_read.to_arrow(splits)
# 轉(zhuǎn)換為 Pandas DataFrame
result_df = pa_table.to_pandas()
print(result_df)四、高級(jí)特性與性能優(yōu)化
1. 并發(fā)控制與事務(wù)
pypaimon 采用樂觀并發(fā)控制,通過兩階段提交協(xié)議保證寫入原子性。對(duì)于對(duì)象存儲(chǔ)(如 S3/OSS),需額外配置:
# 啟用元數(shù)據(jù)鎖(對(duì)象存儲(chǔ)必需)
catalog_options['lock.enabled']='true'
catalog_options['metastore']='jdbc'# 使用 JDBC 元存儲(chǔ)2. 數(shù)據(jù)類型映射
pypaimon 通過 PyArrow 實(shí)現(xiàn) Python 與 Paimon 數(shù)據(jù)類型的自動(dòng)映射:
Python 類型  | PyArrow 類型  | Paimon 類型  | 
int  | pa.int64()  | BIGINT  | 
float  | pa.float64()  | DOUBLE  | 
str  | pa.string()  | STRING  | 
datetime.datetime  | pa.timestamp('ns')  | TIMESTAMP  | 
list  | pa.list_(pa.int64())  | ARRAY<BIGINT>  | 
3. 性能調(diào)優(yōu)參數(shù)
參數(shù)  | 說明  | 推薦值  | 
max-workers  | 并行讀取線程數(shù)  | 4-8  | 
bucket  | 分桶數(shù)(主鍵表)  | 8-32  | 
compaction.delta-commits  | 增量壓縮觸發(fā)閾值  | 10  | 
file.index.bloom-filter  | 啟用布隆過濾器索引  | 'user_id'  | 
五、典型應(yīng)用場(chǎng)景
1. 實(shí)時(shí)數(shù)據(jù)入湖(CDC 同步)
通過 Debezium 捕獲 MySQL 變更數(shù)據(jù),經(jīng) Flink 處理后寫入 Paimon,pypaimon 負(fù)責(zé)批式補(bǔ)數(shù)據(jù):
# 補(bǔ)傳歷史數(shù)據(jù)
historical_df = pd.read_csv('historical_orders.csv')
table_write.write_pandas(historical_df)
table_commit.commit(table_write.prepare_commit())2. 流批一體分析
同一份數(shù)據(jù)同時(shí)支持批式報(bào)表和實(shí)時(shí)查詢:
# 批式查詢(T+1報(bào)表)
batch_read = table.new_read_builder().with_snapshot('20240101').build()
# 實(shí)時(shí)查詢(實(shí)時(shí)dashboard)
stream_read = table.new_read_builder().with_start_snapshot('LATEST').build()3. 機(jī)器學(xué)習(xí)樣本存儲(chǔ)
存儲(chǔ)特征數(shù)據(jù)并支持高效讀?。?/p>
# 讀取特征數(shù)據(jù)用于模型訓(xùn)練
features = table_read.to_arrow(splits).to_pandas()
X = features[['user_age','order_count']]
y = features['label']六、注意事項(xiàng)與優(yōu)秀實(shí)踐
1. 環(huán)境依賴
- 確保 JRE 8 環(huán)境變量配置正確:export JAVA_HOME=/path/to/jre8
 - 本地測(cè)試推薦使用 Flink 預(yù)綁定的 Hadoop jar:export HADOOP_CLASSPATH=$(flink classpath)
 
2. 數(shù)據(jù)一致性
- 寫入后必須調(diào)用 commit() 方法,否則數(shù)據(jù)不會(huì)持久化
 - 多writer場(chǎng)景需避免同一主鍵并發(fā)寫入,可能導(dǎo)致快照沖突
 
3. 資源配置
- 大表查詢建議設(shè)置 max-workers=8 提升并行度
 - 內(nèi)存受限場(chǎng)景啟用 spill 機(jī)制:sort-spill-threshold=10
 
pypaimon 作為 Apache Paimon 的 Python 客戶端,填補(bǔ)了 Python 生態(tài)與數(shù)據(jù)湖之間的鴻溝,使數(shù)據(jù)科學(xué)家和 Python 開發(fā)者能夠直接操作流批一體數(shù)據(jù)湖。其核心優(yōu)勢(shì)在于:
- 簡(jiǎn)單易用:Python 友好的 API 設(shè)計(jì),降低數(shù)據(jù)湖使用門檻
 - 生態(tài)融合:無縫對(duì)接 Pandas、PyArrow 等數(shù)據(jù)科學(xué)工具
 - 性能卓越:繼承 Paimon 的 LSM 樹結(jié)構(gòu)和高效 compaction 機(jī)制
 















 
 
 







 
 
 
 