?理解 Spark 寫入 API 的數(shù)據(jù)處理能力
這張圖解釋了 Apache Spark DataFrame 寫入 API 的流程。它始于對寫入數(shù)據(jù)的 API 調(diào)用,支持的格式包括 CSV、JSON 或 Parquet。流程根據(jù)選擇的保存模式(追加、覆蓋、忽略或報(bào)錯(cuò))而分岔。每種模式執(zhí)行必要的檢查和操作,例如分區(qū)和數(shù)據(jù)寫入處理。流程以數(shù)據(jù)的最終寫入或錯(cuò)誤結(jié)束,取決于這些檢查和操作的結(jié)果。
Apache Spark 是一個(gè)開源的分布式計(jì)算系統(tǒng),提供了強(qiáng)大的平臺(tái)用于處理大規(guī)模數(shù)據(jù)。寫入 API 是 Spark 數(shù)據(jù)處理能力的基本組成部分,允許用戶將數(shù)據(jù)從他們的 Spark 應(yīng)用程序?qū)懭牖蜉敵龅讲煌臄?shù)據(jù)源。
一、理解 Spark 寫入 API
1.數(shù)據(jù)源
Spark 支持將數(shù)據(jù)寫入各種數(shù)據(jù)源,包括但不限于:
- 分布式文件系統(tǒng),如 HDFS
- 云存儲(chǔ),如 AWS S3、Azure Blob Storage
- 傳統(tǒng)數(shù)據(jù)庫(包括 SQL 和 NoSQL)
- 大數(shù)據(jù)文件格式(Parquet、Avro、ORC)
2.DataFrameWriter
寫入 API 的核心類是 DataFrameWriter。它提供配置和執(zhí)行寫入操作的功能。通過在 DataFrame 或 Dataset 上調(diào)用 .write 方法獲得 DataFrameWriter。
3.寫入模式
指定 Spark 在寫入數(shù)據(jù)時(shí)應(yīng)如何處理現(xiàn)有數(shù)據(jù)的模式。常見的模式包括:
- append:將新數(shù)據(jù)添加到現(xiàn)有數(shù)據(jù)中。
- overwrite:用新數(shù)據(jù)覆蓋現(xiàn)有數(shù)據(jù)。
- ignore:如果數(shù)據(jù)已存在,則忽略寫入操作。
- errorIfExists(默認(rèn)):如果數(shù)據(jù)已存在,則拋出錯(cuò)誤。
4.格式規(guī)范
可以使用 .format("formatType") 方法指定輸出數(shù)據(jù)的格式,如 JSON、CSV、Parquet 等。
5.分區(qū)
為了實(shí)現(xiàn)有效的數(shù)據(jù)存儲(chǔ),可以使用 .partitionBy("column") 方法根據(jù)一個(gè)或多個(gè)列對輸出數(shù)據(jù)進(jìn)行分區(qū)。
6.配置選項(xiàng)
可以使用 .option("key", "value") 方法設(shè)置特定于數(shù)據(jù)源的各種選項(xiàng),如壓縮、CSV 文件的自定義分隔符等。
7.保存數(shù)據(jù)
最后,使用 .save("path") 方法將 DataFrame 寫入指定的路徑。其他方法如 .saveAsTable("tableName") 也可用于不同的寫入場景。
from pyspark.sql import SparkSession
from pyspark.sql import Row
import os
# 初始化 SparkSession
spark = SparkSession.builder
.appName("DataFrameWriterSaveModesExample")
.getOrCreate()
# 示例數(shù)據(jù)
data = [
Row(name="Alice", age=25, country="USA"),
Row(name="Bob", age=30, country="UK")
]
# 附加數(shù)據(jù)用于追加模式
additional_data = [
Row(name="Carlos", age=35, country="Spain"),
Row(name="Daisy", age=40, country="Australia")
]
# 創(chuàng)建 DataFrames
df = spark.createDataFrame(data)
additional_df = spark.createDataFrame(additional_data)
# 定義輸出路徑
output_path = "output/csv_save_modes"
# 函數(shù):列出目錄中的文件
def list_files_in_directory(path):
files = os.listdir(path)
return files
# 顯示初始 DataFrame
print("初始 DataFrame:")
df.show()
# 使用覆蓋模式寫入 CSV 格式
df.write.csv(output_path, mode="overwrite", header=True)
print("覆蓋模式后的文件:", list_files_in_directory(output_path))
# 顯示附加 DataFrame
print("附加 DataFrame:")
additional_df.show()
# 使用追加模式寫入 CSV 格式
additional_df.write.csv(output_path, mode="append", header=True)
print("追加模式后的文件:", list_files_in_directory(output_path))
# 使用忽略模式寫入 CSV 格式
additional_df.write.csv(output_path, mode="ignore", header=True)
print("忽略模式后的文件:", list_files_in_directory(output_path))
# 使用 errorIfExists 模式寫入 CSV 格式
try:
additional_df.write.csv(output_path, mode="errorIfExists", header=True)
except Exception as e:
print("errorIfExists 模式中發(fā)生錯(cuò)誤:", e)
# 停止 SparkSession
spark.stop()
二、Spark 架構(gòu)概述
在 Apache Spark 中寫入 DataFrame 遵循一種順序流程。Spark 基于用戶 DataFrame 操作創(chuàng)建邏輯計(jì)劃,優(yōu)化為物理計(jì)劃,并分成階段。系統(tǒng)按分區(qū)處理數(shù)據(jù),對其進(jìn)行日志記錄以確保可靠性,并帶有定義的分區(qū)和寫入模式寫入到本地存儲(chǔ)。Spark 的架構(gòu)確保在計(jì)算集群中高效管理和擴(kuò)展數(shù)據(jù)寫入任務(wù)。
從 Spark 內(nèi)部架構(gòu)的角度來看,Apache Spark 寫入 API 涉及了解 Spark 如何在幕后管理數(shù)據(jù)處理、分發(fā)和寫入操作。讓我們來詳細(xì)了解:
三、Spark 架構(gòu)概述
- 驅(qū)動(dòng)程序和執(zhí)行器: Spark 采用主從架構(gòu)。驅(qū)動(dòng)節(jié)點(diǎn)運(yùn)行應(yīng)用程序的 main() 函數(shù)并維護(hù)有關(guān) Spark 應(yīng)用程序的信息。執(zhí)行器節(jié)點(diǎn)執(zhí)行數(shù)據(jù)處理和寫入操作。
- DAG 調(diào)度器: 當(dāng)觸發(fā)寫入操作時(shí),Spark 的 DAG(有向無環(huán)圖)調(diào)度器將高級轉(zhuǎn)換轉(zhuǎn)換為一系列可以在集群中并行執(zhí)行的階段。
- 任務(wù)調(diào)度器: 任務(wù)調(diào)度器在每個(gè)階段內(nèi)啟動(dòng)任務(wù)。這些任務(wù)分布在執(zhí)行器之間。
- 執(zhí)行計(jì)劃和物理計(jì)劃: Spark 使用 Catalyst 優(yōu)化器創(chuàng)建高效的執(zhí)行計(jì)劃。這包括將邏輯計(jì)劃(要做什么)轉(zhuǎn)換為物理計(jì)劃(如何做),考慮到分區(qū)、數(shù)據(jù)本地性和其他因素。
四、在 Spark 內(nèi)部寫入數(shù)據(jù)
(1) 數(shù)據(jù)分布: Spark 中的數(shù)據(jù)分布在分區(qū)中。當(dāng)啟動(dòng)寫入操作時(shí),Spark 首先確定這些分區(qū)中的數(shù)據(jù)布局。
(2) 寫入任務(wù)執(zhí)行: 每個(gè)分區(qū)的數(shù)據(jù)由一個(gè)任務(wù)處理。這些任務(wù)在不同的執(zhí)行器之間并行執(zhí)行。
寫入模式和一致性:
- 對于 overwrite 和 append 模式,Spark 確保一致性,通過管理數(shù)據(jù)文件的替換或添加來實(shí)現(xiàn)。
- 對于基于文件的數(shù)據(jù)源,Spark 以分階段的方式寫入數(shù)據(jù),先寫入臨時(shí)位置再提交到最終位置,有助于確保一致性和處理故障。
(3) 格式處理和序列化: 根據(jù)指定的格式(例如,Parquet、CSV),Spark 使用相應(yīng)的序列化器將數(shù)據(jù)轉(zhuǎn)換為所需的格式。執(zhí)行器處理此過程。
(4) 分區(qū)和文件管理:
- 如果指定了分區(qū),則Spark在寫入之前根據(jù)這些分區(qū)對數(shù)據(jù)進(jìn)行排序和組織。這通常涉及在執(zhí)行器之間移動(dòng)數(shù)據(jù)。
- Spark 試圖最小化每個(gè)分區(qū)創(chuàng)建的文件數(shù)量,以優(yōu)化大文件大小,在分布式文件系統(tǒng)中更有效。
(5) 錯(cuò)誤處理和容錯(cuò): 在寫入操作期間,如果任務(wù)失敗,Spark 可以重試任務(wù),確保容錯(cuò)。但并非所有寫入操作都是完全原子的,特定情況可能需要手動(dòng)干預(yù)以確保數(shù)據(jù)完整性。
(6) 優(yōu)化技術(shù):
- Catalyst 優(yōu)化器: 為效率優(yōu)化寫入計(jì)劃,例如最小化數(shù)據(jù)移動(dòng)。
- Tungsten: Spark 的 Tungsten 引擎優(yōu)化數(shù)據(jù)序列化和反序列化過程中的內(nèi)存和 CPU 使用。
(7) 寫入提交協(xié)議: Spark 使用寫入提交協(xié)議來協(xié)調(diào)特定數(shù)據(jù)源的任務(wù)提交和中止過程,確保對寫入數(shù)據(jù)的一致視圖。
Spark 的寫入 API 旨在實(shí)現(xiàn)高效和可靠的數(shù)據(jù)寫入,它以復(fù)雜的方式編排任務(wù)分發(fā)、數(shù)據(jù)序列化和文件管理。它利用 Spark 的核心組件,如 DAG 調(diào)度器、任務(wù)調(diào)度器和 Catalyst 優(yōu)化器,有效地執(zhí)行寫入操作。