偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Hadoop 超燃之路

大數(shù)據(jù) Hadoop
以前的存儲手段跟分析方法現(xiàn)在行不通了!Hadoop 就是用來解決海量數(shù)據(jù)的 存儲 跟海量數(shù)據(jù)的 分析計算 問題的,創(chuàng)始人 Doug Cutting 在創(chuàng)建 Hadoop 時主要思想源頭是 Google 三輛馬車.

[[378296]]

1 Hadoop 簡介

1.1 Hadoop 由來

數(shù)據(jù)容量

 

大數(shù)據(jù)時代數(shù)據(jù)量超級大,數(shù)據(jù)具有如下特性:

  • Volume(大量)
  • Velocity(高速)
  • Variety(多樣)
  • Value(低價值密度)

以前的存儲手段跟分析方法現(xiàn)在行不通了!Hadoop 就是用來解決海量數(shù)據(jù)的 存儲 跟海量數(shù)據(jù)的 分析計算 問題的,創(chuàng)始人 Doug Cutting 在創(chuàng)建 Hadoop 時主要思想源頭是 Google 三輛馬車

  • 第一輛 GFS 產(chǎn)生了 HDFS。
  • 第二輛 MapReduce 產(chǎn)生了MR。
  • 第三輛 BigTable 產(chǎn)生了HBase。

現(xiàn)在說的 Hadoop 通常指的是 Hadoop 生態(tài)圈 這樣一個廣義概念,如下:

大數(shù)據(jù)知識體系

 

1.2 Hadoop 特點

1.2.1 Hadoop 特點

高可用

Hadoop 底層對同一個數(shù)據(jù)維護(hù)這多個復(fù)本,即使Hadoop某個計算元素或者存儲出現(xiàn)問題,也不會導(dǎo)致數(shù)據(jù)的丟失。

高擴展

在集群之間分配任務(wù)數(shù)據(jù),可以方便的擴展跟刪除多個節(jié)點,比如美團節(jié)點就在3K~5k 個節(jié)點

高效性

在MapReduce的思想下 Hadoop是并行工作的,以加快任務(wù)的處理速度

高容錯性

如果一個子任務(wù)速度過慢或者任務(wù)失敗 Hadoop會有響應(yīng)策略會自動重試跟任務(wù)分配。

1.2.2 Hadoop 架構(gòu)設(shè)計

Hadoop 的 1.x 跟 2.x 區(qū)別挺大,2.x 主要是將1.x MapReduce中資源調(diào)度的任務(wù)解耦合出來交 Yarn 來管理了(接下來本文以2.7開展探索)。

1.x跟2.x變化

 

HDFS

Hadoop Distributed File System 簡稱 HDFS,是一個分布式文件系統(tǒng)。HDFS 有著高容錯性,被設(shè)計用來部署在低廉的硬件上來提供高吞吐量的訪問應(yīng)用程序的數(shù)據(jù),適合超大數(shù)據(jù)集的應(yīng)用程序。

MapReduce

MapReduce是一種編程模型,包含Map(映射) 跟 Reduce(歸約)。你可以認(rèn)為是歸并排序的深入化思想。

Yarn

Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協(xié)調(diào)者)是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統(tǒng),可為上層應(yīng)用提供統(tǒng)一的資源管理和調(diào)度,它的引入為集群在利用率、資源統(tǒng)一管理和數(shù)據(jù)共享等方面帶來了巨大好處。

Common 組件

log組件。

獨有RPC體系ipc、I/O系統(tǒng)、序列化、壓縮。

配置文件conf。

公共方法類,比如checkSum校驗。

2 HDFS

產(chǎn)生背景:

隨著數(shù)據(jù)量變大,數(shù)據(jù)在一個OS的磁盤無法存儲了,需要將數(shù)據(jù)分配到多個OS管理的磁盤中,為了方面管理多個OS下的磁盤文件,迫切需要一種系統(tǒng)來管理多臺機器上的文件,這就是分布式文件管理系統(tǒng),HDFS 是通過目錄樹定位文件。需注意 HDFS 只是分布式文件系統(tǒng)中的其中一種。

2.1 HDFS 優(yōu)缺點

2.1.1 優(yōu)點

高容錯性

數(shù)據(jù)會自動保存多個副本,默認(rèn)為3個,通過增加副本來提高容錯性。

某個副本丟失后系統(tǒng)會自動恢復(fù)。

高擴展性

HDFS集群規(guī)模是可以動態(tài)伸縮的。

適合大數(shù)據(jù)處理

數(shù)據(jù)規(guī)模達(dá)到GB/TB/PB級別。

文件規(guī)模達(dá)到百萬規(guī)模以上。

流式訪問,它能保證數(shù)據(jù)的一致性。

低成本,部署廉價機器 提高了商業(yè)化能了。

統(tǒng)一對外接口,Hadoop本身用Java編寫,但基于此的應(yīng)用程序可以用其他語言編寫調(diào)用。

2.1.1 缺點

做不到低延時

Hadoop對高吞吐做了優(yōu)化,犧牲了獲取數(shù)據(jù)的延遲,比如毫秒級獲取數(shù)據(jù)在Hadoop上做不到。

不適合存儲大量小文件

存儲大量小文件的話,它會占用 NameNode 大量的內(nèi)存來存儲文件、目錄和塊信息。因此該文件系統(tǒng)所能存儲的文件總數(shù)受限于 NameNode 的內(nèi)存容量,根據(jù)經(jīng)驗,每個文件、目錄和數(shù)據(jù)塊的存儲信息大約占150字節(jié)。

小文件存儲的尋道時間會超過讀取時間,它違反了HDFS的設(shè)計目標(biāo)。

無法修改文件

對于上傳到HDFS上的文件,不支持修改文件,僅支持追加。HDFS適合一次寫入,多次讀取的場景。

無法并發(fā)寫入

