MapReduce連接:復(fù)制連接
如圖4.5所示,MapReduce復(fù)制連接工作原理如下:
-
使用分布式緩存(Districubted cache)將這個小數(shù)據(jù)集復(fù)制到所有運行map任務(wù)的節(jié)點。
-
用各個map任務(wù)初始化方法將這個小數(shù)據(jù)集裝載到一個哈希表(hashtable)中。
-
逐條用大數(shù)據(jù)集中的記錄遍歷這個哈希表,逐個判斷是否符合連接條件。
-
輸出符合連接條件的結(jié)果。
復(fù)制連接的實現(xiàn)非常直接明了。更具體的內(nèi)容可以參考《Hadoop in Action》。附錄D提供了一個通用的框架來實現(xiàn)復(fù)制連接。這個框架支持任意類型的InputFormat和OutputFormat的數(shù)據(jù)。(我們將在下一個技術(shù)中使用這個框架。)復(fù)制連接框架根據(jù)內(nèi)存足跡的大小從分布式緩存的內(nèi)容和輸入塊(input split)兩者中動態(tài)地決定需要緩存的對象。
如果所有的輸入數(shù)據(jù)集都不能夠小到可以放到緩存中,那有沒有辦法來優(yōu)化map端連接呢?那就到了看半連接(semi-join)的時間了。
附錄D.2 復(fù)制連接框架
復(fù)制連接是map端連接,得名于它的具體實現(xiàn):連接中最小的數(shù)據(jù)集將會被復(fù)制到所有的map主機節(jié)點。復(fù)制連接的實現(xiàn)非常直接明了。更具體的內(nèi)容可以參考Chunk Lam的《Hadoop in Action》。
這個部分的目標(biāo)是:創(chuàng)建一個可以支持任意類型的數(shù)據(jù)集的通用的復(fù)制連接框架。這個框架中提供了一個優(yōu)化的小功能:動態(tài)監(jiān)測分布式緩存內(nèi)容和輸入塊的大小,并判斷哪個更大。如果輸入塊較小,那么你就需要將map的輸入塊放到內(nèi)存緩沖中,然后在map的cleanup方法中執(zhí)行連接操作了。
圖D.4是這個框架的類圖,這里提供了連接類(GenericReplicatedJoin)的具體實現(xiàn),而不僅僅是一個抽象類。在這個框架外,這個類將和KeyValueTextInputFormat及TextOutputFormat協(xié)作。它的一個假設(shè)前提是:每個數(shù)據(jù)文件的***個標(biāo)記是連接鍵。此外,連接類也可以被繼承擴展來支持任意類型的輸入和輸出。
圖D.5是連接框架的算法。Map的setup方法判斷在map的輸入塊和分布式緩存中的內(nèi)容哪個大。如果分布式緩存的內(nèi)容比較小,那么它將被裝載到內(nèi)存緩存中。然后在Map函數(shù)開始連接操作。如果輸入塊比較小,map函數(shù)將輸入塊的鍵\值對裝載到內(nèi)存緩存中。Map的cleanup方法將從分布式緩存中讀取記錄,逐條記錄和在內(nèi)存緩存中的鍵\值對進行連接操作。
以下代碼是GenericReplicatedJoin類中setup方法。它在map的初始化階段被調(diào)用的。這個方法判斷分布式緩存中的文件和輸入塊哪個大。如果文件比較小,則將文件裝載到HashMap中。
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- distributedCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
- int distCacheSizes = 0;
- for (Path distFile : distributedCacheFiles) {
- File distributedCacheFile = new File(distFile.toString());
- distCacheSizes += distributedCacheFile.length();
- }
- if(context.getInputSplit() instanceof FileSplit) {
- FileSplit split = (FileSplit) context.getInputSplit();
- long inputSplitSize = split.getLength();
- distributedCacheIsSmaller = (distCacheSizes < inputSplitSize);
- } else {
- distributedCacheIsSmaller = true;
- }
- if (distributedCacheIsSmaller) {
- for (Path distFile : distributedCacheFiles) {
- File distributedCacheFile = new File(distFile.toString());
- DistributedCacheFileReader reader = getDistributedCacheReader();
- reader.init(distributedCacheFile);
- for (Pair p : (Iterable<Pair>) reader) {
- addToCache(p);
- }
- reader.close();
- }
- }
- }
根據(jù)setup方法是否將分布式緩存的內(nèi)容裝載到內(nèi)存的緩存中,Map方法將會有不同的行為。如果分布式緩存中的內(nèi)容被裝載到內(nèi)存中,那么map方法就將輸入塊的記錄和內(nèi)存中的緩存做連接操作。如果分布式緩存中的內(nèi)容沒有被裝載到內(nèi)存中,那么map方法就將輸入塊的記錄裝載到內(nèi)存中,然后在cleanup方法中使用。
- @Override
- protected void map(Object key, Object value, Context context)
- throws IOException, InterruptedException {
- Pair pair = readFromInputFormat(key, value);
- if (distributedCacheIsSmaller) {
- joinAndCollect(pair, context);
- } else {
- addToCache(pair);
- }
- }
- public void joinAndCollect(Pair p, Context context)
- throws IOException, InterruptedException {
- List<Pair> cached = cachedRecords.get(p.getKey());
- if (cached != null) {
- for (Pair cp : cached) {
- Pair result;
- if (distributedCacheIsSmaller) {
- result = join(p, cp);
- } else {
- result = join(cp, p);
- }
- if (result != null) {
- context.write(result.getKey(), result.getData());
- }
- }
- }
- }
- public Pair join(Pair inputSplitPair, Pair distCachePair) {
- StringBuilder sb = new StringBuilder();
- if (inputSplitPair.getData() != null) {
- sb.append(inputSplitPair.getData());
- }
- sb.append("\t");
- if (distCachePair.getData() != null) {
- sb.append(distCachePair.getData());
- }
- return new Pair<Text, Text>(
- new Text(inputSplitPair.getKey().toString()),
- new Text(sb.toString()));
- }
當(dāng)所有的記錄都被傳輸給map方法后,MapReduce將會調(diào)用cleanup方法。如果分布式緩存中的內(nèi)容比輸入塊大,連接將會在cleanup中進行。連接的對象是map函數(shù)的緩存中的輸入塊的記錄和分布式緩存中的記錄。
- @Override
- protected void cleanup(Context context)
- throws IOException, InterruptedException {
- if (!distributedCacheIsSmaller) {
- for (Path distFile : distributedCacheFiles) {
- File distributedCacheFile = new File(distFile.toString());
- DistributedCacheFileReader reader = getDistributedCacheReader();
- reader.init(distributedCacheFile);
- for (Pair p : (Iterable<Pair>) reader) {
- joinAndCollect(p, context);
- }
- reader.close();
- }
- }
- }
***,作業(yè)的驅(qū)動代碼必須指定需要裝載到分布式緩存中的文件。以下的代碼可以處理一個文件,也可以處理MapReduce輸入結(jié)果的一個目錄。
- Configuration conf = new Configuration();
- FileSystem fs = smallFilePath.getFileSystem(conf);
- FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath);
- if(smallFilePathStatus.isDir()) {
- for(FileStatus f: fs.listStatus(smallFilePath)) {
- if(f.getPath().getName().startsWith("part")) {
- DistributedCache.addCacheFile(f.getPath().toUri(), conf);
- }
- }
- } else {
- DistributedCache.addCacheFile(smallFilePath.toUri(), conf);
- }
這個框架假設(shè)分布式緩存中的內(nèi)容和輸入塊的內(nèi)容都可以被裝載到內(nèi)存中。它的優(yōu)點在于兩個數(shù)據(jù)集之中較小的才會裝載到內(nèi)存中。
在論文《A Comparison of Join Algorithms for Log Processing in MapReduce》中,針對對于分布式緩存中的內(nèi)容較大時的場景對這個方法進行了更多的優(yōu)化。在他們的優(yōu)化中,他們將分布式緩存分成N個分區(qū),并將輸入塊放入N個哈希表。然后在cleanup方法中的優(yōu)化就更加高效。
在map端的復(fù)制連接的問題在于,map任務(wù)必須在啟動時讀取分布式緩存。上述論文提到的另一個優(yōu)化方案是重載FileInputFormat的splitting。將存在于同一個主機上的輸入塊合并成一個塊。然后就可以減少需要裝載分布式緩存的map任務(wù)的個數(shù)了。
***一個說明,Hadoop在org.apache.hadoop.mapred.join包中自帶了map端的連接。但是它需要有序的待連接的數(shù)據(jù)集的輸入文件,并要求將其分發(fā)到相同的分區(qū)中。這樣就造成了繁重的預(yù)處理工作。
原文鏈接:http://www.cnblogs.com/datacloud/p/3579333.html