偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

spark 自己的分布式存儲(chǔ)系統(tǒng) - BlockManager

存儲(chǔ) 存儲(chǔ)軟件 大數(shù)據(jù) Spark 分布式
BlockManager 是 spark 中至關(guān)重要的一個(gè)組件, 在 spark的的運(yùn)行過(guò)程中到處都有 BlockManager 的身影, 只有搞清楚 BlockManager 的原理和機(jī)制,你才能更加深入的理解 spark。 今天我們來(lái)揭開(kāi) BlockaManager 的底層原理和設(shè)計(jì)思路,

整體架構(gòu)

BlockManager 是 spark 中至關(guān)重要的一個(gè)組件, 在 spark的的運(yùn)行過(guò)程中到處都有 BlockManager 的身影, 只有搞清楚 BlockManager 的原理和機(jī)制,你才能更加深入的理解 spark。 今天我們來(lái)揭開(kāi) BlockaManager 的底層原理和設(shè)計(jì)思路,

BlockManager 是一個(gè)嵌入在 spark 中的 key-value型分布式存儲(chǔ)系統(tǒng),是為 spark 量身打造的,

BlockManager 在一個(gè) spark 應(yīng)用中作為一個(gè)本地緩存運(yùn)行在所有的節(jié)點(diǎn)上, 包括所有 driver 和 executor上。 BlockManager 對(duì)本地和遠(yuǎn)程提供一致的 get 和set 數(shù)據(jù)塊接口, BlockManager 本身使用不同的存儲(chǔ)方式來(lái)存儲(chǔ)這些數(shù)據(jù), 包括 memory, disk, off-heap。

 

上面是一個(gè)整體的架構(gòu)圖, BlockManagerMaster擁有BlockManagerMasterEndpoint 的actor和所有BlockManagerSlaveEndpoint的ref, 可以通過(guò)這些引用對(duì) slave 下達(dá)命令

executor 節(jié)點(diǎn)上的BlockManagerMaster 則擁有BlockManagerMasterEndpoint的ref和自身BlockManagerSlaveEndpoint的actor??梢酝ㄟ^(guò) Master的引用注冊(cè)自己。

在master 和 slave 可以正常的通信之后, 就可以根據(jù)設(shè)計(jì)的交互協(xié)議進(jìn)行交互, 整個(gè)分布式緩存系統(tǒng)也就運(yùn)轉(zhuǎn)起來(lái)了,

初始化

我們知道, sparkEnv 啟動(dòng)的時(shí)候會(huì)啟動(dòng)各個(gè)組件, BlockManager 也不例外, 也是這個(gè)時(shí)候啟動(dòng)的,