HDFS不支持多用戶同時執(zhí)行寫操作,即同一時間,只能有一個用戶執(zhí)行寫操作。

2.2 HDFS 組成架構(gòu)

2.2.1 Client

客戶端主要有如下功能:

  1. 文件切分,文件上傳 HDFS 的時候,Client 將文件切分成一個一個的Block,然后進(jìn)行存儲。
  2. 與 NameNode 交互,獲取文件的位置信息。
  3. 與 DataNode 交互,讀取或者寫入數(shù)據(jù)。
  4. Client 提供一些命令來管理 HDFS,比如啟動或者關(guān)閉 HDFS。
  5. Client 可以通過一些命令來訪問 HDFS。

2.2.2 NameNode

NameNode 簡稱NN,就是HDFS中的 Master,是個管理者,主要有如下功能:

  1. 管理HDFS的名稱空間。
  2. 配置副本策略
  3. 處理客戶端讀寫請求。
  4. 管理數(shù)據(jù)塊(Block)映射信息。

映射信息:NameNode(文件路徑,副本數(shù),{Block1,Block2},[Block1:[三個副本路徑],Block2:[三個副本路徑]])

2.2.3 DataNode

DataNode 簡稱 DN 就是HDFS集群中的Slave,NameNode 負(fù)責(zé)下達(dá)命令,DataNode執(zhí)行實際的操作。

  1. 存儲實際的數(shù)據(jù)塊。
  2. 執(zhí)行數(shù)據(jù)塊的讀/寫操作。

上面說過數(shù)據(jù)目錄信息存儲在NN中,而具體信息存儲在DN中,很形象的比喻如下

NN跟DN對比

 

DataNode 的工作機制

  1. 數(shù)據(jù)塊存儲在磁盤信息 包括 數(shù)據(jù) + 數(shù)據(jù)長度 + 校驗和 + 時間戳。
  2. DataNode 啟動后向 NameNode注冊,周期性(1小時)的向 NameNode 上報所有的塊信息。
  3. NN 跟 DN 之間 心跳 3秒一次,心跳返回結(jié)果帶有 NameNode 給該 DataNode 的命令如復(fù)制塊數(shù)據(jù)到另一臺機器,或刪除某個數(shù)據(jù)塊。如果超過10分鐘沒有收到某個 DataNode 的心跳,則認(rèn)為該節(jié)點不可用。
  4. 集群運行中可以安全加入和退出一些機器。

DataNode 確保數(shù)據(jù)完整性

  1. 當(dāng) DataNode 讀取 Block 的時候,它會計算 CheckSum。
  2. 如果計算后的 CheckSum,與 Block 創(chuàng)建時值不一樣,說明 Block 已經(jīng)損壞。
  3. Client 讀取其他 DataNode 上的 Block。
  4. DataNode 在其文件創(chuàng)建后周期驗證 CheckSum

DN 進(jìn)程死亡或無法跟 NN 通信后 NN 不會立即將 DN 判死,一般經(jīng)過十分鐘 + 30秒再判刑。

2.2.4 Secondary NameNode

當(dāng) NameNode 掛掉的時候,它并不能馬上替換 NameNode 并提供服務(wù)。需要通過 HA等手段實現(xiàn)自動切換。SNN 主要提供如下功能:

  1. 輔助 NameNode,分擔(dān)其工作量。
  2. 定期合并 Fsimage 和 Edits,并推送給 NameNode。
  3. 在緊急情況下,可輔助恢復(fù) NameNode。

2.2.5 Block

HDFS中的文件在物理上是分塊 Block 存儲的,在 1.x 版本中塊 = 64M,2.x中塊 = 128M。塊不是越大越好,也不是越小越好。因為用戶獲取數(shù)據(jù)信息時間 = 尋址塊時間 + 磁盤傳輸時間。

塊太小會增加尋址時間,程序大部分耗時在尋址上了。

快太大則會導(dǎo)致磁盤傳輸時間明顯大于尋址時間,程序處理塊數(shù)據(jù)時較慢。

2.3 HDFS 寫流程

2.3.1 具體寫流程

寫流程

 

  1. 客戶端通過 Distributed FileSystem 模塊向 NameNode 請求上傳文件,NameNode檢查目標(biāo)文件是否已存在,父目錄是否存在。
  2. NameNode 返回是否可以上傳。
  3. 客戶端請求第一個 Block上傳到哪幾個 DataNode 服務(wù)器上。
  4. NameNode 返回3個 DataNode 節(jié)點,分別為dn1、dn2、dn3。
  5. 客戶端通過 FSDataOutputStream 模塊請求dn1上傳數(shù)據(jù),dn1收到請求會繼續(xù)調(diào)用dn2,然后dn2調(diào)用dn3,將這個通信管道建立完成。
  6. dn1、dn2、dn3逐級應(yīng)答客戶端。
  7. 客戶端開始往dn1上傳第一個Block(先從磁盤讀取數(shù)據(jù)放到一個本地內(nèi)存緩存),以Packet為單位,dn1收到一個Packet就會傳給dn2,dn2傳給dn3;dn1每傳一個packet會放入一個應(yīng)答隊列等待應(yīng)答。
  8. 當(dāng)一個 Block 傳輸完成之后,客戶端再次請求NameNode上傳第二個Block的服務(wù)器。(重復(fù)執(zhí)行3-7步)。

2.3.2 節(jié)點距離計算

在 HDFS 寫數(shù)據(jù)的過程中,NameNode 會選擇距離待上傳數(shù)據(jù)最近距離的DataNode接收數(shù)據(jù)。

最近距離 = 兩個節(jié)點到達(dá)最近的共同祖先的距離總和。

