偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

MapReduce連接:復(fù)制連接

數(shù)據(jù)庫
復(fù)制連接是map端的連接。復(fù)制連接得名于它的具體實現(xiàn):連接中最小的數(shù)據(jù)集將會被復(fù)制到所有的map主機節(jié)點。復(fù)制連接有一個假設(shè)前提:在被連接的數(shù)據(jù)集中,有一個數(shù)據(jù)集足夠小到可以緩存在內(nèi)存中。

如圖4.5所示,MapReduce復(fù)制連接工作原理如下:

  1. 使用分布式緩存(Districubted cache)將這個小數(shù)據(jù)集復(fù)制到所有運行map任務(wù)的節(jié)點。

  2. 用各個map任務(wù)初始化方法將這個小數(shù)據(jù)集裝載到一個哈希表(hashtable)中。

  3. 逐條用大數(shù)據(jù)集中的記錄遍歷這個哈希表,逐個判斷是否符合連接條件。

  4. 輸出符合連接條件的結(jié)果。

 

 

復(fù)制連接的實現(xiàn)非常直接明了。更具體的內(nèi)容可以參考《Hadoop in Action》。附錄D提供了一個通用的框架來實現(xiàn)復(fù)制連接。這個框架支持任意類型的InputFormat和OutputFormat的數(shù)據(jù)。(我們將在下一個技術(shù)中使用這個框架。)復(fù)制連接框架根據(jù)內(nèi)存足跡的大小從分布式緩存的內(nèi)容和輸入塊(input split)兩者中動態(tài)地決定需要緩存的對象。

如果所有的輸入數(shù)據(jù)集都不能夠小到可以放到緩存中,那有沒有辦法來優(yōu)化map端連接呢?那就到了看半連接(semi-join)的時間了。

附錄D.2 復(fù)制連接框架 

復(fù)制連接是map端連接,得名于它的具體實現(xiàn):連接中最小的數(shù)據(jù)集將會被復(fù)制到所有的map主機節(jié)點。復(fù)制連接的實現(xiàn)非常直接明了。更具體的內(nèi)容可以參考Chunk Lam的《Hadoop in Action》。 

這個部分的目標(biāo)是:創(chuàng)建一個可以支持任意類型的數(shù)據(jù)集的通用的復(fù)制連接框架。這個框架中提供了一個優(yōu)化的小功能:動態(tài)監(jiān)測分布式緩存內(nèi)容和輸入塊的大小,并判斷哪個更大。如果輸入塊較小,那么你就需要將map的輸入塊放到內(nèi)存緩沖中,然后在map的cleanup方法中執(zhí)行連接操作了。 

圖D.4是這個框架的類圖,這里提供了連接類(GenericReplicatedJoin)的具體實現(xiàn),而不僅僅是一個抽象類。在這個框架外,這個類將和KeyValueTextInputFormat及TextOutputFormat協(xié)作。它的一個假設(shè)前提是:每個數(shù)據(jù)文件的***個標(biāo)記是連接鍵。此外,連接類也可以被繼承擴展來支持任意類型的輸入和輸出。

圖D.5是連接框架的算法。Map的setup方法判斷在map的輸入塊和分布式緩存中的內(nèi)容哪個大。如果分布式緩存的內(nèi)容比較小,那么它將被裝載到內(nèi)存緩存中。然后在Map函數(shù)開始連接操作。如果輸入塊比較小,map函數(shù)將輸入塊的鍵\值對裝載到內(nèi)存緩存中。Map的cleanup方法將從分布式緩存中讀取記錄,逐條記錄和在內(nèi)存緩存中的鍵\值對進行連接操作。

 

