偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

HDFS 底層交互原理,看這篇就夠了

大數(shù)據(jù)
HDFS全稱是 Hadoop Distribute File System,是 Hadoop最重要的組件之一,也被稱為分步式存儲之王。本文主要從 HDFS 高可用架構(gòu)組成、HDFS 讀寫流程、如何保證可用性以及高頻面試題出發(fā),提高大家對 HDFS 的認(rèn)識,掌握一些高頻的 HDFS 面試題。

[[423115]]

 前言

大家好,我是林哥!

HDFS全稱是 Hadoop Distribute File System,是 Hadoop最重要的組件之一,也被稱為分步式存儲之王。本文主要從 HDFS 高可用架構(gòu)組成、HDFS 讀寫流程、如何保證可用性以及高頻面試題出發(fā),提高大家對 HDFS 的認(rèn)識,掌握一些高頻的 HDFS 面試題。本篇文章概覽如下圖:

本篇文章概覽

1.HA 架構(gòu)組成

1.1HA架構(gòu)模型

在 HDFS 1.X 時,NameNode 是 HDFS 集群中可能發(fā)生單點故障的節(jié)點,集群中只有一個 NameNode,一旦 NameNode 宕機(jī),整個集群將處于不可用的狀態(tài)。

在 HDFS 2.X 時,HDFS 提出了高可用(High Availability, HA)的方案,解決了 HDFS 1.X 時的單點問題。在一個 HA 集群中,會配置兩個 NameNode ,一個是 Active NameNode(主),一個是 Stadby NameNode(備)。主節(jié)點負(fù)責(zé)執(zhí)行所有修改命名空間的操作,備節(jié)點則執(zhí)行同步操作,以保證與主節(jié)點命名空間的一致性。HA 架構(gòu)模型如下圖所示:

HA 架構(gòu)組成2

HA 集群中所包含的進(jìn)程的職責(zé)各不相同。為了使得主節(jié)點和備用節(jié)點的狀態(tài)一致,采用了 Quorum Journal Manger (QJM)方案解決了主備節(jié)點共享存儲問題,如圖 JournalNode 進(jìn)程,下面依次介紹各個進(jìn)程在架構(gòu)中所起的作用:

  • Active NameNode:它負(fù)責(zé)執(zhí)行整個文件系統(tǒng)中命名空間的所有操作;維護(hù)著數(shù)據(jù)的元數(shù)據(jù),包括文件名、副本數(shù)、文件的 BlockId 以及 Block 塊所對應(yīng)的節(jié)點信息;另外還接受 Client 端讀寫請求和 DataNode 匯報 Block 信息。
  • Standby NameNode:它是 Active NameNode 的備用節(jié)點,一旦主節(jié)點宕機(jī),備用節(jié)點會切換成主節(jié)點對外提供服務(wù)。它主要是監(jiān)聽 JournalNode Cluster 上 editlog 變化,以保證當(dāng)前命名空間盡可能的與主節(jié)點同步。任意時刻,HA 集群只有一臺 Active NameNode,另一個節(jié)點為 Standby NameNode。
  • JournalNode Cluster: 用于主備節(jié)點間共享 editlog 日志文件的共享存儲系統(tǒng)。負(fù)責(zé)存儲 editlog 日志文件, 當(dāng) Active NameNode 執(zhí)行了修改命名空間的操作時,它會定期將執(zhí)行的操作記錄在 editlog 中,并寫入 JournalNode Cluster 中。Standby NameNode 會一直監(jiān)聽 JournalNode Cluster 上 editlog 的變化,如果發(fā)現(xiàn) editlog 有改動,備用節(jié)點會讀取 JournalNode 上的 editlog 并與自己當(dāng)前的命名空間合并,從而實現(xiàn)了主備節(jié)點的數(shù)據(jù)一致性。