節(jié)點距離計算

 

  • Distance(/d1/r1/n0,/d1/r1/n0) = 0 同一節(jié)點上的進(jìn)程
  • Distance(/d1/r1/n1,/d1/r1/n2) = 2 同一機架上不同節(jié)點
  • Distance(/d1/r2/n0,/d1/r3/n2) = 4 同一數(shù)據(jù)中心不同機架節(jié)點
  • Distance(/d1/r2/n1,/d2/r4/n1) = 6 不同數(shù)據(jù)中心

2.3.3 副本節(jié)點選擇

  • 第一個副本在Client所在節(jié)點上,如果在集群外則隨機選個。
  • 第二個副本跟第一個副本位于同機架不同節(jié)點
  • 第三個部分位于不同機架,隨機節(jié)點。

機架感知

 

2.4 HDFS 讀流程

讀流程

 

  • 客戶端通過 Distributed FileSystem 向 NameNode 請求下載文件,NameNode 通過查詢元數(shù)據(jù),找到文件塊所在的 DataNode 地址。
  • 挑選一臺 DataNode(就近原則,然后隨機)服務(wù)器,請求讀取數(shù)據(jù)。
  • DataNode 開始傳輸數(shù)據(jù)給客戶端(從磁盤里面讀取數(shù)據(jù)輸入流,以Packet為單位來做校驗)。
  • 客戶端以 Packet 為單位接收,先在本地緩存,然后寫入目標(biāo)文件。

2.5 NameNode 和 Secondary NameNode

2.5.1 NN 和 2NN 工作機制

NameNode 中元數(shù)據(jù)單獨存到磁盤不方便讀寫。單獨存到內(nèi)存時,斷電會丟失。Hadoop 采用的是如下方式。

FsImage :

元數(shù)據(jù)序列化后在磁盤存儲的地方。包含HDFS文件系統(tǒng)的所有目錄跟文件inode序列化信息。

Memory:

元數(shù)據(jù)在內(nèi)存中存儲的地方。

Edit 文件:

Edit 記錄客戶端更新元數(shù)據(jù)信息的每一步操作(可通過Edits運算出元數(shù)據(jù))。

一旦元數(shù)據(jù)有更新跟添加,元數(shù)據(jù)修改追加到Edits中然后修改內(nèi)存中的元數(shù)據(jù),這樣一旦NameNode 節(jié)點斷電,通過 FsImage 跟 Edits 的合并生成元數(shù)據(jù)。

Edits文件不要過大,系統(tǒng)會定期的由 Secondary Namenode 完成 FsImage 和 Edits 的合并。

NN跟2NN工作機制

 

第一階段:NameNode 啟動

  • 第一次啟動 NameNode 格式化后,創(chuàng)建 Fsimage 和 Edits 文件。如果不是第一次啟動,直接加載編輯日志和鏡像文件到內(nèi)存。
  • 客戶端對元數(shù)據(jù)進(jìn)行增刪改的請求。
  • NameNode 記錄操作日志,更新滾動日志。
  • NameNode 在內(nèi)存中對數(shù)據(jù)進(jìn)行增刪改。

第二階段:Secondary NameNode 工作

Secondary NameNode 詢問 NameNode 是否需要 CheckPoint。直接帶回 NameNode 是否檢查結(jié)果。一般下面條件任意滿足即可:

  • CheckPoint 默認(rèn)1小時執(zhí)行一次。
  • 一分鐘檢查一次Edits文件操作次數(shù),達(dá)閾值 CheckPoint 。
  • Secondary NameNode 請求執(zhí)行 CheckPoint。
  • NameNode 滾動正在寫的 Edits 日志。
  • 將滾動前的編輯日志Edit_001 和 鏡像文件FsImage 拷貝到 Secondary NameNode。
  • Secondary NameNode 加載編輯日志和鏡像文件到內(nèi)存并合并。
  • 生成新的鏡像文件 FsImage.chkpoint。
  • 拷貝 FsImage.chkpoint 到 NameNode。
  • NameNode 將 FsImage.chkpoint 重新命名成 FsImage。

2.6 安全模式

NameNode 剛啟動時候系統(tǒng)進(jìn)入安全模式(只讀),如果整個文件系統(tǒng)中99.9%塊滿足最小副本,NameNode 會30秒后退出安全模式。

2.6.1 NameNode 啟動

將 FsImage 文件載入內(nèi)存再執(zhí)行Edits文件各種操作,最終內(nèi)存生成完整的元數(shù)據(jù)鏡像。

創(chuàng)建個新的 FsImage 跟空 Edits 文件。

NameNode 開始監(jiān)聽 DataNode。

整個過程 NameNode 一直運行在安全模式,NameNode 對于 Client 是只讀的。

2.6.2 DataNode 啟動

系統(tǒng)數(shù)據(jù)塊位置不是由 NameNode 維護(hù)的,而是以塊列表形式存儲在 DataNode 中。

安全模式下 DataNode 向 NameNode 發(fā)送最新塊列表信息,促使 NameNode 高效運行。

正常運行期 NameNode 內(nèi)存中保留所有塊位置映射信息。

2.7 HDFS-HA

HDFS 集群中 NameNode存在單點故障(SPOF),為了實現(xiàn) High Available,其實包括 HDFS-HA 和YARN-HA。HDFS 可以 通過配置Active/Standby 兩個 NameNodes 實現(xiàn)在集群中對 NameNode 的熱備來解決上述問題。如果出現(xiàn)故障,如機器崩潰或機器需要升級維護(hù),可將NameNode很快的切換到另外一臺機器。實現(xiàn) HA 功能主要依賴ZooKeeper 跟 ZKFC 進(jìn)程。

HA故障轉(zhuǎn)移

 

2.7.1 HDFS-HA工作要點

元數(shù)據(jù)管理方式需要改變

內(nèi)存中各自保存一份元數(shù)據(jù)。

Edits 日志只有 Active 狀態(tài)的 NameNode 節(jié)點可以做寫操作。

兩個 NameNode 都可以讀取 Edits。

