從源碼解密Spark內(nèi)存管理
內(nèi)存不過(guò)是計(jì)算機(jī)分級(jí)存儲(chǔ)系統(tǒng)中的靠近c(diǎn)pu的一個(gè)存儲(chǔ)介質(zhì)
1.spark運(yùn)行起來(lái)內(nèi)存里都存的啥?
2.如何管理里面所存的東西?
3.spark用java和scala這樣的jvm語(yǔ)言寫(xiě)的,沒(méi)有像c語(yǔ)言那樣顯式申請(qǐng)釋放內(nèi)存,如何進(jìn)行內(nèi)存的管理的?
4.我們應(yīng)該如何設(shè)置spark關(guān)于內(nèi)存的參數(shù)?
一、內(nèi)存模型
遠(yuǎn)古大神曾告訴我們這個(gè)神秘公式:程序=算法+數(shù)據(jù)。
1.1 什么是內(nèi)存模型
內(nèi)存模型就是告訴我們?cè)趺磩澐謨?nèi)存、怎么合理利用我們的內(nèi)存。
首先我們要存什么,根據(jù)大神的公式,我們這樣來(lái)分析:
數(shù)據(jù): 就是我們代碼操作的數(shù)據(jù),比如人的數(shù)據(jù)(年齡、職位等)或者輸入的某個(gè)值。這些可在運(yùn)行時(shí)將要計(jì)算的部分?jǐn)?shù)據(jù)加載到內(nèi)存。
算法:就是操作數(shù)據(jù)的邏輯,表現(xiàn)形式就是代碼或者編譯后的指令。當(dāng)然它要運(yùn)行起來(lái),會(huì)依賴一部分內(nèi)存,來(lái)儲(chǔ)存程序計(jì)數(shù)器(代碼執(zhí)行到那一句了)、函數(shù)調(diào)用棧等運(yùn)行時(shí)需要的數(shù)據(jù)??偠灾褪菆?zhí)行數(shù)據(jù)操作邏輯所必要的內(nèi)存。
這下我們就可以把我們需要儲(chǔ)存的東西分為數(shù)據(jù)區(qū)和執(zhí)行區(qū)。
二、spark內(nèi)存模型
2.1 spark為啥快
我們都知道spark之所以比mapreduce計(jì)算的快,是因?yàn)樗腔趦?nèi)存的,不用每次計(jì)算完都寫(xiě)磁盤(pán),再讀取出來(lái)進(jìn)行下一次計(jì)算,spark直接把內(nèi)存作為數(shù)據(jù)的臨時(shí)儲(chǔ)存介質(zhì)。所以mapreduce就沒(méi)有強(qiáng)調(diào)內(nèi)存管理,而spark需要管理內(nèi)存。
2.2 spark管理的內(nèi)存
系統(tǒng)區(qū):spark運(yùn)行自身的代碼需要一定的空間。
用戶區(qū):我們自己寫(xiě)的一些udf之類的代碼也需要一定的空間來(lái)運(yùn)行。
存儲(chǔ)區(qū):spark的任務(wù)就是操作數(shù)據(jù),spark為了快可能把數(shù)據(jù)存內(nèi)存,而這些數(shù)據(jù)也需要占用空間。
執(zhí)行區(qū):spark操作數(shù)據(jù)的單元是partition,spark在執(zhí)行一些shuffle、join、sort、aggregation之類的操作,需要把partition加載到內(nèi)存進(jìn)行運(yùn)算,這也會(huì)運(yùn)用到部分內(nèi)存。
2.3 spark內(nèi)存模型
上圖就是spark內(nèi)存劃分的圖了
我們從下到上一層一層的解釋:
第1層:整個(gè)excutor所用到的內(nèi)存
第2層:分為jvm中的內(nèi)存和jvm外的內(nèi)存,這里的jvm內(nèi)存在yarn的時(shí)候就是指申請(qǐng)的container的內(nèi)存
第3層:對(duì)于spark來(lái)內(nèi)存分為jvm堆內(nèi)的和memoryoverhead、off-heap
jvm堆內(nèi)的下一層再說(shuō)
memoryOverhead: 對(duì)應(yīng)的參數(shù)就是spark.yarn.executor.memoryOverhead 這塊內(nèi)存是用于虛擬機(jī)的開(kāi)銷、內(nèi)部的字符串、還有一些本地開(kāi)銷(比如python需要用到的內(nèi)存)等。其實(shí)就是額外的內(nèi)存,spark并不會(huì)對(duì)這塊內(nèi)存進(jìn)行管理。
off-heap : 這里特指的spark.memory.offHeap.size這個(gè)參數(shù)指定的內(nèi)存(廣義上是指所有堆外的)。這部分內(nèi)存的申請(qǐng)和釋放是直接進(jìn)行的不通過(guò)jvm管控所以沒(méi)有GC,被spark分為storage和excution兩部分和第5層講的一同被spark統(tǒng)一進(jìn)行管理。
第4層:jvm堆內(nèi)的內(nèi)存分為三個(gè)部分
reservedMemory: 預(yù)留內(nèi)存300M,用于保障spark正常運(yùn)行
other memory: 用于spark內(nèi)部的一些元數(shù)據(jù)、用戶的數(shù)據(jù)結(jié)構(gòu)、防止出現(xiàn)對(duì)內(nèi)存估計(jì)不足導(dǎo)致oom時(shí)的內(nèi)存緩沖、占用空間比較大的記錄做緩沖
memory faction: spark主要控制的內(nèi)存,由參數(shù)spark.memory.fraction控制。
第5層:分成storage和execution 由參數(shù)spark.memory.storageFraction控制它兩的大小,但是
execution: 用于spark的計(jì)算:shuffle、sort、aggregation等這些計(jì)算時(shí)會(huì)用到的內(nèi)存,如果計(jì)算是內(nèi)存不足會(huì)向storage部分借,如果還是不夠就會(huì)spill到磁盤(pán)。
storage: 主要用于rdd的緩存,如果execution來(lái)借內(nèi)存,可能會(huì)犧牲自己丟棄緩存來(lái)借給execution,storage也可以向execution借內(nèi)存,但execution不會(huì)犧牲自己。
三、源碼層面
3.1 整體架構(gòu)
- 內(nèi)存申請(qǐng)和釋放(綠色):
 
