大數據有道之spark選擇去重
一.spark簡介
spark是基于內存運算的大數據分布式并行計算框架,本身具有豐富的API,可實現與HDFS、HBase、Hive、Kafka、Elasticsearch、Druid等組件的交互,同時也是優(yōu)秀的MapReduce替代方案。
spark卓越的計算性能得意于其核心的分布式數據架構:RDD和DataFrame。
1、RDD
RDD(Resilient Distributes Dataset), 是spark中最基礎、最常用的數據結構。其本身封裝了作業(yè)中input data數據,并以分區(qū)方式分布在內存或者磁盤上的Block中。但實質上RDD對象是一個元數據結構,存儲著Block、Node映射關系等元數據信息。
RDD常規(guī)去重算子:
2、DataFrame
DataFrame是一種以RDD為基礎的分布式數據集,具有schema元數據信息,即標注了DataFrame中每一列名稱和類型,能夠大幅提升Transform、Action的計算效率。
DataFrame常規(guī)去重算子:
3、RDD與DataFrame對比
二.選擇去重
接下來,大數據有道將和大家一起學習一下spark RDD和DataFrame選擇去重的技巧。
1、原始數據
江南皮革廠訂單數據(input),需要指出“original_price”和real_pay對應double類型、“create_time”和“modify_time”為long類型。
源數據預處理:
為了方便對每條訂單進行提取和計算,作業(yè)中封裝了訂單對象RiveSouthOrder:
2、RDD選擇去重
a.選擇去重代碼(scala):
b.執(zhí)行日志:
c.計算結果:
d.邏輯解析:
***部分,加載源數據并封裝到RiveSouthOrder樣例類中,生成RDD;
第二部分,首先通過groupBy對order_id數據做分組后生成RDD[(String, Iterable[RiveSouthOrder])]對象([K,V]結構),隨即使用map對每個Key(order_id)下多組記錄(Iterable[RiveSouthOrder])進行reduce操作(maxBy),***在maxBy算子傳入一個字面量函數(也可寫為x=>x.modify_time),即提取該order_id下每條記錄中的modify_time進行比對,然后選出***時間記錄(maxBy為高階函數,依賴reduceLeft實現);
第三部分,toDebugString方法打印RDD轉換過程,***值得注意collect才是真正觸發(fā)一系列運算的源頭。
3、DataFrame選擇去重
a.選擇去重代碼(scala):
b.執(zhí)行日志:
c.計算結果:
d.邏輯解析:
***部分,引入依賴和隱式轉換,分別對應DataFrame類型識別、使用sql格式的$"modify_time"和row_number()+Window()函數的使用;
第二部分,加載源數據,由于源數據由RiveSouthOrder封裝,可直接toDF;
第三部分,首先使用withColumn方法添加Num字段,Num是由row_number()+Window()+orderBy()實現(原理同Hive sql),原則是根據modify_time對每個order_id分區(qū)下的訂單進行降序排序,接著使用where做過濾(也可使用filter),***drop掉不再使用的Num字段;
第四部分,通過explain打印dataFrame的物理執(zhí)行過程,show方法作為action算子觸發(fā)了以上的系列運算。
三.歸納總結
spark RDD和DataFrame均提供了豐富的API接口,極大的提升了開發(fā)效率和計算性能;
RDD的計算更傾向于map和reduce方式,而DataFrame含有schema元信息更容易與sql計算方式相結合;
RDD選擇去重使用了groupBy+maxBy方法,一氣呵成;DataFrame則使用row_number+window+orderBy方法,邏輯清晰;兩者處理方式所展現的spark函數式編程的精妙之處都值得探索和學習。