啟動(dòng)的時(shí)候會(huì)根據(jù)自己是在 driver 還是 executor 上進(jìn)行不同的啟動(dòng)過(guò)程,

  1. def registerOrLookupEndpoint( 
  2.         name: String, endpointCreator: => RpcEndpoint): 
  3.       RpcEndpointRef = { 
  4.       if (isDriver) { 
  5.         logInfo("Registering " + name
  6.         rpcEnv.setupEndpoint(name, endpointCreator) 
  7.       } else { 
  8.         RpcUtils.makeDriverRef(name, conf, rpcEnv) 
  9.       } 
  10.     } 

上圖是 sparkEnv 在 master上啟動(dòng)的時(shí)候, 構(gòu)造了一個(gè) BlockManagerMasterEndpoint, 然后把這個(gè)Endpoint 注冊(cè)在 rpcEnv中, 同時(shí)也會(huì)啟動(dòng)自己的 BlockManager

上圖是 sparkEnv 在executor上啟動(dòng)的時(shí)候, 通過(guò) setupEndpointRef 方法獲取到了  BlockManagerMaster的引用 BlockManagerMasterRef, 同時(shí)也會(huì)啟動(dòng)自己的 BlockManager,

在 BlockManager 初始化自己的時(shí)候, 會(huì)向 BlockManagerMasterEndpoint 注冊(cè)自己, BlockManagerMasterEndpoint 發(fā)送 registerBlockManager消息,  BlockManagerMasterEndpoint 接受到消息, 把 BlockManagerSlaveEndpoint  的引用 保存在自己的  blockManagerInfo 數(shù)據(jù)結(jié)構(gòu)中以待后用。

分布式協(xié)議

下面的一個(gè)表格是 master 和 slave 接受到各種類型的消息, 以及接受到消息后,做的處理。

  • BlockManagerMasterEndpoint  接受的消息

  • BlockManagerSlaveEndpoint 接受的消息

根據(jù)以上的協(xié)議, 相信我們可以很清楚的猜測(cè)整個(gè)交互的流程, 一般過(guò)程應(yīng)該是這樣的, slave的 BlockManager  在自己接的上存儲(chǔ)一個(gè) Block, 然后把這個(gè) BlockId 匯報(bào)到master的BlockManager , 經(jīng)過(guò) cache, shuffle 或者 Broadcast后,別的節(jié)點(diǎn)需要上一步的Block的時(shí)候, 會(huì)到 master 獲取數(shù)據(jù)所在位置, 然后去相應(yīng)節(jié)點(diǎn)上去 fetch。

存儲(chǔ)層

在RDD層面上我們了解到RDD是由不同的partition組成的,我們所進(jìn)行的transformation和action是在partition上面進(jìn)行的;而在storage模塊內(nèi)部,RDD又被視為由不同的block組成,對(duì)于RDD的存取是以block為單位進(jìn)行的,本質(zhì)上partition和block是等價(jià)的,只是看待的角度不同。在Spark storage模塊中中存取數(shù)據(jù)的最小單位是block,所有的操作都是以block為單位進(jìn)行的。

 

BlockManager對(duì)象被創(chuàng)建的時(shí)候會(huì)創(chuàng)建出MemoryStore和DiskStore對(duì)象用以存取block,如果內(nèi)存中擁有足夠的內(nèi)存, 就 使用 MemoryStore存儲(chǔ),  如果 不夠, 就 spill 到 磁盤中, 通過(guò) DiskStore進(jìn)行存儲(chǔ)。

  • DiskStore 有一個(gè)DiskBlockManager,DiskBlockManager 主要用來(lái)創(chuàng)建并持有邏輯 blocks 與磁盤上的 blocks之間的映射,一個(gè)邏輯 block 通過(guò) BlockId 映射到一個(gè)磁盤上的文件。 在 DiskStore 中會(huì)調(diào)用  diskManager.getFile 方法, 如果子文件夾不存在,會(huì)進(jìn)行創(chuàng)建, 文件夾的命名方式為(spark-local-yyyyMMddHHmmss-xxxx, xxxx是一個(gè)隨機(jī)數(shù)), 所有的block都會(huì)存儲(chǔ)在所創(chuàng)建的folder里面。
  • MemoryStore 相對(duì)于DiskStore需要根據(jù)block id hash計(jì)算出文件路徑并將block存放到對(duì)應(yīng)的文件里面,MemoryStore管理block就顯得非常簡(jiǎn)單:MemoryStore內(nèi)部維護(hù)了一個(gè)hash map來(lái)管理所有的block,以block id為key將block存放到hash map中。而從MemoryStore中取得block則非常簡(jiǎn)單,只需從hash map中取出block id對(duì)應(yīng)的value即可。

BlockManager 的 PUT 和GET接口

BlockManager 提供了 Put 接口和 Get 接口, 這兩個(gè) api 屏蔽了底層的細(xì)節(jié), 我們來(lái)看下底層是如何實(shí)現(xiàn)的

  • GET操作 如果 local 中存在就直接返回, 從本地獲取一個(gè)Block, 會(huì)先判斷如果是 useMemory, 直接從內(nèi)存中取出, 如果是 useDisk, 會(huì)從磁盤中取出返回, 然后根據(jù)useMemory判斷是否在內(nèi)存中緩存一下,方便下次獲取,  如果local 不存在, 從其他節(jié)點(diǎn)上獲取, 當(dāng)然元信息是存在 drive上的,要根據(jù)我們上文中提到的 GETlocation 協(xié)議獲取 Block 所在節(jié)點(diǎn)位置, 然后到其他節(jié)點(diǎn)上獲取。
  • PUT操作 操作之前會(huì)加鎖來(lái)避免多線程的問(wèn)題, 存儲(chǔ)的時(shí)候會(huì)根據(jù) 存儲(chǔ)級(jí)別, 調(diào)用對(duì)應(yīng)的是 memoryStore 還是  diskStore, 然后在具體存儲(chǔ)器上面調(diào)用 存儲(chǔ)接口。 如果有 replication 需求, 會(huì)把數(shù)據(jù)備份到其他的機(jī)器上面。

blockManager 和 blockTransferService 關(guān)系

spark 歷史上使用過(guò)兩套網(wǎng)絡(luò)框架, 最開(kāi)始的時(shí)候, rpc 調(diào)用使用的是 akka, 大文件傳輸使用的是 netty,  后面統(tǒng)一全部使用 netty,  這里的大文件傳輸其實(shí)走的是 netty,  在啟動(dòng) blockManager的時(shí)候會(huì)啟動(dòng)一個(gè) blockTransferService 服務(wù), 這個(gè)服務(wù)就是用來(lái)傳輸大文件用的, 對(duì)應(yīng)的具體類是  NettyBlockTransferService, 這個(gè)實(shí)例中也會(huì)有 BlocakManager的引用, 會(huì)啟動(dòng)一個(gè) NettyBlockRpcServer的 netty Handler, 也擁有 BlocakManager 的引用,  用來(lái)提供服務(wù), BlocakManager 根據(jù) BlockId 獲取一個(gè) Block 然后包裝為一個(gè) ManagedBuffer 對(duì)象,

當(dāng)我們需要從遠(yuǎn)端獲取一個(gè) Block的時(shí)候,就需要 blockTransferService 傳輸大的字節(jié)數(shù)組,

首先需要從 driver上獲取到 Block的真正存儲(chǔ)位置, 然后調(diào)用 blockTransferService 的 fetchBlocks方法, 去其他真正存儲(chǔ)節(jié)點(diǎn)上去fetch數(shù)據(jù), 會(huì)從 client 資源池中獲取一個(gè)client,  如果是一對(duì)一的進(jìn)行fetch,  使用的是 OneForOneBlockFetcher, 這個(gè)Fetcher 是以 Chunks 為單位分別單獨(dú)fetch,  每個(gè) Chunks 也就對(duì)應(yīng)一個(gè)Block的數(shù)據(jù), 根據(jù)配置,會(huì)進(jìn)行重試直到***重試次數(shù),發(fā)送 OpenBlocks消息,  里面會(huì)包裝對(duì)應(yīng)的是哪個(gè)  BlockId,  其他節(jié)點(diǎn)服務(wù)端會(huì)根據(jù) BlockId 從 blockManager中拿到數(shù)據(jù), 然后用來(lái)傳輸, 使用的是 netty 的流式傳輸方式, 同時(shí)也會(huì)有回調(diào)函數(shù),

如果是備份的時(shí)候同步上傳一個(gè) Block,  其他節(jié)點(diǎn)服務(wù)端會(huì)根據(jù),uploadBlock消息中包含的BlockId, 在本地的BlockManager 中冗余存儲(chǔ)一份,

ChunkFetch也有一個(gè)類似Stream的概念,ChunkFetch的對(duì)象是“一個(gè)內(nèi)存中的Iterator[ManagedBuffer]”,即一組Buffer,每一個(gè)Buffer對(duì)應(yīng)一個(gè)chunkIndex,整個(gè)Iterator[ManagedBuffer]由一個(gè)StreamID標(biāo)識(shí)。Client每次的ChunkFetch請(qǐng)求是由(streamId,chunkIndex)組成的唯一的StreamChunkId,Server端根據(jù)StreamChunkId獲取為一個(gè)Buffer并返回給Client; 不管是Stream還是ChunkFetch,在Server的內(nèi)存中都需要管理一組由StreamID與資源之間映射,即StreamManager類,它提供了getChunk和openStream兩個(gè)接口來(lái)分別響應(yīng)ChunkFetch與Stream兩種操作,并且針對(duì)Server的ChunkFetch提供一個(gè)registerStream接口來(lái)注冊(cè)一組Buffer,比如可以將BlockManager中一組BlockID對(duì)應(yīng)的Iterator[ManagedBuffer]注冊(cè)到StreamManager,從而支持遠(yuǎn)程Block Fetch操作。

對(duì)于ExternalShuffleService(一種單獨(dú)shuffle服務(wù)進(jìn)程,對(duì)其他計(jì)算節(jié)點(diǎn)提供本節(jié)點(diǎn)上面的所有shuffle map輸出),它為遠(yuǎn)程Executor提供了一種OpenBlocks的RPC接口,即根據(jù)請(qǐng)求的appid,executorid,blockid(appid+executor對(duì)應(yīng)本地一組目錄,blockid拆封出)從本地磁盤中加載一組FileSegmentManagedBuffer到內(nèi)存,并返回加載后的streamId返回給客戶端,從而支持后續(xù)的ChunkFetch的操作。

Partition 與 Block 的關(guān)系

我們都知道, RDD 的運(yùn)算是基于 partition, 每個(gè) task 代表一個(gè) 分區(qū)上一個(gè) stage 內(nèi)的運(yùn)算閉包, task 被分別調(diào)度到 多個(gè) executor上去運(yùn)行, 那么是在哪里變成了 Block 呢,  我們以 spark 2.11 源碼為準(zhǔn), 看看這個(gè)轉(zhuǎn)變過(guò)程,

一個(gè) RDD 調(diào)度到 executor 上會(huì)運(yùn)行調(diào)用 getOrCompute方法,

  1. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { 
  2.       readCachedBlock = false 
  3.       computeOrReadCheckpoint(partition, context) 
  4.     }) 

