中小型企業(yè)大數(shù)據(jù)體系建設(shè)的核心技術(shù)選型
本文分享的主題是中小型企業(yè)基于大數(shù)據(jù)技術(shù)的項(xiàng)目實(shí)踐,筆者將從大數(shù)據(jù)技術(shù)棧開始說起,并在后文分享自己在工程實(shí)踐中的一些具體經(jīng)驗(yàn)。
一、大數(shù)據(jù)技術(shù)初探
首先我們從大數(shù)據(jù)技術(shù)的干貨介紹開始,這部分內(nèi)容對(duì)于有基礎(chǔ)的童鞋來說,可以快速略過。
準(zhǔn)確來說 “大數(shù)據(jù)” 這個(gè)概念并不存在,其就是在曾經(jīng)我們提到過的 “海量數(shù)據(jù)” 的基礎(chǔ)上,數(shù)據(jù)量級(jí)再一次增大,導(dǎo)致傳統(tǒng)的處理手段無法進(jìn)行及時(shí)、有效地處理。
為了表征與傳統(tǒng)數(shù)據(jù)處理手段的區(qū)別,表明技術(shù)的先進(jìn)性,提出來了一個(gè)新詞——大數(shù)據(jù)。
作為 DT 時(shí)代的代表技術(shù)之一,大數(shù)據(jù)緊緊地與人工智能,云計(jì)算技術(shù)相結(jié)合,三者相輔相成,共同促進(jìn)產(chǎn)業(yè)變革,技術(shù)進(jìn)步。無論在學(xué)術(shù)界還是工業(yè)界,這 “三駕馬車” 無疑都是最熱門和前沿的。
作為近幾年火起來的一項(xiàng)技術(shù),大數(shù)據(jù)技術(shù)的主要應(yīng)用場(chǎng)景是日志收集與處理、數(shù)據(jù)分析、機(jī)器學(xué)習(xí)模型的訓(xùn)練等?;谶@些,我們可以實(shí)現(xiàn)商業(yè)智能(BI)、科學(xué)決策等。
所謂的大數(shù)據(jù)技術(shù)棧,無外乎 Hadoop 生態(tài)系統(tǒng),如下圖所示:
那么,作為一個(gè)大數(shù)據(jù)工程師,是否有必要掌握上述全部?jī)?nèi)容呢?答案是否定的!
大數(shù)據(jù)技術(shù)主要表現(xiàn)在:
- 大規(guī)模數(shù)據(jù)存儲(chǔ)
- 彈性計(jì)算
- 集群資源調(diào)度
- 數(shù)據(jù)收集
- 集群一致性保證
1、大規(guī)模數(shù)據(jù)存儲(chǔ)
網(wǎng)盤就是一個(gè)典型的大數(shù)據(jù)存儲(chǔ)應(yīng)用。毫無疑問,網(wǎng)盤上存儲(chǔ)的數(shù)據(jù)量是海量的,這需要一個(gè)集群去存儲(chǔ),也就是我們說的云存儲(chǔ)。
類似地,我們?cè)诠I(yè)實(shí)踐中,也會(huì)遇到各種各樣數(shù)據(jù),這些數(shù)據(jù)有些是冷數(shù)據(jù),也有的是熱數(shù)據(jù)。但是,無論是冷的、熱的,只要是有存儲(chǔ)意義的數(shù)據(jù)我們必然要給他存儲(chǔ)起來,以便后續(xù)使用。舉個(gè)例子,一個(gè)訪問量大的網(wǎng)站,每天產(chǎn)生的日質(zhì)量是很大的,這些數(shù)據(jù)我們可以存儲(chǔ)起來,以便后續(xù)使用。
Hadoop 的 HDFS 可以認(rèn)為是實(shí)際上的工業(yè)標(biāo)準(zhǔn),其存儲(chǔ)模式是文件分塊存儲(chǔ)、多機(jī)備份(冗余),通過 standby 節(jié)點(diǎn)來進(jìn)行心跳探測(cè),保證可用性。除了 HDFS,我們使用云產(chǎn)品時(shí),可能也會(huì)用亞馬遜的公有云產(chǎn)品,也即是 AWS 的 S3 存儲(chǔ)系統(tǒng)。
由于筆者所在公司的業(yè)務(wù)是面向海外市場(chǎng)的,云服務(wù)選擇的是 AWS,用的云存儲(chǔ)是亞馬遜的 S3,免去了自己部署 Hadoop HDFS 的過程。Hadoop 的 HDFS 是自帶讀取 AWS S3 的 API 的。但是,值得說明的是,Hadoop 的 HDFS 并不太適合頻繁更改或者是海量的小文件存儲(chǔ),畢竟一個(gè)文件塊就很大了,有的版本默認(rèn)是 128M,有的是 64M,海量小文件,一般使用的是 FastDFS 或者淘寶開源的 TFS。
2、彈性計(jì)算
所謂彈性計(jì)算,也就是之前學(xué)術(shù)界所說的網(wǎng)格計(jì)算,現(xiàn)在很流行的分布式計(jì)算。我們知道,單節(jié)點(diǎn)的算力是有限的,包括超級(jí)計(jì)算機(jī)的架構(gòu)也是上千個(gè) CPU 和 GPU 們組成的。我們?cè)谄綍r(shí)使用的時(shí)候,自然不會(huì)設(shè)計(jì)出超級(jí)計(jì)算機(jī)這樣復(fù)雜的硬件基礎(chǔ)設(shè)施,會(huì)通過 TCP/IP 協(xié)議來傳送數(shù)據(jù),在不同的節(jié)點(diǎn)上進(jìn)行并行計(jì)算,最后再講結(jié)果匯總,這種算法我們叫做 Map/Reduce 算法。這種理念是 Google 提出來的。
Hadoop 有三個(gè)組件,用于大規(guī)模數(shù)據(jù)存儲(chǔ)的 HDFS、分布式計(jì)算的 Map/Reduce 引擎和資源調(diào)度 Yarn。只不過 Hadoop 的同名計(jì)算引擎 MapReduce 在涉及到中間數(shù)據(jù)緩存時(shí),要寫入 HDFS 上,我們知道 HDFS 本身就是建立在外存上的,而且還要有冗余備份,整個(gè)讀取和寫入速度都比較慢,所以現(xiàn)在真正使用的就是 Spark 計(jì)算引擎,MR(MapReduce)引擎都快被廢掉了。
Spark 是一個(gè)通用的計(jì)算引擎,其除了核心 Core,為應(yīng)用層封裝了機(jī)器學(xué)習(xí)、圖計(jì)算、流式計(jì)算框架和 SparkSQL 即席查詢四個(gè)模塊,用起來很是方便,我們?cè)趯?shí)際工程中,用得最多的也就是 Spark 了。Spark 與 Hadoop 的 MR 引擎不同的是,Spark 的中間數(shù)據(jù)存儲(chǔ)在內(nèi)存中,所以速度特別快。但Spark 的內(nèi)存要求比較大,不過內(nèi)存畢竟也不算太貴。
3、集群資源調(diào)度
所謂的資源調(diào)度,主要指的就是 CPU 和內(nèi)存資源的調(diào)度,集群中哪臺(tái)節(jié)點(diǎn)比較閑,就給它多點(diǎn)任務(wù),這樣可使整體的集群負(fù)載均衡,這對(duì)于分布式集群來說是十分重要的,直接影響了集群的計(jì)算性能。
Hadoop 自帶的模塊是 Yarn,Spark 也自帶一個(gè),叫做 Mesos,不過我們說Spark 是 Hadoop 生態(tài)系統(tǒng)中的成員,自然而然 Spark 也可以使用 Hadoop 的 Yarn 資源調(diào)度引擎,避免了部署上的麻煩。
4、數(shù)據(jù)收集
數(shù)據(jù)分為流式數(shù)據(jù)和批處理數(shù)據(jù)。所謂的流式數(shù)據(jù)是像流水一樣的數(shù)據(jù),通常用的計(jì)算引擎是 Spark Streaming 和 Storm,我們公司主要用到的是 Spark Streaming。
二者的區(qū)別就是,Spark Streaming 不是嚴(yán)格意義的實(shí)時(shí),是一種準(zhǔn)實(shí)時(shí),每隔一段時(shí)間來對(duì)收集到的數(shù)據(jù)運(yùn)算一次,這樣達(dá)到一種流式計(jì)算的效果,而 Storm 是嚴(yán)格意義的實(shí)時(shí),來一條數(shù)據(jù)處理一條。
對(duì)于我們公司來講,不需要這么實(shí)時(shí)的效果,同時(shí) Spark streaming 直接就用 Spark 框架編寫就 ok 了,團(tuán)隊(duì)成員的技術(shù)棧比較吻合,避免了再次學(xué)習(xí) Storm 的成本,也減少了版本發(fā)布和維護(hù)上的苦難。但是具體的選型,還要結(jié)合公司的實(shí)際情況。
說到流式數(shù)據(jù)的收集,我們不得不提到 Kafka 這個(gè)消息中間件。它是發(fā)布 / 訂閱模式的,可以用來做流式數(shù)據(jù)收集的消息隊(duì)列,起到緩存與緩沖的作用,詳細(xì)介紹請(qǐng)看 http://kafka.apache.org/intro.html。
這是一整套流式數(shù)據(jù)處理的架構(gòu),在網(wǎng)上找到這幾篇博文,感覺還可以,推薦給大家:
http://shiyanjun.cn/archives/1097.html
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
此外,再介紹一個(gè)叫做 Flume 的東西,它的官方介紹是:
- Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.
- The use of Apache Flume is not only restricted to log data aggregation. Since data sources are customizable, Flume can be used to transport massive quantities of event data including but not limited to network traffic data, social-media-generated data, email messages and pretty much any data source possible.
Flume 多用作日志的收集,常用來收集諸如 Nginx 日志等,配合 Kafka 使用,可以做到數(shù)據(jù)的流式收集。
5、集群一致性保證
作為一個(gè)集群,一致性是應(yīng)高考慮的一個(gè)重要因素。例如,我們?cè)谝粋€(gè)集群上兩個(gè)不同節(jié)點(diǎn)讀取到的數(shù)據(jù)不一樣,那么我們是相信誰的?很容易就無法做出下一步的處理。所以,我們?cè)谏厦娴? Hadoop 生態(tài)系統(tǒng)的圖示中可以看到一個(gè)貫穿始終的叫做 ZooKeeper 的東西,這個(gè)東西就是用來保障集群一致性的。
ZooKeeper 主要提供的是 Java API,它通過觀察者模式來實(shí)現(xiàn)的,不同節(jié)點(diǎn)注冊(cè)一個(gè) watcher來監(jiān)聽事件。它實(shí)現(xiàn)了 Paxos 算法,Paxos 算法是一個(gè)比較復(fù)雜的算法,整個(gè)算法的推倒與證明過程一頁 A4 紙都寫不下。
ZooKeeper 實(shí)現(xiàn)的 Paxos 算法也是 Fast Paxos,或者說是 Paxos 算法的精簡(jiǎn)版本。通過 ZooKeeper 我們可以保證整個(gè)集群的一致性,也就為后來基于 ZooKeeper 的應(yīng)用提供了高可用(HA)的基礎(chǔ)。
二、大數(shù)據(jù)技術(shù)工程實(shí)踐
筆者以大數(shù)據(jù)技術(shù)使用的一個(gè)典型場(chǎng)景為例展開探討,場(chǎng)景描述:
應(yīng)用場(chǎng)景是針對(duì)一款 app 的日志分析,該 app 的架構(gòu)方式是基于 HTTP 的微服務(wù),app 算是典型的社交軟件。包括聊天,更新狀態(tài),群組討論,更新個(gè)人信息等都是通過調(diào)用 HTTP 接口來實(shí)現(xiàn)的,當(dāng)然,這些內(nèi)容都是加密過的,包括服務(wù)器之間的通訊也都是通過證書來驗(yàn)證的。
這樣的微服務(wù)架構(gòu)為我們的日志分析提供了方便,可以認(rèn)為日志上的 url 路徑包含了很多的信息,基于不同的 url 我們可以發(fā)現(xiàn)用戶的行為,并針對(duì)用戶的行為進(jìn)行數(shù)據(jù)分析。
1、數(shù)據(jù)的收集
如果是做離線計(jì)算的,可以直接把日志下載到本機(jī),然后再對(duì)本機(jī)上的所有日志進(jìn)行統(tǒng)一的計(jì)算。
Spark 是支持 AWS S3 的,不過這得基于 Hadoop 來實(shí)現(xiàn),還得安裝 Hadoop,在實(shí)際使用中坑很多。Spark 讀取 S3 數(shù)據(jù)可以使用亞馬遜官方的 Java driver 來做,相對(duì)來說坑比較少。不過,Spark 直接讀取 HDFS 上的數(shù)據(jù)相對(duì)容易很多,坑也沒有多少;在實(shí)際使用的時(shí)候,可以嘗試用流式日志下載的方式,在下載的同時(shí),進(jìn)行數(shù)據(jù)的分析,實(shí)際還是比較高效的。
2、數(shù)據(jù)的 ETL
ETL( Extract-Transform-Load )用來描述將數(shù)據(jù)從來源端經(jīng)過抽取(extract)、轉(zhuǎn)換(transform)、加載(load)至目的端的過程。ETL 的方式有很多,有基于現(xiàn)有用具進(jìn)行 ETL 的,也有自己編寫代碼進(jìn)行 ETL 的。
筆者所采用的 ETL 方式是基于 Spark 的 ETL,基于 Spark 的 ETL 有諸如靈活快速等特點(diǎn),這里有幾篇博文,介紹了 Spark 的 ETL,總的來說,用 Spark 來做 ETL 是比較高大上的。
http://blog.csdn.net/u011204847/article/details/51247306
http://blog.csdn.net/zbc1090549839/article/details/54407876
上面說到,筆者的日志數(shù)據(jù)存儲(chǔ)在 AWS 的 S3 上,故而介紹寫 AWS S3 的日志格式,原文鏈接請(qǐng)查閱 https://goo.gl/8CHKdp
s3 文件的路徑格式:
- bucket[/prefix]/AWSLogs/aws-account-id/elasticloadbalancing/region/yyyy/mm/dd/aws-account-id_elasticloadbalancing_region_load-balancer-name_end-time_ip-address_random-string.log
- 日志的存儲(chǔ)格式:
- timestamp elb client:port backend:port request_processing_time backend_processing_time response_processing_time elb_status_code backend_status_code received_bytes sent_bytes “request” “user_agent” ssl_cipher ssl_protocol
總之,就是包括了用戶的請(qǐng)求 IP、請(qǐng)求設(shè)備、時(shí)間、請(qǐng)求方法、請(qǐng)求路徑和服務(wù)器的相應(yīng)和處理時(shí)間等。這里有專門針對(duì) AWS 日志的分析系統(tǒng)的博文供大家學(xué)習(xí):
http://blog.csdn.net/wireless_com/article/details/43533607
我們的目標(biāo)是利用 Spark 將這種存儲(chǔ)于亞馬遜 S3 的原始日志格式進(jìn)行轉(zhuǎn)換,存儲(chǔ)在數(shù)據(jù)倉(cāng)庫(kù)中。對(duì)于數(shù)據(jù)倉(cāng)庫(kù),比較著名的應(yīng)該是 HBase 了,HBase 是基于 HDFS 的一個(gè) NoSQL 列式數(shù)據(jù)庫(kù),存儲(chǔ)容量大。
不過,對(duì)于我的業(yè)務(wù)場(chǎng)景來說,選用 HBase 并不太適合,因?yàn)楹芏鄶?shù)據(jù)存儲(chǔ)很長(zhǎng)時(shí)間并沒有必要,最多只需要存儲(chǔ)最近一個(gè)月的經(jīng)過 ETL 后的數(shù)據(jù)就可以了,沒有必要存儲(chǔ)那么多冷數(shù)據(jù),所以我選擇了 MongoDB 進(jìn)行數(shù)據(jù)的存儲(chǔ)。
那么我們就明確了 ETL 的目標(biāo),將來自于 AWS S3 的原始數(shù)據(jù)(raw log)經(jīng)過 ETL,存儲(chǔ)在 MongoDB 中,MongoDB 中存儲(chǔ)的格式類似于:
- {
- "time":"2017-2-1-26 UTC xx:xx:xx",
- "url":"http://foo.com/ab?c=d&e=f",
- "uri":"ab",
- "uid":"10000"
- }
3、MongoSpark
MongoDB 和 Spark 之間是可以用來做高速地?cái)?shù)據(jù)傳輸?shù)模覀兪褂?MongoDB 來作為 Spark 的數(shù)據(jù)持久層,MongoDB 的 Spark driver 名稱就叫做 MongoSpark。
這有一篇文章,非常詳細(xì)地介紹了 MongoDB 和 Spark 用于大數(shù)據(jù)解決方案的架構(gòu)設(shè)計(jì),很推薦,詳見 http://www.mongoing.com/tj/mongodb_shanghai_spark
原文部分內(nèi)容摘要
1) HDFS VS MongoDB
- 既然我們說 MongoDB 可以用在 HDFS 的地方,那我們來詳細(xì)看看兩者之間的差異性。
- 在說區(qū)別之前,其實(shí)我們可以先來注意一下兩者的共同點(diǎn)。HDFS 和 MongoDB 都是基于廉價(jià) x86 服務(wù)器的橫向擴(kuò)展架構(gòu),都能支持到 TB 到 PB 級(jí)的數(shù)據(jù)量。
- 數(shù)據(jù)會(huì)在多節(jié)點(diǎn)自動(dòng)備份,來保證數(shù)據(jù)的高可用和冗余。兩者都支持非結(jié)構(gòu)化數(shù)據(jù)的存儲(chǔ)等。
2) HDFS 和 MongoDB 的區(qū)別
- 如在存儲(chǔ)方式上 HDFS 的存儲(chǔ)是以文件為單位,每個(gè)文件 64MB 到 128MB 不等。而 MongoDB 則是細(xì)顆?;?、以文檔為單位的存儲(chǔ)。
- HDFS 不支持索引的概念,對(duì)數(shù)據(jù)的操作局限于掃描性質(zhì)的讀,MongoDB 則支持基于二級(jí)索引的快速檢索。
- MongoDB 可以支持常見的增刪改查場(chǎng)景,而 HDFS 一般只是一次寫入后就很難進(jìn)行修改。
- 從響應(yīng)時(shí)間上來說,HDFS 一般是分鐘級(jí)別而 MongoDB 對(duì)手請(qǐng)求的響應(yīng)時(shí)間通常以毫秒作為單位。
3) MongoDB-Spark 架構(gòu)
4) 什么時(shí)候選用 MongoDB
- 涉及到快速讀取數(shù)據(jù)
- 建立索引
- 對(duì)數(shù)據(jù)的存儲(chǔ)粒度要求較細(xì)(文檔形式)
- 能夠?qū)?shù)據(jù)進(jìn)行修改的場(chǎng)合。
5) 什么時(shí)候選用 HDFS
- HDFS 數(shù)據(jù)存儲(chǔ)節(jié)點(diǎn)不要求就有較大的內(nèi)存,而 MongoDB 要想保證讀寫迅速的前提是要占據(jù)較大的內(nèi)存空間;
- 對(duì)數(shù)據(jù)修改的要求不高,例如圖片,音視頻文件,一般寫入后不需要再次修改;
- HDFS 被設(shè)計(jì)部署在低廉的硬件設(shè)備上,對(duì)硬件的要求不苛刻,能夠保證高可用性,集群的數(shù)據(jù)吞吐量也很高;
- 相比之下,MongoDB 對(duì) CPU 和內(nèi)存的要求要高得多。
6) MongoDB 的地理位置搜索
MongoDB 具有很多高級(jí)搜索功能,譬如微信搜索附近的人,我們可以通過 MongoDB 的 GEO 搜索來完成,這是 MongoDB 的又一大好處,有關(guān)地理位置搜索,推薦這篇博文:https://goo.gl/FYiZJg。
4、數(shù)據(jù)的分析
我們首先來回顧一下,日志中主要包括的內(nèi)容有:在我們的日志 url 中記錄了用戶的 id、用戶的行為、用戶的行為屬性、用戶的設(shè)備、用戶的 IP、用戶訪問時(shí)間、服務(wù)器處理時(shí)間、服務(wù)器響應(yīng)時(shí)間等。
上述數(shù)據(jù)是來自日志的原始數(shù)據(jù),經(jīng)過 ETL 后,被存儲(chǔ)到 MongoDB 的 raw 數(shù)據(jù)庫(kù)中,以 K-V 對(duì)文檔的形式存儲(chǔ)起來,下面我們將要對(duì)存儲(chǔ)到 MongoDB 中,經(jīng)過整理后的數(shù)據(jù)進(jìn)行分析。
4.1 宏觀分析
宏觀分析是最基礎(chǔ)也是最簡(jiǎn)單的,例如:
- 我們可以統(tǒng)計(jì)一天 24Hour,那個(gè)小時(shí)用戶的活躍量最多;
- 我們可以根據(jù)用戶的 IP 來判斷哪個(gè)區(qū)域的用戶最多;
- 我們可以根據(jù)使用設(shè)備,來判斷使用什么終端的用戶最多;
- 同樣我們也可以用服務(wù)器的響應(yīng)時(shí)間來判斷服務(wù)器的運(yùn)轉(zhuǎn)情況。
宏觀分析,在用 Spark 進(jìn)行編程時(shí),首先經(jīng)過 map 過程轉(zhuǎn)換成我們想要的形式,例如:我們要統(tǒng)計(jì) 24 小時(shí),分時(shí)統(tǒng)計(jì)用戶活躍量。這樣,我們經(jīng)過 map 后,就可以形成這樣的一個(gè)形式:
- // 我們假設(shè) ,rdd 的存儲(chǔ)格式是一個(gè) Document,Document 是 MongoDB driver 的存儲(chǔ)格式,它實(shí)現(xiàn)了 Map 接口。
- val rdd = MongoSpark.load(...)
- // 從 MongoDB 中直接加載某個(gè) table,也就是說,rdd 的類型是 RDD[Document]. 這里用到的是 scala 編程,與 Java 類似
- val count =
- rdd.map(x=>{
- (parse2Hour(x.getString("time")),1)
- }).reduceByKey(_+_)
- // 得到了分時(shí)統(tǒng)計(jì)結(jié)果,與寫 wordcount 是類似的。
- //parse2Hour() 是一個(gè)函數(shù),實(shí)現(xiàn)了將存儲(chǔ)的 UTC 格式的 time 提取出小時(shí),這個(gè)其實(shí)自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的文本分割就搞定了。
- count.foreach(println)
- // 打印出統(tǒng)計(jì)的結(jié)果
4.2 微觀分析
所謂微觀分析,就是粒度更細(xì)致的分析了。
我們?cè)谏厦嬷皇欠治龀鏊械挠脩羧后w,在哪個(gè)時(shí)間段更加活躍。現(xiàn)在我們?cè)倏戳硗庖粋€(gè)例子:我們想要分析 uid 為 1000 的用戶,在一天 24 小時(shí)中,哪個(gè)小時(shí)活動(dòng)最頻繁。統(tǒng)計(jì)出來的結(jié)果,可以直接用做給它推送消息的推送時(shí)間點(diǎn)來使用。
其實(shí),這個(gè)編程與上面的宏觀統(tǒng)計(jì)類似,只不過,我們要將所有的 rdd 進(jìn)行一個(gè) group 分組,把所有 uid 相同的全都放到一起去。之后,再在這個(gè)子 rdd 中分析該用戶在哪個(gè)時(shí)間段最活躍即可。
示例代碼如下:
- val rdd = MongoSpark.load(...)
- // 從 MongoDB 中直接加載某個(gè) table
- val user = rdd.groupBy(_.getString("uid"))
- // 通過用戶的 uid 不同,來劃分為不同的子 rdd
- val count = user.map(x=>{
- // 每個(gè)劃分出來的子 rdd 的格式是這樣的:
- // ("uid",[Document1,Document2,...])
- /*
- 我們可以看出來,劃分出來的結(jié)果實(shí)際上是一個(gè)元組,元組的第一個(gè)元素就是我們劃分的依據(jù),元組的第二個(gè)元素就是一個(gè) List, 這個(gè) List 把所有屬于這個(gè)元組的 Document 都包括進(jìn)去了。
- */
- // 后面,我們?cè)賹?duì)這個(gè) List 進(jìn)行一個(gè)暴力掃描,掃描出其中我們想要的結(jié)果就 ok 了 , 這里根據(jù)業(yè)務(wù)不同,代碼省略,如果不會(huì)分布式并行編程,就給 collect() 到本地,編寫相關(guān)的業(yè)務(wù)代碼也 Ok.
- ...
- // 最后返回結(jié)果:
- (uid, 某個(gè)小時(shí))
- })
4.3 機(jī)器學(xué)習(xí)
其實(shí)在我們實(shí)踐當(dāng)中,最常用到的機(jī)器學(xué)習(xí)算法恐怕就是聚類算法了。
聚類是一種無監(jiān)督學(xué)習(xí),我們最常用到的聚類算法就是 Kmeans 算法,Spark 的 MLlib 庫(kù)為我們實(shí)現(xiàn)了 Kmeans 算法,我們直接調(diào)用就 OK 了。
通過聚類算法,我們可以實(shí)現(xiàn):因?yàn)槲覀冊(cè)谌罩局惺前脩舻男袨樘卣鞯?,根?jù)這些行為特征,我們可以通過聚類算法來實(shí)現(xiàn)用戶的分群。
這里簡(jiǎn)單介紹下 Kmeans 算法的原理:
- Kmeans 算法需要指定參數(shù) k ,用來告訴算法需要分成幾個(gè)類別;
- 然后將事先輸入的 n 個(gè)數(shù)據(jù)對(duì)象劃分為 k 個(gè)聚類以便使得所獲得的聚類滿足以下條件:
- 同一聚類中的對(duì)象相似度較高;
- 不同聚類中的對(duì)象相似度較小。
- 聚類相似度是利用各聚類中對(duì)象的均值所獲得一個(gè) “中心對(duì)象” 來進(jìn)行計(jì)算的。
- 聚類算法是一種迭代算法,通過反復(fù)迭代,來使得結(jié)果趨向于最優(yōu)。這個(gè)迭代次數(shù)也是可以指定的,不過也不是越多越好,因?yàn)樵酵蟾淖兙驮叫?,效果不理想,反而浪費(fèi)時(shí)間,這個(gè)需要具體去調(diào)試。
那么,我們?cè)谶M(jìn)行聚類時(shí),我們可以統(tǒng)計(jì)某個(gè)用戶(叫他小明吧),下面我舉個(gè)例子,假設(shè)下面的數(shù)據(jù)都是針對(duì)小明同學(xué)行為產(chǎn)生的日志情況,進(jìn)行統(tǒng)計(jì)分析的結(jié)果:
- 小明的基本用戶信息:
- {
- “name”:” 小明 “,
- “age”:”18”,
- “gender”:”male”,
- “country”:”china”,
- …
- }
- 日志統(tǒng)計(jì)信息:
- {
- “ 發(fā)送聊天記錄 “:250,
- “ 陌生人聊天 “:200,
- “ 好友聊天 “:25,
- “ 群組聊天 “:25,
- “ 給別人照片點(diǎn)贊 “:100,
- “ 瀏覽別人發(fā)的說說 “:100,
- “ 給別人說說點(diǎn)贊 “,52,
- “ 搜索附近的人 “,100,
- “ 勾搭過幾個(gè)陌生人 “:50,
- “ 閱讀推薦文章 “:0,
- …
- }
當(dāng)然了,上面的日志統(tǒng)計(jì)結(jié)果我只是舉個(gè)例子,我們可以選擇其中的某幾個(gè)具有代表性的作為特征向量,根據(jù)這些特征向量來對(duì)用戶進(jìn)行聚類。譬如,我們可以選擇:聊天記錄、陌生人聊天比例、搜索陌生人次數(shù)、勾搭過幾個(gè)陌生人等來衡量某些人對(duì)陌生人交友的喜好程度。
4.4 歸一化
這里順便說一下歸一化的問題。
在上面的例子中,我們可以看到,如果某個(gè)人搜索附近的人頻次特別高,而且只有這個(gè)人的水平特別高,可能達(dá)到了 100000000 這個(gè)量級(jí),而除他之外的所有人可能都是 200 一下的量級(jí)。
這樣在進(jìn)行數(shù)據(jù)計(jì)算時(shí),直接用 100000000 這個(gè)數(shù)字帶進(jìn)去算很容易對(duì)結(jié)果造成干擾,甚至數(shù)字還有溢出的可能。
我們想辦法將這些數(shù)字映射到 [0,1] 的區(qū)間中,用小數(shù)來表示,這樣我們叫做歸一化。
比較簡(jiǎn)單的歸一化可以是用某個(gè)用戶的值除以全體的總數(shù);也可以是用某個(gè)用戶的值處理這個(gè)群體中最大的那個(gè)值;這樣都可以保證結(jié)果是在 [0,1] 之間,當(dāng)然,對(duì)于某些特別 “奇葩” 的用戶,我們也可以用 sigmoid 函數(shù)來進(jìn)行映射,sigmoid 函數(shù)是一種 S 型曲線函數(shù),它的圖像是:
這個(gè)當(dāng)作了解就行了,實(shí)際上在一些分工明確的公司里,會(huì)有專門的算法組來進(jìn)行優(yōu)化和設(shè)計(jì)的。詳見百度百科https://goo.gl/m8NMqC
通過 Kmeans 算法,我們可以對(duì)用戶進(jìn)行聚類,相同類型的人,會(huì)被聚類到一起,可以供我們進(jìn)行統(tǒng)計(jì)分析、科學(xué)決策和相似用戶推薦等。
4.5 推薦系統(tǒng)
諸如涉及到評(píng)分相關(guān)內(nèi)容的都可以用作推薦系統(tǒng)。推薦系統(tǒng),只要保證能夠維護(hù)好這幾個(gè)數(shù)據(jù)表就可以做了:用戶信息表、產(chǎn)品信息表、用戶對(duì)產(chǎn)品的評(píng)分表。
現(xiàn)在在工業(yè)界最常用的推薦系統(tǒng)算法是協(xié)同過濾相關(guān)算法,Spark 的 MLlib 庫(kù)為我們實(shí)現(xiàn)了推薦系統(tǒng)的算法。
算法比較常用的一個(gè)是基于產(chǎn)品信息的(ItemCF),一個(gè)是基于用戶的(UserCF),這里有一篇博文 https://goo.gl/n1sjnF ,介紹了上面兩種算法。在實(shí)際應(yīng)用場(chǎng)景中,可能并非具有具體評(píng)分值,那么就需要我們根據(jù)用戶的具體行為來為其指定具體的分?jǐn)?shù),譬如一張圖片,衡量用戶對(duì)其的喜歡程度:
瀏覽圖片算作 1,評(píng)論算作 2(舉個(gè)例子,這個(gè)有歧義,也可能是差評(píng)),點(diǎn)擊大圖觀看算作 3,點(diǎn)贊算作 4,分享算作 5,等。
5、任務(wù)調(diào)度系統(tǒng)
大數(shù)據(jù)的任務(wù)調(diào)度系統(tǒng)主要有 Hadoop 的 Oozie,不過相對(duì)而言,筆者更喜歡用領(lǐng)英開源的任務(wù)調(diào)度系統(tǒng)——Azkaban,Azkaban 的官方簡(jiǎn)介是:
- azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies.We had jobs that needed to run in order, from ETL jobs to data analytics products.
- Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.
可以看到,領(lǐng)英官方就用它來做大數(shù)據(jù)相關(guān)的任務(wù)調(diào)度使用,這里推薦一篇博文 https://goo.gl/w6bXdA,詳細(xì)介紹了 Azkaban 用作大數(shù)據(jù)領(lǐng)域任務(wù)調(diào)度系統(tǒng)的配置和應(yīng)用方法。
通過 Azkaban 就可以做到解放人力:任務(wù)的自動(dòng)調(diào)用和執(zhí)行,而且可以指定調(diào)用順序,定時(shí)觸發(fā)還有報(bào)錯(cuò)功能,的確是件神器。
三、經(jīng)驗(yàn)之談
1、合理架構(gòu)
在考慮實(shí)現(xiàn)大數(shù)據(jù)平臺(tái)時(shí),要對(duì)需要實(shí)現(xiàn)的產(chǎn)品做一個(gè)全方位的衡量,選擇適合自己業(yè)務(wù)需要的方式針對(duì)性地架構(gòu),不應(yīng)直接從網(wǎng)上 copy 一種方案便開始實(shí)施。
舉一個(gè)例子,某種場(chǎng)合下,我們可以提出多級(jí) ETL 的方式,來實(shí)現(xiàn)數(shù)據(jù)的復(fù)用,這些數(shù)據(jù)之間的關(guān)系呈現(xiàn)出金字塔狀,如圖所示:
越在金字塔上部分的數(shù)據(jù)量越小,經(jīng)過 ETL 也變得更加細(xì)粒度,這部分?jǐn)?shù)據(jù)的冗余部分相對(duì)較少,越在下面的數(shù)據(jù)冗余越大,越是冷數(shù)據(jù)。
假設(shè)這樣不同層的數(shù)據(jù),我們可以對(duì)其進(jìn)行復(fù)用,那么我們就有必要進(jìn)行多級(jí)的 ETL,如果這種復(fù)用情況很沒有必要,我們也沒有必要進(jìn)行多級(jí)的 ETL。
具體是否適合我們的應(yīng)用場(chǎng)景,要依據(jù)自身具體的業(yè)務(wù)情況來進(jìn)行分析,不能按圖索驥。
2、保證任務(wù)調(diào)度順序
任務(wù)調(diào)度系統(tǒng)我們使用 Azkaban 而不使用 Croncat(Linux 自帶的工具),是因?yàn)?Azkaban 可以讓我們自行指定任務(wù)之間的依賴關(guān)系。
這些依賴是一個(gè) DAG,我們?cè)?Azkaban 中配置任務(wù)之間順序時(shí),一定要把握好任務(wù)之間的關(guān)系,當(dāng)涉及到并行事務(wù)時(shí),要考慮到二者之間的執(zhí)行順序和耦合關(guān)系,否則將會(huì)造成任務(wù)的失敗。
3、保證集群的高負(fù)載
一個(gè)計(jì)算集群都不能浪費(fèi)掉,因?yàn)榧旱膬r(jià)格比較昂貴,我們往往都是使用的云服務(wù)。對(duì)于不是按量付費(fèi)的云服務(wù),我們要保證集群的高負(fù)載。也就是讓集群始終處于一種工作狀態(tài),不要將集群空著,這樣比較浪費(fèi)資源。對(duì)于流式數(shù)據(jù)處理來講,集群自然是保證一直在工作。但對(duì)于離線計(jì)算來講,可能當(dāng)我們提交完一個(gè)作業(yè)之后,很快任務(wù)就執(zhí)行結(jié)束,如果確定沒有什么額外的計(jì)算任務(wù),請(qǐng)選擇按量付費(fèi),這樣能節(jié)約很大一筆開銷。
對(duì)于很多云服務(wù)商來講,他們往往提供了 MapReduce 的云服務(wù),在有條件的情況下,也可以購(gòu)買這種云服務(wù),避免配置的繁瑣,也能夠合理地按量付費(fèi)。
4、充分挖掘節(jié)點(diǎn)算力
Spark 的默認(rèn)設(shè)置,每個(gè)節(jié)點(diǎn)都有內(nèi)存使用上的限制,我們可以通過修改 conf 目錄中的配置文件,來修改 Spark 使用的內(nèi)存量。
譬如 spark-env.sh 文件中的參數(shù) SPARK_WORKER_MEMORY 可以設(shè)置工作節(jié)點(diǎn)的內(nèi)存使用,這個(gè)使用值盡可能設(shè)得大一些,可以提高集群性能。
5、考慮批處理調(diào)用 HTTP API
由于 Spark 是一種并行編程思想,在某些調(diào)用上是并行地取執(zhí)行。例如我們通過 HTTP 微服務(wù)的方式,查詢一個(gè)用戶的性別:
http://foo.com/getGender/10001
每一個(gè)并行的執(zhí)行操作都會(huì)去調(diào)用一次 HTTP 請(qǐng)求,來查詢某個(gè)用戶的性別。實(shí)際上,對(duì)于查詢這種操作,遠(yuǎn)程的服務(wù)器是通過掃描數(shù)據(jù)庫(kù)中的內(nèi)容來完成的,多次反復(fù)掃描和一次批量地掃描效率相比是要差很多的。
以 MongoDB 為例,執(zhí)行兩次 findOne() 和執(zhí)行一次 findMany() 相比,開銷可能要達(dá)到 1.8 倍左右,這還不算遠(yuǎn)程服務(wù)器響應(yīng)并發(fā)時(shí)的性能消耗。對(duì)于這些操作,可以合并執(zhí)行,將 HTTP API 改成:
http://foo.com/getGender/100001,100002,1111,112333
6、降低耦合
通過分析日志中的 URL 請(qǐng)求來完成大數(shù)據(jù)分析,避免修改現(xiàn)有的代碼,可以實(shí)現(xiàn)大數(shù)據(jù)平臺(tái)與現(xiàn)有平臺(tái)之間的分離,實(shí)現(xiàn)松耦合。
大數(shù)據(jù)平臺(tái)的數(shù)據(jù)源來源于日志文件,避免對(duì)現(xiàn)有的業(yè)務(wù)代碼侵犯,可以對(duì)現(xiàn)有數(shù)據(jù)采用讀取的方式豐富數(shù)據(jù)來源,但盡量不要取修改業(yè)務(wù)系統(tǒng)中的數(shù)據(jù)。這樣把大數(shù)據(jù)平臺(tái)作為一個(gè)單獨(dú)的系統(tǒng)來實(shí)現(xiàn),可以避免修改現(xiàn)有的業(yè)務(wù)系統(tǒng)。
四、總結(jié)
在本文中,我們談到了中小型企業(yè)基于大數(shù)據(jù)技術(shù)的項(xiàng)目實(shí)踐。其實(shí),對(duì)于中小型企業(yè)來講,可能數(shù)據(jù)量并沒有大型公司想象得那么多,一般一天產(chǎn)生的日志條數(shù)幾千萬到一億的居多。
對(duì)于這種離線計(jì)算場(chǎng)景,其實(shí)并不一定就非得用分布式集群去消費(fèi)數(shù)據(jù),如果公司尚有閑置的單節(jié)點(diǎn)內(nèi)存容量達(dá)到 16G,雙核心及以上的一臺(tái)機(jī)器,實(shí)際上在做離線計(jì)算的時(shí)候也夠用了。
作者介紹
Tumweeg,學(xué)生時(shí)期曾為微軟中國(guó)打雜并取得過相關(guān)專利,現(xiàn)在某海外業(yè)務(wù)社交類移動(dòng)互聯(lián)網(wǎng)公司任大數(shù)據(jù)工程師。熟悉大數(shù)據(jù)平臺(tái)研發(fā)、架構(gòu),以及數(shù)據(jù)的處理和分析,熟悉Web架構(gòu)和高性能/高并發(fā)/高可用系統(tǒng),熱愛技術(shù)交流,共同提高。