以下代碼是GenericReplicatedJoin類中setup方法。它在map的初始化階段被調(diào)用的。這個方法判斷分布式緩存中的文件和輸入塊哪個大。如果文件比較小,則將文件裝載到HashMap中。

  1. @Override 
  2. protected void setup(Context context) 
  3.     throws IOException, InterruptedException { 
  4.      
  5.     distributedCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
  6.     int distCacheSizes = 0; 
  7.      
  8.     for (Path distFile : distributedCacheFiles) { 
  9.         File distributedCacheFile = new File(distFile.toString()); 
  10.         distCacheSizes += distributedCacheFile.length(); 
  11.     } 
  12.      
  13.     if(context.getInputSplit() instanceof FileSplit) { 
  14.         FileSplit split = (FileSplit) context.getInputSplit(); 
  15.         long inputSplitSize = split.getLength(); 
  16.         distributedCacheIsSmaller = (distCacheSizes < inputSplitSize); 
  17.     } else { 
  18.         distributedCacheIsSmaller = true
  19.     } 
  20.      
  21.     if (distributedCacheIsSmaller) { 
  22.         for (Path distFile : distributedCacheFiles) { 
  23.             File distributedCacheFile = new File(distFile.toString()); 
  24.             DistributedCacheFileReader reader = getDistributedCacheReader(); 
  25.             reader.init(distributedCacheFile); 
  26.              
  27.             for (Pair p : (Iterable<Pair>) reader) { 
  28.                 addToCache(p); 
  29.             } 
  30.              
  31.             reader.close(); 
  32.         } 
  33.     } 

根據(jù)setup方法是否將分布式緩存的內(nèi)容裝載到內(nèi)存的緩存中,Map方法將會有不同的行為。如果分布式緩存中的內(nèi)容被裝載到內(nèi)存中,那么map方法就將輸入塊的記錄和內(nèi)存中的緩存做連接操作。如果分布式緩存中的內(nèi)容沒有被裝載到內(nèi)存中,那么map方法就將輸入塊的記錄裝載到內(nèi)存中,然后在cleanup方法中使用。

  1. @Override 
  2. protected void map(Object key, Object value, Context context) 
  3.     throws IOException, InterruptedException { 
  4.     Pair pair = readFromInputFormat(key, value); 
  5.      
  6.     if (distributedCacheIsSmaller) { 
  7.         joinAndCollect(pair, context); 
  8.     } else { 
  9.         addToCache(pair); 
  10.     } 
  11.  
  12. public void joinAndCollect(Pair p, Context context) 
  13.     throws IOException, InterruptedException { 
  14.     List<Pair> cached = cachedRecords.get(p.getKey()); 
  15.      
  16.     if (cached != null) { 
  17.         for (Pair cp : cached) { 
  18.             Pair result; 
  19.              
  20.             if (distributedCacheIsSmaller) { 
  21.                 result = join(p, cp); 
  22.             } else { 
  23.                 result = join(cp, p); 
  24.             } 
  25.              
  26.             if (result != null) { 
  27.                 context.write(result.getKey(), result.getData()); 
  28.             } 
  29.         } 
  30.     } 
  31.  
  32. public Pair join(Pair inputSplitPair, Pair distCachePair) { 
  33.     StringBuilder sb = new StringBuilder(); 
  34.      
  35.     if (inputSplitPair.getData() != null) { 
  36.         sb.append(inputSplitPair.getData()); 
  37.     } 
  38.      
  39.     sb.append("\t"); 
  40.      
  41.     if (distCachePair.getData() != null) { 
  42.         sb.append(distCachePair.getData()); 
  43.     } 
  44.      
  45.     return new Pair<Text, Text>( 
  46.                 new Text(inputSplitPair.getKey().toString()), 
  47.                 new Text(sb.toString())); 

當(dāng)所有的記錄都被傳輸給map方法后,MapReduce將會調(diào)用cleanup方法。如果分布式緩存中的內(nèi)容比輸入塊大,連接將會在cleanup中進行。連接的對象是map函數(shù)的緩存中的輸入塊的記錄和分布式緩存中的記錄。

  1. @Override 
  2. protected void cleanup(Context context) 
  3.     throws IOException, InterruptedException { 
  4.      
  5.     if (!distributedCacheIsSmaller) { 
  6.      
  7.         for (Path distFile : distributedCacheFiles) { 
  8.             File distributedCacheFile = new File(distFile.toString()); 
  9.             DistributedCacheFileReader reader = getDistributedCacheReader(); 
  10.             reader.init(distributedCacheFile); 
  11.              
  12.             for (Pair p : (Iterable<Pair>) reader) { 
  13.                 joinAndCollect(p, context); 
  14.             } 
  15.          
  16.             reader.close(); 
  17.         } 
  18.     } 

***,作業(yè)的驅(qū)動代碼必須指定需要裝載到分布式緩存中的文件。以下的代碼可以處理一個文件,也可以處理MapReduce輸入結(jié)果的一個目錄。

  1. Configuration conf = new Configuration(); 
  2.  
  3. FileSystem fs = smallFilePath.getFileSystem(conf); 
  4. FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath); 
  5.  
  6. if(smallFilePathStatus.isDir()) { 
  7.     for(FileStatus f: fs.listStatus(smallFilePath)) { 
  8.         if(f.getPath().getName().startsWith("part")) { 
  9.             DistributedCache.addCacheFile(f.getPath().toUri(), conf); 
  10.         } 
  11.     } 
  12. else { 
  13.     DistributedCache.addCacheFile(smallFilePath.toUri(), conf); 

這個框架假設(shè)分布式緩存中的內(nèi)容和輸入塊的內(nèi)容都可以被裝載到內(nèi)存中。它的優(yōu)點在于兩個數(shù)據(jù)集之中較小的才會裝載到內(nèi)存中。

在論文《A Comparison of Join Algorithms for Log Processing in MapReduce》中,針對對于分布式緩存中的內(nèi)容較大時的場景對這個方法進行了更多的優(yōu)化。在他們的優(yōu)化中,他們將分布式緩存分成N個分區(qū),并將輸入塊放入N個哈希表。然后在cleanup方法中的優(yōu)化就更加高效。

在map端的復(fù)制連接的問題在于,map任務(wù)必須在啟動時讀取分布式緩存。上述論文提到的另一個優(yōu)化方案是重載FileInputFormat的splitting。將存在于同一個主機上的輸入塊合并成一個塊。然后就可以減少需要裝載分布式緩存的map任務(wù)的個數(shù)了。

***一個說明,Hadoop在org.apache.hadoop.mapred.join包中自帶了map端的連接。但是它需要有序的待連接的數(shù)據(jù)集的輸入文件,并要求將其分發(fā)到相同的分區(qū)中。這樣就造成了繁重的預(yù)處理工作。

原文鏈接:http://www.cnblogs.com/datacloud/p/3579333.html

責(zé)任編輯:彭凡 來源: 博客園
相關(guān)推薦

2014-03-18 10:23:11

MapReduce

2017-06-23 22:00:13

MySqlsslcentos

2015-08-21 13:50:49

Oracle連接

2009-07-22 10:53:42

MySQL左連接

2010-05-10 15:48:37

Unix連接

2021-03-24 09:06:01

MySQL長連接短連接

2011-03-28 14:04:10

SQL左連接右連接

2018-06-06 11:01:25

HTTP長連接短連接

2011-06-01 13:54:10

MySQL

2015-04-23 18:46:38

TCPTCP協(xié)議

2010-01-04 09:51:52

ADO連接對象

2010-11-08 15:47:01

SQL Server外

2010-11-11 13:51:36

SQL Server內(nèi)

2010-06-07 15:24:34

Java連接MYSQL

2019-09-16 09:29:01

TCP全連接隊列半連接隊列

2014-01-02 14:04:39

PostgreSQLPerl

2014-01-02 15:41:24

PostgreSQLPHP

2014-01-02 13:22:01

PythonPostgreSQL

2023-01-31 18:09:12

物聯(lián)網(wǎng)移動物聯(lián)網(wǎng)

2010-08-24 09:29:37

內(nèi)連接全連接
點贊
收藏

51CTO技術(shù)棧公眾號