注意:QJM 方案是基于 Paxos 算法實現(xiàn)的,集群由 2N + 1 JouranlNode 進(jìn)程組成,最多可以容忍 N 臺 JournalNode 宕機(jī),宕機(jī)數(shù)大于 N 臺,這個算法就失效了!

  • ZKFailoverController: ZKFC 以獨立進(jìn)程運行,每個 ZKFC 都監(jiān)控自己負(fù)責(zé)的 NameNode,它可以實現(xiàn) NameNode 自動故障切換:即當(dāng)主節(jié)點異常,監(jiān)控主節(jié)點的 ZKFC 則會斷開與 ZooKeeper 的連接,釋放分步式鎖,監(jiān)控備用節(jié)點的 ZKFC 進(jìn)程會去獲取鎖,同時把備用 NameNode 切換成 主 NameNode。
  • ZooKeeper: 為 ZKFC 進(jìn)程實現(xiàn)自動故障轉(zhuǎn)移提供統(tǒng)一協(xié)調(diào)服務(wù)。通過 ZooKeeper 中 Watcher 監(jiān)聽機(jī)制,通知 ZKFC 異常NameNode 下線;保證同一時刻只有一個主節(jié)點。
  • DataNode: DataNode 是實際存儲文件 Block 塊的地方,一個 Block 塊包含兩個文件:一個是數(shù)據(jù)本身,一個是元數(shù)據(jù)(數(shù)據(jù)塊長度、塊數(shù)據(jù)的校驗和、以及時間戳),DataNode 啟動后會向 NameNode 注冊,每 6 小時同時向主備兩個 NameNode 上報所有的塊信息,每 3 秒同時向主備兩個 NameNode 發(fā)送一次心跳。

DataNode 向 NameNode 匯報當(dāng)前塊信息的時間間隔,默認(rèn) 6 小時,其配置參數(shù)名如下:

  1. <property> 
  2.  <name>dfs.blockreport.intervalMsec</name
  3.  <value>21600000</value> 
  4.  <description>Determines block reporting interval in  
  5. milliseconds.</description> 
  6. </property> 

 

1.2HA主備故障切換流程

HA 集群剛啟動時,兩個 NameNode 節(jié)點狀態(tài)均為 Standby,之后兩個 NameNode 節(jié)點啟動 ZKFC 進(jìn)程后會去 ZooKeeper 集群搶占分步式鎖,成功獲取分步式鎖,ZooKeeper 會創(chuàng)建一個臨時節(jié)點,成功搶占分步式鎖的 NameNode 會成為 Active NameNode,ZKFC 便會實時監(jiān)控自己的 NameNode。

HDFS 提供了兩種 HA 狀態(tài)切換方式:一種是管理員手動通過DFSHAAdmin -faieover執(zhí)行狀態(tài)切換;另一種則是自動切換。下面分別從兩種情況分析故障的切換流程:

1.主 NameNdoe 宕機(jī)后,備用 NameNode 如何升級為主節(jié)點?

當(dāng)主 NameNode 宕機(jī)后,對應(yīng)的 ZKFC 進(jìn)程檢測到 NameNode 狀態(tài),便向 ZooKeeper 發(fā)生刪除鎖的命令,鎖刪除后,則觸發(fā)一個事件回調(diào)備用 NameNode 上的 ZKFC

ZKFC 得到消息后先去 ZooKeeper 爭奪創(chuàng)建鎖,鎖創(chuàng)建完成后會檢測原先的主 NameNode 是否真的掛掉(有可能由于網(wǎng)絡(luò)延遲,心跳延遲),掛掉則升級備用 NameNode 為主節(jié)點,沒掛掉則將原先的主節(jié)點降級為備用節(jié)點,將自己對應(yīng)的 NameNode 升級為主節(jié)點。

2.主 NameNode 上的 ZKFC 進(jìn)程掛掉,主 NameNode 沒掛,如何切換?

ZKFC 掛掉后,ZKFC 和 ZooKeeper 之間 TCP 鏈接會隨之?dāng)嚅_,session 也會隨之消失,鎖被刪除,觸發(fā)一個事件回調(diào)備用 NameNode ZKFC,ZKFC 得到消息后會先去 ZooKeeper 爭奪創(chuàng)建鎖,鎖創(chuàng)建完成后也會檢測原先的主 NameNode 是否真的掛掉,掛掉則升級 備用 NameNode 為主節(jié)點,沒掛掉則將主節(jié)點降級為備用節(jié)點,將自己對應(yīng)的 NameNode 升級為主節(jié)點。

1.3Block、packet及chunk 概念

