從數(shù)據(jù)庫(kù)獲取數(shù)據(jù),必須要了解Python生成器
介紹
作為數(shù)據(jù)工程師,我們經(jīng)常面臨這樣的情況:我們必須從運(yùn)營(yíng)數(shù)據(jù)庫(kù)中獲取一個(gè)特別大的數(shù)據(jù)集,對(duì)其進(jìn)行一些轉(zhuǎn)換,然后將其寫(xiě)回分析數(shù)據(jù)庫(kù)或云對(duì)象存儲(chǔ)(例如S3桶)。
如果數(shù)據(jù)集太大無(wú)法裝入內(nèi)存,但同時(shí)使用分布式計(jì)算不值得或不可行,該怎么辦呢?
在這種情況下,我們需要找到一種方法,在不影響數(shù)據(jù)團(tuán)隊(duì)其他同事(例如通過(guò)使用Airflow實(shí)例中可用內(nèi)存的大部分)的情況下完成工作。這就是Python生成器可能會(huì)派上用場(chǎng)的時(shí)候,通過(guò)避免內(nèi)存峰值來(lái)高效地從數(shù)據(jù)庫(kù)獲取數(shù)據(jù)。
事實(shí)上,在本教程中,我們將通過(guò)旋轉(zhuǎn)運(yùn)行三個(gè)服務(wù)(PostgresDB、Jupyter Notebook和MinIO)的Docker容器來(lái)模擬一個(gè)真實(shí)的端到端數(shù)據(jù)工作流程,探討在數(shù)據(jù)工程師中使用生成器的兩個(gè)實(shí)際用例。

Python中使用生成器的優(yōu)點(diǎn)
在Python中,標(biāo)準(zhǔn)函數(shù)計(jì)算并返回單個(gè)值然后終止,而生成器可以隨時(shí)間產(chǎn)生一系列值,根據(jù)需要暫停和恢復(fù)。生成器是一種特殊的函數(shù),它使用`yield`子句而不是`return`來(lái)產(chǎn)生一系列的值。值逐個(gè)創(chuàng)建,無(wú)需將整個(gè)序列存儲(chǔ)在內(nèi)存中。
當(dāng)調(diào)用生成器函數(shù)時(shí),它返回一個(gè)迭代器對(duì)象,可以用于迭代生成器產(chǎn)生的值的序列。例如,讓我們創(chuàng)建一個(gè)squares_generator(n)函數(shù),該函數(shù)生成介于零和輸入變量n之間的數(shù)字的平方:
def squares_generator(n):
    num = 0
    while num < n:
        yield num * num
        num += 1當(dāng)調(diào)用該函數(shù)時(shí),它只返回一個(gè)迭代器:
squares_generator(n)
#Output:
# <generator object squares_generator at 0x10653bdd0>為了觸發(fā)整個(gè)值序列,我們必須在循環(huán)中調(diào)用生成器函數(shù):
for num in squares_generator(5):
  print(num)
#Output:
0
1
4
9
16另一種更優(yōu)雅的選擇是創(chuàng)建一個(gè)生成器表達(dá)式,它執(zhí)行與上述函數(shù)相同的操作,但作為一行代碼:
n = 5 
generator_exp = (num * num for num in range(n))現(xiàn)在,可以直接使用`next()`方法訪問(wèn)值:
print(next(generator_exp)) # 0
print(next(generator_exp)) # 1
print(next(generator_exp)) # 4
print(next(generator_exp)) # 9
print(next(generator_exp)) # 16正如我們所看到的,生成器函數(shù)返回值的方式并不像常規(guī)Python函數(shù)那樣直觀,這可能是為什么許多數(shù)據(jù)工程師沒(méi)有像他們應(yīng)該的那樣經(jīng)常使用生成器的原因。
目標(biāo)與設(shè)置
本教程的目標(biāo)是:
- 從Postgres數(shù)據(jù)庫(kù)中獲取數(shù)據(jù)并將其存儲(chǔ)為pandas數(shù)據(jù)框。
 - 將pandas數(shù)據(jù)框以parquet格式寫(xiě)入S3桶。
 
每個(gè)目標(biāo)都將使用常規(guī)函數(shù)和生成器函數(shù)兩種方法實(shí)現(xiàn)。為了模擬這樣的工作流程,我們將使用三個(gè)服務(wù)旋轉(zhuǎn)一個(gè)Docker容器:
- Postgres數(shù)據(jù)庫(kù)(這個(gè)服務(wù)將是我們的源操作數(shù)據(jù)庫(kù),從中獲取數(shù)據(jù)。Docker-compose還涉及創(chuàng)建一個(gè)mainDB,以及在名為transactions的表中插入500萬(wàn)個(gè)模擬記錄。請(qǐng)注意:可以插入任意數(shù)量的行來(lái)模擬一個(gè)更大的數(shù)據(jù)集(在準(zhǔn)備本教程的材料時(shí),嘗試過(guò)5000萬(wàn)和1億行),但Docker服務(wù)的性能會(huì)受到嚴(yán)重影響。)
 - MinIO(這個(gè)服務(wù)將用于模擬AWS S3桶,然后使用awswrangler包幫助將pandas數(shù)據(jù)框以parquet格式寫(xiě)入其中。)
 - Jupyter Notebook(這個(gè)服務(wù)將用于通過(guò)熟悉的編譯器以交互方式運(yùn)行Python片段。)
 