如果 Block 在 BlockManager 中存在, 就會(huì)從 BlockManager 中獲取,如果不存在, 就進(jìn)行計(jì)算這個(gè)Block, 然后在 BlockManager 中進(jìn)行存儲(chǔ)持久化, 方便下次使用,

當(dāng)然獲取的時(shí)候是先從本地的 BlockManager 中獲取, 如果本地沒(méi)有, 然后再 從 remote 獲取, 先從 driver 上獲取到元數(shù)據(jù) Block的位置, 然后去到真正的節(jié)點(diǎn)上fetch

如果沒(méi)有, 就進(jìn)行計(jì)算, 然后根據(jù)存儲(chǔ)級(jí)別,存儲(chǔ)到計(jì)算節(jié)點(diǎn)本地的BlockManager 的內(nèi)存或磁盤中,

這樣RDD的transformation、action就和block數(shù)據(jù)建立了聯(lián)系,雖然抽象上我們的操作是在partition層面上進(jìn)行的,但是partition最終還是被映射成為block,因此實(shí)際上我們的所有操作都是對(duì)block的處理和存取。

blockManager 在 spark 中扮演的角色

blockManager 是非常非常重要的一個(gè) spark 組件, 我們隨便舉幾個(gè)例子, 你就知道 BlockManager 多重要了 ,

  • spark  shuffle 的過(guò)程總用到了 BlockManager 作為數(shù)據(jù)的中轉(zhuǎn)站
  • spark broadcast 調(diào)度 task 到多個(gè) executor 的時(shí)候, broadCast 底層使用的數(shù)據(jù)存儲(chǔ)層
  • spark streaming  一個(gè) ReceiverInputDStream 接受到的數(shù)據(jù)也是先放在 BlockManager 中, 然后封裝為一個(gè) BlockRdd 進(jìn)行下一步運(yùn)算的
  • 如果我們 對(duì)一個(gè) rdd 進(jìn)行了cache, cacheManager 也是把數(shù)據(jù)放在了 blockmanager 中, 截?cái)嗔擞?jì)算鏈依賴, 后續(xù)task 運(yùn)行的時(shí)候可以直接從 cacheManager 中獲取到 cacherdd ,不用再?gòu)念^計(jì)算。