在 HDFS 中,文件存儲是按照數(shù)據(jù)塊(Block)為單位進(jìn)行存儲的,在讀寫數(shù)據(jù)時,DFSOutputStream使用 Packet 類來封裝一個數(shù)據(jù)包。每個 Packet 包含了若干個 chunk 和對應(yīng)的 checksum。

  • Block: HDFS 上的文件都是分塊存儲的,即把一個文件物理劃分為一個 Block 塊存儲。Hadoop 2.X/3.X 默認(rèn)塊大小為 128 M,1.X 為 64M.
  • Packet: 是 Client 端向 DataNode 或 DataNode 的 Pipline 之間傳輸數(shù)據(jù)的基本單位,默認(rèn) 64 KB
  • Chunk: Chunk 是最小的單位,它是 Client 向 DataNode 或 DataNode PipLine 之間進(jìn)行數(shù)據(jù)校驗的基本單位,默認(rèn) 512 Byte ,因為用作校驗,所以每個 Chunk 需要帶有 4 Byte 的校驗位,實際上每個 Chunk 寫入 Packtet 的大小為 516 Byte。

2.源碼級讀寫流程

2.1HDFS 讀流程

HDFS讀流程

我們以從 HDFS 讀取一個 information.txt 文件為例,其讀取流程如上圖所示,分為以下幾個步驟:

1.打開 information.txt 文件:首先客戶端調(diào)用 DistributedFileSystem.open() 方法打開文件,這個方法在底層會調(diào)用DFSclient.open() 方法,該方法會返回一個 HdfsDataInputStream 對象用于讀取數(shù)據(jù)塊。但實際上真正讀取數(shù)據(jù)的是 DFSInputStream ,而 HdfsDataInputStream 是 DFSInputStream 的裝飾類(new HdfsDataInputStream(DFSInputStream))。

2.從 NameNode 獲取存儲 information.txt 文件數(shù)據(jù)塊的 DataNode 地址:即獲取組成 information.txt block 塊信息。在構(gòu)造輸出流 DFSInputStream 時,會通過調(diào)用 getBlockLocations() 方法向 NameNode 節(jié)點獲取組成 information.txt 的 block 的位置信息,并且 block 的位置信息是按照與客戶端的距離遠(yuǎn)近排好序。

3.連接 DataNode 讀取數(shù)據(jù)塊: 客戶端通過調(diào)用 DFSInputStream.read() 方法,連接到離客戶端最近的一個 DataNode 讀取 Block 塊,數(shù)據(jù)會以數(shù)據(jù)包(packet)為單位從 DataNode 通過流式接口傳到客戶端,直到一個數(shù)據(jù)塊讀取完成;DFSInputStream會再次調(diào)用 getBlockLocations() 方法,獲取下一個最優(yōu)節(jié)點上的數(shù)據(jù)塊位置。

4.直到所有文件讀取完成,調(diào)用 close() 方法,關(guān)閉輸入流,釋放資源。

從上述流程可知,整個過程最主要涉及到 open()、read()兩個方法(其它方法都是在這兩個方法的調(diào)用鏈中調(diào)用,如getBlockLocations()),下面依次介紹這2個方法的實現(xiàn)。

注:本文是以 hadoop-3.1.3 源碼為基礎(chǔ)!

  • open()方法

事實上,在調(diào)用 DistributedFileSystem.open()方法時,底層調(diào)用的是 DFSClient.open()方法打開文件,并構(gòu)造 DFSInputStream 輸入流對象。

  1. public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) 
  2.       throws IOException { 
  3.     //檢查DFSClicent 的運行狀況 
  4.     checkOpen(); 
  5.     // 從 namenode 獲取 block 位置信息,并存到 LocatedBlocks 對象中,最終傳給 DFSInputStream 的構(gòu)造方法 
  6.     try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) { 
  7.       LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0); 
  8.       //調(diào)用 openInternal 方法,獲取輸入流 
  9.       return openInternal(locatedBlocks, src, verifyChecksum); 
  10.     } 
  11.   } 

整個 open()方法分為兩部分:

第一部分是,調(diào)用 checkOpen()方法檢查 DFSClient 的運行狀況,調(diào)用getLocateBlocks()方法,獲取 block 的位置消息

