Spark Streaming 數(shù)據(jù)清理機(jī)制
前言
為啥要了解機(jī)制呢?這就好比JVM的垃圾回收,雖然JVM的垃圾回收已經(jīng)巨牛了,但是依然會遇到很多和它相關(guān)的case導(dǎo)致系統(tǒng)運(yùn)行不正常。
這個內(nèi)容我記得自己剛接觸Spark Streaming的時候,老板也問過我,運(yùn)行期間會保留多少個RDD? 當(dāng)時沒回答出來。后面在群里也有人問到了,所以就整理了下。文中如有謬誤之處,還望指出。
DStream 和 RDD
我們知道Spark Streaming 計算還是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上關(guān)系。然而Spark Streaming 并沒有直接讓用戶使用RDD而是自己抽象了一套DStream的概念。 DStream 和 RDD 是包含的關(guān)系,你可以理解為Java里的裝飾模式,也就是DStream 是對RDD的增強(qiáng),但是行為表現(xiàn)和RDD是基本上差不多的。都具備幾個條件:
具有類似的tranformation動作,比如map,reduceByKey等,也有一些自己獨(dú)有的,比如Window,mapWithStated等
都具有Action動作,比如foreachRDD,count等
從編程模型上看是一致的。
所以很可能你寫的那堆Spark Streaming代碼看起來好像和Spark 一致的,然而并不能直接復(fù)用,因為一個是DStream的變換,一個是RDD的變化。
Spark Streaming中 DStream 介紹
DStream 下面包含幾個類:
- 數(shù)據(jù)源類,比如InputDStream,具體如DirectKafkaInputStream等
- 轉(zhuǎn)換類,典型比如MappedDStream,ShuffledDStream
- 輸出類,典型比如ForEachDStream
從上面來看,數(shù)據(jù)從開始(輸入)到結(jié)束(輸出)都是DStream體系來完成的,也就意味著用戶正常情況是無法直接去產(chǎn)生和操作RDD的,這也就是說,DStream有機(jī)會和義務(wù)去負(fù)責(zé)RDD的生命周期。
這就回答了前言中的問題了。Spark Streaming具備自動清理功能。
RDD 在Spark Stream中產(chǎn)生的流程
在Spark Streaming中RDD的生命流程大體如下:
- 在InputDStream會將接受到的數(shù)據(jù)轉(zhuǎn)化成RDD,比如DirectKafkaInputStream 產(chǎn)生的就是 KafkaRDD
- 接著通過MappedDStream等進(jìn)行數(shù)據(jù)轉(zhuǎn)換,這個時候是直接調(diào)用RDD對應(yīng)的map方法進(jìn)行轉(zhuǎn)換的
- 在進(jìn)行輸出類操作時,才暴露出RDD,可以讓用戶執(zhí)行相應(yīng)的存儲,其他計算等操作。
我們這里就以下面的代碼來進(jìn)行更詳細(xì)的解釋:
- val source = KafkaUtils.createDirectInputStream(....)
- source.map(....).foreachRDD{rdd=>
- rdd.saveTextFile(....)
- }
foreachRDD 產(chǎn)生ForEachDStream,因為foreachRDD是個Action,所以會觸發(fā)任務(wù)的執(zhí)行,會被調(diào)用generateJob方法。
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
- foreachFunc(rdd, time)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
對應(yīng)的parent是MappedDStream,也就是說調(diào)用MappedDStream.getOrCompute.該方法在DStream中,首先會在MappedDStream對象中的generatedRDDs 變量中查找是否已經(jīng)有RDD,如果沒有則觸發(fā)計算,并且將產(chǎn)生的RDD放到generatedRDDs
- @transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
- private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
- // If RDD was already generated, then retrieve it from HashMap,
- // or else compute the RDD
- generatedRDDs.get(time).orElse {
- ....
- generatedRDDs.put(time, newRDD)
- ....
計算RDD是調(diào)用的compute方法,MappedDStream 的compute方法很簡單,直接調(diào)用的父類也就是DirectKafkaInputStream的getOrCompute方法:
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.map[U](mapFunc))
- }
在上面的例子中,MappedDStream 的parent是DirectKafkaInputStream中,這是個數(shù)據(jù)源,所以他的compute方法會直接new出一個RDD.
從上面可以得出幾個結(jié)論:
- 數(shù)據(jù)源以及轉(zhuǎn)換類DStream都會維護(hù)一個generatedRDDs,可以按batchTime 進(jìn)行獲取
- 內(nèi)部本質(zhì)還是進(jìn)行的RDD的轉(zhuǎn)換
- 如果我們調(diào)用了cache會發(fā)生什么
這里又會有兩種情況,一種是調(diào)用DStream.cache,第二種是RDD.cache。事實上他們是完全一樣的。
- DStream的cache 動作只是將DStream的變量storageLevel 設(shè)置為MEMORY_ONLY_SER,然后在產(chǎn)生(或者獲取)RDD的時候,調(diào)用RDD的persit方法進(jìn)行設(shè)置。所以DStream.cache 產(chǎn)生的效果等價于RDD.cache(也就是你自己調(diào)用foreachRDD 將RDD 都設(shè)置一遍)
- 進(jìn)入正題,我們是怎么釋放Cache住的RDD的
其實無所謂Cache不Cache住,RDD最終都是要釋放的,否則運(yùn)行久了,光RDD對象也能承包了你的內(nèi)存。我們知道,在Spark Streaming中,周期性產(chǎn)生事件驅(qū)動Spark Streaming 的類其實是:
- org.apache.spark.streaming.scheduler.JobGenerator
他內(nèi)部有個永動機(jī)(定時器),定時發(fā)布一個產(chǎn)生任務(wù)的事件:
- private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
然后通過processEvent進(jìn)行事件處理:
- /** Processes all events */
- private def processEvent(event: JobGeneratorEvent) {
- logDebug("Got event " + event)
- event match {
- case GenerateJobs(time) => generateJobs(time)
- case ClearMetadata(time) => clearMetadata(time)
- case DoCheckpoint(time, clearCheckpointDataLater) =>
- doCheckpoint(time, clearCheckpointDataLater)
- case ClearCheckpointData(time) => clearCheckpointData(time)
- }
- }
目前我們只關(guān)注ClearMetadata 事件。對應(yīng)的方法為:
- private def clearMetadata(time: Time) {
- ssc.graph.clearMetadata(time)
- // If checkpointing is enabled, then checkpoint,
- // else mark batch to be fully processed
- if (shouldCheckpoint) {
- eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
- } else {
- // If checkpointing is not enabled, then delete metadata information about
- // received blocks (block data not saved in any case). Otherwise, wait for
- // checkpointing of this batch to complete.
- val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
- jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
- jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
- markBatchFullyProcessed(time)
- }
- }
首先是清理輸出DStream(比如ForeachDStream),接著是清理輸入類(基于Receiver模式)的數(shù)據(jù)。
ForeachDStream 其實調(diào)用的也是DStream的方法。該方法大體如下:
- private[streaming] def clearMetadata(time: Time) {
- val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
- val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
- logDebug("Clearing references to old RDDs: [" +
- oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
- generatedRDDs --= oldRDDs.keys
- if (unpersistData) {
- logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
- oldRDDs.values.foreach { rdd =>
- rdd.unpersist(false)
- // Explicitly remove blocks of BlockRDD
- rdd match {
- case b: BlockRDD[_] =>
- logInfo("Removing blocks of RDD " + b + " of time " + time)
- b.removeBlocks()
- case _ =>
- }
- }
- }
- logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
- (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
- dependencies.foreach(_.clearMetadata(time))
- }
大體執(zhí)行動作如下描述:
- 根據(jù)記憶周期得到應(yīng)該剔除的RDD
- 根據(jù)是否要清理cache數(shù)據(jù),進(jìn)行unpersit 操作,并且顯示的移除block
- 根據(jù)依賴調(diào)用其他的DStream進(jìn)行動作清理
這里我們還可以看到,通過參數(shù)spark.streaming.unpersist 你是可以決定是否手工控制是否需要對cache住的數(shù)據(jù)進(jìn)行清理。
這里你會有兩個疑問:
- dependencies 是什么?
- rememberDuration 是怎么來的?
dependencies 你可以簡單理解為父DStream,通過dependencies 我們可以獲得已完整DStream鏈。
rememberDuration 的設(shè)置略微復(fù)雜些,大體是 slideDuration,如果設(shè)置了checkpointDuration 則是2*checkpointDuration 或者通過DStreamGraph.rememberDuration(如果設(shè)置了的話,譬如通過StreamingContext.remember方法,不過通過該方法設(shè)置的值要大于計算得到的值會生效)
另外值得一提的就是后面的DStream 會調(diào)整前面的DStream的rememberDuration,譬如如果你用了window* 相關(guān)的操作,則在此之前的DStream 的rememberDuration 都需要加上windowDuration。
然后根據(jù)Spark Streaming的定時性,每個周期只要完成了,都會觸發(fā)清理動作,這個就是清理動作發(fā)生的時機(jī)。代碼如下:
- def onBatchCompletion(time: Time) {
- eventLoop.post(ClearMetadata(time))
- }
總結(jié)下
Spark Streaming 會在每個Batch任務(wù)結(jié)束時進(jìn)行一次清理動作。每個DStream 都會被掃描,不同的DStream根據(jù)情況不同,保留的RDD數(shù)量也是不一致的,但都是根據(jù)rememberDuration變量決定,而該變量會被下游的DStream所影響,所以不同的DStream的rememberDuration取值是不一樣的。




