spark cache  與  spark   broadcast task

我隨便舉兩個(gè)例子, 看看具體 spark cache 和 spark  broadcast 調(diào)度 task 的時(shí)候怎么用的 blockManager的

spark cache

rdd 計(jì)算的時(shí)候, 首先根據(jù)RDD id和partition index構(gòu)造出block id (rdd_xx_xx), 接著從BlockManager中取出相應(yīng)的block, 如果該block存在,表示此RDD在之前已經(jīng)被計(jì)算過(guò)和存儲(chǔ)在BlockManager中,因此取出即可,無(wú)需再重新計(jì)算。 如果 block 不存在我們可以 計(jì)算出來(lái), 然后吧 block 通過(guò)   doPutIterator 函數(shù)存儲(chǔ)在 節(jié)點(diǎn)上的 BlockManager上面, 匯報(bào)block信息到 driver, 下次如果使用同一個(gè) rdd, 就可以直接從分布式存儲(chǔ)中 直接取出相應(yīng)的 block

下面看一下源碼

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 
  2.     if (storageLevel != StorageLevel.NONE) { 
  3.       getOrCompute(split, context) 
  4.     } else { 
  5.       computeOrReadCheckpoint(split, context) 
  6.     } 
  7.   } 

如果存儲(chǔ)級(jí)別不是 NONE類型就會(huì)調(diào)用 getOrCompute 這個(gè)我們已經(jīng)看過(guò)了,  里面實(shí)際調(diào)用  SparkEnv.get.blockManager.getOrElseUpdate 方法, 如果 Block 在 BlockManager 中存在, 就會(huì)從 BlockManager 中獲取,如果不存在, 就進(jìn)行計(jì)算這個(gè)Block, 然后在 BlockManager 中進(jìn)行存儲(chǔ)持久化, 方便下次使用,

