Apache Spark源碼走讀之1:論文閱讀筆記
基本概念(Basic Concepts)
RDD - resillient distributed dataset 彈性分布式數(shù)據(jù)集
Operation - 作用于RDD的各種操作分為transformation和action
Job - 作業(yè),一個(gè)JOB包含多個(gè)RDD及作用于相應(yīng)RDD上的各種operation
Stage - 一個(gè)作業(yè)分為多個(gè)階段
Partition - 數(shù)據(jù)分區(qū), 一個(gè)RDD中的數(shù)據(jù)可以分成多個(gè)不同的區(qū)
DAG - Directed Acycle graph, 有向無環(huán)圖,反應(yīng)RDD之間的依賴關(guān)系
Narrow dependency - 窄依賴,子RDD依賴于父RDD中固定的data partition
Wide Dependency - 寬依賴,子RDD對(duì)父RDD中的所有data partition都有依賴
Caching Managenment -- 緩存管理,對(duì)RDD的中間計(jì)算結(jié)果進(jìn)行緩存管理以加快整體的處理速度
編程模型(Programming Model)
RDD是只讀的數(shù)據(jù)分區(qū)集合,注意是數(shù)據(jù)集。
作用于RDD上的Operation分為transformantion和action。 經(jīng)Transformation處理之后,數(shù)據(jù)集中的內(nèi)容會(huì)發(fā)生更改,由數(shù)據(jù)集A轉(zhuǎn)換成為數(shù)據(jù)集B;而經(jīng)Action處理之后,數(shù)據(jù)集中的內(nèi)容會(huì)被歸約為一個(gè)具體的數(shù)值。
只有當(dāng)RDD上有action時(shí),該RDD及其父RDD上的所有operation才會(huì)被提交到cluster中真正的被執(zhí)行。
從代碼到動(dòng)態(tài)運(yùn)行,涉及到的組件如下圖所示。
演示代碼
val sc = new SparkContext("Spark://...", "MyJob", home, jars) val file = sc.textFile("hdfs://...") val errors = file.filter(_.contains("ERROR"))
errors.cache()
errors.count()
運(yùn)行態(tài)(Runtime view)
不管什么樣的靜態(tài)模型,其在動(dòng)態(tài)運(yùn)行的時(shí)候無外乎由進(jìn)程,線程組成。
用Spark的術(shù)語來說,static view稱為dataset view,而dynamic view稱為parition view. 關(guān)系如圖所示
在Spark中的task可以對(duì)應(yīng)于線程,worker是一個(gè)個(gè)的進(jìn)程,worker由driver來進(jìn)行管理。
那么問題來了,這一個(gè)個(gè)的task是如何從RDD演變過來的呢?下節(jié)將詳細(xì)回答這個(gè)問題。
部署(Deployment view)
當(dāng)有Action作用于某RDD時(shí),該action會(huì)作為一個(gè)job被提交。
在提交的過程中,DAGScheduler模塊介入運(yùn)算,計(jì)算RDD之間的依賴關(guān)系。RDD之間的依賴關(guān)系就形成了DAG。
每一個(gè)JOB被分為多個(gè)stage,劃分stage的一個(gè)主要依據(jù)是當(dāng)前計(jì)算因子的輸入是否是確定的,如果是則將其分在同一個(gè)stage,避免多個(gè)stage之間的消息傳遞開銷。
當(dāng)stage被提交之后,由taskscheduler來根據(jù)stage來計(jì)算所需要的task,并將task提交到對(duì)應(yīng)的worker.
Spark支持以下幾種部署模式1)standalone 2)Mesos 3) yarn. 這些部署模式將作為taskscheduler的初始化入?yún)ⅰ?/p>
RDD接口(RDD Interface)
RDD由以下幾個(gè)主要部分組成
-
partitions -- partition集合,一個(gè)RDD中有多少data partition
-
dependencies -- RDD依賴關(guān)系
-
compute(parition) -- 對(duì)于給定的數(shù)據(jù)集,需要作哪些計(jì)算
-
preferredLocations -- 對(duì)于data partition的位置偏好
-
partitioner -- 對(duì)于計(jì)算出來的數(shù)據(jù)結(jié)果如何分發(fā)
緩存機(jī)制(caching)
RDD的中間計(jì)算結(jié)果可以被緩存起來,緩存先選Memory,如果Memory不夠的話,將會(huì)被寫入到磁盤中。
根據(jù)LRU(last-recent update)來決定哪先內(nèi)容繼續(xù)保存在內(nèi)存,哪些保存到磁盤。
容錯(cuò)性(Fault-tolerant)
從最初始的RDD到衍生出來的***一個(gè)RDD,中間要經(jīng)過一系列的處理。那么如何處理中間環(huán)節(jié)出現(xiàn)錯(cuò)誤的場(chǎng)景呢?
Spark提供的解決方案是只對(duì)失效的data partition進(jìn)行事件重演,而無須對(duì)整個(gè)數(shù)據(jù)全集進(jìn)行事件重演,這樣可以大大加快場(chǎng)景恢復(fù)的開銷。
RDD又是如何知道自己的data partition的number該是多少?如果是hdfs文件,那么hdfs文件的block將會(huì)成為一個(gè)重要的計(jì)算依據(jù)。
集群管理(cluster management)
task運(yùn)行在cluster之上,除了spark自身提供的standalone部署模式之外,spark還內(nèi)在支持yarn和mesos.
Yarn來負(fù)責(zé)計(jì)算資源的調(diào)度和監(jiān)控,根據(jù)監(jiān)控結(jié)果來重啟失效的task或者是重新distributed task一旦有新的node加入cluster的話。
這一部分的內(nèi)容需要參考yarn的文檔。
小結(jié)
在源碼閱讀時(shí),需要重點(diǎn)把握以下兩大主線。
-
靜態(tài)view 即 RDD, transformation and action
-
動(dòng)態(tài)view 即 life of a job, 每一個(gè)job又分為多個(gè)stage,每一個(gè)stage中可以包含多個(gè)rdd及其transformation,這些stage又是如何映射成為task被distributed到cluster中
參考資料(reference)
-
Introduction to Spark Internals http://files.meetup.com/3138542/dev-meetup-dec-2012.pptx
-
Resilient Distributed Datasets: A Fault-tolerant Abstraction for In-Memory Cluster Computing https://www.usenix.org/system/files/.../nsdi12-final138.pdf
-
Lightning-Fast Cluster Computing with Spark and Shark http://www.meetup.com/TriHUG/events/112474102/
原文鏈接:http://www.cnblogs.com/hseagle/p/3664933.html