第二部分是,調(diào)用openInternal()方法,獲取輸入流。

  • openInternal( )方法
  1. private DFSInputStream openInternal(LocatedBlocks locatedBlocks, String src, 
  2.       boolean verifyChecksum) throws IOException { 
  3.     if (locatedBlocks != null) { 
  4.         //獲取糾刪碼策略,糾刪碼是 Hadoop 3.x 的新特性,默認(rèn)不啟用糾刪碼策略 
  5.       ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy(); 
  6.       if (ecPolicy != null) { 
  7.           //如果用戶指定了糾刪碼策略,將返回一個 DFSStripedInputStream 對象 
  8.           //DFSStripedInputStream 會將數(shù)據(jù)邏輯字節(jié)范圍的請求轉(zhuǎn)換為存儲在 DataNode 上的內(nèi)部塊 
  9.         return new DFSStripedInputStream(this, src, verifyChecksum, ecPolicy, 
  10.             locatedBlocks); 
  11.       } 
  12.      //如果未指定糾刪碼策略,調(diào)用 DFSInputStream 的構(gòu)造方法,并且返回該 DFSInputStream 的對象 
  13.       return new DFSInputStream(this, src, verifyChecksum, locatedBlocks); 
  14.     } else { 
  15.       throw new IOException("Cannot open filename " + src); 
  16.     } 
  17.   } 
  • DFSInputStream 構(gòu)造方法
  1. DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, 
  2.       LocatedBlocks locatedBlocks) throws IOException { 
  3.     this.dfsClient = dfsClient; 
  4.     this.verifyChecksum = verifyChecksum; 
  5.     this.src = src; 
  6.     synchronized (infoLock) { 
  7.       this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); 
  8.     } 
  9.     this.locatedBlocks = locatedBlocks; 
  10.     //調(diào)用 openInfo 方法,參數(shù):refreshLocatedBlocks,是否要更新 locateBlocks 屬性。 
  11.     openInfo(false); 
  12.   } 

構(gòu)造方法做了2件事:

第一部分是初始化 DFSInputStream 屬性,其中 verifyChecksum 含義是:讀取數(shù)據(jù)時是否進(jìn)行校驗,cachingStrategy,指的是緩存策略。

第二部分,調(diào)用 openInfo()方法。

思考:為甚么要更新最后一個數(shù)據(jù)塊長度?

因為可能會有這種情況出現(xiàn),當(dāng)客戶端在讀取文件時,最后一個文件塊可能還在構(gòu)建的狀態(tài)(正在被寫入),Datanode 還未上報最后一個文件塊,那么 namenode 所保存的數(shù)據(jù)塊長度有可能小于 Datanode實際存儲的數(shù)據(jù)塊長度,所以需要與 Datanode 通信以確認(rèn)最后一個數(shù)據(jù)塊的真實長度。

獲取到 DFSInputStream 流對象后,并且得到了文件的所有 Block 塊的位置信息,接下來調(diào)用read()方法,從 DataNode 讀取數(shù)據(jù)塊。

注:在openInfo() 方法

在openInfp()中,會從 namenode 獲取當(dāng)前正在讀取文件的最后一個數(shù)據(jù)塊的長度 lastBlockBeingWrittenLength,如果返回的最后一個數(shù)據(jù)塊的長度為 -1 ,這是一種特殊情況:即集群剛重啟,DataNode 可能還沒有向 NN 進(jìn)行完整的數(shù)據(jù)塊匯報,這時部分?jǐn)?shù)據(jù)塊位置信息還獲取不到,也獲取不到這些塊的長度,則默認(rèn)會重試 3 次,默認(rèn)每次等待 4 秒,重新去獲取文件對應(yīng)的數(shù)據(jù)塊的位置信息以及最后數(shù)據(jù)塊長度;如果最后一個數(shù)據(jù)塊的長度不為 -1,則表明,最后一個數(shù)據(jù)塊已經(jīng)是完整狀態(tài)。

  • read()方法
  1. public synchronized int read(@Nonnull final byte buf[], int offint len) 
  2.       throws IOException { 
  3.     //驗證輸入的參數(shù)是否可用 
  4.     validatePositionedReadArgs(pos, buf, off, len); 
  5.     if (len == 0) { 
  6.       return 0; 
  7.     } 
  8.     //構(gòu)造字節(jié)數(shù)組作為容器  
  9.     ReaderStrategy byteArrayReader = 
  10.         new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient); 
  11.     //調(diào)用 readWithStrategy()方法讀取數(shù)據(jù) 
  12.     return readWithStrategy(byteArrayReader); 
  13.   } 

當(dāng)用戶代碼調(diào)用read()方法時,其底層調(diào)用的是 DFSInputStream.read()方法。該方法從輸入流的 off 位置開始讀取,讀取 len 個字節(jié),然后存入 buf 字節(jié)數(shù)組中。源碼中構(gòu)造了一個 ByteArrayStrategy 對象,該對象封裝了 5 個屬性,分別是:字節(jié)數(shù)組 buf,讀取到的字節(jié)存入該字節(jié)數(shù)組;off,讀取的偏移量;len,將要讀取的目標(biāo)長度;readStatistics,統(tǒng)計計數(shù)器,客戶端。最后通過調(diào)用 readWithStrategy()方法去讀取文件數(shù)據(jù)塊的數(shù)據(jù)。