在  BlockManager 進(jìn)行存儲(chǔ)后, 會(huì)調(diào)用下面的代碼把 匯報(bào)block信息到 driver,

  1. private def tryToReportBlockStatus( 
  2.      blockId: BlockId, 
  3.      status: BlockStatus, 
  4.      droppedMemorySize: Long = 0L): Boolean = { 
  5.    val storageLevel = status.storageLevel 
  6.    val inMemSize = Math.max(status.memSize, droppedMemorySize) 
  7.    val onDiskSize = status.diskSize 
  8.    master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) 
  9.  } 

實(shí)際上上想  masterEndpoint 的引用發(fā)送一條 UpdateBlockInfo消息,  master 會(huì)把這個(gè) blockId 對(duì)應(yīng)的 location 放在 driver 上,

同樣的如果一個(gè) Block已經(jīng)計(jì)算過(guò)了,會(huì)到 driver 上獲取到 location 信息

  1. private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { 
  2.    val locs = Random.shuffle(master.getLocations(blockId)) 
  3.    val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } 
  4.    preferredLocs ++ otherLocs 
  5.  } 

spark   broadcast task

這個(gè)調(diào)度 task 到多個(gè) task 上面過(guò)程代碼太多,我就不貼了, 直接說(shuō)一下流程,

  • DAGScheduler 在  submitMissingTasks 方法提交 task的時(shí)候, 會(huì)把 task 包裝為一個(gè) Broadcast 類型, 里面使用 TorrentBroadcastFactory 創(chuàng)建一個(gè) TorrentBroadcast 的類型, 使用的是p2p的協(xié)議, 會(huì)減輕 master 的壓力,  這個(gè)里面會(huì) 調(diào)用 writeBlocks 里面把taskBinary  通過(guò) blockManager.putSingle 放在 BlockManager 緩存中
  • ShuffleMapTask 或者 ResultTask,然后調(diào)用 runTask 方法, 里面實(shí)際上會(huì)調(diào)用 Broadcast 的value 方法, 里面最終調(diào)用了 BlockManager 的 getLocalBytes 或者 getRemoteBytes 方法