共享的 Edits 放在一個共享存儲中管理(qjournal 或 NFS)。

需要一個狀態(tài)管理功能模塊

實現(xiàn)了一個ZKFC,常駐在每一個namenode所在的節(jié)點,每一個ZKFC負(fù)責(zé)監(jiān)控自己所在NameNode節(jié)點,利用zk進(jìn)行狀態(tài)標(biāo)識,當(dāng)需要進(jìn)行狀態(tài)切換時,由ZKFC來負(fù)責(zé)切換,切換時需要防止brain split現(xiàn)象的發(fā)生。

必須保證兩個 NameNode 之間能夠ssh無密碼登錄

防腦裂,同一時刻僅僅有一個 NameNode 對外提供服務(wù)。

2.7.2 ZooKeeper

ZooKeeper 提供如下功能:

  1. 故障檢測:集群中每個 NameNode 在 ZooKeeper 中維護(hù)一個持久會話,如果機器崩潰,ZooKeeper中的會話將終止,ZooKeeper通知另一個NameNode需要觸發(fā)故障轉(zhuǎn)移。
  2. 現(xiàn)役NameNode選擇:ZooKeeper提供了一個簡單的機制用于唯一的選擇一個節(jié)點為active狀態(tài)。如果目前現(xiàn)役NameNode崩潰,另一個節(jié)點可能從ZooKeeper獲得特殊的排外鎖以表明它應(yīng)該成為現(xiàn)役NameNode。

2.7.3 ZKFC進(jìn)程

在 NameNode 主機上有個 ZKFC(ZKFailoverController) 這樣的ZK客戶端,負(fù)責(zé)監(jiān)視管理 NameNode 狀態(tài)。ZKFC負(fù)責(zé):

  1. 健康監(jiān)測:ZKFC周期性檢測同主機下NameNode監(jiān)控撞庫。
  2. ZooKeeper會話管理:NameNode健康時候ZKFC保持跟ZK集群會話打開狀態(tài),ZKFC還持有個znode鎖,如果會話終止,鎖節(jié)點將自動刪除。
  3. 基于ZooKeeper的選擇:ZKFC發(fā)現(xiàn)本地NameNode健康前提下會嘗試獲取znode鎖,獲得成功則Active狀態(tài)。

3 MapReduce

MapReduce是個分布式運算程序的編程框架,是基于 Hadoop 的 數(shù)據(jù)分析計算核心框架。處理過程分為兩個階段:Map 階段跟 Reduce 階段。

Map 負(fù)責(zé)把一個任務(wù)分解成多個任務(wù)。該階段的 MapTask 并發(fā)實例,完全并行運行,互不相干。

Reduce 負(fù)責(zé)把多個任務(wù)處理結(jié)果匯總。該階段的 ReduceTask 并發(fā)實例互不相干,但是他們的數(shù)據(jù)依賴于上一個階段的所有 MapTask 并發(fā)實例的輸出。

MapReduce 編程模型只能包含一個 Map 階段和一個 Reduce 階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個MapReduce程序串行運行。

用戶編寫MR任務(wù)時候 程序?qū)崿F(xiàn)部分分成三個部分:Mapper、Reducer、Driver(提交運行mr程序的客戶端)。

3.1 優(yōu)缺點

3.1.1 優(yōu)點

易于編程

簡單實現(xiàn)了一些接口就可以完成個分布式程序,你寫個分布式程序跟寫個串行化程序一樣,類似八股文編程。

良好的擴展

計算資源不足時可以簡單的增加機器來擴展計算能力。

高容錯性

MapReduce任務(wù)部署在多臺機器上后如果其中一臺掛了,系統(tǒng)會進(jìn)行自動化的任務(wù)轉(zhuǎn)移來保證任務(wù)正確執(zhí)行。

適合PB級數(shù)據(jù)離線處理

比如 美團3K個節(jié)點的集群并發(fā),提供超大數(shù)據(jù)處理能力。

3.1.2 缺點

不擅長實時計算

MapReduce 不會想 MySQL 一樣毫秒級返回結(jié)果。

不擅長流式計算

流式計算的 輸入數(shù)據(jù)是動態(tài)的,而 MapReduce 的輸入數(shù)據(jù)集是靜態(tài)的。

不擅長DAG計算

多個應(yīng)用程序存在依賴關(guān)系,MapReduce的作業(yè)結(jié)果會落盤導(dǎo)致大量磁盤IO,性能賊低,此時上Spark吧!

3.2 序列化

序列化

把內(nèi)存中的對象,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于存儲(持久化)和網(wǎng)絡(luò)傳輸。

反序列化

將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是硬盤的持久化數(shù)據(jù),轉(zhuǎn)換成內(nèi)存中的對象。

因為 Hadoop 在集群之間進(jìn)行通訊或者 RPC 調(diào)用時是需要序列化的,而且要求序列化要快、且體積要小、占用帶寬要小。而Java自帶的序列化是重量級框架,對象序列化后會附帶額外信息,比如各種校驗信息,header,繼承體系等。所以 Hadoop 自研了序列化框架。

Java類型 Hadoop Writable類型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable

3.3 MapTask 并行度

數(shù)據(jù)塊:Block 是 HDFS 物理上把數(shù)據(jù)分成一塊一塊。

數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對輸入進(jìn)行分片,并不會在磁盤上將其切分成片進(jìn)行存儲。

切片核心注意點:

  • 一個 Job 的 Map 階段并行度又客戶端提交Job時的切片數(shù)決定
  • 每個 Split 切片分配個 MapTask 并行實例處理
  • 模型情況下 切片大小 = BlockSize
  • 切片時不會考慮數(shù)據(jù)集整體大小,而是逐個針對每個文件單獨切片的。