總結(jié):HDFS 讀取一個文件,調(diào)用流程如下:(中間涉及到的部分方法未列出)

usercode 調(diào)用 open() ---> DistributedFileSystem.open() ---> DFSClient.open() ---> 返回一個 DFSInputStream 對象給 DistributedFileSystem ---> new hdfsDataInputStream(DFSInputStream) 并返回給用戶;

usercode 調(diào)用 read() ---> 底層DFSIputStream.read() ---> readWithStrategy(bytArrayReader)

2.2HDFS 寫流程

介紹完 HDFS 讀的流程,接下來看看一個文件的寫操作的實現(xiàn)。從下圖中可以看出,HDFS 寫流程涉及的方法比較多,過程也比較復(fù)雜。

1.在 namenode 創(chuàng)建文件: 當(dāng) client 寫一個新文件時,首先會調(diào)用 DistributeedFileSytem.creat() 方法,DistributeFileSystem 是客戶端創(chuàng)建的一個對象,在收到 creat 命令之后,DistributeFileSystem 通過 RPC 與 NameNode 通信,讓它在文件系統(tǒng)的 namespace 創(chuàng)建一個獨立的新文件;namenode 會先確認(rèn)文件是否存在以及客戶端是否有權(quán)限,確認(rèn)成功后,會返回一個 HdfsDataOutputStream 對象,與讀流程類似,這個對象底層包裝了一個 DFSOutputStream 對象,它才是寫數(shù)據(jù)的真正執(zhí)行者。

2.建立數(shù)據(jù)流 pipeline 管道: 客戶端得到一個輸出流對象,還需要通過調(diào)用 ClientProtocol.addBlock()向 namenode 申請新的空數(shù)據(jù)塊,addBlock( ) 會返回一個 LocateBlock 對象,該對象保存了可寫入的 DataNode 的信息,并構(gòu)成一個 pipeline,默認(rèn)是有三個 DataNode 組成。

3.通過數(shù)據(jù)流管道寫數(shù)據(jù): 當(dāng) DFSOutputStream調(diào)用 write()方法把數(shù)據(jù)寫入時,數(shù)據(jù)會先被緩存在一個緩沖區(qū)中,寫入的數(shù)據(jù)會被切分成多個數(shù)據(jù)包,每當(dāng)達(dá)到一個數(shù)據(jù)包長度(默認(rèn)65536字節(jié))時,

DFSOutputStream會構(gòu)造一個 Packet 對象保存這個要發(fā)送的數(shù)據(jù)包;新構(gòu)造的 Packet 對象會被放到 DFSOutputStream維護(hù)的 dataQueue 隊列中,DataStreamer 線程會從 dataQueue 隊列中取出 Packet 對象,通過底層 IO 流發(fā)送到 pipeline 中的第一個 DataNode,然后繼續(xù)將所有的包轉(zhuǎn)到第二個 DataNode 中,以此類推。發(fā)送完畢后,

這個 Packet 會被移出 dataQueue,放入 DFSOutputStream 維護(hù)的確認(rèn)隊列 ackQueue 中,該隊列等待下游 DataNode 的寫入確認(rèn)。當(dāng)一個包已經(jīng)被 pipeline 中所有的 DataNode 確認(rèn)了寫入磁盤成功,這個數(shù)據(jù)包才會從確認(rèn)隊列中移除。

4.關(guān)閉輸入流并提交文件: 當(dāng)客戶端完成了整個文件中所有的數(shù)據(jù)塊的寫操作之后,會調(diào)用 close() 方法關(guān)閉輸出流,客戶端還會調(diào)用 ClientProtoclo.complete( ) 方法通知 NameNode 提交這個文件中的所有數(shù)據(jù)塊,

NameNode 還會確認(rèn)該文件的備份數(shù)是否滿足要求。對于 DataNode 而言,它會調(diào)用 blockReceivedAndDelete() 方法向 NameNode 匯報,NameNode 會更新內(nèi)存中的數(shù)據(jù)塊與數(shù)據(jù)節(jié)點的對應(yīng)關(guān)系。

