Spark體系架構(gòu)必讀
最近看到一篇關(guān)于Spark架構(gòu)的博文,作者是 Alexey Grishchenko??催^Alexey博文的同學(xué)應(yīng)該都知道,他對Spark理解地非常深入,讀完他的 “spark-architecture” 這篇博文,有種醍醐灌頂?shù)母杏X,從JVM內(nèi)存分配到Spark集群的資源管理,步步深入,感觸頗多。因此,在周末的業(yè)余時(shí)間里,將此文的核心內(nèi)容譯成中文,并在這里與大家分享。如在翻譯過程中有文字上的表達(dá)紕漏,還請大家指出。
首先來看一張Spark 1.3.0 官方給出的圖片,如下:

在這張圖中,你會(huì)看到很多的術(shù)語 ,諸如“executor”, “task”, “cache”, “Worker Node” 等。原作者表示,在他開始學(xué)spark的時(shí)候,上述圖是唯一一張可以找到的圖片(Spark 1.3.0),形勢很不樂觀。更加不幸地是,這張圖并沒有很好地表達(dá)出Spark內(nèi)在的一些概念。因此,通過不斷地學(xué)習(xí),作者將自己所學(xué)的知識(shí)整理成一個(gè)系列,而此文僅是其中的一篇。下面進(jìn)入核心要點(diǎn)。
Spark 內(nèi)存分配
在你的cluster或是local machine上正常運(yùn)行的任何Spark程序都是一個(gè)JVM進(jìn)程。對于任何的JVM進(jìn)程,你都可以使用-Xmx和-Xms配置它的堆大小(heap size)。問題是:這些進(jìn)程是如何使用它的堆內(nèi)存(heap memory)以及為何需要它呢?下面圍繞這個(gè)問題慢慢展開。
首先來看看下面這張Spark JVM堆內(nèi)存分配圖:

Heap Size
默認(rèn)情況下,Spark啟動(dòng)時(shí)會(huì)初始化512M的JVM 堆內(nèi)存。處于安全角度以及避免OOM錯(cuò)誤,Spark只允許使用90%的的堆內(nèi)存,該參數(shù)可以通過Spark的spark.storage.safetyFraction參數(shù)進(jìn)行控制。 OK,你可能聽說Spark是基于內(nèi)存的工具,它允許你將數(shù)據(jù)存在內(nèi)存中。如果你讀過作者的 Spark Misconceptions 這篇文章,那么你應(yīng)該知道Spark其實(shí)不是真正的基于內(nèi)存(in-memory)的工具。它僅僅是在LRU cache (http://en.wikipedia.org/wiki/Cache_algorithms) 過程中使用內(nèi)存。所以一部分的內(nèi)存用在數(shù)據(jù)緩存上,這部分通常占安全堆內(nèi)存(90%)的60%,該參數(shù)也可以通過配置spark.storage.memoryFraction進(jìn)行控制。因此,如果你想知道在Spark中可以緩存多少數(shù)據(jù),你可以通過對所有executor的堆大小求和,然后乘以safetyFraction 和storage.memoryFraction即可,默認(rèn)情況下是0.9 * 0.6 = 0.54,即總的堆內(nèi)存的54%可供Spark使用。
Shuffle Memory
接下來談?wù)剆huffle memory,計(jì)算公式是 “Heap Size” * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction。spark.shuffle.safetyFraction的默認(rèn)值是 0.8 或80%, spark.shuffle.memoryFraction的默認(rèn)值是0.2或20%,所以你***可以用于shuffle的JVM heap 內(nèi)存大小是 0.8*0.2=0.16,即總heap size的16%。 問題是Spark是如何來使用這部分內(nèi)存呢?官方的Github上面有更詳細(xì)的解釋(https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala)。
總得來說,Spark將這部分memory 用于Shuffle階段調(diào)用其他的具體task。當(dāng)shuffle執(zhí)行之后,有時(shí)你需要對數(shù)據(jù)進(jìn)行sort。在sort階段,通常你還需要一個(gè)類似緩沖的buffer來存儲(chǔ)已經(jīng)排序好的數(shù)據(jù)(謹(jǐn)記,不能修改已經(jīng)LRU cache中的數(shù)據(jù),因?yàn)檫@些數(shù)據(jù)可能會(huì)再次使用)。因此,需要一定數(shù)量的RAM來存儲(chǔ)已經(jīng)sorted的數(shù)據(jù)塊。如果你沒有足夠的memory用來排序,該怎么做呢?在wikipedia 搜一下“external sorting” (外排序),仔細(xì)研讀一下即可。外排序允許你對塊對數(shù)據(jù)塊進(jìn)行分類,然后將***的結(jié)果合并到一起。
unroll Memory
關(guān)于RAM***要講到”unroll” memory,用于unroll 進(jìn)程的內(nèi)存總量計(jì)算公式為:spark.storage.unrollFraction * spark.storage.memoryFraction *spark.storage.safetyFraction。默認(rèn)情況下是 0.2 * 0.6 * 0.9 = 0.108,即10.8%的heap size。 當(dāng)你需要在內(nèi)存中將數(shù)據(jù)塊展開的時(shí)候使用它。為什么需要 unroll 操作呢?在Spark中,允許以 序列化(serialized )和反序列化(deserialized) 兩種方式存儲(chǔ)數(shù)據(jù),而對于序列化后的數(shù)據(jù)是無法直接使用的,所以在使用時(shí)必須對其進(jìn)行unroll操作,因此這部分RAM是用于unrolling操作的內(nèi)存。unroll memory 與storage RAM 是共享的,也就是當(dāng)你在對數(shù)據(jù)執(zhí)行unroll操作時(shí),如果需要內(nèi)存,而這個(gè)時(shí)候內(nèi)存卻不夠,那么可能會(huì)致使撤銷存儲(chǔ)在 Spark LRU cache中少些數(shù)據(jù)塊。
Spark 集群模式JVM分配
OK,通過上面的講解,我們應(yīng)該對Spark進(jìn)程有了進(jìn)一步的理解,并且已經(jīng)知道它是如何利用JVM進(jìn)程中的內(nèi)存。現(xiàn)在切換到集群上,以YARN模式為例。
在YARN集群里,它有一個(gè)YARN ResourceMananger 守護(hù)進(jìn)程控制著集群資源(也就是memory),還有一系列運(yùn)行在集群各個(gè)節(jié)點(diǎn)的YARN Node Managers控制著節(jié)點(diǎn)資源的使用。從YARN的角度來看,每個(gè)節(jié)點(diǎn)可以看做是可分配的RAM池,當(dāng)你向ResourceManager發(fā)送request請求資源時(shí),它會(huì)返回一些NodeManager信息,這些NodeManager將會(huì)為你提供execution container,而每個(gè)execution container 都是一個(gè)你發(fā)送請求時(shí)指定的heap size的JVM進(jìn)程。JVM的位置是由 YARN ResourceMananger 管理的,你沒有控制權(quán)限。如果某個(gè)節(jié)點(diǎn)有64GB的RAM被YARN控制著(可通過設(shè)置yarn-site.xml 配置文件中參數(shù) yarn.nodemanager.resource.memory-mb ),當(dāng)你請求10個(gè)4G內(nèi)存的executors時(shí),這些executors可能運(yùn)行在同一個(gè)節(jié)點(diǎn)上,即便你的集群跟大也無濟(jì)于事。
當(dāng)以YARN模式啟動(dòng)spark集群時(shí),你可以指定executors的數(shù)量(-num-executors 或者 spark.executor.instances 參數(shù)),可以指定每個(gè)executor 固有的內(nèi)存大小(-executor-memory 或者 spark.executor.memory),可以指定每個(gè)executor使用的cpu核數(shù)(-executor-cores 或者 spark.executor.cores),可以指定分配給每個(gè)task的core的數(shù)量(spark.task.cpus),還可以指定 driver 上使用的內(nèi)存(-driver-memory 或者spark.driver.memory)。
當(dāng)你在集群上執(zhí)行應(yīng)用程序時(shí),job程序會(huì)被切分成多個(gè)stages,每個(gè)stage又會(huì)被切分成多個(gè)task,每個(gè)task單獨(dú)調(diào)度,可以把每個(gè)executor的JVM進(jìn)程看做是task執(zhí)行槽池,每個(gè)executor 會(huì)給你的task設(shè)置 spark.executor.cores/ spark.task.cpus execution個(gè)執(zhí)行槽。例如,在集群的YARN NodeManager中運(yùn)行有12個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)有64G內(nèi)存和32個(gè)CPU核(16個(gè)超線程物理core)。每個(gè)節(jié)點(diǎn)可以啟動(dòng)2個(gè)26G內(nèi)存的executor(剩下的RAM用于系統(tǒng)程序、YARN NM 和DataNode),每個(gè)executor有12個(gè)cpu核可以用于執(zhí)行task(剩下的用于系統(tǒng)程序、YARN NM 和DataNode),這樣整個(gè)集群可以處理 12 machines * 2 executors per machine * 12 cores per executor / 1 core = 288 個(gè)task 執(zhí)行槽,這意味著你的spark集群可以同時(shí)跑288個(gè)task,幾乎充分利用了所有的資源。整個(gè)集群用于緩存數(shù)據(jù)的內(nèi)存有0.9 spark.storage.safetyFraction * 0.6 spark.storage.memoryFraction * 12 machines * 2 executors per machine * 26 GB per executor = 336.96 GB. 實(shí)際上沒有那么多,但在大多數(shù)情況下,已經(jīng)足夠了。
到這里,大概已經(jīng)了解了spark是如何使用JVM的內(nèi)存,并且知道什么是集群的執(zhí)行槽。而關(guān)于task,它是Spark執(zhí)行的工作單元,并且作為exector JVM 進(jìn)程中的一個(gè)thread執(zhí)行。這也是為什么Spark job啟動(dòng)時(shí)間快的原因,在JVM中啟動(dòng)一個(gè)線程比啟動(dòng)一個(gè)單獨(dú)的JVM進(jìn)程塊,而在Hadoop中執(zhí)行MapReduce應(yīng)用會(huì)啟動(dòng)多個(gè)JVM進(jìn)程。
Spark Partition
下面來談?wù)凷park的另一個(gè)抽象概念”partition”。在Spark程序運(yùn)行過程中,所有的數(shù)據(jù)都會(huì)被切分成多個(gè)Partion。問題是一個(gè)parition是什么并且如何決定partition的數(shù)量呢?首先Partition的大小完全依賴于你的數(shù)據(jù)源。在Spark中,大部分用于讀取數(shù)據(jù)的method都可以指定生成的RDD中Partition數(shù)量。當(dāng)你從hdfs上讀取一個(gè)文件時(shí),你會(huì)使用Hadoop的InputFormat來指定,默認(rèn)情況下InputFormat返回每個(gè)InputSplit都會(huì)映射到RDD中的一個(gè)Partition上。對于HDFS上的大部分文件,每個(gè)數(shù)據(jù)塊都會(huì)生成一個(gè)InputSplit,大小近似為64 MB/128 MB的數(shù)據(jù)。近似情況下,HDFS上數(shù)據(jù)的塊邊界是按字節(jié)來算的(64MB一個(gè)塊),但是當(dāng)數(shù)據(jù)被處理時(shí),它會(huì)按記錄進(jìn)行切分。對于文本文件來說切分的字符就是換行符,對于sequence文件,它以塊結(jié)束等等。比較特殊的是壓縮文件,由于整個(gè)文件被壓縮了,因此不能按行進(jìn)行切分了,整個(gè)文件只有一個(gè)inputsplit,這樣spark中也會(huì)只有一個(gè)parition,在處理的時(shí)候需要手動(dòng)對它進(jìn)行repatition。