下面的圖表是對(duì)到目前為止所描述的內(nèi)容的可視化表示:

第一步,我們項(xiàng)目的GitHub存儲(chǔ)庫(kù)并切換到相關(guān)文件夾:
git clone git@github.com:anbento0490/projects.git &&
cd fetch_data_with_python_generators然后,我們可以運(yùn)行docker-compose來(lái)啟動(dòng)這三個(gè)服務(wù):
docker compose up -d
[+] Running 5/5
 ? Network shared-network                 Created                                                 0.0s
 ? Container jupyter-notebooks            Started                                                 1.0s
 ? Container minio                        Started                                                 0.7s
 ? Container postgres-db                  Started                                                 0.9s
 ? Container mc                           Started最終,我們可以驗(yàn)證:
(1) 在Postgres數(shù)據(jù)庫(kù)中存在一個(gè)名為transactions的表,其中包含5百萬(wàn)條記錄。
docker exec -it postgres-db /bin/bash
root@9632469c70e0:/# psql -U postgres
psql (13.13 (Debian 13.13-1.pgdg120+1))
Type "help" for help.
postgres=# \c mainDB
You are now connected to database "mainDB" as user "postgres".
mainDB=# select count(*) from transactions;
  count
---------
 5000000
(1 row)(2) 可以通過(guò)端口localhost:9001訪問(wèn)MinIO UI(在要求憑據(jù)時(shí)插入管理員和密碼),并且已經(jīng)創(chuàng)建了一個(gè)名為generators-test-bucket的空桶:

MinIO UI端口9001處的用戶界面
(3) 可以通過(guò)localhost:8889訪問(wèn)Jupyter Notebook用戶界面,并通過(guò)以下方法檢索令牌:
docker exec -it jupyter-notebooks /bin/bash
root@eae08d1f4bf6:~# jupyter server list
Currently running servers:
http://eae08d1f4bf6:8888/?token=8a45d846d03cf0c0e4584c3b73af86ba5dk9e83c8ac47ee7 :: /home/jovyan
很好!我們已經(jīng)準(zhǔn)備好在Jupyter上運(yùn)行一些代碼了。但在我們這樣做之前,我們需要?jiǎng)?chuàng)建一個(gè)新的access_key和secret_access_key,以便能夠與MinIO桶進(jìn)行交互:

如何在MinIO中生成新的密鑰對(duì)
注意:MinIO桶的最酷的功能之一是,我們可以與它們交互,就像它們是AWS S3桶一樣(例如使用boto3、awswrangler等),但它們是免費(fèi)的,而且無(wú)需擔(dān)心暴露密鑰,因?yàn)樗鼈儍H存在于我們的本地環(huán)境中,并且除非持久保存,否則將在容器停止時(shí)被刪除。
現(xiàn)在,在生成器筆記本中,讓我們運(yùn)行以下代碼(確保替換secrets):
import psycopg2
import pandas as pd
import boto3
import awswrangler as wr
#######################################################
######## CONNECTING TO PG DB + CREATING CURSORS #######
connection = psycopg2.connect(user="postgres",
                              password="postgres",
                              port="5432",
                              database="mainDB")
cursor = connection.cursor()
query = "select * from transactions;"
#######################################################
######## CONNECTING TO MINIO BUCKET ###################
boto3.setup_default_session(aws_access_key_id = 'your_access_key',
                            aws_secret_access_key = 'your_secret_key')
bucket = 'generators-test-bucket'
folder_gen = 'data_gen'
folder_batch = 'data_batch'
parquet_file_name = 'transactions'
batch_size = 1000000
wr.config.s3_endpoint_url = 'http://minio:9000'這將創(chuàng)建一個(gè)連接到mainDB的連接以及用于執(zhí)行查詢(xún)的游標(biāo)。還將設(shè)置一個(gè)default_session,以與generators-test-bucket進(jìn)行交互。
用例 #1:從數(shù)據(jù)庫(kù)讀取數(shù)據(jù)
作為數(shù)據(jù)工程師,在將大型數(shù)據(jù)集從數(shù)據(jù)庫(kù)或外部服務(wù)抓取到Python管道中時(shí),我們經(jīng)常需要在以下方面找到合適的平衡:
- 內(nèi)存:一次性拉取整個(gè)數(shù)據(jù)集可能導(dǎo)致OOM錯(cuò)誤或影響整個(gè)實(shí)例/集群的性能。
 - 速度:逐行獲取數(shù)據(jù)也會(huì)導(dǎo)致昂貴的I/O網(wǎng)絡(luò)操作。
 