從上述流程來看,整個寫流程主要涉及到了 creat()、write()這些方法,下面著重介紹下這兩個方法的實現(xiàn)。當(dāng)調(diào)用 DistributeedFileSytem.creat() 方法時,其底層調(diào)用的其實是 DFSClient.create()方法,其源碼如下:

  • create( )方法
  1. public DFSOutputStream create(String src, FsPermission permission, 
  2.           EnumSet<CreateFlag> flag, boolean createParent,  
  3.                     short replication,long blockSize, 
  4.                     Progressable progress, int buffersize, 
  5.                     ChecksumOpt checksumOpt,  
  6.                     InetSocketAddress[] favoredNodes, 
  7.                     String ecPolicyName) throws IOException { 
  8.     //檢查客戶端是否已經(jīng)打開 
  9.     checkOpen(); 
  10.     final FsPermission masked = applyUMask(permission); 
  11.     LOG.debug("{}: masked={}", src, masked); 
  12.     //調(diào)用 DFSOutputStream.newStreamForCreate()創(chuàng)建輸出流對象 
  13.     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, 
  14.         src, masked, flag, createParent, replication, blockSize, progress, 
  15.         dfsClientConf.createChecksum(checksumOpt), 
  16.         getFavoredNodesStr(favoredNodes), ecPolicyName); 
  17.     //獲取 HDFS 文件的租約 
  18.     beginFileLease(result.getFileId(), result); 
  19.     return result; 
  20.   } 

DistributeFileSystem.create()在底層會調(diào)用 DFSClient.create()方法。該方法主要完成三件事:

租約:指的是租約持有者在規(guī)定時間內(nèi)獲得該文件權(quán)限(寫文件權(quán)限)的合同

第一,檢查客戶端是否已經(jīng)打開

第二,調(diào)用靜態(tài)的 newStreamForCreate() 方法,通過 RPC 與 NameNode 通信創(chuàng)建新文件,并構(gòu)建出 DFSOutputStream流

第三,執(zhí)行 beginFileLease() 方法,獲取新J建文件的租約

  • newStreamForCreate() 方法
  1. static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, 
  2.       FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, 
  3.       short replication, long blockSize, Progressable progress, 
  4.       DataChecksum checksum, String[] favoredNodes, String ecPolicyName) 
  5.       throws IOException { 
  6.   
  7.     try (TraceScope ignored = 
  8.              dfsClient.newPathTraceScope("newStreamForCreate", src)) { 
  9.       HdfsFileStatus stat = null
  10.  
  11.       // 如果發(fā)生異常,并且異常為 RetryStartFileException ,便重新調(diào)用create()方法,重試次數(shù)為 10 
  12.       boolean shouldRetry = true
  13.       //重試次數(shù)為 10 
  14.       int retryCount = CREATE_RETRY_COUNT; 
  15.       while (shouldRetry) { 
  16.         shouldRetry = false
  17.         try { 
  18.            //調(diào)用 ClientProtocol.create() 方法,在命名空間中創(chuàng)建 HDFS 文件 
  19.           stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, 
  20.               new EnumSetWritable<>(flag), createParent, replication, 
  21.               blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName); 
  22.           break; 
  23.         } catch (RemoteException re) { 
  24.           IOException e = re.unwrapRemoteException(AccessControlException.class, 
  25.               //....此處省略了部分異常類型 
  26.               UnknownCryptoProtocolVersionException.class); 
  27.           if (e instanceof RetryStartFileException) {//如果發(fā)生異常,判斷異常是否為 RetryStartFileException 
  28.             if (retryCount > 0) { 
  29.               shouldRetry = true
  30.               retryCount--; 
  31.             } else { 
  32.               throw new IOException("Too many retries because of encryption" + 
  33.                   " zone operations", e); 
  34.             } 
  35.           } else { 
  36.             throw e; 
  37.           } 
  38.         } 
  39.       } 
  40.       Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); 
  41.       final DFSOutputStream out
  42.       if(stat.getErasureCodingPolicy() != null) { 
  43.         //如果用戶指定了糾刪碼策略,將創(chuàng)建一個 DFSStripedOutputStream 對象 
  44.         out = new DFSStripedOutputStream(dfsClient, src, stat, 
  45.             flag, progress, checksum, favoredNodes); 
  46.       } else { 
  47.         //如果沒指定糾刪碼策略,調(diào)用構(gòu)造方法創(chuàng)建一個 DFSOutputStream 對象 
  48.         out = new DFSOutputStream(dfsClient, src, stat, 
  49.             flag, progress, checksum, favoredNodes, true); 
  50.       } 
  51.       //啟動輸出流對象的 Datastreamer 線程 
  52.       out.start(); 
  53.       return out
  54.     } 
  55.   } 