3.3.1 FileInputFormat 切片源碼追蹤

  • FileInputFormat切片源碼追蹤
  • 程序先找到目標(biāo)數(shù)據(jù)存儲目錄
  • 開始遍歷目錄下每個文件。每個文件都會做如下操作
  • 獲取切片大小,默認(rèn)情況下切片大小 = blocksize
  • 開始切片,每次切片都要判斷剩余部分是否大于塊的1.1倍,不大于則就劃分到一個切片。
  • 切片信息寫到切片規(guī)劃文件中。
  • 切片核心過程在getSplit方法完成。
  • InputSplit只是記錄了切片元數(shù)據(jù)信息,如起始位置、長度跟所在節(jié)點列表等。

3.3.2 切片大小計算

SplitSize= Math.max(minSize,Math.min(maxSize,blockSize))

  • mapreduce.input.fileinputformat.split.minsize 默認(rèn) 1
  • mapreduce.input.fileinputformat.split.maxsize 默認(rèn) Long.MAXValue
  • blockSize 默認(rèn)128M
  • maxsize :該參數(shù)如果比blockSize小灰導(dǎo)致切片變小,且就等于配置的整個參數(shù)。
  • minsize :該參數(shù)如果調(diào)的比blockSize大,則切片大小會比blockSize還大。

3.3.3 切片舉例

切片舉例

 

3.4 FileInputFormat

3.4.1 實現(xiàn)類簡介

MR任務(wù)輸入文件個數(shù)各有不同,針對不同類型MR定義了一個接口跟若干實現(xiàn)類來讀取不同的數(shù)據(jù)。


 

input繼承關(guān)系

 

TextInputFormat

默認(rèn)使用類,按行讀取每條數(shù)據(jù),Key是該行數(shù)據(jù)的 offset,Value = 行內(nèi)容。

KeyValueTExtInputFormat

每行都是一條記錄,被指定分隔符分割為Key跟Value,默認(rèn)是 \t 。

NLineInputFormat

該模型下每個 map 處理 InputSplit 時不再按照 Block 塊去劃分,而是按照指定的行數(shù)N來劃分文件。

自定義InputFormat

基礎(chǔ)接口,改寫 RecordReader,實現(xiàn)一次讀取一個完整文件封裝為 KV,使用 SequenceFileOutPutFormat 輸出合并文件。

CombineTextInputFormat

用于小文件過多場景,邏輯上合并多個小文件個一個切片任務(wù)。較重要 中

3.4.2 CombineTextInputFormat

默認(rèn)框架 TextInputFormat 切片機制是對任務(wù)按文件規(guī)劃切片,不管文件多小,都會是一個單獨的切片,都會交給一個MapTask,這樣如果有大量小文件,就會產(chǎn)生大量的MapTask,處理效率極其低下。CombineTextInputFormat 可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣多個小文件就可以交給一個MapTask處理。主要包含 虛擬存儲過程 跟 切片過程。

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m

虛擬存儲過程:

  1. 文件 <= SplitSize 則單獨一塊。
  2. 1 * SplitSize < 文件 < 2 * SplitSize 時對半分。
  3. 文件 >= 2*SplitSize時,以 SplitSize 切割一塊,剩余部分若 < 2 * SplitSize 則對半分。

切片過程:

判斷虛擬存儲的文件大小是否大于setMaxInputSplitSize值,大于等于則單獨形成一個切片。

如果不大于則跟下一個虛擬存儲文件進(jìn)行合并,共同形成一個切片。

切片過程

 

3.6 OutputFormat

OutputFormat 是 MapReduce 輸出的基類,常見的實現(xiàn)類如下:

3.5.1 TextOutputFormat

系統(tǒng)默認(rèn)輸出格式,把每條記錄寫為文本行,他的K跟V是任意類型,系統(tǒng)在寫入時候會統(tǒng)一轉(zhuǎn)化為字符串。

3.5.2 SequenceFileOutputFormat

此模式下的輸出結(jié)果作為后續(xù)MapReduce任務(wù)的輸入,該模式下數(shù)據(jù)格式緊湊,很容易被壓縮。

3.5.3 自定義OutputFormat

如果需求不滿足可按需求進(jìn)行自定義。

  1. 自定義類繼承自FileOutputFormat。
  2. 重寫RecordWriter,改寫具體輸出數(shù)據(jù)的方法write。

3.6 MapReduce 流程

3.6.1 整體流程圖

MapReduce流程

 

MapTask 工作機制

  • Read階段:MapTask 通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
  • Map階段:將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value。
  • Collect收集階段:它會將生成的key/value分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中。
  • Spill階段:先按照分區(qū)進(jìn)行排序,然后區(qū)內(nèi)按照字典對key進(jìn)行快排,并在必要時對數(shù)據(jù)進(jìn)行合并、壓縮等操作。
  • Combine階段:選擇性可進(jìn)行MapTask內(nèi)的優(yōu)化提速。

ReduceTask 工作機制

  • Copy階段:從所有的MapTask中收集結(jié)果然后決定將數(shù)據(jù)放入緩存還是磁盤。
  • Merge階段:copy數(shù)據(jù)時后天會對磁盤還有內(nèi)存數(shù)據(jù)進(jìn)行Merge。
  • Sort階段:ReduceTask需對所有數(shù)據(jù)進(jìn)行一次歸并排序,方便執(zhí)行reduce 函數(shù)。
  • Reduce階段:調(diào)用用戶 reduce() 函數(shù)將計算結(jié)果寫到HDFS上。

3.6.2 Shuffle

Shuffle機制

 

