大數(shù)據(jù)計(jì)算框架Spark之任務(wù)調(diào)度
Spark有幾種資源調(diào)度設(shè)施。每個(gè)Spark Application(SparkContext實(shí)例)獨(dú)立地運(yùn)行在一組executor進(jìn)程內(nèi)。cluster manager為應(yīng)用間的調(diào)度提供設(shè)施。在每個(gè)Spark應(yīng)用內(nèi),如果將多個(gè)job(多個(gè)spark action)提交給不同的線程,那么他們會(huì)并行運(yùn)行。
1 Application間的資源調(diào)度
集群上,每個(gè)Spark application獲得獨(dú)立的一組executor JVM,這組executor JVM只為那個(gè)application運(yùn)行task和存儲(chǔ)數(shù)據(jù)。如果多個(gè)用戶要共享集群,有不同的策略管理資源分配,這取決于使用的cluster manager。
資源的靜態(tài)分區(qū)(static partitioning)可被所有的cluster manager獲得,這樣每個(gè)application在他的生命周期內(nèi)都可獲得他能使用的最多資源。standalone、YARN、coarse-grained Mesos mode這三種模式使用的就是這種方式。
1.1控制資源使用
集群類型下,如下配置資源分配:
- Standalone mode:application提交到standalone mode集群,將會(huì)以FIFO的順序運(yùn)行,每個(gè)application會(huì)盡可能地使用所有可用節(jié)點(diǎn),配置spark.cores.max來限制application使用節(jié)點(diǎn)的數(shù)目,或者設(shè)置spark.deploy.defaultCores。除了可以設(shè)置application可用內(nèi)核數(shù),還可以設(shè)置spark.executor.memory來控制內(nèi)存的使用。
- Mesos:為了使用靜態(tài)分區(qū)(static partitioning)在Mesos集群上,spark.mesos.coarse=true,可以通過設(shè)置spark.cores.max來限制每個(gè)application的資源共享,通過設(shè)置spark.executor.memory來控制executor內(nèi)存的使用。
- YARN:通過設(shè)置--num-executors選項(xiàng),spark YARN客戶端可控制集群上有多少executor被分配(對(duì)應(yīng)的配置屬性為spark.executor.instances),--executor-memory(對(duì)應(yīng)的配置屬性spark.executor.memory)和--executor-cores(對(duì)應(yīng)的配置屬性spark.executor.cores)控制了分配給每個(gè)executor的資源。
應(yīng)用之間無法共享內(nèi)存。
1.2動(dòng)態(tài)資源分配
Spark提供了依據(jù)應(yīng)用的工作量動(dòng)態(tài)調(diào)整資源的機(jī)制。這意味著你的application不在使用的資源會(huì)返還給集群,當(dāng)需要的時(shí)候再申請(qǐng)分配資源,這種特性對(duì)于多應(yīng)用共享集群特別有用。
這個(gè)特性默認(rèn)失效,但在所有coarse-grained cluster manager上都可用,如:standalone mode, YARN mode, 和Mesos coarse-grained mode。
使用這個(gè)特性有兩個(gè)要求。首先用于必須設(shè)置spark.dynamicAllocation.enabled=true,其次要設(shè)置external shuffle service在集群上的每個(gè)worker node并設(shè)置spark.shuffle.service.enabled=true。設(shè)置external shuffle service目的是executor可被移除但是不刪除他們生成的shuffle文件。
設(shè)置這個(gè)變量的方式為:
- 在standalone模式:設(shè)置spark.shuffle.service.enabled=true
- Mesos coarse-grained模式:在所有從節(jié)點(diǎn)運(yùn)行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh設(shè)置spark.shuffle.service.enabled=true
- YARN:詳見運(yùn)行spark與YARN
1.3資源分配策略
當(dāng)Spark不再使用executor時(shí)就出讓它,需要的時(shí)候再獲取它。因?yàn)闆]有一個(gè)確定的方式預(yù)測(cè)將要被移除的executor是否在不久的將來會(huì)被使用,或者一個(gè)將要被添加的新executor實(shí)際上是否是空閑的,所以我們需要一系列試探來確定是移除executor(可能會(huì)移除多個(gè))還是請(qǐng)求executor(可能會(huì)請(qǐng)求多個(gè))。
請(qǐng)求策略
開啟Spark application動(dòng)態(tài)分配資源特性,當(dāng)pending task等待被調(diào)度時(shí),Spark application會(huì)請(qǐng)求額外的executor。這就意味著,當(dāng)前的這些executor無法同時(shí)滿足所有的task,這些task已經(jīng)被提交,但是還沒有執(zhí)行完。
Spark輪流請(qǐng)求executor。當(dāng)task等待的時(shí)間大于spark.dynamicAllocation.schedulerBacklogTimeout時(shí),真正的請(qǐng)求(申請(qǐng)executor的請(qǐng)求)被觸發(fā),之后,如果未完成task隊(duì)列存在,那么每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒請(qǐng)求被觸發(fā)一次。每一輪請(qǐng)求的executor數(shù)量以指數(shù)級(jí)增長。例如,***輪請(qǐng)求一個(gè)executor,第二輪請(qǐng)求2個(gè),第三,四輪分別請(qǐng)求4,8個(gè)。
按指數(shù)形式增長的動(dòng)機(jī)有兩個(gè),首先,起初應(yīng)用應(yīng)該慎重地請(qǐng)求executor,以防只需幾個(gè)executor就能滿足需求,這和TCP慢啟動(dòng)類似。其次,當(dāng)應(yīng)用確實(shí)需要更多的executor時(shí),應(yīng)用應(yīng)該能夠及時(shí)地增加資源的使用。
移除策略
當(dāng)executor閑置超過spark.dynamicAllocation.executorIdleTimeout秒時(shí),就將他移除。注意,大多數(shù)情況下,executor的移除條件和請(qǐng)求條件是互斥的,這樣如果仍然有待調(diào)度的task的情況下executor是不會(huì)被移除的。
executor優(yōu)雅地退役
非動(dòng)態(tài)分配資源情況下,一個(gè)Spark executor或者是由于失敗而退出,或者是因相關(guān)application退出而退出。這兩種情況下,不在需要與executor相關(guān)聯(lián)的狀態(tài)并且這些狀態(tài)可以被安全地丟棄。動(dòng)態(tài)分配資源的情況下,當(dāng)executor被明確移除時(shí),application仍然在運(yùn)行。如果application要想使用這些由executor存儲(chǔ)和寫下的狀態(tài),就必須重新計(jì)算狀態(tài)。這樣就需要一種優(yōu)雅的退役機(jī)制,即在executor退役前保留他的狀態(tài)。
這個(gè)機(jī)制對(duì)于shuffles特別重要。shuffle期間,executor自己的map輸出寫入本地磁盤。當(dāng)其他的executor要獲取這些文件的時(shí)候,這個(gè)executor充當(dāng)了文件服務(wù)器的角色。對(duì)于那些落后的executor,他們的task執(zhí)行時(shí)間比同輩要長,在shuffle完成之前,動(dòng)態(tài)資源分配可能移除了一個(gè)executor,這種情形下,那個(gè)executor寫入本地的文件(即executor的狀態(tài))不必重新計(jì)算。
保留shuffle文件的辦法就是使用外部的shuffle服務(wù),這是在Spark 1.2中引入的。這個(gè)外部的shuffle服務(wù)指的是長時(shí)間運(yùn)行的進(jìn)程,它運(yùn)行與集群的每個(gè)節(jié)點(diǎn)上,獨(dú)立于application和executor。如果這個(gè)服務(wù)可用,executor就從這個(gè)服務(wù)獲shuffle file,而不是彼此之間獲取shuffle file。這意味著executor生成的任何shuffle文件都可能被服務(wù)包含,即使在executor生命周期之外也是如此。
executor除了寫shuffle 文件到本地硬盤,還緩存數(shù)據(jù)到硬盤或內(nèi)存中。但是,當(dāng)executor被移除后,緩存到內(nèi)存中的數(shù)據(jù)將不可用。為了解決這一問題,默認(rèn)地緩存數(shù)據(jù)到內(nèi)存的executor永遠(yuǎn)不會(huì)被刪除??梢酝ㄟ^spark.dynamicAllocation.cachedExecutorIdleTimeout配置這一行為,
2 Application內(nèi)的資源調(diào)度
概述
給定的application內(nèi)部(SparkContext 實(shí)例),如果多個(gè)并行的job被提交到不同的線程上,那么這些job可以同時(shí)執(zhí)行。這里的job指的是Spark action及Spark action觸發(fā)的計(jì)算task。Spark scheduler是線程安全的,支持spark application服務(wù)于多個(gè)請(qǐng)求。
默認(rèn)地Spark scheduler以FIFO的順序執(zhí)行job,每個(gè)job被切分為一到多個(gè)stage(例如,map和reduce),當(dāng)***個(gè)job的stage的task啟動(dòng)后,這個(gè)job優(yōu)先獲得所有可用資源,然后才是第二,三個(gè)job......。如果隊(duì)頭的job不必使用整個(gè)集群,之后的job就能立即啟動(dòng)。如果隊(duì)頭的job較大,那么之后的job啟動(dòng)延遲會(huì)比較明顯。
從Spark 0.8開始,也可以通過配置實(shí)現(xiàn)隊(duì)列間的公平調(diào)度。Job間的task資源分配采用單循環(huán)的方式。所有job都會(huì)獲得大致相同的集群資源。這就意味著,當(dāng)有長job存在時(shí),提交的短job可以立即獲得資源啟動(dòng)運(yùn)行而不必等到長job執(zhí)行完畢??梢栽O(shè)置spark.scheduler.mode為FAIR
- val conf = new SparkConf().setMaster(...).setAppName(...)
- conf.set("spark.scheduler.mode", "FAIR")
- val sc = new SparkContext(conf)
公平調(diào)度池(可能多個(gè))
公平調(diào)度器也支持在池中對(duì)job分組并給每個(gè)池配置不同的選項(xiàng)。這有助于為更重要的job設(shè)置高優(yōu)先級(jí)池,例如把每個(gè)用戶的job分到一組,并且給這些用戶相等的資源不論有多少并行task,而不是給每個(gè)job相等的資源。
不需要任何干預(yù),新job會(huì)進(jìn)入默認(rèn)池,但是可以使用spark.scheduler.pool設(shè)置job池。
- sc.setLocalProperty("spark.scheduler.pool", "pool1")
設(shè)置完后,這個(gè)線程(通過調(diào)用RDD.save, count, collect)提交的所有job都會(huì)使用這個(gè)資源池的名稱。設(shè)置是針對(duì)每一個(gè)線程的,這樣更容易實(shí)現(xiàn)一個(gè)線程運(yùn)行一個(gè)用戶的多個(gè)job。如果想清除與一個(gè)線程相關(guān)的池,調(diào)用:sc.setLocalProperty("spark.scheduler.pool", null)
池默認(rèn)行為
默認(rèn)地每個(gè)池都能獲得相等的資源(在默認(rèn)池中每個(gè)job都能獲得相等的資源),但在每個(gè)池內(nèi)部,job以FIFO 的順序運(yùn)行。例如如果為每一個(gè)用戶創(chuàng)建一個(gè)池,這就意味著每一個(gè)用戶將獲得相等的資源,并且每個(gè)用戶的查詢都會(huì)按順序運(yùn)行而不會(huì)出現(xiàn)后來的查詢搶占了前面查詢的資源
配置池屬性
可以通過修改配置文件改變池屬性。每個(gè)池都支持三種屬性:
- schedulingMode:可以是FIFO或FAIR,控制池中的job排隊(duì)等候或公平地分享集群資源。
- weight:控制資源分配的比例。默認(rèn)所有池分配資源比重都是1。如果指定一個(gè)池的比重為2,那么他獲得的資源是其他池的2倍。如果將一個(gè)池的比重設(shè)的很高,比如1000,那么不論他是否有活躍的job,他總是***個(gè)開始執(zhí)行task。
- minShare:除了設(shè)置總體的占比之外,還可以對(duì)每個(gè)池設(shè)定一個(gè)最小資源分配(例如CPU核數(shù))。在根據(jù)比重重新分配資源之前,公平調(diào)度器總是試圖滿足所有活躍池的最小資源需求。minShare屬性能以另一種方式確保一個(gè)池快速地獲得一定數(shù)量的資源(10個(gè)核)而不必給他更高的優(yōu)先級(jí)。默認(rèn)地minShare=0。
調(diào)用SparkConf.set,可以通過XML文件配置池屬性:
- conf.set("spark.scheduler.allocation.file", "/path/to/file")
每個(gè)池一個(gè),在XML文件中沒有配置的池使用默認(rèn)配置(調(diào)度模式 FIFO, weight 1, minShare 0),例如:
- <?xml version="1.0"?><allocations>
- <pool name="production">
- <schedulingMode>FAIR</schedulingMode>
- <weight>1</weight>
- <minShare>2</minShare>
- </pool>
- <pool name="test">
- <schedulingMode>FIFO</schedulingMode>
- <weight>2</weight>
- <minShare>3</minShare>
- </pool></allocations>