數(shù)據(jù)倉(cāng)庫(kù)中的SQL性能優(yōu)化(Hive篇)
一個(gè)Hive查詢生成多個(gè)map reduce job,一個(gè)map reduce job又有map,reduce,spill,shuffle,sort等多個(gè)階段,所以針對(duì)hive查詢的優(yōu)化可以大致分為針對(duì)M/R中單個(gè)步驟的優(yōu)化,針對(duì)M/R全局的優(yōu)化,和針對(duì)整個(gè)查詢(多M/R job)的優(yōu)化,下文會(huì)分別闡述。
要說明的是,這個(gè)優(yōu)化只是針對(duì)Hive 0.9版本。由于Hortonwork發(fā)起了Stinger項(xiàng)目,Hive后續(xù)版本應(yīng)該能更加快速的響應(yīng)查詢。目前已經(jīng)發(fā)布的Hive 0.11就有不少新feature,比如針對(duì)數(shù)據(jù)倉(cāng)庫(kù)中常用的星型模型的優(yōu)化等等,這些就不在本文的討論范圍之內(nèi)了。
Map階段的優(yōu)化
Map階段的優(yōu)化,主要是確定合適的map數(shù)。那么首先要了解map數(shù)的計(jì)算公式,即:
- num_map_tasks = max[${mapred.min.split.size},
- min(${dfs.block.size}, ${mapred.max.split.size})]
其中mapred.min.split.size指的是數(shù)據(jù)的最小分割單元大小;mapred.max.split.size指的是數(shù)據(jù)的***分割單元大小;dfs.block.size指的是HDFS設(shè)置的數(shù)據(jù)塊大小。
一般來說dfs.block.size這個(gè)值是一個(gè)已經(jīng)指定好的值,而且這個(gè)參數(shù)默認(rèn)情況下hive是識(shí)別不到的(除非在hive-site.xml中明確指定),即:
- hive> set dfs.block.size;
- dfs.block.size is undefined
所以默認(rèn)情況下只有mapred.min.split.size和mapred.max.split.size這兩個(gè)參數(shù)(本節(jié)內(nèi)容后面就以min和max指代這兩個(gè)參數(shù))來決定map數(shù)量。
在hive中min的默認(rèn)值是1B,max的默認(rèn)值是256MB,即:
- hive> set mapred.min.split.size;
- mapred.min.split.size=1
- hive> set mapred.max.split.size;
- mapred.max.split.size=256000000
所以如果不做修改的話,就是1個(gè)map task處理256MB數(shù)據(jù),我們就以調(diào)整max為主。通過調(diào)整max可以起到調(diào)整map數(shù)的作用,減小max可以增加map數(shù),增大max可以減少map數(shù)。需要提醒的是,直接調(diào)整mapred.map.tasks這個(gè)參數(shù)是沒有效果的。
調(diào)整大小的時(shí)機(jī)根據(jù)查詢的不同而不同,總的來講可以通過觀察map task的完成時(shí)間來確定是否需要增加map資源。如果map task的完成時(shí)間都是接近1分鐘,甚至幾分鐘了,那么往往增加map數(shù)量,使得每個(gè)map task處理的數(shù)據(jù)量減少,能夠讓map task更快完成;而如果map task的運(yùn)行時(shí)間已經(jīng)很少了,比如10-20秒,這個(gè)時(shí)候增加map不太可能讓map task更快完成,反而可能因?yàn)閙ap需要的初始化時(shí)間反而讓job總體速度變慢,這個(gè)時(shí)候反而需要考慮是否可以把map的數(shù)量減少,這樣可以節(jié)省更多資源給其他Job。
Reduce階段的優(yōu)化
這里說的reduce階段,是指前面流程圖中的reduce phase(實(shí)際的reduce計(jì)算)而非圖中整個(gè)reduce task。Reduce階段優(yōu)化的主要工作也是選擇合適的reduce task數(shù)量,跟上面的map優(yōu)化類似。
與map優(yōu)化不同的是,reduce優(yōu)化時(shí),可以直接設(shè)置mapred.reduce.tasks參數(shù)從而直接指定reduce的個(gè)數(shù)。當(dāng)然直接指定reduce個(gè)數(shù)雖然比較方便,但是不利于自動(dòng)擴(kuò)展。Reduce數(shù)的設(shè)置雖然相較map更靈活,但是也需要像map一樣設(shè)定一個(gè)自動(dòng)生成規(guī)則,這樣運(yùn)行定時(shí)job的時(shí)候就不用擔(dān)心原來設(shè)置的固定reduce數(shù)會(huì)由于數(shù)據(jù)量的變化而不合適。
Hive估算reduce數(shù)量的時(shí)候,使用的是下面的公式:
- num_reduce_tasks = min(${hive.exec.reducers.max},
- ${input.size} / ${ hive.exec.reducers.bytes.per.reducer})
也就是說,根據(jù)輸入的數(shù)據(jù)量大小來決定reduce的個(gè)數(shù),默認(rèn)hive.exec.reducers. bytes.per.reducer為1G,而且reduce個(gè)數(shù)不能超過一個(gè)上限參數(shù)值,這個(gè)參數(shù)的默認(rèn)取值為999。所以我們以調(diào)整hive.exec.reducers.bytes.per.reducer為主來設(shè)置reduce個(gè)數(shù)。
設(shè)置reduce數(shù)同樣也是根據(jù)運(yùn)行時(shí)間作為參考調(diào)整,并且可以根據(jù)特定的業(yè)務(wù)需求、工作負(fù)載類型總結(jié)出經(jīng)驗(yàn),所以不再贅述。
Map與Reduce之間的優(yōu)化
所謂map和reduce之間,主要有3道工序。首先要把map輸出的結(jié)果進(jìn)行排序后做成中間文件,其次這個(gè)中間文件就能分發(fā)到各個(gè)reduce,***reduce端在執(zhí)行reduce phase之前把收集到的排序子文件合并成一個(gè)排序文件。
***個(gè)階段中,由于內(nèi)存不夠,數(shù)據(jù)可能沒辦法在內(nèi)存中一次性排序完成,那么就只能把局部排序的文件先保存到磁盤上,這個(gè)動(dòng)作叫spill,然后spill出來的多個(gè)文件可以在***進(jìn)行merge。如果發(fā)生spill,可以通過設(shè)置io.sort.mb來增大mapper輸出buffer的大小,避免spill的發(fā)生。另外合并時(shí)可以通過設(shè)置io.sort.factor來使得一次性能夠合并更多的數(shù)據(jù)。調(diào)試參數(shù)的時(shí)候,一個(gè)要看spill的時(shí)間成本,一個(gè)要看merge的時(shí)間成本,還需要注意不要撐爆內(nèi)存(io.sort.mb是算在map的內(nèi)存里面的)。Reduce端的merge也是一樣可以用io.sort.factor。一般情況下這兩個(gè)參數(shù)很少需要調(diào)整,除非很明確知道這個(gè)地方是瓶頸。
關(guān)于文件從map端copy到reduce端,默認(rèn)情況下在5%的map完成的情況下reduce就開始啟動(dòng)copy,這個(gè)有時(shí)候是很浪費(fèi)資源的,因?yàn)閞educe一旦啟動(dòng)就被占用,一直等到map全部完成,收集到所有數(shù)據(jù)才可以進(jìn)行后面的動(dòng)作,所以我們可以等比較多的map完成之后再啟動(dòng)reduce流程,這個(gè)比例可以通過mapred.reduce.slowstart. completed.maps去調(diào)整,他的默認(rèn)值就是5%。如果覺得這么做會(huì)減慢reduce端copy的進(jìn)度,可以把copy過程的線程增大。tasktracker.http.threads可以決定作為server端的map用于提供數(shù)據(jù)傳輸服務(wù)的線程,mapred.reduce.parallel.copies可以決定作為client端的reduce同時(shí)從map端拉取數(shù)據(jù)的并行度(一次同時(shí)從多少個(gè)map拉數(shù)據(jù)),修改參數(shù)的時(shí)候這兩個(gè)注意協(xié)調(diào)一下,server端能處理client端的請(qǐng)求即可。
文件格式的優(yōu)化
文件格式方面有兩個(gè)問題,一個(gè)是給輸入和輸出選擇合適的文件格式,另一個(gè)則是小文件問題。小文件問題在目前的hive環(huán)境下已經(jīng)得到了比較好的解決,hive的默認(rèn)配置中就可以在小文件輸入時(shí)自動(dòng)把多個(gè)文件合并給1個(gè)map處理(當(dāng)然,如果能直接讀取大文件更好),輸出時(shí)如果文件很小也會(huì)進(jìn)行一輪單獨(dú)的合并,所以這里就不專門討論了。相關(guān)的參數(shù)可以在這里找到。
關(guān)于文件格式,Hive中目前主要是3種,textfile,sequencefile和rcfile??傮w上來說,rcfile的壓縮比例和查詢時(shí)間稍好一點(diǎn),所以推薦使用。
關(guān)于使用方法,在建表結(jié)構(gòu)時(shí)可以指定格式,然后指定壓縮插入:
- create table rc_file_test( col int ) stored as rcfile;
- set hive.exec.compress.output = true;
- insert overwrite table rc_file_test
- select * from source_table;
另外create table as select時(shí)也可以指定輸出格式,這個(gè)時(shí)候就要通過hive.default. fileformat來設(shè)定:
- set hive.default.fileformat = SequenceFile;
- set hive.exec.compress.output = true;
- set mapred.output.compression.type = BLOCK; /*對(duì)于sequence file,壓縮方式有record和block兩種可選擇,block壓縮比更高*/
- insert overwrite table seq_file_test
- select * from source_table;
***要說的是,sequencefile和rcfile都是不支持空表要導(dǎo)入本地?cái)?shù)據(jù)的,但是textfile格式的表可以支持文本在本地壓縮完成之后直接以壓縮格式導(dǎo)入,具體的做法可以看這里的詳細(xì)介紹。
Job整體優(yōu)化
有一些問題必須從job的整體角度去觀察。這里討論幾個(gè)問題:Job執(zhí)行模式(本地執(zhí)行v.s.分布式執(zhí)行)、索引、Join算法、以及數(shù)據(jù)傾斜。
Job執(zhí)行模式
Hadoop的map reduce job可以有3種模式執(zhí)行,即本地模式,偽分布式,還有真正的分布式。本地模式和偽分布式都是在最初學(xué)習(xí)hadoop的時(shí)候往往被說成是做單機(jī)開發(fā)的時(shí)候用到。但是實(shí)際上對(duì)于處理數(shù)據(jù)量非常小的job,直接啟動(dòng)分布式j(luò)ob會(huì)消耗大量資源,而真正執(zhí)行計(jì)算的時(shí)間反而非常少。這個(gè)時(shí)候就應(yīng)該使用本地模式執(zhí)行mr job,這樣執(zhí)行的時(shí)候不會(huì)啟動(dòng)分布式j(luò)ob,執(zhí)行速度就會(huì)快很多。比如一般來說啟動(dòng)分布式j(luò)ob,無論多小的數(shù)據(jù)量,執(zhí)行時(shí)間一般不會(huì)少于20s,而使用本地mr模式,10秒左右就能出結(jié)果。
設(shè)置執(zhí)行模式的主要參數(shù)有三個(gè),一個(gè)是hive.exec.mode.local.auto,把他設(shè)為true就能夠自動(dòng)開啟local mr模式。但是這還不足以啟動(dòng)local mr,輸入的文件數(shù)量和數(shù)據(jù)量大小必須要控制,這兩個(gè)參數(shù)分別為hive.exec.mode.local.auto.tasks.max和hive.exec.mode.local.auto.inputbytes.max,默認(rèn)值分別為4和128MB,即默認(rèn)情況下,map處理的文件數(shù)不超過4個(gè)并且總大小小于128MB就啟用local mr模式。
索引
總體上來說,hive的索引目前還是一個(gè)不太適合使用的東西,這里只是考慮到敘述完整性,對(duì)其進(jìn)行基本的介紹。
Hive中的索引架構(gòu)開放了一個(gè)接口,允許你根據(jù)這個(gè)接口去實(shí)現(xiàn)自己的索引。目前hive自己有一個(gè)參考的索引實(shí)現(xiàn)(CompactIndex),后來在0.8版本中又加入位圖索引。這里就講講CompactIndex。
CompactIndex的實(shí)現(xiàn)原理類似一個(gè)lookup table,而非傳統(tǒng)數(shù)據(jù)庫(kù)中的B樹。如果你對(duì)table A的col1做了索引,索引文件本身就是一個(gè)table,這個(gè)table會(huì)有3列,分別是col1的枚舉值,每個(gè)值對(duì)應(yīng)的數(shù)據(jù)文件位置,以及在這個(gè)文件位置中的偏移量。通過這種方式,可以減少你查詢的數(shù)據(jù)量(偏移量可以告訴你從哪個(gè)位置開始找,自然只需要定位到相應(yīng)的block),起到減少資源消耗的作用。但是就其性能來說,并沒有很大的改善,很可能還不如構(gòu)建索引需要花的時(shí)間。所以在集群資源充足的情況下,沒有太大必要考慮索引。
CompactIndex的還有一個(gè)缺點(diǎn)就是使用起來不友好,索引建完之后,使用之前還需要根據(jù)查詢條件做一個(gè)同樣剪裁才能使用,索引的內(nèi)部結(jié)構(gòu)完全暴露,而且還要花費(fèi)額外的時(shí)間。具體看看下面的使用方法就了解了:
- /*在index_test_table表的id字段上創(chuàng)建索引*/
- create index idx on table index_test_table(id)
- as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
- with deferred rebuild;
- alter index idx on index_test_table rebuild;
- /*索引的剪裁。找到上面建的索引表,根據(jù)你最終要用的查詢條件剪裁一下。如果你想跟RDBMS一樣建完索引就用,那是不行的,會(huì)直接報(bào)錯(cuò),這也是其麻煩的地方。*/
- create table my_index
- as select `_bucketname`, `_offsets`
- from default__index_test_table_idx__ where id = 10;
- /*現(xiàn)在可以用索引了,注意最終查詢條件跟上面的剪裁條件一致*/
- set hive.index.compact.file = /user/hive/warehouse/my_index;
- set hive.input.format = org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat;
- select count(*) from index_test_table where id = 10;
Join算法
處理分布式j(luò)oin,一般有兩種方法。一種是replication join:把其中一個(gè)表復(fù)制到所有節(jié)點(diǎn),這樣另一個(gè)表在每個(gè)節(jié)點(diǎn)上面的分片就可以跟這個(gè)完整的表join了;另一種方法是repartition join:把兩份數(shù)據(jù)按照join key進(jìn)行hash重分布,讓每個(gè)節(jié)點(diǎn)處理hash值相同的join key數(shù)據(jù),也就是做局部的join。這兩種方式在M/R Job中分別對(duì)應(yīng)了map side join和reduce side join。在一些MPP DB中,數(shù)據(jù)可以按照某列字段預(yù)先進(jìn)行hash分布,這樣在跟這個(gè)表以這個(gè)字段為join key進(jìn)行join的時(shí)候,該表肯定不需要做數(shù)據(jù)重分布了,這種功能是以HDFS作為底層文件系統(tǒng)的hive所沒有的。
在默認(rèn)情況下,hive的join策略是進(jìn)行reduce side join。當(dāng)兩個(gè)表中有一個(gè)是小表的時(shí)候,就可以考慮用map join了,因?yàn)樾”韽?fù)制的代價(jià)會(huì)好過大表shuffle的代價(jià)。使用map join的配置方法有兩種,一種直接在sql中寫hint,語(yǔ)法是/*+MAPJOIN (tbl)*/,其中tbl就是你想要做replication的表。另一種方法是設(shè)置hive.auto.convert.join = true,這樣hive會(huì)自動(dòng)判斷當(dāng)前的join操作是否合適做map join,主要是找join的兩個(gè)表中有沒有小表。至于多大的表算小表,則是由hive.smalltable.filesize決定,默認(rèn)25MB。
但是有的時(shí)候,沒有一個(gè)表足夠小到能夠放進(jìn)內(nèi)存,但是還是想用map join怎么辦?這個(gè)時(shí)候就要用到bucket map join。其方法是兩個(gè)join表在join key上都做hash bucket,并且把你打算復(fù)制的那個(gè)(相對(duì))小表的bucket數(shù)設(shè)置為大表的倍數(shù)。這樣數(shù)據(jù)就會(huì)按照join key做hash bucket。小表依然復(fù)制到所有節(jié)點(diǎn),map join的時(shí)候,小表的每一組bucket加載成hashtable,與對(duì)應(yīng)的一個(gè)大表bucket做局部join,這樣每次只需要加載部分hashtable就可以了。
然后在兩個(gè)表的join key都具有唯一性的時(shí)候(也就是可做主鍵),還可以進(jìn)一步做sort merge bucket map join。做法還是兩邊要做hash bucket,而且每個(gè)bucket內(nèi)部要進(jìn)行排序。這樣一來當(dāng)兩邊bucket要做局部join的時(shí)候,只需要用類似merge sort算法中的merge操作一樣把兩個(gè)bucket順序遍歷一遍即可完成,這樣甚至都不用把一個(gè)bucket完整的加載成hashtable,這對(duì)性能的提升會(huì)有很大幫助。
然后這里以一個(gè)完整的實(shí)驗(yàn)說明這幾種join算法如何操作。
首先建表要帶上bucket:
- create table map_join_test(id int)
- clustered by (id) sorted by (id) into 32 buckets
- stored as textfile;
然后插入我們準(zhǔn)備好的800萬行數(shù)據(jù),注意要強(qiáng)制劃分成bucket(也就是用reduce劃分hash值相同的數(shù)據(jù)到相同的文件):
- set hive.enforce.bucketing = true;
- insert overwrite table map_join_test
- select * from map_join_source_data;
這樣這個(gè)表就有了800萬id值(且里面沒有重復(fù)值,所以可以做sort merge),占用80MB左右。
接下來我們就可以一一嘗試map join的算法了。首先是普通的map join:
- select /*+mapjoin(a) */count(*)
- from map_join_test a
- join map_join_test b on a.id = b.id;
然后就會(huì)看到分發(fā)hash table的過程:
- 2013-08-31 09:08:43 Starting to launch local task to process map join; maximum memory = 1004929024
- 2013-08-31 09:08:45 Processing rows: 200000 Hashtable size: 199999 Memory usage: 38823016 rate: 0.039
- 2013-08-31 09:08:46 Processing rows: 300000 Hashtable size: 299999 Memory usage: 56166968 rate: 0.056
- ……
- ……
- ……
- 2013-08-31 09:12:39 Processing rows: 4900000 Hashtable size: 4899999 Memory usage: 896968104 rate: 0.893
- 2013-08-31 09:12:47 Processing rows: 5000000 Hashtable size: 4999999 Memory usage: 922733048 rate: 0.918
- Execution failed with exit status: 2
- Obtaining error information
- Task failed!
- Task ID:
- Stage-4
不幸的是,居然內(nèi)存不夠了,直接做map join失敗了。但是80MB的大小為何用1G的heap size都放不下?觀察整個(gè)過程就會(huì)發(fā)現(xiàn),平均一條記錄需要用到200字節(jié)的存儲(chǔ)空間,這個(gè)overhead太大了。不過這里我也搞不清楚hive為什么需要這么大空間,是否可以修改,總之對(duì)于map join的小表size一定要好好評(píng)估,如果有幾十萬記錄數(shù)就要小心了。
所以接下來我們就用bucket map join,之前分的bucket就派上用處了。只需要在上述sql的前面加上如下的設(shè)置:
- set hive.optimize.bucketmapjoin = true;
然后還是會(huì)看到hash table分發(fā):
- 2013-08-31 09:20:39 Starting to launch local task to process map join; maximum memory = 1004929024
- 2013-08-31 09:20:41 Processing rows: 200000 Hashtable size: 199999 Memory usage: 38844832 rate: 0.039
- 2013-08-31 09:20:42 Processing rows: 275567 Hashtable size: 275567 Memory usage: 51873632 rate: 0.052
- 2013-08-31 09:20:42 Dump the hashtable into file: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0.hashtable
- 2013-08-31 09:20:46 Upload 1 File to: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0.hashtable File size: 11022975
- 2013-08-31 09:20:47 Processing rows: 300000 Hashtable size: 24432 Memory usage: 8470976 rate: 0.008
- 2013-08-31 09:20:47 Processing rows: 400000 Hashtable size: 124432 Memory usage: 25368080 rate: 0.025
- 2013-08-31 09:20:48 Processing rows: 500000 Hashtable size: 224432 Memory usage: 42968080 rate: 0.043
- 2013-08-31 09:20:49 Processing rows: 551527 Hashtable size: 275960 Memory usage: 52022488 rate: 0.052
- 2013-08-31 09:20:49 Dump the hashtable into file: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000001_0.hashtable
- ……
這次就會(huì)看到每次構(gòu)建完一個(gè)hash table(也就是所對(duì)應(yīng)的對(duì)應(yīng)一個(gè)bucket),會(huì)把這個(gè)hash table寫入文件,重新構(gòu)建新的hash table。這樣一來由于每個(gè)hash table的量比較小,也就不會(huì)有內(nèi)存不足的問題,整個(gè)sql也能成功運(yùn)行。不過光光是這個(gè)復(fù)制動(dòng)作就要花去3分半的時(shí)間,所以如果整個(gè)job本來就花不了多少時(shí)間的,那這個(gè)時(shí)間就不可小視。
***我們?cè)囋噑ort merge bucket map join,在bucket map join的基礎(chǔ)上加上下面的設(shè)置即可:
- set hive.optimize.bucketmapjoin.sortedmerge = true;
- set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
sort merge bucket map join是不會(huì)產(chǎn)生hash table復(fù)制的步驟的,直接開始做實(shí)際map端join操作了,數(shù)據(jù)在join的時(shí)候邊做邊讀。跳過復(fù)制的步驟,外加join算法的改進(jìn),使得sort merge bucket map join的效率要明顯好于bucket map join。
關(guān)于join的算法雖然有這么些選擇,但是個(gè)人覺得,對(duì)于日常使用,掌握默認(rèn)的reduce join和普通的(無bucket)map join已經(jīng)能解決大多數(shù)問題。如果小表不能完全放內(nèi)存,但是小表相對(duì)大表的size量級(jí)差別也非常大的時(shí)候也可以試試bucket map join,不過其hash table分發(fā)的過程會(huì)浪費(fèi)不少時(shí)間,需要評(píng)估下是否能夠比reduce join更高效。而sort merge bucket map join雖然性能不錯(cuò),但是把數(shù)據(jù)做成bucket本身也需要時(shí)間,另外其發(fā)動(dòng)條件比較特殊,就是兩邊join key必須都唯一(很多介紹資料中都不提這一點(diǎn)。強(qiáng)調(diào)下必須都是唯一,哪怕只有一個(gè)表不唯一,出來的結(jié)果也是錯(cuò)的)。這樣的場(chǎng)景相對(duì)比較少見,“用戶基本表 join 用戶擴(kuò)展表”以及“用戶今天的數(shù)據(jù)快照 join 用戶昨天的數(shù)據(jù)快照”這類場(chǎng)景可能比較合適。
數(shù)據(jù)傾斜
所謂數(shù)據(jù)傾斜,說的是由于數(shù)據(jù)分布不均勻,個(gè)別值集中占據(jù)大部分?jǐn)?shù)據(jù)量,加上hadoop的計(jì)算模式,導(dǎo)致計(jì)算資源不均勻引起性能下降。
還是拿博客網(wǎng)站的訪問日志說事吧。假設(shè)網(wǎng)站訪問日志中會(huì)記錄用戶的user_id,并且對(duì)于注冊(cè)用戶使用其用戶表的user_id,對(duì)于非注冊(cè)用戶使用一個(gè)user_id=0代表。那么鑒于大多數(shù)用戶是非注冊(cè)用戶(只看不寫),所以u(píng)ser_id=0占據(jù)了絕大多數(shù)。而如果進(jìn)行計(jì)算的時(shí)候如果以u(píng)ser_id作為group by的維度或者是join key,那么個(gè)別reduce會(huì)收到比其他reduce多得多的數(shù)據(jù)——因?yàn)樗邮账衭ser_id=0的記錄進(jìn)行處理,使得其處理效果會(huì)非常差,其他reduce都跑完很久了它還在運(yùn)行。
group by造成的傾斜和join造成的傾斜需要分開看。group by造成的傾斜有兩個(gè)參數(shù)可以解決,一個(gè)是hive.map.aggr,默認(rèn)值已經(jīng)為true,意思是會(huì)做map端的combiner。所以如果你的group by查詢只是做count(*)的話,其實(shí)是看不出傾斜效果的,但是如果你做的是count(distinct),那么還是會(huì)看出一點(diǎn)傾斜效果。另一個(gè)參數(shù)是hive.groupby.skewindata。這個(gè)參數(shù)的意思是做reduce操作的時(shí)候,拿到的key并不是所有相同值給同一個(gè)reduce,而是隨機(jī)分發(fā),然后reduce做聚合,做完之后再做一輪MR,拿前面聚合過的數(shù)據(jù)再算結(jié)果。所以這個(gè)參數(shù)其實(shí)跟hive.map.aggr做的是類似的事情,只是拿到reduce端來做,而且要額外啟動(dòng)一輪job,所以其實(shí)不怎么推薦用,效果不明顯。
join造成的傾斜就比如上面描述的網(wǎng)站訪問日志和用戶表兩個(gè)表join:
- select a.* from logs a join users b on a.user_id = b.user_id;
hive給出的解決方案是,把這種user_id = 0的特殊值先不在reduce端計(jì)算掉,而是先寫入hdfs,然后啟動(dòng)一輪map join專門做這個(gè)特殊值的計(jì)算,期望能提高計(jì)算這部分值的處理速度。當(dāng)然你要告訴hive這個(gè)join是個(gè)skew join,即set hive.optimize.skewjoin = true;還有要告訴hive如何判斷特殊值,根據(jù)hive.skewjoin.key設(shè)置的數(shù)量hive可以知道,比如默認(rèn)值是100000,那么超過100000條記錄的值就是特殊值。
另外對(duì)于特殊值的處理往往跟業(yè)務(wù)有關(guān)系,所以也可以從業(yè)務(wù)角度重寫sql解決。比如前面這種傾斜join,可以把特殊值隔離開來(從業(yè)務(wù)角度說,users表應(yīng)該不存在user_id = 0的情況,但是這里還是假設(shè)有這個(gè)值,使得這個(gè)寫法更加具有通用性):
- select a.* from
- (
- select a.*
- from (select * from logs where user_id = 0) a
- join (select * from users where user_id = 0) b
- on a.user_id = b.user_id
- union all
- select a.*
- from logs a join users b
- on a.user_id <> 0 and a.user_id = b.user_id
- )t;
SQL整體優(yōu)化
前面對(duì)于單個(gè)job如何做優(yōu)化已經(jīng)做過詳細(xì)討論,但是hive查詢會(huì)生成多個(gè)job,針對(duì)多個(gè)job,有什么地方需要優(yōu)化?
首先,在hive生成的多個(gè)job中,在有些情況下job之間是可以并行的,典型的就是子查詢。當(dāng)需要執(zhí)行多個(gè)子查詢union all或者join操作的時(shí)候,job間并行就可以使用了。比如下面的代碼就是一個(gè)可以并行的場(chǎng)景示意:
- select * from
- (
- select count(*) from logs
- where log_date = 20130801 and item_id = 1
- union all
- select count(*) from logs
- where log_date = 20130802 and item_id = 2
- union all
- select count(*) from logs
- where log_date = 20130803 and item_id = 3
- ) t
設(shè)置job間并行的參數(shù)是hive.exec.parallel,將其設(shè)為true即可。默認(rèn)的并行度***為8,也就是允許sql中8個(gè)job并行。如果想要更高的并行度,可以通過hive.exec.parallel. thread.number參數(shù)進(jìn)行設(shè)置,但要避免設(shè)置過大而占用過多資源。
另外在實(shí)際開發(fā)過程中也發(fā)現(xiàn),一些實(shí)現(xiàn)思路會(huì)導(dǎo)致生成多余的job而顯得不夠高效。比如這個(gè)需求:取出cnblog某一天訪問日志中同時(shí)看過博主“小張”和博主“小李”的人數(shù)。低效的思路是面向明細(xì)的,先取出看過博主“小張”的用戶,再取出看過博主“小李”的用戶,然后取交集,代碼如下:
- select count(*) from
- (select distinct user_id
- from cnblogs_visit_20130801 where blog_owner = ‘小張’) a
- join
- (select distinct user_id
- from cnblogs_visit_20130801 where blog_owner = ‘小李’) b
- on a.user_id = b.user_id;
這樣一來,就要產(chǎn)生2個(gè)求子查詢的job(當(dāng)然,可以并行),一個(gè)join job,還有一個(gè)計(jì)算count的job。
但是我們直接用面向統(tǒng)計(jì)的方法去計(jì)算的話,則會(huì)更加符合M/R的模式:
- select count(*) from
- (
- select user_id,
- count(case when blog_owner = ‘小張’ then 1 end) as visit_z,
- count(case when blog_owner = ‘小李’ then 1 end) as visit_l
- from cnblogs_visit_20130801 group by user_id
- ) t
- where visit_z > 0 and visit_l > 0;
這種實(shí)現(xiàn)方式轉(zhuǎn)換成job就只會(huì)有2個(gè):內(nèi)層的子查詢和外層的統(tǒng)計(jì),更少的job也就帶來更高效的執(zhí)行結(jié)果。
***種查詢方法符合思考問題的直覺,是工程師和分析師在實(shí)際查數(shù)據(jù)中***想到的寫法,然而想要更加快速的跑出結(jié)果,懂一點(diǎn)工具的內(nèi)部機(jī)理,也是必須的。
【本文為51CTO專欄作者“王森豐”的原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)注明出處】
