MapReduce 的核心就是 Shuffle 過程,Shuffle 過程是貫穿于 map 和 reduce 兩個過程的!在Map端包括Spill過程,在Reduce端包括copy和sort過程。 具體Shuffle過程如下:

  1. MapTask 收集我們的map()方法輸出的kv對,放到內(nèi)存緩沖區(qū)中。
  2. 從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個文件,溢出前會按照分區(qū)針對key進(jìn)行區(qū)內(nèi)快排。
  3. 多個溢出文件會被合并成大的溢出文件。
  4. 在溢出過程及合并的過程中,都要調(diào)用 Partitioner 進(jìn)行分區(qū)和針對key進(jìn)行排序。
  5. ReduceTask 根據(jù)自己的分區(qū)號,去各個 MapTask 機器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)。
  6. ReduceTask 對收集后的數(shù)據(jù)進(jìn)行合并跟歸并排序。
  7. 進(jìn)入 ReduceTask 的邏輯運算過程,調(diào)用用戶自定義的reduce()方法。
  8. Shuffle 中的緩沖區(qū)大小會影響到 MapReduce 程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快。

3.6.3 Partition

MapReduce 默認(rèn)的分區(qū)方式是hashPartition,在這種分區(qū)方式下,KV 對根據(jù) key 的 hashcode 值與reduceTask個數(shù)進(jìn)行取模,決定該鍵值對該要訪問哪個ReduceTask。

  1. public int getPartition(K2 key, V2 value, int numReduceTasks) { 
  2.     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 
  3.     // numReduceTasks 默認(rèn) = 1 所以導(dǎo)致默認(rèn)的reduce結(jié)果 = 1 
  4.   } 

自定義的時候一般就是類繼承Partitioner然后重寫getPartition 方法。用戶也可以設(shè)置ReduceTask數(shù)量,不過會遵循如下規(guī)則。

  • 如果 ReduceTask 數(shù) > getPartition 數(shù), 會多產(chǎn)生幾個空的輸出part-r-000xx。
  • 如果 1 < ReduceTask < getPartition 數(shù),會有部分?jǐn)?shù)據(jù)無法安放導(dǎo)致報錯。
  • 如果ReduceTask = 1,不管MapTask端輸出多少分區(qū)文件結(jié)果都是一個文件。
  • 分區(qū)必須從0開始,逐步累加。

比如 假設(shè)自定義分區(qū)數(shù)為5。

  • job.setNumReduceTasks(1):會正常運行,只不過會產(chǎn)生一個輸出文件。
  • job.setNumReduceTasks(2):會報錯。
  • job.setNumReduceTasks(6):大于5,程序會正常運行,會產(chǎn)生空文件。

3.6.4 環(huán)形緩沖區(qū)

Map 的輸出結(jié)果由 Collector 處理,每個 Map 任務(wù)不斷地將鍵值對輸出到在內(nèi)存中構(gòu)造的一個環(huán)形數(shù)據(jù)結(jié)構(gòu)中。使用環(huán)形數(shù)據(jù)結(jié)構(gòu)是為了更有效地使用內(nèi)存空間,在內(nèi)存中放置盡可能多的數(shù)據(jù)。

環(huán)形數(shù)據(jù)結(jié)構(gòu)其實就是個字節(jié)數(shù)組byte[],叫kvbuffer,默認(rèn)值100M。里面主要存儲 數(shù)據(jù) 跟 元數(shù)據(jù)。中間有個分界點,并且分界點是變化的。當(dāng)環(huán)形緩沖區(qū)寫入的buffer的大小達(dá)到 80% 滿足溢寫條件的時候,開始溢寫spill。系統(tǒng)有兩個線程一個負(fù)責(zé)寫入數(shù)據(jù),一個負(fù)責(zé)spill數(shù)據(jù)。

數(shù)據(jù):

存儲 Key + Value + bufindex。其中 bufindex(即數(shù)據(jù)的存儲方向)是一直悶著頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之后,bufindex增長為4,一個Int型的value寫完之后,bufindex增長為8。

元數(shù)據(jù):

元數(shù)據(jù)是為了排序而生,是關(guān)于數(shù)據(jù)描述的數(shù)據(jù)。

Kvmeta = Partition + keystart + valstart + valLength , 共占用4個Int長度,其中K的長度 = V的起點 - K的起點。

Kvmeta 的存放指針 Kvindex 每次都是向下跳四個 格子,然后再向上一個格子一個格子地填充四元組的數(shù)據(jù)。比如Kvindex初始位置是-4,當(dāng)?shù)谝粋€鍵值對寫完之后,(Kvindex+0)的位置存放partition的起始位置、(Kvindex+1)的位置存放keystart、(Kvindex+2)的位置存放valstart、(Kvindex+3)的位置存放value length,然后Kvindex跳到 -8位置,等第二個鍵值對和索引寫完之后,Kvindex跳到-12位置。

  1. kvmeta.put(kvindex + PARTITION, partition); 
  2. 2kvmeta.put(kvindex + KEYSTART, keystart); 
  3. 3kvmeta.put(kvindex + VALSTART, valstart); 
  4. 4kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); 
  5. 5// advance kvindex 改變每次index的值 每次4個位置! 
  6. 6kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity(); 

環(huán)形緩沖區(qū)

 

3.6.5 Combiner 合并

  • Combiner 是 MR 程序中 Mapper 跟 Reducer 之外的組件。
  • Combiner 是在每一個MapTask 所在節(jié)點運行,Reducer 是接受全部 Mapper 輸出結(jié)果。
  • Combiner 屬于局部匯總的意思,來減少網(wǎng)絡(luò)傳輸。
  • Combiner 用的時候要注意不能影響最終業(yè)務(wù)邏輯!比如求平均值就不能用。求和就OK。

3.6.6 關(guān)于 MapReduce 排序

MapReduce框架最重要的操作就是排序,MapTask 跟 ReduceTask 都會根據(jù)key進(jìn)行按照字典順序進(jìn)行快排。

MapTask 將緩沖區(qū)數(shù)據(jù)快排后寫入到磁盤,然后磁盤文件會進(jìn)行歸并排序。

ReduceTask統(tǒng)一對內(nèi)存跟磁盤所有數(shù)據(jù)進(jìn)行歸并排序。