方法 #1:使用批處理
一個(gè)合理的折衷方案(在實(shí)踐中經(jīng)常使用)是以批處理方式獲取數(shù)據(jù),其中批處理的大小取決于可用內(nèi)存以及數(shù)據(jù)管道的速度要求。
# 1.1. CREATE DF USING BATCHES
def create_df_batch(cursor, batch_size):
    print('Creating pandas DF using generator...')
    colnames = ['transaction_id', 
                'user_id', 
                'product_name', 
                'transaction_date', 
                'amount_gbp']
    
    df = pd.DataFrame(columns=colnames)
    cursor.execute(query)
    while True:
        rows = cursor.fetchmany(batch_size)
        if not rows:
            break
        # some tramsformation
        batch_df = pd.DataFrame(data = rows, columns=colnames)        
        df = pd.concat([df, batch_df], ignore_index=True)
    print('DF successfully created!\n')
    return df上面的代碼執(zhí)行以下操作:
- 創(chuàng)建一個(gè)空的df(數(shù)據(jù)框);
 - 執(zhí)行查詢(xún),將整個(gè)結(jié)果緩存到游標(biāo)對(duì)象中;
 - 初始化一個(gè)while循環(huán),每次迭代都獲取等于指定batch_size的行數(shù)(在此示例中為1百萬(wàn)行),并使用這些數(shù)據(jù)創(chuàng)建一個(gè)batch_df(批數(shù)據(jù)框)。
 - 最終將batch_df附加到主df。該過(guò)程重復(fù)進(jìn)行,直到整個(gè)數(shù)據(jù)集被遍歷。
 
讓我們明確一下:這只是一個(gè)基本示例,我們可以在while循環(huán)的一部分執(zhí)行許多其他操作(過(guò)濾、排序、聚合、將數(shù)據(jù)寫(xiě)入其他位置等),而不僅僅是一次一個(gè)批次地創(chuàng)建df。當(dāng)在筆記本中執(zhí)行該函數(shù)時(shí),我們得到:
%%time 
df_batch = create_df_batch(cursor, batch_size)
df_batch.head()
Output:
Creating pandas DF using generator...
DF successfully created!
CPU times: user 9.97 s, sys: 13.7 s, total: 23.7 s
Wall time: 25 s
df_batch數(shù)據(jù)框的前5行
方法 #2:使用生成器
一種不太常見(jiàn)但強(qiáng)大的數(shù)據(jù)工程師策略是使用生成器以流的形式獲取數(shù)據(jù):
# AUXILIARY FUNCTION
def generate_dataset(cursor):
    
    cursor.execute(query)
    
    for row in cursor.fetchall():
        # some tramsformation
        yield row 
# 2.1. CREATE DF USING GENERATORS
def create_df_gen(cursor):
    print('Creating pandas DF using generator...')
    colnames = ['transaction_id', 
                'user_id', 
                'product_name', 
                'transaction_date', 
                'amount_gbp']
    
    df = pd.DataFrame(data = generate_dataset(cursor), columns=colnames)
    print('DF successfully created!\n')
    
    return df在上面的代碼片段中,我們創(chuàng)建了`generate_dataset` 輔助函數(shù),該函數(shù)執(zhí)行查詢(xún),然后將行作為序列生成。該函數(shù)直接傳遞給`pd.DataFrame()` 子句的`data`參數(shù),該子句在背后遍歷所有獲取的記錄,直到行被耗盡。
同樣,這個(gè)例子非?;A(chǔ)(主要是為了演示目的),但我們可以在輔助函數(shù)中執(zhí)行任何類(lèi)型的過(guò)濾或轉(zhuǎn)換。當(dāng)執(zhí)行該函數(shù)時(shí),我們得到df_gen數(shù)據(jù)框的前5行
%%time 
df_gen = create_df_gen(cursor)
df_gen.head()
Creating pandas DF using generator...
DF successfully created!
CPU times: user 9.04 s, sys: 2.1 s, total: 11.1 s
Wall time: 14.4 s
看起來(lái)似乎兩種方法最終都使用了同樣的內(nèi)存量(因?yàn)閐f都是以不同方式返回的),但事實(shí)并非如此,因?yàn)閿?shù)據(jù)在生成df本身時(shí)的處理方式是不同的:
- 對(duì)于方法 #1,是急切地獲取數(shù)據(jù),通過(guò)網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)交換有點(diǎn)低效,導(dǎo)致內(nèi)存占用峰值較高;
 - 對(duì)于方法 #2,是懶惰地獲取數(shù)據(jù),只有在需要時(shí)才計(jì)算,并且逐個(gè)計(jì)算,從而降低內(nèi)存占用。
 
