如何基于Spark Streaming構(gòu)建實(shí)時(shí)計(jì)算平臺(tái)
1、前言
隨著互聯(lián)網(wǎng)技術(shù)的迅速發(fā)展,用戶對(duì)于數(shù)據(jù)處理的時(shí)效性、準(zhǔn)確性與穩(wěn)定性要求越來越高,如何構(gòu)建一個(gè)穩(wěn)定易用并提供齊備的監(jiān)控與預(yù)警功能的實(shí)時(shí)計(jì)算平臺(tái)也成了很多公司一個(gè)很大的挑戰(zhàn)。
自2015年攜程實(shí)時(shí)計(jì)算平臺(tái)搭建以來,經(jīng)過兩年多不斷的技術(shù)演進(jìn),目前實(shí)時(shí)集群規(guī)模已達(dá)上百臺(tái),平臺(tái)涵蓋各個(gè)SBU與公共部門數(shù)百個(gè)實(shí)時(shí)應(yīng)用,全年JStorm集群穩(wěn)定性達(dá)到100%。目前實(shí)時(shí)平臺(tái)主要基于JStorm與Spark Streaming構(gòu)建而成,相信關(guān)注攜程實(shí)時(shí)平臺(tái)的朋友在去年已經(jīng)看到一篇關(guān)于攜程實(shí)時(shí)平臺(tái)的分享:攜程實(shí)時(shí)大數(shù)據(jù)平臺(tái)實(shí)踐分享。
本次分享將著重于介紹攜程如何基于Spark Streaming構(gòu)建實(shí)時(shí)計(jì)算平臺(tái),文章將從以下幾個(gè)方面分別闡述平臺(tái)的構(gòu)建與應(yīng)用:
- Spark Streaming vs JStorm
- Spark Streaming設(shè)計(jì)與封裝
- Spark Streaming在攜程的實(shí)踐
- 曾經(jīng)踩過的坑
- 未來展望
2、Spark Streaming vsJStorm
攜程實(shí)時(shí)平臺(tái)在接入Spark Streaming之前,JStorm已穩(wěn)定運(yùn)行有一年半,基本能夠滿足大部分的應(yīng)用場(chǎng)景。接入Spark Streaming主要有以下幾點(diǎn)考慮:首先攜程使用的JStorm版本為2.1.1版本,此版本的JStorm封裝與抽象程度較低,并沒有提供High Level抽象方法以及對(duì)窗口、狀態(tài)和Sql等方面的功能支持,這大大的提高了用戶使用JStorm實(shí)現(xiàn)實(shí)時(shí)應(yīng)用的門檻以及開發(fā)復(fù)雜實(shí)時(shí)應(yīng)用場(chǎng)景的難度。在這幾個(gè)方面,SparkStreaming表現(xiàn)就相對(duì)好的多,不但提供了高度集成的抽象方法(各種算子),并且用戶還可以與SparkSQL相結(jié)合直接使用SQL處理數(shù)據(jù)。
其次,用戶在處理數(shù)據(jù)的過程中往往需要維護(hù)兩套數(shù)據(jù)處理邏輯,實(shí)時(shí)計(jì)算使用JStorm,離線計(jì)算使用Hive或Spark。為了降低開發(fā)和維護(hù)成本,實(shí)現(xiàn)流式與離線計(jì)算引擎的統(tǒng)一,Spark為此提供了良好的支撐。
最后,在引入Spark Streaming之前,我們重點(diǎn)分析了Spark與Flink兩套技術(shù)的引入成本。Flink當(dāng)時(shí)的版本為1.2版本,Spark的版本為2.0.1。相比較于Spark,F(xiàn)link在SQL與MLlib上的支持相對(duì)弱于Spark,并且公司許多部門都是基于Spark SQL與MLlib開發(fā)離線任務(wù)與算法模型,使得大大降低了用戶使用Spark的學(xué)習(xí)成本。
下圖簡(jiǎn)單的給出了當(dāng)前我們使用Spark Streaming與JStorm的對(duì)比:
3、Spark Streaming設(shè)計(jì)與封裝
在接入Spark Streaming的初期,首先需要考慮的是如何基于現(xiàn)有的實(shí)時(shí)平臺(tái)無縫的嵌入SparkStreaming。原先的實(shí)時(shí)平臺(tái)已經(jīng)包含了許多功能:元數(shù)據(jù)管理、監(jiān)控與告警等功能,所以第一步我們先針對(duì)SparkStreaming進(jìn)行了封裝并提供了豐富的功能。整套體系總共包含了Muise Spark Core、Muise Portal以及外部系統(tǒng)。
3.1 Muise Spark Core
MuiseSpark Core是我們基于Spark Streaming實(shí)現(xiàn)的二次封裝,用于支持?jǐn)y程多種消息隊(duì)列,其中HermesKafka與源生的Kafka基于Direct Approach的方式消費(fèi)數(shù)據(jù),Hermes Mysql與Qmq基于Receiver的方式消費(fèi)數(shù)據(jù)。接下來將要講的諸多特性主要是針對(duì)Kafka類型的數(shù)據(jù)源。
Muisespark core主要包含了以下特性:
- Kafka Offset自動(dòng)管理
- 支持Exactly Once與At Least Once語(yǔ)義
- 提供Metric注冊(cè)系統(tǒng),用戶可注冊(cè)自定義metric
- 基于系統(tǒng)與用戶自定義metric進(jìn)行預(yù)警
- Long running on Yarn,提供容錯(cuò)機(jī)制
3.1.1 Kafka Offset自動(dòng)管理
封裝muise spark core的第一目標(biāo)就是簡(jiǎn)單易用,讓用戶以最簡(jiǎn)單的方式能夠上手使用SparkStreaming。首先我們實(shí)現(xiàn)了幫助用戶自動(dòng)讀取與存儲(chǔ)Kafka Offset的功能,用戶無需關(guān)心Offset是如何被處理的。其次我們也對(duì)Kafka Offset的有效性進(jìn)行了校驗(yàn),有的用戶的作業(yè)可能在停止了較長(zhǎng)時(shí)間后重新運(yùn)行會(huì)出現(xiàn)Offset失效的情形,我們也對(duì)此作了對(duì)應(yīng)的操作,目前的操作是將失效的Offset設(shè)置為當(dāng)前有效的最老的Offset。下圖展現(xiàn)了用戶基于muise spark core編寫一個(gè)Spark streaming作業(yè)的簡(jiǎn)單示例,用戶只需要短短幾行代碼即可完成代碼的初始化并創(chuàng)建好對(duì)應(yīng)的DStream:
默認(rèn)情況下,作業(yè)每次都是基于上次存儲(chǔ)的Kafka Offset繼續(xù)消費(fèi),但是用戶也可以自行決定Offset的消費(fèi)起點(diǎn)。下圖中展示了設(shè)置消費(fèi)起點(diǎn)的三種方式:
3.1.2 Exactly Once的實(shí)現(xiàn)
如果實(shí)時(shí)作業(yè)要實(shí)現(xiàn)端對(duì)端的exactly once則需要數(shù)據(jù)源、數(shù)據(jù)處理與數(shù)據(jù)存儲(chǔ)的三個(gè)階段都保證exactly once的語(yǔ)義。目前基于Kafka Direct API加上Spark RDD算子精確一次的保證能夠?qū)崿F(xiàn)端對(duì)端的exactly once的語(yǔ)義。在數(shù)據(jù)存儲(chǔ)階段一般實(shí)現(xiàn)exactly once需要保證存儲(chǔ)的過程是冪等操作或事務(wù)操作。很多系統(tǒng)本身就支持了冪等操作,比如相同數(shù)據(jù)寫hdfs同一個(gè)文件,這本身就是冪等操作,保證了多次操作最終獲取的值還是相同;HBase、ElasticSearch與redis等都能夠?qū)崿F(xiàn)冪等操作。對(duì)于關(guān)系型數(shù)據(jù)庫(kù)的操作一般都是能夠支持事務(wù)性操作。
官方在創(chuàng)建DirectKafkaInputStream時(shí)只需要輸入消費(fèi)Kafka的From Offset,然后其自行獲取本次消費(fèi)的End Offset,也就是當(dāng)前最新的Offset。保存的Offset是本批次的End Offset,下次消費(fèi)從上次的End Offset開始消費(fèi)。當(dāng)程序宕機(jī)或重啟任務(wù)后,這其中存在一些問題。如果在數(shù)據(jù)處理完成前存儲(chǔ)Offset,則可能存在作業(yè)處理數(shù)據(jù)失敗與作業(yè)宕機(jī)等情況,重啟后會(huì)無法追溯上次處理的數(shù)據(jù)導(dǎo)致數(shù)據(jù)出現(xiàn)丟失。如果在數(shù)據(jù)處理完成后存儲(chǔ)Offset,但是存儲(chǔ)Offset過程中發(fā)生失敗或作業(yè)宕機(jī)等情況,則在重啟后會(huì)重復(fù)消費(fèi)上次已經(jīng)消費(fèi)過的數(shù)據(jù)。而且此時(shí)又無法保證重啟后消費(fèi)的數(shù)據(jù)與宕機(jī)前的數(shù)據(jù)量相同數(shù)據(jù)相當(dāng),這又會(huì)引入另外一個(gè)問題,如果是基于聚合統(tǒng)計(jì)指標(biāo)作更新操作,這會(huì)帶來無法判斷上次數(shù)據(jù)是否已經(jīng)更新成功。
所以在muise spark core中我們加入了自己的實(shí)現(xiàn)用以保證Exactly once的語(yǔ)義。具體的實(shí)現(xiàn)是我們對(duì)Spark源碼進(jìn)行了改造,保證在創(chuàng)建DirectKafkaInputStream可以同時(shí)輸入From Offset與End Offset,并且我們?cè)诖鎯?chǔ)Kafka Offset的時(shí)候保存了每個(gè)批次的起始Offset與結(jié)束Offset,具體格式如下:
如此做的用意在于能夠確保無論是宕機(jī)還是人為重啟,重啟后的第一個(gè)批次與重啟前的最后一個(gè)批次數(shù)據(jù)一模一樣。這樣的設(shè)計(jì)使得后面用戶在后面對(duì)于第一個(gè)批次的數(shù)據(jù)處理非常靈活可變,如果用戶直接忽略第一個(gè)批次的數(shù)據(jù),那此時(shí)保證的是at most once的語(yǔ)義,因?yàn)槲覀儫o法獲知重啟前的最后一個(gè)批次數(shù)據(jù)操作是否有成功完成;如果用戶依照原有邏輯處理第一個(gè)批次的數(shù)據(jù),不對(duì)其做去重操作,那此時(shí)保證的是at least once的語(yǔ)義,最終結(jié)果中可能存在重復(fù)數(shù)據(jù);最后如果用戶想要實(shí)現(xiàn)exactlyonce,muise spark core提供了根據(jù)topic、partition與offset生成UID的功能,只要確保兩個(gè)批次消費(fèi)的Offset相同,則最終生成的UID也相同,用戶可以根據(jù)此UID作為判斷上個(gè)批次數(shù)據(jù)是否有存儲(chǔ)成功的依據(jù)。下面簡(jiǎn)單的給出了重啟后第一個(gè)批次操作的行為。
3.1.3 Metrics系統(tǒng)
Musiespark core基于Spark本身的metrics系統(tǒng)進(jìn)行了改造,添加了許多定制的metrics,并且向用戶暴露了metrics注冊(cè)接口,用戶可以非常方便的注冊(cè)自己的metrics并在程序中更新metrics的數(shù)值。最后所有的metrics會(huì)根據(jù)作業(yè)設(shè)定的批次間隔寫入Graphite,基于公司定制的預(yù)警系統(tǒng)進(jìn)行報(bào)警,前端可以通過Grafana展現(xiàn)各項(xiàng)metrics指標(biāo)。
Muisespark core本身定制的metrics包含以下三種:
- Fail,批次時(shí)間內(nèi)spark task失敗次數(shù)超過4次便報(bào)警,用于監(jiān)控程序的運(yùn)行狀態(tài)
- Ack,批次時(shí)間內(nèi)spark streaming處理的數(shù)據(jù)量小0便報(bào)警,用于監(jiān)控程序是否在正常消費(fèi)數(shù)據(jù)
- Lag,批次時(shí)間內(nèi)數(shù)據(jù)消費(fèi)延遲大于設(shè)定值便報(bào)警
其中由于我們大部分作業(yè)開啟了Back Pressure功能,這就導(dǎo)致在Spark UI中看到每個(gè)批次數(shù)據(jù)都能在正常時(shí)間內(nèi)消費(fèi)完成,然而可能此時(shí)kafka中已經(jīng)積壓了大量數(shù)據(jù),故每個(gè)批次我們都會(huì)計(jì)算當(dāng)前消費(fèi)時(shí)間與數(shù)據(jù)本身時(shí)間的一個(gè)平均差值,如果這個(gè)差值大于批次時(shí)間,說明本身數(shù)據(jù)消費(fèi)就已經(jīng)存在了延遲。
下圖展現(xiàn)了預(yù)警系統(tǒng)中,基于用戶自定義注冊(cè)的Metrics以及系統(tǒng)定制的Metrics進(jìn)行預(yù)警。
3.1.4 容錯(cuò)
其實(shí)在上面Exactly Once一章中已經(jīng)詳細(xì)的描述了muise spark core如何在程序宕機(jī)后能夠保證數(shù)據(jù)正確的處理。但是為了能夠讓Spark Sreaming能夠長(zhǎng)時(shí)間穩(wěn)定的運(yùn)行在Yarn集群上,還需要添加許多配置,感興趣的朋友可以查看:Long running Spark Streaming Jobs on YarnCluster。
除了上述容錯(cuò)保證之外,Muise Portal(后面會(huì)講)也提供了對(duì)Spark Streaming作業(yè)定時(shí)檢測(cè)的功能。目前每過5分鐘對(duì)當(dāng)前所有數(shù)據(jù)庫(kù)中狀態(tài)標(biāo)記為Running的Spark Streaming作業(yè)進(jìn)行狀態(tài)檢測(cè),通過Yarn提供的REST APIs可以根據(jù)每個(gè)作業(yè)的Application Id查詢作業(yè)在Yarn上的狀態(tài),如果狀態(tài)處于非運(yùn)行狀態(tài),則會(huì)嘗試重啟作業(yè)。
3.2 Muise Portal
在封裝完所有的Spark Streaming之后,我們就需要有一個(gè)平臺(tái)能夠管理配置作業(yè),MuisePortal就是這樣的存在。Muise Portal目前主要支持了Storm與Spark Streaming兩類作業(yè),支持新建作業(yè)、Jar包發(fā)布、作業(yè)運(yùn)行與停止等一系列功能。下圖展現(xiàn)了新建作業(yè)的界面:
SparkStreaming作業(yè)基于Yarn Cluster模式運(yùn)行,所有作業(yè)通過在Muise Portal上的Spark客戶端提交到Y(jié)arn集群上運(yùn)行。具體的一個(gè)作業(yè)運(yùn)行流程如下圖所示:
3.3 整體架構(gòu)
最后這邊給出一下目前攜程實(shí)時(shí)平臺(tái)的整體架構(gòu)。
4、Spark Streaming在攜程的實(shí)踐
目前Spark Streaming在攜程的業(yè)務(wù)場(chǎng)景主要可以分為以下幾塊:ETL、實(shí)時(shí)報(bào)表統(tǒng)計(jì)、個(gè)性化推薦類的營(yíng)銷場(chǎng)景以及風(fēng)控與安全的應(yīng)用。從抽象上來說,主要可以分為數(shù)據(jù)過濾抽取、數(shù)據(jù)指標(biāo)統(tǒng)計(jì)與模型算法的使用。
4.1 ETL
如今市面上有形形色色的工具可以從Kafka實(shí)時(shí)消費(fèi)數(shù)據(jù)并進(jìn)行過濾清洗最終落地到對(duì)應(yīng)的存儲(chǔ)系統(tǒng),如:Camus、Flume等。相比較于此類產(chǎn)品,Spark Streaming的優(yōu)勢(shì)首先在于可以支持更為復(fù)雜的處理邏輯,其次基于Yarn系統(tǒng)的資源調(diào)度使得Spark Streaming的資源配置更加靈活,最后用戶可以將Spark RDD數(shù)據(jù)轉(zhuǎn)換成Spark Dataframe數(shù)據(jù),使得可以與Spark SQL相結(jié)合,并且最終將數(shù)據(jù)輸出到HDFS和Alluxio等分布式文件系統(tǒng)時(shí)可以存儲(chǔ)為Parquet之類的格式化數(shù)據(jù),用戶在后續(xù)使用Spark SQL處理數(shù)據(jù)時(shí)更為的簡(jiǎn)便。
目前在ETL使用場(chǎng)景中較為典型的是攜程度假部門的Data Lake應(yīng)用,度假部門使用Spark Streaming對(duì)數(shù)據(jù)做ETL操作最終將數(shù)據(jù)存儲(chǔ)至Alluxio,期間基于muise-spark-core的自定義metric功能對(duì)數(shù)據(jù)的數(shù)據(jù)量、字段數(shù)、數(shù)據(jù)格式與重復(fù)數(shù)據(jù)進(jìn)行了數(shù)據(jù)質(zhì)量校驗(yàn)與監(jiān)控,具體的監(jiān)控預(yù)警已在上面說過。
4.2 實(shí)時(shí)報(bào)表統(tǒng)計(jì)
實(shí)時(shí)報(bào)表統(tǒng)計(jì)與展現(xiàn)也是Spark Streaming使用較多的一個(gè)場(chǎng)景,數(shù)據(jù)可以基于Process Time統(tǒng)計(jì),也可以基于Event Time統(tǒng)計(jì)。由于本身Spark Streaming不同批次的job可以視為一個(gè)個(gè)的滾動(dòng)窗口,某個(gè)獨(dú)立的窗口中包含了多個(gè)時(shí)間段的數(shù)據(jù),這使得使用SparkStreaming基于Event Time統(tǒng)計(jì)時(shí)存在一定的限制。一般較為常用的方式是統(tǒng)計(jì)每個(gè)批次中不同時(shí)間維度的累積值并導(dǎo)入到外部系統(tǒng),如ES;然后在報(bào)表展現(xiàn)的時(shí)基于時(shí)間做二次聚合獲得完整的累加值最終求得聚合值。下圖展示了攜程IBU基于Spark Streaming實(shí)現(xiàn)的實(shí)時(shí)看板。
4.3 個(gè)性化推薦與風(fēng)控安全
這兩類應(yīng)用的共同點(diǎn)莫過于它們都需要基于算法模型對(duì)用戶的行為作出相對(duì)應(yīng)的預(yù)測(cè)或分類,攜程目前所有模型都是基于離線數(shù)據(jù)每天定時(shí)離線訓(xùn)練。在引入Spark Streaming之后,許多部門開始積極的嘗試特征的實(shí)時(shí)提取、模型的在線訓(xùn)練。并且Spark Streaming可以很好的與Spark MLlib相結(jié)合,其中最為成功的案例為信安部門以前是基于各類過濾條件抓取攻擊請(qǐng)求,后來他們采用離線模型訓(xùn)練,Spark Streaming加Spark MLlib對(duì)用戶進(jìn)行實(shí)時(shí)預(yù)測(cè),性能上較JStorm(基于大量正則表達(dá)式匹配用戶,十分消耗CPU)提高了十倍,漏報(bào)率降低了20%。
5、曾經(jīng)踩過的坑
目前攜程的Spark Streaming作業(yè)運(yùn)行的YARN集群與離線作業(yè)同屬一個(gè)集群,這對(duì)作業(yè)無論是性能還是穩(wěn)定性都帶來了諸多影響。尤其是當(dāng)YARN或者Hadoop集群需要更新維護(hù)重啟服務(wù)時(shí),在很大程度上會(huì)導(dǎo)致Spark Streaming作業(yè)出現(xiàn)報(bào)錯(cuò)、掛掉等狀況,雖然有諸多的容錯(cuò)保障,但也會(huì)導(dǎo)致數(shù)據(jù)積壓數(shù)據(jù)處理延遲。后期將會(huì)獨(dú)立部署Hadoop與Yarn集群,所有的實(shí)時(shí)作業(yè)都運(yùn)行在獨(dú)立的集群上,不受外部的影響,這也方便后期對(duì)于Flink作業(yè)的開發(fā)與維護(hù)。后期通過Alluxio實(shí)現(xiàn)主集群與子集群間的數(shù)據(jù)共享。
在使用過程中,也遇到了形形色色不同的Bug,這邊簡(jiǎn)單的介紹幾個(gè)較為嚴(yán)重的問題。首先第一個(gè)問題是,Spark Streaming每個(gè)批次Job都會(huì)通過DirectKafkaInputStream的comput方法獲取消費(fèi)的Kafka Topic當(dāng)前最新的offset,如果此時(shí)kafka集群由于某些原因不穩(wěn)定,就會(huì)導(dǎo)致java.lang.RuntimeException: No leader found for partition xx的問題,由于此段代碼運(yùn)行在Driver端,如果沒有做任何配置和處理的情況下,會(huì)導(dǎo)致程序直接掛掉。對(duì)應(yīng)的解決方法是配置spark.streaming.kafka.maxRetries大于1,并且可以通過配置refresh.leader.backoff.ms參數(shù)設(shè)置每次重試的間隔時(shí)間。
其次在使用Spark Streaming與Spark Sql相結(jié)合的過程中,也會(huì)有諸多問題。比如在使用過程中可能出現(xiàn)out of memory:PermGen space,這是由于Spark sql使用code generator導(dǎo)致大量使用PermGen space,通過在spark.driver.extraJavaOptions中添加-XX:MaxPermSize=1024m-XX:PermSize=512m解決。還有Spark Sql需要?jiǎng)?chuàng)建Spark Warehouse,如果基于Yarn來運(yùn)行,默認(rèn)可能是在HDFS上創(chuàng)建相對(duì)應(yīng)的目錄,如果沒有權(quán)限會(huì)報(bào)出Permission denied的問題,用戶可以通過配置config(“spark.sql.warehouse.dir”,”file:${system:user.dir}/spark-warehouse”)來解決。
6、未來展望
上面主要針對(duì)Spark Streaming在攜程實(shí)時(shí)平臺(tái)中的運(yùn)用做了詳細(xì)的介紹,在使用SparkStreaming過程中還是存在一些痛點(diǎn),比如窗口功能比較單一、基于Event Time統(tǒng)計(jì)指標(biāo)過于繁瑣以及官方在新的版本中基本沒有新的特性加入等,這使得我們更加傾向于嘗試Flink。Flink基本實(shí)現(xiàn)了Google提出的各類實(shí)時(shí)處理的理念,引入了WaterMark的實(shí)現(xiàn),感興趣的朋友可以查看Google官方文檔:The world beyond batch: Streaming 102。
目前Flink 1.4 release版本發(fā)布在即,Spark 2.2.0基于kafka數(shù)據(jù)源的Structured Streaming也支持了更多的特性。前期我們已對(duì)Flink做了充分的調(diào)研,下半年主要工作將放在Flink的對(duì)接上。在提供了諸多實(shí)時(shí)計(jì)算框架的支持后,隨之而來的是帶來了更多的學(xué)習(xí)成本,今后我們的重心將放在如何使用戶更加容易的實(shí)現(xiàn)實(shí)時(shí)計(jì)算邏輯。其中Apache Beam對(duì)各種實(shí)時(shí)場(chǎng)景提供了良好的封裝并對(duì)多種實(shí)時(shí)計(jì)算引擎做了支持,其次基于Stream Sql實(shí)現(xiàn)復(fù)雜的實(shí)時(shí)應(yīng)用場(chǎng)景都將是我們主要調(diào)研的方向。