還不收藏?Spark動態(tài)內(nèi)存管理源碼解析!
一、Spark內(nèi)存管理模式
Spark有兩種內(nèi)存管理模式,靜態(tài)內(nèi)存管理(Static MemoryManager)和動態(tài)(統(tǒng)一)內(nèi)存管理(Unified MemoryManager)。動態(tài)內(nèi)存管理從Spark1.6開始引入,在SparkEnv.scala中的源碼可以看到,Spark目前默認(rèn)采用動態(tài)內(nèi)存管理模式,若將spark.memory.useLegacyMode設(shè)置為true,則會改為采用靜態(tài)內(nèi)存管理。
- // SparkEnv.scala
 - val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
 - val memoryManager: MemoryManager =
 - if (useLegacyMemoryManager) {
 - new StaticMemoryManager(conf, numUsableCores)
 - } else {
 - UnifiedMemoryManager(conf, numUsableCores)
 - }
 
二、Spark動態(tài)內(nèi)存管理空間分配
相比于Static MemoryManager模式,Unified MemoryManager模型打破了存儲內(nèi)存和運行內(nèi)存的界限,使每一個內(nèi)存區(qū)能夠動態(tài)伸縮,降低OOM的概率。由上圖可知,executor JVM內(nèi)存主要由以下幾個區(qū)域組成:
(1)Reserved Memory(預(yù)留內(nèi)存):這部分內(nèi)存預(yù)留給系統(tǒng)使用,默認(rèn)為300MB,可通過spark.testing.reservedMemory進(jìn)行設(shè)置。
- // UnifiedMemoryManager.scala
 - private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
 
另外,JVM內(nèi)存的最小值也與reserved Memory有關(guān),即minSystemMemory = reserved Memory*1.5,即默認(rèn)情況下JVM內(nèi)存最小值為300MB*1.5=450MB。
- // UnifiedMemoryManager.scala
 - val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
 
(2)Spark Memeoy:分為execution Memory和storage Memory。去除掉reserved Memory,剩下usableMemory的一部分用于execution和storage這兩類堆內(nèi)存,默認(rèn)是0.6,可通過spark.memory.fraction進(jìn)行設(shè)置。例如:JVM內(nèi)存是1G,那么用于execution和storage的默認(rèn)內(nèi)存為(1024-300)*0.6=434MB。
- // UnifiedMemoryManager.scala
 - val usableMemory = systemMemory - reservedMemory
 - val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
 - (usableMemory * memoryFraction).toLong
 
他們的邊界由spark.memory.storageFraction設(shè)定,默認(rèn)為0.5。即默認(rèn)狀態(tài)下storage Memory和execution Memory為1:1.
- // UnifiedMemoryManager.scala
 - onHeapStorageRegionSize =
 - (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
 - numCores = numCores)
 
(3)user Memory:剩余內(nèi)存,用戶根據(jù)需要使用,默認(rèn)占usableMemory的(1-0.6)=0.4.
三、內(nèi)存控制詳解
首先我們先來了解一下Spark內(nèi)存管理實現(xiàn)類之前的關(guān)系。
1.MemoryManager主要功能是:(1)記錄用了多少StorageMemory和ExecutionMemory;(2)申請Storage、Execution和Unroll Memory;(3)釋放Stroage和Execution Memory。
Execution內(nèi)存用來執(zhí)行shuffle、joins、sorts和aggegations操作,Storage內(nèi)存用于緩存和廣播數(shù)據(jù),每一個JVM中都存在著一個MemoryManager。構(gòu)造MemoryManager需要指定onHeapStorageMemory和onHeapExecutionMemory參數(shù)。
- // MemoryManager.scala
 - private[spark] abstract class MemoryManager(
 - conf: SparkConf,
 - numCores: Int,
 - onHeapStorageMemory: Long,
 - onHeapExecutionMemory: Long) extends Logging {
 
創(chuàng)建StorageMemoryPool和ExecutionMemoryPool對象,用來創(chuàng)建堆內(nèi)或堆外的Storage和Execution內(nèi)存池,管理Storage和Execution的內(nèi)存分配。
- // MemoryManager.scala
 - @GuardedBy("this")
 - protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
 - @GuardedBy("this")
 - protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
 - @GuardedBy("this")
 - protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
 - @GuardedBy("this")
 - protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
 
默認(rèn)情況下,不使用堆外內(nèi)存,可通過saprk.memory.offHeap.enabled設(shè)置,默認(rèn)堆外內(nèi)存為0,可使用spark.memory.offHeap.size參數(shù)設(shè)置。
- // All the code you will ever need
 - final val tungstenMemoryMode: MemoryMode = {
 - if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
 - require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
 - "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
 - require(Platform.unaligned(),
 - "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
 - MemoryMode.OFF_HEAP
 - } else {
 - MemoryMode.ON_HEAP
 - }
 - }
 
- // MemoryManager.scala
 - protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
 
釋放numBytes字節(jié)的Execution內(nèi)存方法
- // MemoryManager.scala
 - def releaseExecutionMemory(
 - numBytes: Long,
 - taskAttemptId: Long,
 - memoryMode: MemoryMode): Unit = synchronized {
 - memoryMode match {
 - case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
 - case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
 - }
 - }
 
釋放指定task的所有Execution內(nèi)存并將該task標(biāo)記為inactive。
- // MemoryManager.scala
 - private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
 - onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
 - offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
 - }
 
釋放numBytes字節(jié)的Stoarge內(nèi)存方法
- // MemoryManager.scala
 - def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
 - memoryMode match {
 - case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
 - case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
 - }
 - }
 