blockManager 在  spark streaming 中的應(yīng)用

  • ReceiverTracker 在啟動(dòng)的時(shí)候,會(huì)運(yùn)行一個(gè) job, 這個(gè)job 就是到 各個(gè)executor上去啟動(dòng) ReceiverSupervisorImpl, 然后啟動(dòng)各個(gè)具體的數(shù)據(jù)接收器,  如果是SocketInputDStream, 就會(huì)啟動(dòng)一個(gè) SocketReceiver,
  • Receiver 接收到數(shù)據(jù)后, 先在 BlockGenerator 中緩存, 等到達(dá)一定的大小后,  調(diào)用 BlockManagerBasedBlockHandler 的 storeBlock方法持久化到 BlockManager 中, 然后把數(shù)據(jù)信息匯報(bào)到 ReceiverTracker上, 最終 匯總到   ReceivedBlockTracker 中的 timeToAllocatedBlocks中,
  • ReceiverInputDStream compute的時(shí)候,  receivedBlockTracker 會(huì)根據(jù)時(shí)間獲取到  BlockManager 中的元信息,里面最終對(duì)應(yīng)的還是 BlockManager 的存儲(chǔ)位置, 最終獲取到數(shù)據(jù)進(jìn)行計(jì)算,

測(cè)試 blockManager

我們做一個(gè)簡(jiǎn)單的測(cè)試,兩端代碼的區(qū)別就是 一個(gè) 進(jìn)行了cache ,一個(gè)沒(méi)有進(jìn)行cache。

  1. val file = sc.textFile("/fusionlog/midsourcenew/2017-03-13-18-15_2.gz" 
  2. file.count()  
  3. file.count() 

我們從日志可以觀察出來(lái), ***段代碼, 兩個(gè) job 中都從 hdfs 中讀取文件, 讀取了兩次,

  1. val file = sc.textFile("/fusionlog/midsourcenew/2017-03-13-18-15_2.gz").cache() 
  2. file.count() 
  3. file.count() 

有以下日志

  1. MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 1354.9 MB, free 4.9 GB) 
  2. BlockManager: Found block rdd_1_0 locally 

我們發(fā)現(xiàn)在***次讀取文件后, 把文件 cache 在了 blockManager 中, 下一個(gè) job 運(yùn)行的時(shí)候, 在本地 BlockManager 直接發(fā)現(xiàn)獲取到了 block , 沒(méi)有讀取 hdfs 文件 ,

在 spark ui 中也發(fā)現(xiàn)了 cache的 Block, 全部是在內(nèi)存中緩存的, 

責(zé)任編輯:武曉燕 來(lái)源: spark技術(shù)分享
相關(guān)推薦

2017-04-14 09:48:25

分布式存儲(chǔ)系統(tǒng)

2018-09-29 14:08:04

存儲(chǔ)系統(tǒng)分布式

2017-10-16 10:24:47

LogDevice存儲(chǔ)系統(tǒng)

2017-07-18 09:51:36

文件存儲(chǔ)系統(tǒng)

2017-10-12 09:36:54

分布式存儲(chǔ)系統(tǒng)

2017-10-19 08:45:15

存儲(chǔ)系統(tǒng)HBase

2018-11-20 09:19:58

存儲(chǔ)系統(tǒng)雪崩效應(yīng)

2017-10-17 08:33:31

存儲(chǔ)系統(tǒng)分布式

2017-12-18 10:47:04

分布式存儲(chǔ)數(shù)據(jù)

2019-10-15 10:59:43

分布式存儲(chǔ)系統(tǒng)

2019-05-13 15:20:42

存儲(chǔ)系統(tǒng)算法

2018-10-24 11:01:53

分布式存儲(chǔ)系統(tǒng)

2013-12-27 10:56:42

分布式對(duì)象存儲(chǔ)Sheepdog性能測(cè)試

2014-02-19 11:37:57

分布式對(duì)象存儲(chǔ)Sheepdog

2010-07-02 10:08:12

BigtableGoogle

2018-03-13 08:45:08

存儲(chǔ)系統(tǒng)DHT算法

2018-10-29 12:42:23

Ceph分布式存儲(chǔ)

2025-01-26 11:54:39

分布式存儲(chǔ)系統(tǒng)

2021-08-07 05:00:20

存儲(chǔ)系統(tǒng)

2021-07-04 07:07:06

Ceph分布式存儲(chǔ)架構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)