看上圖綠色那塊,就是內(nèi)存的申請(qǐng)和釋放模塊。MemoryAllocator接口負(fù)責(zé)內(nèi)存申請(qǐng),有兩個(gè)子類實(shí)現(xiàn)分別負(fù)責(zé)堆內(nèi)內(nèi)存和off-heap內(nèi)存。
- 內(nèi)存池(粉色):
 
MemoryPool內(nèi)存池有兩個(gè)子類分別管理著執(zhí)行內(nèi)存和儲(chǔ)存內(nèi)存。可以看到兩種內(nèi)存池的申請(qǐng)方法的參數(shù)有很明顯的區(qū)別,執(zhí)行內(nèi)存主要是面向task的,而儲(chǔ)存內(nèi)存主要是面向block的也就是用于rdd緩存呀啥的。
- 統(tǒng)一內(nèi)存管理:
 
MemoryManager負(fù)責(zé)記錄內(nèi)存的消耗,管理這4個(gè)內(nèi)存池,子類UnifiedMemoryManager負(fù)責(zé)把這執(zhí)行內(nèi)存和儲(chǔ)存內(nèi)存統(tǒng)一起來(lái)管理,實(shí)現(xiàn)相互借用之類的功能。
- MemoryManager的使用場(chǎng)景
 
一個(gè)是BlockManager用于管理儲(chǔ)存,還有一部分是運(yùn)行Task是的內(nèi)存使用,主要有executor的使用,shuffle時(shí)spill呀外部排序呀,這樣的場(chǎng)景。
3.2 如何實(shí)現(xiàn)內(nèi)存申請(qǐng)釋放。
spark是用scala和java實(shí)現(xiàn)的,印象中沒(méi)有管理內(nèi)存申請(qǐng)釋放的api,spark是如何利用這些jvm語(yǔ)言管理內(nèi)存的呢。
我們來(lái)看看源碼片段
- //HeapMemoryAllocator.scalaprivate final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();
 - …… public MemoryBlock allocate(long size) throws OutOfMemoryError {
 - …… 上面是些內(nèi)存的判斷 …… long[] array = new long[numWords];//上面這就很關(guān)鍵了
 - MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
 - memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
 - } return memory;
 - }
 
HeapMemoryAllocator可以看到上面的源碼片段,實(shí)際的內(nèi)存申請(qǐng)是這個(gè)代碼:new long[numWords]; 就是new了個(gè)數(shù)組來(lái)占著內(nèi)存,用MemoryBlock 包裝了一下。bufferPoolsBySize這個(gè)是為了防止內(nèi)存頻繁申請(qǐng)和釋放做的buffer。
接下來(lái)看看off-heap是怎么申請(qǐng)內(nèi)存的。
- //UnsafeMemoryAllocator
 - public MemoryBlock allocate(long size) throws OutOfMemoryError { long address = Platform.allocateMemory(size);
 - MemoryBlock memory = new MemoryBlock(null, address, size); if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
 - memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
 - } return memory;
 - }
 
offheap的就和C語(yǔ)言一樣的了可以直接使用api來(lái)申請(qǐng)。這部分內(nèi)存就需要自己進(jìn)行管理了,沒(méi)有jvm的控制,沒(méi)有內(nèi)存回收機(jī)制。
當(dāng)然這也不意味了你能***制的使用內(nèi)存,在yarn的情況下,yarn是監(jiān)測(cè)子進(jìn)程的內(nèi)存占用來(lái)看你是否超了內(nèi)存,如果超了直接kill掉。
四、總結(jié)
我們能回答開(kāi)頭提出的幾個(gè)問(wèn)題了嗎?還是又有了更多的問(wèn)題呢。

















 
 
 










 
 
 
 