newStreamForCreate()方法總共涉及三個部分:

當(dāng)構(gòu)建完 DFSOutputStream 輸出流時,客戶端調(diào)用 write() 方法把數(shù)據(jù)包寫入 dataQueue 隊列,在將數(shù)據(jù)包發(fā)送到 DataNode 之前,DataStreamer會向 NameNode 申請分配一個新的數(shù)據(jù)塊

然后建立寫這個數(shù)據(jù)塊的數(shù)據(jù)流管道(pipeline),之后DataStreamer 會從 dataQueue 隊列取出數(shù)據(jù)包,通過 pipeline 依次發(fā)送給各個 DataNode。每個數(shù)據(jù)包(packet)都有對應(yīng)的序列號,當(dāng)一個數(shù)據(jù)塊中所有的數(shù)據(jù)包都發(fā)送完畢,

并且都得到了 ack 消息確認(rèn)后,Datastreamer會將當(dāng)前數(shù)據(jù)塊的 pipeline 關(guān)閉。通過不斷循環(huán)上述過程,直到該文件(一個文件會被切分為多個 Block)的所有數(shù)據(jù)塊都寫完成。

調(diào)用 ClientProtocol.create()方法,創(chuàng)建文件,如果發(fā)生異常為 RetryStartFileException ,則默認(rèn)重試10次

調(diào)用 DFSStripedOutputStream 或 DFSOutputStream 構(gòu)造方法,構(gòu)造輸出流對象

啟動 Datastreamer 線程,Datastreamer 是 DFSOutputStream 中的一個內(nèi)部類,負(fù)責(zé)構(gòu)建 pipeline 管道,并將數(shù)據(jù)包發(fā)送到 pipeline 中的第一個 DataNode

  • writeChunk()方法
  1. protected synchronized void writeChunk(ByteBuffer buffer, int len, 
  2.       byte[] checksum, int ckoff, int cklen) throws IOException { 
  3.     writeChunkPrepare(len, ckoff, cklen); 
  4.   
  5.     //將當(dāng)前校驗數(shù)據(jù)、校驗塊寫入數(shù)據(jù)包中 
  6.     currentPacket.writeChecksum(checksum, ckoff, cklen); 
  7.     currentPacket.writeData(buffer, len); 
  8.     currentPacket.incNumChunks(); 
  9.     getStreamer().incBytesCurBlock(len); 
  10.  
  11.     // 如果當(dāng)前數(shù)據(jù)包已經(jīng)滿了,或者寫滿了一個數(shù)據(jù)塊,則將當(dāng)前數(shù)據(jù)包放入發(fā)送隊列中 
  12.     if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || 
  13.             getStreamer().getBytesCurBlock() == blockSize) { 
  14.       enqueueCurrentPacketFull(); 
  15.     } 
  16.   } 

最終寫數(shù)據(jù)調(diào)用都的是 writeChunk ()方法,其會首先調(diào)用 checkChunkPrepare()構(gòu)造一個 Packet 對象保存數(shù)據(jù)包,

然后調(diào)用writeCheckSum()和writeData()方法,將校驗塊數(shù)據(jù)和校驗和寫入 Packet 對象中。

當(dāng) Packet 對象寫滿時(每個數(shù)據(jù)包都可以寫入 maxChunks 個校驗塊),則調(diào)用 enqueueCurrentPacketFull()方法,將當(dāng)前的 Packet 對象放入 dataQueue 隊列中,等待 DataStreamer 線程的處理。

如果當(dāng)前數(shù)據(jù)塊中的所有數(shù)據(jù)都已經(jīng)發(fā)送完畢,則發(fā)送一個空數(shù)據(jù)包標(biāo)識所有數(shù)據(jù)已經(jīng)發(fā)送完畢。

3.HDFS 如何保證可用性?