釋放所有Storage內(nèi)存方法
- // MemoryManager.scala
 - final def releaseAllStorageMemory(): Unit = synchronized {
 - onHeapStorageMemoryPool.releaseAllMemory()
 - offHeapStorageMemoryPool.releaseAllMemory()
 - }
 
2.接下來我們了解一下,UnifiedMemoryManager是如何對內(nèi)存進(jìn)行控制的?動態(tài)內(nèi)存是如何實現(xiàn)的呢?
UnifiedMemoryManage繼承了MemoryManager
- // UnifiedMemoryManage.scala
 - private[spark] class UnifiedMemoryManager private[memory] (
 - conf: SparkConf,
 - val maxHeapMemory: Long,
 - onHeapStorageRegionSize: Long,
 - numCores: Int)
 - extends MemoryManager(
 - conf,
 - numCores,
 - onHeapStorageRegionSize,
 - maxHeapMemory - onHeapStorageRegionSize) {
 
重寫了maxOnHeapStorageMemory方法,***Storage內(nèi)存=***內(nèi)存-***Execution內(nèi)存。
- // UnifiedMemoryManage.scala
 - override def maxOnHeapStorageMemory: Long = synchronized {
 - maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
 - }
 
核心方法acquireStorageMemory:申請Storage內(nèi)存。
- // UnifiedMemoryManage.scala
 - override def acquireStorageMemory(
 - blockId: BlockId,
 - numBytes: Long,
 - memoryMode: MemoryMode): Boolean = synchronized {
 - assertInvariants()
 - assert(numBytes >= 0)
 - val (executionPool, storagePool, maxMemory) = memoryMode match {
 - //根據(jù)不同的內(nèi)存模式去創(chuàng)建StorageMemoryPool和ExecutionMemoryPool
 - case MemoryMode.ON_HEAP => (
 - onHeapExecutionMemoryPool,
 - onHeapStorageMemoryPool,
 - maxOnHeapStorageMemory)
 - case MemoryMode.OFF_HEAP => (
 - offHeapExecutionMemoryPool,
 - offHeapStorageMemoryPool,
 - maxOffHeapMemory)
 - }
 - if (numBytes > maxMemory) {
 - // 若申請內(nèi)存大于***內(nèi)存,則申請失敗
 - logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
 - s"memory limit ($maxMemory bytes)")
 - return false
 - }
 - if (numBytes > storagePool.memoryFree) {
 - // 如果Storage內(nèi)存池沒有足夠的內(nèi)存,則向Execution內(nèi)存池借用
 - val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)//當(dāng)Execution內(nèi)存有空閑時,Storage才能借到內(nèi)存
 - executionPool.decrementPoolSize(memoryBorrowedFromExecution)//縮小Execution內(nèi)存
 - storagePool.incrementPoolSize(memoryBorrowedFromExecution)//增加Storage內(nèi)存
 - }
 - storagePool.acquireMemory(blockId, numBytes)
 - }
 
核心方法acquireExecutionMemory:申請Execution內(nèi)存。
- // UnifiedMemoryManage.scala
 - override private[memory] def acquireExecutionMemory(
 - numBytes: Long,
 - taskAttemptId: Long,
 - memoryMode: MemoryMode): Long = synchronized {//使用了synchronized關(guān)鍵字,調(diào)用acquireExecutionMemory方法可能會阻塞,直到Execution內(nèi)存池有足夠的內(nèi)存。
 - ...
 - executionPool.acquireMemory(
 - numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
 - }
 
方法***調(diào)用了ExecutionMemoryPool的acquireMemory方法,該方法的參數(shù)需要兩個函數(shù):maybeGrowExecutionPool()和computeMaxExecutionPoolSize()。
每個Task能夠使用的內(nèi)存被限制在pooSize / (2 * numActiveTask) ~ maxPoolSize / numActiveTasks。其中maxPoolSize代表了execution pool的***內(nèi)存,poolSize表示當(dāng)前這個pool的大小。
- // ExecutionMemoryPool.scala
 - val maxPoolSize = computeMaxPoolSize()
 - val maxMemoryPerTask = maxPoolSize / numActiveTasks
 - val minMemoryPerTask = poolSize / (2 * numActiveTasks)
 
maybeGrowExecutionPool()方法實現(xiàn)了如何動態(tài)增加Execution內(nèi)存區(qū)的大小。在每次申請execution內(nèi)存的同時,execution內(nèi)存池會進(jìn)行多次嘗試,每次嘗試都可能會回收一些存儲內(nèi)存。
- // UnifiedMemoryManage.scala
 - def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
 - if (extraMemoryNeeded > 0) {//如果申請的內(nèi)存大于0
 - //計算execution可借到的storage內(nèi)存,是storage剩余內(nèi)存和可借出內(nèi)存的***值
 - val memoryReclaimableFromStorage = math.max(
 - storagePool.memoryFree,
 - storagePool.poolSize - storageRegionSize)
 - if (memoryReclaimableFromStorage > 0) {//如果可以申請到內(nèi)存
 - val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
 - math.min(extraMemoryNeeded, memoryReclaimableFromStorage))//實際需要的內(nèi)存,取實際需要的內(nèi)存和storage內(nèi)存區(qū)域全部可用內(nèi)存大小的最小值
 - storagePool.decrementPoolSize(spaceToReclaim)//storage內(nèi)存區(qū)域減少
 - executionPool.incrementPoolSize(spaceToReclaim)//execution內(nèi)存區(qū)域增加
 - }
 - }
 - }
 















 
 
 






 
 
 
 