3.6.7 ReduceJoin 跟 MapJoin

Reducejoin

思路:通過將關(guān)聯(lián)條件作為Map 輸出的 Key,將兩表滿足 Join條件的數(shù)據(jù)并攜帶數(shù)據(jù)源文件發(fā)送同一個ReduceTask,在Reduce端進(jìn)行數(shù)據(jù)串聯(lián)信息合并。

缺點:合并操作在Reduce端完成,Reduce 端處理壓力太大,并且Reduce端易產(chǎn)生數(shù)據(jù)傾斜。

MapJoin

適用:適用于一張表十分小、一張表很大的場景。

思路:在 Map 端緩存多張表,提前處理業(yè)務(wù)邏輯,這樣增加 Map 端業(yè)務(wù),減少 Reduce 端數(shù)據(jù)的壓力,盡可能的減少數(shù)據(jù)傾斜。

3.6.8 注意點

ReduceTask = 0 說明沒有Reduce節(jié)點,輸出文件個數(shù)和 Map 個數(shù)一樣。

ReduceTask 默認(rèn)= 1,所以結(jié)果是一個文件。

ReduceTask 的個數(shù)不是任意設(shè)置的,需跟集群性能還有結(jié)果需求而定。

邏輯處理 Mapper 時候可根據(jù)業(yè)務(wù)需求實現(xiàn)其中三個方法,map、setup、cleanup。

3.7 壓縮

壓縮是提高Hadoop運行效率的一種優(yōu)化策略,通過在Mapper、Reducer運行過程的數(shù)據(jù)進(jìn)行壓縮來減少磁盤空間跟網(wǎng)絡(luò)傳輸,最終實現(xiàn)提高M(jìn)R運行速度。但需注意壓縮也給CPU運算帶來了負(fù)擔(dān)。

壓縮的基本原則:

運算密集型任務(wù) ,少壓縮。

IO密集型任務(wù),多壓縮。

壓縮格式 自帶 算法 擴展名 可切分嗎 壓縮后,代碼修改
DEFLATE DEFLATE .deflate 不需要修改
Gzip DEFLATE .gz 不需要修改
bzip2 bzip2 .bz2 不需要修改
Snappy Snappy .snappy 不需要修改
LZO LZO .lzo 需要建索引
還需要指定輸入格式

4 YARN

Yarn 是一個資源調(diào)度平臺,負(fù)責(zé)為運算程序提供服務(wù)器運算資源,相當(dāng)于一個分布式的操作系統(tǒng)平臺,而 MapReduce 等運算程序則相當(dāng)于運行于操作系統(tǒng)之上的應(yīng)用程序。

4.1 基本組成

Yarn架構(gòu)

 

YARN主要由 ResourceManager、NodeManager、ApplicationMaster 和 Container 等組件構(gòu)成。

ResourceManager

處理客戶端請求

監(jiān)控NodeMananger

啟動或監(jiān)控ApplicationMaster

計算資源的分配跟調(diào)度

NodeManager

管理單個節(jié)點上資源

處理來著ResourceManager的命令

處理來自ApplicationMaster的命令

ApplicationMaster

負(fù)責(zé)數(shù)據(jù)切分。

為應(yīng)用程序申請資源并分配給內(nèi)部任務(wù)。

任務(wù)的監(jiān)控跟容錯。

Container

Container 是 YARN 中資源的抽象,封裝了某個節(jié)點上的多維度資源,比如內(nèi)存、CPU、磁盤、網(wǎng)絡(luò)等。

YarnChild 其實它就是一個運行程序的進(jìn)程。MrAppMaster 運行程序時向 Resouce Manager 請求的 Maptask / ReduceTask。

4.2 Yarn 調(diào)度 MapReduce 任務(wù)

Yarn調(diào)度流程

 

當(dāng) MR 程序提交到客戶端所在的節(jié)點時后 大致運行流程如下:

作業(yè)提交

Client 調(diào)用 job.waitForCompletion 方法 YarnRunner ,向整個集群提交MapReduce作業(yè)。Client 向 RM 申請一個作業(yè)id。

RM 給 Client 返回該 job 資源的提交路徑和作業(yè) id。

Client 提交jar包、切片信息和配置文件到指定的資源提交路徑。

Client 提交完資源后,向 RM 申請運行 MrAppMaster。

作業(yè)初始化

當(dāng) RM 收到 Client 的請求后,將該 Task 添加到容量調(diào)度器中。

某一個空閑的 NodeManager 領(lǐng)取到該 Task 。

該 NodeManager 創(chuàng)建 Container,并產(chǎn)生 MRAppMaster。

下載 Client 提交的資源 到本地。

任務(wù)分配

MRAppMaster 向 RM 申請運行多個 MapTask 任務(wù)資源。

RM 將運行 MapTask 任務(wù)分配給倆 NodeManager。其中分配原則 是優(yōu)先 jar 跟 數(shù)據(jù)在一臺機器上,其次就盡可能在一個機房。最后 隨便來個空閑機器。

任務(wù)運行

MR 向兩個接收到任務(wù)的 NodeManager 發(fā)送程序啟動腳本,這兩個 NodeManager 分別啟動MapTask,MapTask 對數(shù)據(jù)分區(qū)排序。

MrAppMaster 等待所有 MapTask 運行完畢后,向RM申請容器 運行ReduceTask。

ReduceTask 向 MapTask 獲取相應(yīng)分區(qū)的數(shù)據(jù)。

程序運行完畢后,MR會向RM申請注銷自己。

進(jìn)度和狀態(tài)更新

YARN 中的任務(wù)將其進(jìn)度和狀態(tài)(包括counter)返回給應(yīng)用管理器, 客戶端每秒向應(yīng)用管理器請求進(jìn)度更新來展示給用戶。

作業(yè)完成