用例 #2:寫(xiě)入云對(duì)象存儲(chǔ)
有時(shí),數(shù)據(jù)工程師需要獲取存儲(chǔ)在數(shù)據(jù)庫(kù)中的大量數(shù)據(jù),并將這些記錄外部共享(例如與監(jiān)管機(jī)構(gòu)、審計(jì)員、合作伙伴共享)。
一種常見(jiàn)的解決方案是創(chuàng)建一個(gè)云對(duì)象存儲(chǔ),數(shù)據(jù)將被傳遞到該存儲(chǔ)中,以便第三方(具有適當(dāng)訪問(wèn)權(quán)限的人)能夠讀取并將數(shù)據(jù)復(fù)制到其系統(tǒng)中。
實(shí)際上,我們創(chuàng)建了一個(gè)名為`generators-test-bucket`的桶,數(shù)據(jù)將以parquet格式寫(xiě)入其中,利用了`awswrangler`包。
`awswrangler`的優(yōu)勢(shì)在于它與pandas數(shù)據(jù)框非常有效地配合,并允許以保留數(shù)據(jù)集結(jié)構(gòu)的方式將它們轉(zhuǎn)換為parquet格式。
方法 #1:使用批處理
與第一個(gè)用例一樣,一個(gè)常見(jiàn)的解決方案是以批處理方式獲取數(shù)據(jù),然后寫(xiě)入數(shù)據(jù),直到整個(gè)數(shù)據(jù)集被遍歷:
# 1.2 WRITING DF TO MINIO BUCKET IN PARQUET FORMAT USING BATCHES
def write_df_to_s3_batch(cursor, bucket, folder, parquet_file_name, batch_size):
    colnames = ['transaction_id', 
                'user_id', 
                'product_name', 
                'transaction_date', 
                'amount_gbp']
    cursor.execute(query)
    batch_num = 1
    while True:
        rows = cursor.fetchmany(batch_size)
        if not rows:
            break
        print(f"Writing DF batch #{batch_num} to S3 bucket...")
        wr.s3.to_parquet(df= pd.DataFrame(data = rows, columns=colnames),
                         path=f's3://{bucket}/{folder}/{parquet_file_name}',
                         compression='gzip',
                         mode = 'append',
                         dataset=True)
        print('Batch successfully written to S3 bucket!\n')
        batch_num += 1執(zhí)行`write_df_to_s3_batch()` 函數(shù)會(huì)在桶中創(chuàng)建五個(gè)parquet文件,每個(gè)文件包含1百萬(wàn)條記錄:
write_df_to_s3_batch(cursor, bucket, folder_batch, parquet_file_name, batch_size)
Writing DF batch #1 to S3 bucket...
Batch successfully written to S3 bucket!
Writing DF batch #2 to S3 bucket...
Batch successfully written to S3 bucket!
Writing DF batch #3 to S3 bucket...
Batch successfully written to S3 bucket!
Writing DF batch #4 to S3 bucket...
Batch successfully written to S3 bucket!
Writing DF batch #5 to S3 bucket...
Batch successfully written to S3 bucket!
在MinIO中以批處理方式寫(xiě)入的數(shù)據(jù)
方法 #2:使用生成器
或者,可以通過(guò)利用生成器提取數(shù)據(jù)并將其寫(xiě)入桶中。由于生成器在提取和移動(dòng)數(shù)據(jù)時(shí)不會(huì)導(dǎo)致內(nèi)存效率問(wèn)題,我們甚至可以決定一次性寫(xiě)入整個(gè)df:
# 2.2 WRITING DF TO MINIO BUCKET IN PARQUET FORMAT USING GENERATORS
def write_df_to_s3_gen(cursor, bucket, folder, parquet_file_name):
    print('Writing DF to S3 bucket...')
    colnames = ['transaction_id', 
                'user_id', 
                'product_name', 
                'transaction_date', 
                'amount_gbp']
    
    wr.s3.to_parquet(df= pd.DataFrame(data = generate_dataset(cursor), columns=colnames),
             path=f's3://{bucket}/{folder}/{parquet_file_name}',
             compression='gzip',
             mode = 'append',
             dataset=True)
    print('Data successfully written to S3 bucket!\n')當(dāng)執(zhí)行`write_df_to_s3_gen()` 函數(shù)時(shí),將一個(gè)包含所有5百萬(wàn)行的唯一較大parquet文件保存到桶中:
write_df_to_s3_gen(cursor, bucket, folder_gen, parquet_file_name)
Writing DF to S3 bucket...
Data successfully written to S3 bucket!
利用生成器寫(xiě)入MinIO的數(shù)據(jù)















 
 
 







 
 
 
 