使用Python實(shí)現(xiàn)HIVE的UDF函數(shù)
在處理一些復(fù)雜邏輯時(shí)候,python這種面向過(guò)程的語(yǔ)言相比于SQL更符合人的思維方式。相信有不少同學(xué)曾經(jīng)感慨,如果能用python處理數(shù)據(jù)庫(kù)中的數(shù)據(jù)就好了。那么今天它來(lái)了。
首先用python寫(xiě)處理復(fù)雜邏輯的自定義的函數(shù)(一陽(yáng)指),再將函數(shù)代碼嵌入SQL(獅吼功)就能合并成了一整招:UDF
下面我用一個(gè)栗子來(lái)說(shuō)明一些兩者處理數(shù)據(jù)過(guò)程中的差異,在介紹栗子之前,先介紹一些with as。與python 創(chuàng)建函數(shù)或者類(lèi)一樣,with as 用于創(chuàng)建中間表
簡(jiǎn)單來(lái)做個(gè)介紹
- select
- *
- from(select * from table where dt='2021-03-30')a
可以寫(xiě)成
- with a as (select * from table where dt='2021-03-30' )
- select * from a
簡(jiǎn)單的SQL看不出這樣的優(yōu)勢(shì)(甚至有點(diǎn)多此一舉),但是當(dāng)邏輯復(fù)雜了之后我們就能看出這種語(yǔ)法的優(yōu)勢(shì),他能從底層抽取中間表格,讓我們只專(zhuān)注于當(dāng)前使用的表格,進(jìn)而可以將復(fù)雜的處理邏輯分解成簡(jiǎn)單的步驟。
如下面地表格記錄了用戶適用app過(guò)程中每個(gè)行為日志地時(shí)間戳,我們想統(tǒng)計(jì)一下用戶今天用了幾次app,以及每次的起始時(shí)間和結(jié)束時(shí)間是什么時(shí)候,這個(gè)問(wèn)題怎么解呢?
SQL實(shí)現(xiàn)方式
首先用with as 構(gòu)建一個(gè)中間表(注意看on 和 where條件)
- with t1 as
- (select
- x.uid,
- case when x.rank=1 then y.timestamp_ms
- else x.timestamp_ms
- end as start_time,
- case when x.rank=1 then x.timestamp_ms
- else y.timestamp_ms end as end_time
- from
- (select
- uid,
- timestamp_ms,
- row_number()over(partition by uid order by timestamp_ms) rank
- from tmp.tmpx) x
- left outer join
- (select
- uid,
- timestamp_ms,
- row_number()over(partition by uid order by timestamp_ms) rank
- from tmp.tmpx) y
- on x.uid=y.uid and x.rank=y.rank-1
- where x.rank=1 or y.rank is null or y.timestamp_ms-x.timestamp_ms>=300)
首先我們用開(kāi)窗函數(shù)錯(cuò)位相減,用where條件篩選出我們需要的列,其中
x.rank=1 抽取出第一行
y.rank is null 抽取最后一樣
y.timestamp_ms-x.timestamp_ms>=300抽取滿足條件的行,如下:
當(dāng)然這個(gè)結(jié)果并不是我們要的結(jié)果,需要將上述表格中某一行數(shù)據(jù)的end-time和下一條數(shù)據(jù)的start-time結(jié)合起來(lái)起來(lái),構(gòu)造出時(shí)間段
好的,按照上面我們所說(shuō)的那么下面我們不用關(guān)心底層的邏輯,將注意力專(zhuān)注于這張中間表t1
- select
- a.uid,end_time as start_time,start_time as end_time
- from
- (select uid,start_time,row_number()over(partition by uid order by start_time) as rank from t1) a
- join
- (select uid,end_time,row_number()over(partition by uid order by end_time) as rank from t1)b
- on
- a.uid=b.uid and a.rank=b.rank+1
同樣,排序后錯(cuò)位相減,然后就可以打完收工了~
UDF實(shí)現(xiàn)方式
首先我們假設(shè)上述數(shù)據(jù)存儲(chǔ)在csv中,
用python 處理本地文件data.csv,按照python的處理方式寫(xiě)代碼(這里就不一句句解釋了,會(huì)python的同學(xué)可以跳過(guò),不會(huì)的同學(xué)不妨自己動(dòng)手寫(xiě)一下)
- def life_cut(files):
- f=open(files)
- act_list=[]
- act_dict={}
- for line in f:
- line_list=line.strip().split()
- key=tuple(line_list[0:1])
- if key not in act_dict:
- act_dict.setdefault(key,[])
- act_dict[key].append(line_list[1])
- else:
- act_dict[key].append(line_list[1])
- for k,v in act_dict.items():
- k_str=k[0]+"\t"
- start_time = v[0]
- last_time=v[0]
- i=1
- while i<len(v)-1:
- if int(v[i])-int(last_time)>=300:
- print(k_str+"\t"+start_time+"\t"+v[i-1])
- start_time=v[i]
- last_time = v[i]
- i=i+1
- else:
- last_time = v[i]
- i=i+1
- print(k_str+"\t"+start_time+"\t"+v[len(v)-1])
- # print(k_str + "\t" + start_time + "\t" + v[i])
- if __name__=="__main__":
- life_cut("data.csv")
得到結(jié)果如下:

那么下面我們將上述函數(shù)寫(xiě)成udf的形式:
- #!/usr/bin/env python
- # -*- encoding:utf-8 -*-
- import sys
- act_list=[]
- act_dict={}
- for line in sys.stdin:
- line_list=line.strip().split("\t")
- key=tuple(line_list[0:1])
- if key not in act_dict:
- act_dict.setdefault(key,[])
- act_dict[key].append(line_list[1])
- else:
- act_dict[key].append(line_list[1])
- for k,v in act_dict.items():
- k_str=k[0]+"\t"
- start_time = v[0]
- last_time=v[0]
- i=1
- while i<len(v)-1:
- if int(v[i])-int(last_time)>=300:
- print(k_str+"\t"+start_time+"\t"+v[i-1])
- start_time=v[i]
- last_time = v[i]
- i=i+1
- else:
- last_time = v[i]
- i=i+1
- print(k_str+"\t"+start_time+"\t"+v[len(v)-1])
這個(gè)變化過(guò)程的關(guān)鍵點(diǎn)是將 for line in f 替換成 for line in sys.stdin,其他基本上沒(méi)什么變化
然后我們?cè)賮?lái)引用這個(gè)函數(shù)
先add這個(gè)函數(shù)的路徑add file /xxx/life_cut.py 加載udf路徑,然后再使用
- select
- TRANSFORM (uid,timestamp_ms) USING "python life_cut.py" as (uid,start_time,end_time)
- from tmp.tmpx
總結(jié)
從上述案例我們可以看出,
UDF和SQL的區(qū)別在于,在處理復(fù)雜邏輯時(shí)候,UDF相比SQL能更高效地組織起來(lái)邏輯并落地實(shí)現(xiàn)功能。UDF和普通腳本的關(guān)鍵區(qū)別所在在于將 for line in f 替換成 for line in sys.stdin,常規(guī)函數(shù)一般是將文件一行行讀入,UDF是從標(biāo)準(zhǔn)輸入一行行加載數(shù)據(jù)。希望大家平時(shí)沒(méi)事的時(shí)候好好練練python,切莫書(shū)到用時(shí)方恨少。