騰訊面試:Spark 內(nèi)存如何優(yōu)化?包含哪幾個方面?
一、Spark內(nèi)存架構概述
Apache Spark作為一個高效的分布式計算引擎,其性能很大程度上取決于內(nèi)存的使用效率。本文將詳細介紹Spark內(nèi)存管理機制及優(yōu)化策略,包括緩存優(yōu)化、內(nèi)存配置、狀態(tài)存儲優(yōu)化等方面及并給出相應的樣例代碼。

Spark的內(nèi)存管理分為執(zhí)行內(nèi)存(Execution Memory)和存儲內(nèi)存(Storage Memory)兩大部分:
- 執(zhí)行內(nèi)存:用于shuffle、join、sort等計算操作
- 存儲內(nèi)存:用于緩存RDD、DataFrame和廣播變量
在Spark 1.6之前,這兩部分內(nèi)存是靜態(tài)分配的,而在Spark 1.6及以后版本中,采用了統(tǒng)一內(nèi)存管理(Unified Memory Management),允許兩種類型的內(nèi)存相互借用。

二、緩存優(yōu)化策略
1. 緩存級別選擇
Spark提供了多種緩存級別,可以通過persist()或cache()方法設置:
以下是一個使用不同存儲級別的示例:
from
pyspark.storagelevel
import
StorageLevel
# 默認緩存級別 MEMORY_AND_DISK_DESER
df.cache()
# 僅使用內(nèi)存
df.persist(StorageLevel.MEMORY_ONLY)
# 僅使用磁盤
df.persist(StorageLevel.DISK_ONLY)
# 使用內(nèi)存和磁盤,序列化存儲
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
# 使用堆外內(nèi)存
df.persist(StorageLevel.OFF_HEAP)2. 列式存儲優(yōu)化
Spark SQL使用列式存儲格式來緩存數(shù)據(jù),這種方式比行式存儲更節(jié)省內(nèi)存,并且支持壓縮。
列式存儲的主要優(yōu)勢:
- 更高的壓縮率:相同類型的數(shù)據(jù)放在一起,壓縮效率更高
- 謂詞下推:可以只讀取查詢所需的列
- 向量化處理:支持批量處理,提高CPU效率
以下配置可以優(yōu)化列式緩存:
// 啟用列式緩存壓縮
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)
// 設置批處理大小
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
// 啟用向量化讀取
spark.conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", true)3. 堆外內(nèi)存使用
Spark支持使用堆外內(nèi)存(Off-heap Memory)來存儲數(shù)據(jù),減少GC壓力:
啟用堆外內(nèi)存的配置:
// 啟用堆外內(nèi)存
spark.conf.set("spark.memory.offHeap.enabled", true)
// 設置堆外內(nèi)存大?。ㄗ止?jié))
spark.conf.set("spark.memory.offHeap.size", "10g")
// 啟用列向量堆外內(nèi)存
spark.conf.set("spark.sql.columnVector.offheap.enabled", true)三、內(nèi)存配置優(yōu)化
1. 執(zhí)行器內(nèi)存配置
合理配置執(zhí)行器內(nèi)存是優(yōu)化Spark性能的關鍵:
// 設置執(zhí)行器內(nèi)存
spark.conf.set("spark.executor.memory", "8g")
// 設置內(nèi)存開銷因子
spark.conf.set("spark.executor.memoryOverheadFactor", "0.1")
// 設置執(zhí)行器核心數(shù)
spark.conf.set("spark.executor.cores", "4")2. 內(nèi)存分配比例調(diào)整
調(diào)整執(zhí)行內(nèi)存和存儲內(nèi)存的比例:
// 設置存儲內(nèi)存占比(默認0.5,即50%)
spark.conf.set("spark.memory.storageFraction", "0.4")3. 動態(tài)內(nèi)存管理
Spark 1.6引入的統(tǒng)一內(nèi)存管理允許執(zhí)行內(nèi)存和存儲內(nèi)存動態(tài)共享:
// 啟用動態(tài)內(nèi)存分配(默認開啟)
spark.conf.set("spark.memory.useLegacyMode", false)四、Shuffle優(yōu)化
Shuffle是Spark中最消耗內(nèi)存和磁盤I/O的操作之一。
1. Shuffle內(nèi)存占比
// 設置shuffle內(nèi)存占比
spark.conf.set("spark.shuffle.memoryFraction", "0.2")2. Shuffle合并
// 啟用shuffle文件合并
spark.conf.set("spark.shuffle.consolidateFiles", true)3. Shuffle溢出優(yōu)化
// 設置溢出前內(nèi)存中排序的條目數(shù)
spark.conf.set("spark.shuffle.sort.bypassMergeThreshold", 200)五、狀態(tài)存儲優(yōu)化
對于Structured Streaming等有狀態(tài)操作,內(nèi)存優(yōu)化尤為重要。
1. RocksDB狀態(tài)存儲
Spark 3.2引入了RocksDB狀態(tài)存儲實現(xiàn),可以有效減少JVM GC壓力:
啟用RocksDB狀態(tài)存儲:
// 設置狀態(tài)存儲提供者為RocksDB
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass"
, "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)2. RocksDB內(nèi)存管理
RocksDB提供了內(nèi)存使用限制功能,避免OOM問題:
配置RocksDB內(nèi)存限制:
// 啟用RocksDB內(nèi)存限制
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", true)
// 設置最大內(nèi)存使用量(MB)
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", 1000)
// 設置寫緩沖區(qū)占比
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio", 0.4)
// 設置高優(yōu)先級池占比
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio", 0.1)3. 狀態(tài)存儲優(yōu)化示例
以下是一個使用RocksDB狀態(tài)存儲的Structured Streaming示例:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
// 配置Spark Session
val spark = SparkSession.builder()
.appName("StatefulStreamingWithMemoryOptimization")
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
.config("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", true)
.config("spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", 500)
.getOrCreate()
// 創(chuàng)建輸入流
val inputStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.load()
// 解析并處理數(shù)據(jù)
val processedStream = inputStream
.selectExpr("CAST(value AS STRING)")
.as[String]
.flatMap(_.split(" "))
.groupBy("value")
.count()
// 輸出結果
val query = processedStream.writeStream
.outputMode("update")
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
query.awaitTermination()六、內(nèi)存泄漏檢測與處理
Spark提供了內(nèi)存泄漏檢測機制:
啟用內(nèi)存泄漏檢測:
// 內(nèi)存泄漏時拋出異常
spark.conf.set("spark.unsafe.exceptionOnMemoryLeak", true)七、實際案例分析與優(yōu)化
1. 數(shù)據(jù)傾斜處理
數(shù)據(jù)傾斜會導致某些執(zhí)行器內(nèi)存壓力過大。解決方案:
// 示例:處理傾斜的join
// 1. 識別傾斜鍵
val skewedKeys = df1.groupBy("key").count().filter("count > 1000").select("key")
// 2. 對傾斜鍵進行特殊處理
val skewedKeysBroadcast = spark.sparkContext.broadcast(skewedKeys.collect().map(_.getString(0)).toSet)
// 3. 將數(shù)據(jù)分為傾斜和非傾斜部分
val dfSkewed = df1.filter(r => skewedKeysBroadcast.value.contains(r.getString(0)))
val dfNormal = df1.filter(r => !skewedKeysBroadcast.value.contains(r.getString(0)))
// 4. 對傾斜部分進行加鹽處理
val saltedDfSkewed = dfSkewed.withColumn("salt", (rand() * 10).cast("int"))
.withColumn("salted_key", concat($"key", lit("_"), $"salt"))
val saltedDf2 = df2.join(skewedKeys, "key")
.withColumn("salt", explode(array((0 until 10).map(lit): _*)))
.withColumn("salted_key", concat($"key", lit("_"), $"salt"))
// 5. 分別join并合并結果
val joinSkewed = saltedDfSkewed.join(saltedDf2, "salted_key").drop("salt", "salted_key")
val joinNormal = dfNormal.join(df2, "key")
val result = joinSkewed.union(joinNormal)2. 緩存優(yōu)化實例
以下是一個優(yōu)化DataFrame緩存的實例:
import org.apache.spark.storage.StorageLevel
// 創(chuàng)建測試數(shù)據(jù)
val df = spark.range(0, 1000000)
.withColumn("square", $"id" * $"id")
.withColumn("cube", $"square" * $"id")
// 1. 基準測試 - 不使用緩存
val t1 = System.nanoTime()
val count1 = df.filter($"square" > 1000).count()
val count2 = df.filter($"cube" > 10000).count()
val duration1 = (System.nanoTime() - t1) / 1e9d
println(s"未緩存執(zhí)行時間: $duration1 秒")
// 2. 使用默認緩存
df.cache()
df.count() // 觸發(fā)緩存
val t2 = System.nanoTime()
val count3 = df.filter($"square" > 1000).count()
val count4 = df.filter($"cube" > 10000).count()
val duration2 = (System.nanoTime() - t2) / 1e9d
println(s"默認緩存執(zhí)行時間: $duration2 秒")
df.unpersist()
// 3. 使用序列化緩存
df.persist(StorageLevel.MEMORY_ONLY_SER)
df.count() // 觸發(fā)緩存
val t3 = System.nanoTime()
val count5 = df.filter($"square" > 1000).count()
val count6 = df.filter($"cube" > 10000).count()
val duration3 = (System.nanoTime() - t3) / 1e9d
println(s"序列化緩存執(zhí)行時間: $duration3 秒")
df.unpersist()
// 4. 使用列式緩存(默認已啟用)
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 20000)
df.cache()
df.count() // 觸發(fā)緩存
val t4 = System.nanoTime()
val count7 = df.filter($"square" > 1000).count()
val count8 = df.filter($"cube" > 10000).count()
val duration4 = (System.nanoTime() - t4) / 1e9d
println(s"優(yōu)化列式緩存執(zhí)行時間: $duration4 秒")
df.unpersist()3. 內(nèi)存監(jiān)控與調(diào)優(yōu)
// 創(chuàng)建監(jiān)控函數(shù)
def monitorMemory(sc: SparkContext): Unit = {
val executorMemoryInfo = sc.getExecutorMemoryStatus
println("==== 內(nèi)存使用情況 ====")
executorMemoryInfo.foreach { case (executorId, (usedMem, maxMem)) =>
println(s"執(zhí)行器 $executorId: 已用內(nèi)存 ${usedMem / 1024 / 1024}MB, 最大內(nèi)存 ${maxMem / 1024 / 1024}MB")
}
// 打印存儲內(nèi)存使用情況
println("==== 緩存塊信息 ====")
sc.getRDDStorageInfo.foreach { info =>
println(s"RDD: ${info.name}, 分區(qū)數(shù): ${info.numPartitions}, " +
s"緩存級別: ${info.storageLevel}, 內(nèi)存使用: ${info.memoryUsed / 1024 / 1024}MB")
}
}
// 定期監(jiān)控內(nèi)存使用情況
val monitorThread = new Thread(() => {
while (true) {
monitorMemory(spark.sparkContext)
Thread.sleep(10000) // 每10秒監(jiān)控一次
}
})
monitorThread.setDaemon(true)
monitorThread.start()八、自適應查詢執(zhí)行(AQE)內(nèi)存優(yōu)化
Spark 3.0引入的自適應查詢執(zhí)行(Adaptive Query Execution)可以根據(jù)運行時統(tǒng)計信息動態(tài)調(diào)整執(zhí)行計劃,對內(nèi)存使用也有積極影響。
1. 啟用AQE
// 啟用自適應查詢執(zhí)行
spark.conf.set("spark.sql.adaptive.enabled", true)
// 設置合并shuffle分區(qū)的目標大小
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m")
// 啟用shuffle分區(qū)合并
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", true)
// 設置合并后的最小分區(qū)大小
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1m")2. AQE內(nèi)存優(yōu)化示例
// 創(chuàng)建測試數(shù)據(jù)
val largeDF = spark.range(0, 10000000)
.withColumn("key", $"id" % 1000)
.withColumn("value", rand() * 100)
// 設置較大的初始shuffle分區(qū)數(shù)
spark.conf.set("spark.sql.shuffle.partitions", 200)
// 執(zhí)行聚合查詢
val result = largeDF.groupBy("key")
.agg(
avg("value").as("avg_value"),
max("value").as("max_value"),
min("value").as("min_value")
)
.filter($"avg_value" > 50)
// 查看執(zhí)行計劃
result.explain(true)
// 執(zhí)行查詢并觀察實際使用的分區(qū)數(shù)
result.collect()九、列式存儲與壓縮優(yōu)化
Spark SQL的列式存儲是內(nèi)存優(yōu)化的重要手段,通過壓縮和編碼技術可以顯著減少內(nèi)存使用。
1. 列式存儲優(yōu)化示例
// 啟用列式存儲壓縮
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)
// 設置批處理大小
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
// 創(chuàng)建測試數(shù)據(jù)
val wideDF = spark.range(0, 100000)
.withColumn("col1", $"id" % 100)
.withColumn("col2", $"id" * 2)
.withColumn("col3", $"id" * 3)
.withColumn("col4", $"id" * 4)
.withColumn("col5", $"id" * 5)
.withColumn("col6", concat(lit("value-"), $"id".cast("string")))
// 緩存數(shù)據(jù)
wideDF.cache()
wideDF.count() // 觸發(fā)緩存
// 查看緩存統(tǒng)計信息
println(s"列式緩存內(nèi)存使用: ${spark.sparkContext.getRDDStorageInfo.filter(_.id == wideDF.rdd.id).map(_.memoryUsed).sum / 1024 / 1024}MB")
// 執(zhí)行查詢
val result = wideDF.filter($"col1" < 10).select("id", "col1", "col6")
result.explain()
result.show(5)十、堆外內(nèi)存優(yōu)化
堆外內(nèi)存(Off-heap Memory)是減輕GC壓力的有效方法,特別適合大數(shù)據(jù)量處理。
1. 堆外內(nèi)存示例
// 啟用堆外內(nèi)存
spark.conf.set("spark.memory.offHeap.enabled", true)
spark.conf.set("spark.memory.offHeap.size", "4g")
// 啟用列向量堆外內(nèi)存
spark.conf.set("spark.sql.columnVector.offheap.enabled", true)
// 創(chuàng)建測試數(shù)據(jù)
val largeDF = spark.range(0, 10000000)
.withColumn("value", rand() * 1000)
// 執(zhí)行聚合操作
val result = largeDF.groupBy($"id" % 100 as "key")
.agg(sum("value") as "sum_value")
.orderBy("key")
// 查看執(zhí)行計劃
result.explain()
// 執(zhí)行查詢
result.show()十一、Structured Streaming內(nèi)存優(yōu)化
對于Structured Streaming應用,狀態(tài)管理是內(nèi)存優(yōu)化的關鍵。
Structured Streaming優(yōu)化示例:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
// 配置Spark Session
val spark = SparkSession.builder()
.appName("StreamingMemoryOptimization")
// 啟用RocksDB狀態(tài)存儲
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
// 啟用RocksDB內(nèi)存限制
.config("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", true)
.config("spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", 1000)
.config("spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio", 0.4)
.config("spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio", 0.1)
// 啟用RocksDB壓縮
.config("spark.sql.streaming.stateStore.rocksdb.compression", "lz4")
// 啟用RocksDB變更日志檢查點
.config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", true)
.getOrCreate()
// 創(chuàng)建輸入流
val inputStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.option("startingOffsets", "latest")
.load()
// 解析JSON數(shù)據(jù)
val parsedStream = inputStream
.selectExpr("CAST(value AS STRING) as json")
.select(from_json($"json",
"id STRING, timestamp LONG, value DOUBLE").as("data"))
.select("data.*")
.withColumn("event_time",
to_timestamp($"timestamp" / 1000))
.withWatermark("event_time", "10 minutes")
// 執(zhí)行窗口聚合
val windowedAggregation = parsedStream
.groupBy(
window($"event_time", "5 minutes", "1 minute"),
$"id"
)
.agg(
avg("value").as("avg_value"),
count("*").as("event_count")
)
.select(
$"window.start".as("window_start"),
$"window.end".as("window_end"),
$"id",
$"avg_value",
$"event_count"
)
// 輸出結果
val query = windowedAggregation.writeStream
.outputMode("update")
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("1 minute"))
.start()十二、內(nèi)存泄漏檢測與處理
Spark提供了內(nèi)存泄漏檢測機制,可以幫助識別和解決內(nèi)存問題。
內(nèi)存泄漏監(jiān)控示例:
// 啟用內(nèi)存泄漏檢測
spark.conf.set("spark.unsafe.exceptionOnMemoryLeak", true)
// 創(chuàng)建自定義累加器監(jiān)控內(nèi)存使用
val memoryLeakMonitor = spark.sparkContext.longAccumulator("MemoryLeakMonitor")
// 創(chuàng)建可能導致內(nèi)存泄漏的函數(shù)
def processWithPotentialLeak(df: DataFrame): DataFrame = {
// 模擬處理邏輯
val result = df.mapPartitions { iter =>
// 記錄處理前內(nèi)存
val runtime = Runtime.getRuntime
val beforeMem = runtime.totalMemory() - runtime.freeMemory()
// 處理數(shù)據(jù)
val resultIter = iter.map(row => {
// 處理邏輯
row
})
// 記錄處理后內(nèi)存
val afterMem = runtime.totalMemory() - runtime.freeMemory()
memoryLeakMonitor.add(afterMem - beforeMem)
resultIter
}
result
}
// 使用監(jiān)控函數(shù)處理數(shù)據(jù)
val df = spark.range(0, 1000000).withColumn("value", rand())
val processed = processWithPotentialLeak(df)
processed.count()
// 檢查累加器值
println(s"內(nèi)存增長: ${memoryLeakMonitor.value} 字節(jié)")十三、綜合優(yōu)化案例
以下是一個綜合應用多種內(nèi)存優(yōu)化技術的實際案例。
1. 大規(guī)模數(shù)據(jù)處理優(yōu)化
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
// 配置Spark Session
val spark = SparkSession.builder()
.appName("ComprehensiveMemoryOptimization")
// 啟用自適應查詢執(zhí)行
.config("spark.sql.adaptive.enabled", true)
.config("spark.sql.adaptive.coalescePartitions.enabled", true)
// 啟用堆外內(nèi)存
.config("spark.memory.offHeap.enabled", true)
.config("spark.memory.offHeap.size", "8g")
.config("spark.sql.columnVector.offheap.enabled", true)
// 列式存儲優(yōu)化
.config("spark.sql.inMemoryColumnarStorage.compressed", true)
.config("spark.sql.inMemoryColumnarStorage.batchSize", 20000)
.getOrCreate()
// 讀取大規(guī)模數(shù)據(jù)
val rawData = spark.read
.format("parquet")
.load("/path/to/large/dataset")
// 數(shù)據(jù)預處理
val processedData = rawData
.filter($"value" > 0)
.withColumn("category", when($"value" < 100, "low")
.when($"value" < 500, "medium")
.otherwise("high"))
.withColumn("date", to_date($"timestamp"))
// 使用優(yōu)化的存儲級別緩存
processedData.persist(StorageLevel.MEMORY_AND_DISK_SER)
processedData.count() // 觸發(fā)緩存
// 檢測數(shù)據(jù)傾斜
val keyDistribution = processedData
.groupBy("key")
.count()
.cache()
val maxCount = keyDistribution.agg(max("count")).first().getLong(0)
val avgCount = keyDistribution.agg(avg("count")).first().getDouble(0)
println(s"最大鍵計數(shù): $maxCount, 平均鍵計數(shù): $avgCount, 傾斜比例: ${maxCount / avgCount}")
// 處理傾斜鍵
val skewThreshold = avgCount * 5
val skewedKeys = keyDistribution
.filter($"count" > skewThreshold)
.select("key")
.collect()
.map(_.getString(0))
.toSet
val skewedKeysBroadcast = spark.sparkContext.broadcast(skewedKeys)
// 分離傾斜和非傾斜數(shù)據(jù)
val skewedData = processedData
.filter(row => skewedKeysBroadcast.value.contains(row.getAs[String]("key")))
// 對兩部分數(shù)據(jù)分別進行聚合
val skewedAggregated = saltedSkewedData
.groupBy("salted_key")
.agg(
sum("value").as("sum_value"),
count("*").as("count")
)
.withColumn("key", split($"salted_key", "_").getItem(0))
.drop("salted_key")
.groupBy("key")
.agg(
sum("sum_value").as("sum_value"),
sum("count").as("count")
)
val normalAggregated = normalData
.groupBy("key")
.agg(
sum("value").as("sum_value"),
count("*").as("count")
)
// 合并結果
val finalResult = skewedAggregated.union(normalAggregated)2. 廣播變量優(yōu)化
廣播變量可以有效減少數(shù)據(jù)傳輸和內(nèi)存使用:
// 創(chuàng)建一個大型查找表
val lookupTable = spark.range(0, 100000)
.withColumn("value", rand() * 1000)
.collect()
.map(row => (row.getLong(0), row.getDouble(1)))
.toMap
// 廣播查找表
val broadcastLookupTable = spark.sparkContext.broadcast(lookupTable)
// 使用廣播變量進行查找
val result = spark.range(0, 1000000)
.withColumn("key", $"id" % 100000)
.mapPartitions { iter =>
val lookup = broadcastLookupTable.value
iter.map { row =>
val id = row.getLong(0)
val key = row.getLong(1)
val value = lookup.getOrElse(key, 0.0)
(id, key, value)
}
}十四、內(nèi)存調(diào)優(yōu)優(yōu)秀實踐
1. 內(nèi)存配置原則
- 合理設置執(zhí)行器內(nèi)存:根據(jù)集群節(jié)點內(nèi)存和并行度設置合適的執(zhí)行器內(nèi)存
- 避免過度分配:每個執(zhí)行器內(nèi)存不宜過大,以免GC時間過長
- 預留系統(tǒng)開銷:為操作系統(tǒng)和其他進程預留足夠內(nèi)存
- 調(diào)整存儲與執(zhí)行內(nèi)存比例:根據(jù)應用特點調(diào)整存儲內(nèi)存和執(zhí)行內(nèi)存的比例
// 示例:4節(jié)點集群,每節(jié)點64GB內(nèi)存,16核
// 設置每個執(zhí)行器使用4核
spark.conf.set("spark.executor.cores", "4")
// 每節(jié)點運行3個執(zhí)行器,每執(zhí)行器約16GB內(nèi)存
spark.conf.set("spark.executor.memory", "16g")
// 設置內(nèi)存開銷因子
spark.conf.set("spark.executor.memoryOverhead", "2g")
// 調(diào)整存儲內(nèi)存比例
spark.conf.set("spark.memory.storageFraction", "0.4")2. 緩存策略選擇
根據(jù)數(shù)據(jù)特點和查詢模式選擇合適的緩存策略:
數(shù)據(jù)特點 | 推薦緩存策略 |
高頻訪問,內(nèi)存充足 | MEMORY_ONLY |
高頻訪問,內(nèi)存有限 | MEMORY_ONLY_SER |
數(shù)據(jù)量大,查詢少 | MEMORY_AND_DISK_SER |
數(shù)據(jù)量極大,內(nèi)存緊張 | OFF_HEAP |
// 示例:根據(jù)數(shù)據(jù)大小選擇緩存策略
def smartCache(df: DataFrame): DataFrame = {
val sizeEstimate = SparkContext.getActive.get.estimateRDDSize(df.rdd)
val availableMemory = SparkContext.getActive.get.getExecutorMemoryStatus
.map(_._2._2).sum * 0.6 // 可用內(nèi)存的60%
if (sizeEstimate < availableMemory * 0.5) {
// 數(shù)據(jù)較小,使用MEMORY_ONLY
df.persist(StorageLevel.MEMORY_ONLY)
} else if (sizeEstimate < availableMemory * 0.8) {
// 數(shù)據(jù)中等,使用MEMORY_ONLY_SER
df.persist(StorageLevel.MEMORY_ONLY_SER)
} else {
// 數(shù)據(jù)較大,使用MEMORY_AND_DISK_SER
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
}
df
}十五、高級內(nèi)存優(yōu)化技術
1. 列裁剪和謂詞下推
列裁剪和謂詞下推可以減少處理的數(shù)據(jù)量,從而降低內(nèi)存使用:
// 啟用列裁剪和謂詞下推
spark.conf.set("spark.sql.optimizer.columnPruning.enabled", true)
spark.conf.set("spark.sql.optimizer.nestedPredicatePushdown.enabled", true)
// 示例:只選擇需要的列并盡早過濾
val optimizedQuery = spark.table("large_table")
.select("id", "name", "value") // 列裁剪
.filter($"value" > 100) // 謂詞下推
.join(spark.table("small_table").select("id", "category"), "id")2. 分區(qū)修剪
分區(qū)修剪可以減少讀取的數(shù)據(jù)量:
// 啟用動態(tài)分區(qū)修剪
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", true)
// 示例:使用分區(qū)字段進行過濾
val result = spark.table("partitioned_table")
.filter($"date" >= "2023-01-01" && $"date" <= "2023-01-31")
.join(spark.table("dimension_table"), "id")3. 內(nèi)存使用監(jiān)控與調(diào)優(yōu)
定期監(jiān)控內(nèi)存使用情況,及時調(diào)整配置:
// 創(chuàng)建內(nèi)存使用監(jiān)控函數(shù)
def monitorMemoryUsage(sc: SparkContext): Unit = {
// 獲取執(zhí)行器內(nèi)存狀態(tài)
val executorMemoryStatus = sc.getExecutorMemoryStatus
// 計算總內(nèi)存和已用內(nèi)存
val totalMem = executorMemoryStatus.map(_._2._2).sum
val usedMem = executorMemoryStatus.map(_._2._1).sum
// 計算內(nèi)存使用率
val memoryUtilization = usedMem.toDouble / totalMem
println(s"內(nèi)存使用率: ${memoryUtilization * 100}%")
println(s"已用內(nèi)存: ${usedMem / 1024 / 1024} MB")
println(s"總內(nèi)存: ${totalMem / 1024 / 1024} MB")
// 獲取存儲內(nèi)存使用情況
val storageMemoryUsed = sc.getRDDStorageInfo.map(_.memoryUsed).sum
println(s"存儲內(nèi)存使用: ${storageMemoryUsed / 1024 / 1024} MB")
// 檢查是否需要調(diào)整配置
if (memoryUtilization > 0.85) {
println("警告:內(nèi)存使用率過高,考慮增加執(zhí)行器內(nèi)存或減少并行度")
}
if (storageMemoryUsed > usedMem * 0.7) {
println("警告:存儲內(nèi)存占比過高,考慮調(diào)整存儲內(nèi)存比例或減少緩存數(shù)據(jù)量")
}
}
// 定期執(zhí)行監(jiān)控
val monitoringThread = new Thread(() => {
while (true) {
try {
monitorMemoryUsage(spark.sparkContext)
Thread.sleep(60000) // 每分鐘監(jiān)控一次
} catch {
case e: Exception => println(s"監(jiān)控異常: ${e.getMessage}")
}
}
})
monitoringThread.setDaemon(true)
monitoringThread.start()十六、特定場景的內(nèi)存優(yōu)化
1. 機器學習應用優(yōu)化
機器學習應用通常需要處理大量特征和模型參數(shù):
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.RandomForestClassifier
// 啟用ML優(yōu)化配置
spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.memory.offHeap.enabled", true)
spark.conf.set("spark.memory.offHeap.size", "4g")
// 讀取數(shù)據(jù)
val data = spark.read.parquet("/path/to/features")
// 特征工程
val featureCols = data.columns.filter(_ != "label")
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
// 使用列式存儲優(yōu)化特征數(shù)據(jù)
val assembled = assembler.transform(data)
.select("features", "label")
assembled.cache()
assembled.count()
// 訓練模型
val rf = new RandomForestClassifier()
.setNumTrees(100)
.setMaxDepth(10)
.setFeatureSubsetStrategy("sqrt")
// 使用checkpoint減少RDD依賴鏈
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
val model = rf.fit(assembled)2. 圖計算優(yōu)化
圖計算應用通常需要處理大量頂點和邊的數(shù)據(jù):
import org.apache.spark.graphx._
import org.apache.spark.storage.StorageLevel
// 配置圖計算優(yōu)化
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
// 創(chuàng)建頂點和邊
val vertices = spark.sparkContext.parallelize(
(1L to 1000000L).map(id => (id, Map("name" -> s"vertex_$id")))
)
val edges = spark.sparkContext.parallelize(
(1L to 2000000L).map { i =>
val src = scala.util.Random.nextInt(1000000) + 1L
val dst = scala.util.Random.nextInt(1000000) + 1L
Edge(src, dst, 1.0)
}
)
// 創(chuàng)建圖并使用優(yōu)化的存儲級別
val graph = Graph(vertices, edges, Map.empty[String, Any],
StorageLevel.MEMORY_AND_DISK_SER,
StorageLevel.MEMORY_AND_DISK_SER)
// 緩存圖
graph.cache()
graph.vertices.count()
graph.edges.count()
// 執(zhí)行PageRank算法
val ranks = graph.pageRank(0.0001).vertices十七、總結與優(yōu)秀實踐
1. 內(nèi)存優(yōu)化核心原則
- 了解應用特點:分析應用的數(shù)據(jù)量、計算模式和內(nèi)存需求
- 合理配置資源:根據(jù)集群規(guī)模和應用特點配置執(zhí)行器數(shù)量和內(nèi)存
- 選擇適當?shù)木彺娌呗裕焊鶕?jù)數(shù)據(jù)特點選擇合適的存儲級別
- 利用列式存儲和壓縮:減少內(nèi)存占用,提高查詢效率
- 使用堆外內(nèi)存:減輕GC壓力,提高大數(shù)據(jù)處理能力
- 監(jiān)控和調(diào)優(yōu):定期監(jiān)控內(nèi)存使用情況,及時調(diào)整配置
2. 常見問題及解決方案
問題 | 解決方案 |
OOM錯誤 | 增加執(zhí)行器內(nèi)存、減少并行度、使用序列化緩存 |
GC時間過長 | 使用堆外內(nèi)存、調(diào)整GC策略、減少執(zhí)行器內(nèi)存大小 |
數(shù)據(jù)傾斜 | 加鹽處理、拆分任務、使用AQE |
緩存效率低 | 調(diào)整批處理大小、啟用壓縮、使用列式存儲 |
Shuffle溢出 | 增加shuffle內(nèi)存比例、調(diào)整分區(qū)數(shù)、使用AQE |