在 1.1 節(jié)中已經(jīng)闡述了 HDFS 的高可用的架構(gòu),分別涉及到 NameNode,DataNode,Journalnode,ZKFC等組件。所以,在談及 HDFS 如何保證可用性,要從多個方面去回答。

  • 在 Hadoop 2.X 時,主備 NameNode 節(jié)點通過 JournalNode 的數(shù)據(jù)同步,來保證數(shù)據(jù)一致性,2個 ZKFC 進(jìn)程負(fù)責(zé)各自的 NameNode 健康監(jiān)控,從而實現(xiàn)了 NameNode 的高可用。Hadoop 3.X 時,NameNode 數(shù)量可以大于等于 2。
  • 對于 JournalNode 來講,也是分布式的,保證了可用性。因為有選舉機(jī)制,所以 JournalNode 個數(shù) 一般都為 2N+1 個。在 主NameNode向 JournalNode寫入 editlog 文件時,當(dāng)有一半以上的(≥N+1) JournalNode返回寫操作成功時即認(rèn)為該次寫成功。所以 JournalNode集群能容忍最多 N 臺節(jié)點宕掉,如果多于 N 臺機(jī)器掛掉,服務(wù)才不可用。
  • ZKFC 主要輔助 ZooKeeper 做 Namenode 的健康監(jiān)控,能夠保證故障自動轉(zhuǎn)移,它是部署在兩臺 NameNode 節(jié)點上的獨立的進(jìn)程。此外,ZooKeeper 集群也是一個獨立的分布式系統(tǒng),它通過 Zab 協(xié)議來保證數(shù)據(jù)一致,和主備節(jié)點的選舉切換等機(jī)制來保證可用性。
  • DataNode 節(jié)點主要負(fù)責(zé)存儲數(shù)據(jù),通過 3 副本策略來保證數(shù)據(jù)的完整性,所以其最大可容忍 2 臺 DataNode 掛掉,同時 NameNode 會保證副本的數(shù)量。
  • 最后,關(guān)于數(shù)據(jù)的可用性保證,HDFS 提供了數(shù)據(jù)完整性校驗的機(jī)制。當(dāng)客戶端創(chuàng)建文件時,它會計算每個文件的數(shù)據(jù)塊的checknums,也就是校驗和,并存儲在 NameNode 中。當(dāng)客戶端去讀取文件時,會驗證從 DataNode 接收的數(shù)據(jù)塊的校驗和,如果校驗和不一致,說明該數(shù)據(jù)塊已經(jīng)損壞,此時客戶端會選擇從其它 DataNode 獲取該數(shù)據(jù)塊的可用副本。

4.HDFS 高頻面試題

  • HDFS 客戶端是如何與 DataNode 、NameNode 交互的?
  • ZKFC 是如何實現(xiàn)主 NameNode 故障自動轉(zhuǎn)移的?
  • NameNode 存儲了哪些數(shù)據(jù)?
  • Zookeeper 在故障轉(zhuǎn)移過程中是如何起作用的?
  • HDFS 的讀寫流程?
  • 在 Hadoop 2.X 時,HDFS block 塊為什么設(shè)置為 128 M?

本文轉(zhuǎn)載自微信公眾號「小林玩大數(shù)據(jù)」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系小林玩大數(shù)據(jù)公眾號。

 

責(zé)任編輯:武曉燕 來源: 小林玩大數(shù)據(jù)
相關(guān)推薦

2021-12-13 10:43:45

HashMapJava集合容器

2019-08-16 09:41:56

UDP協(xié)議TCP

2021-09-30 07:59:06

zookeeper一致性算法CAP

2021-04-09 10:03:12

大數(shù)據(jù)exactly-onc

2022-03-29 08:23:56

項目數(shù)據(jù)SIEM

2021-05-07 07:52:51

Java并發(fā)編程

2023-09-25 08:32:03

Redis數(shù)據(jù)結(jié)構(gòu)

2023-10-04 00:32:01

數(shù)據(jù)結(jié)構(gòu)Redis

2023-11-07 07:46:02

GatewayKubernetes

2021-07-28 13:29:57

大數(shù)據(jù)PandasCSV

2024-08-27 11:00:56

單例池緩存bean

2017-03-30 22:41:55

虛擬化操作系統(tǒng)軟件

2023-11-22 07:54:33

Xargs命令Linux

2021-10-21 06:52:17

ZooKeeper分布式配置

2023-12-07 09:07:58

2018-09-26 11:02:46

微服務(wù)架構(gòu)組件

2021-04-11 08:30:40

VRAR虛擬現(xiàn)實技術(shù)

2021-11-10 07:47:48

Traefik邊緣網(wǎng)關(guān)

2022-08-18 20:45:30

HTTP協(xié)議數(shù)據(jù)

2020-12-09 08:01:38

Mybatis關(guān)系型數(shù)據(jù)庫
點贊
收藏

51CTO技術(shù)棧公眾號