除了向應(yīng)用管理器請求作業(yè)進(jìn)度外, 客戶端每5秒都會通過調(diào)用 waitForCompletion() 來檢查作業(yè)是否完成。作業(yè)完成之后, 應(yīng)用管理器和Container會清理工作狀態(tài)。作業(yè)的信息會被作業(yè)歷史服務(wù)器存儲以備之后用戶核查。

4.3 資源調(diào)度器

目前,Hadoop作業(yè)調(diào)度器主要有三種:FIFO、Capacity Scheduler 和 Fair Scheduler。Hadoop2.7.2默認(rèn)的資源調(diào)度器是Capacity Scheduler。

4.3.1 FIFO

FIFO調(diào)度

 

4.3.2 容量調(diào)度器 Capacity Scheduler

容量調(diào)度器

 

  • 支持多個隊列,每個隊列配置一定資源,每個隊列采用FIFO策略。
  • 為防止同一個童虎作業(yè)獨占隊列資源,會對同一用戶提交作業(yè)所占資源量限制。
  • 計算每個隊列中在跑任務(wù)數(shù)與其應(yīng)該分得的計算只有比值,選擇個比值最小的隊列(最閑的)。
  • 按照作業(yè)優(yōu)先級跟提交時間,同時還考慮用戶資源限制跟內(nèi)存限制對隊列任務(wù)排序。
  • 比如job1、job2、job3分配排在最前面也是并行運行。

4.3.3 公平調(diào)度器 Fair Scheduler

支持多隊列多用戶,每個隊列中資源可以配置,同一隊列中作業(yè)公平共享隊列中所有資源。

公平調(diào)度器

 

比如有queue1、queue2、queue3三個任務(wù)隊列,每個隊列中的job按照優(yōu)先級分配資源,優(yōu)先級高獲得資源多,但會確保每個任務(wù)被分配到資源。

每個任務(wù)理想所需資源跟實際獲得資源的差距叫缺額,同一個隊列中是按照缺額高低來先后執(zhí)行的,缺額越大越優(yōu)先獲得資源。

4.4 任務(wù)推測執(zhí)行

作業(yè)完成時間取決于最慢的任務(wù)完成時間。系統(tǒng)中有99%的Map任務(wù)都完成了,只有少數(shù)幾個Map老是進(jìn)度很慢,此時系統(tǒng)會發(fā)現(xiàn)拖后腿的任務(wù),比如某個任務(wù)運行速度遠(yuǎn)慢于任務(wù)平均速度。為拖后腿任務(wù)啟動一個備份任務(wù),同時運行。誰先運行完,則采用誰的結(jié)果。

5 MapReduce 優(yōu)化方法

MapReduce優(yōu)化方法主要從六個方面考慮:數(shù)據(jù)輸入、Map階段、Reduce階段、IO傳輸、數(shù)據(jù)傾斜問題和常用的調(diào)優(yōu)參數(shù)。

5.1 數(shù)據(jù)輸入

數(shù)據(jù)采集時,用 Hadoop Archive 將多個小文件打包成一個Har文件。

業(yè)務(wù)處理前,SequenceFile 由一系列KV組成,key=文件名,value=文件內(nèi)容,將大批小文件合并成大文件。

在 MapReduce 處理時,采用CombineTextInputFormat來作為輸入,解決輸入端大量小文件場景。

對于大量小文件任務(wù)開啟JVM 重用可提速,JVM 重用可以使得 JVM 實例在同一個 job 中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中進(jìn)行配置,通常在10-20之間。

5.2 Map 階段

減少溢寫 Spill 次數(shù),調(diào)整循環(huán)緩存區(qū)大小,減少磁盤IO。

減少合并 Merge 次數(shù),增大Merge文件大小減少次數(shù)。

在不影響業(yè)務(wù)的情況下在Map端進(jìn)行Combine處理。

5.3 Reduce 階段

設(shè)置合理的Map跟REduce數(shù),太少會導(dǎo)致Task等待。太多會導(dǎo)致競爭資源激烈。

設(shè)置Map跟Reduce階段共存,map運行一定程度后Reduce 也可以運行。

規(guī)避使用Reduce,Reduce 端的Buffer也要合理設(shè)置,盡量防止溢寫到磁盤。

5.4 IO 傳輸

采用數(shù)據(jù)壓縮方式來減少網(wǎng)絡(luò)IO時間。

使用SequenceFile二進(jìn)制文件。

5.5 數(shù)據(jù)傾斜

通過對數(shù)據(jù)抽樣得到結(jié)果集來設(shè)置分區(qū)邊界值。

自定義分區(qū)。

使用Combine來減少數(shù)據(jù)傾斜。

采用MapJoin,盡量避免ReduceJoin。

參考

 

HDFS-Shell 指令:http://hadoop.apache.org/docs/r1.0.4/cn/hdfs_shell.html

 

責(zé)任編輯:武曉燕 來源: sowhat1412
相關(guān)推薦

2022-03-03 10:49:46

Python自動追蹤代碼

2021-09-18 14:51:10

平安金融云可信云

2018-08-06 16:08:21

三星SSD

2017-08-14 11:01:08

iWeb

2017-05-26 13:59:29

大數(shù)據(jù)數(shù)字天網(wǎng)通信運營

2025-04-15 19:52:04

2017-08-11 18:30:22

羅技耳機

2018-04-03 17:08:08

程序員技能面試

2011-05-10 10:53:04

綜合布線線纜

2013-10-18 17:19:30

Windows 8.1PC

2019-01-11 09:59:21

超融合

2017-11-22 09:14:50

HPC數(shù)據(jù)超算

2019-06-21 14:25:07

HadoopCTO數(shù)據(jù)庫

2018-06-29 13:10:02

阿里巴巴監(jiān)控系統(tǒng)人工智能
點贊
收藏

51CTO技術(shù)棧公眾號