面向數(shù)據(jù)科學(xué)的5個Apache Spark最佳實踐
譯文【51CTO.com快譯】
為什么轉(zhuǎn)向Spark?
雖然我們都在談?wù)摯髷?shù)據(jù),但通常在職場闖蕩一段時間后才遇到大數(shù)據(jù)。在我供職的Wix.com,有逾1.6億個用戶在生成大量數(shù)據(jù),因此需要擴展我們的數(shù)據(jù)流程。
雖然有其他選擇(比如Dask),但我們決定選擇Spark,原因主要有兩個:(1)它是目前的最新技術(shù),廣泛用于大數(shù)據(jù)。(2)我們擁有Spark所需的基礎(chǔ)架構(gòu)。
如何針對pandas人群用PySpark編寫代碼?
您可能很熟悉pandas,僅僅搞好語法可能開了個好頭,但確保PySpark項目成功還需要具備更多的條件,您要了解Spark的工作原理。
讓Spark正常工作很難,但一旦可以正常工作,它效果很棒!
Spark簡述
建議看看這篇文章,閱讀MapReduce方面的說明以便更深入的了解:《如何使用Spark處理大數(shù)據(jù)?》(https://towardsdatascience.com/the-hitchhikers-guide-to-handle-big-data-using-spark-90b9be0fe89a)。
我們在這里要了解的概念是橫向擴展。
從縱向擴展入手比較容易。如果我們有一個運行良好的pandas代碼,但后來數(shù)據(jù)對于它來說太大了,我們可能會轉(zhuǎn)移到一臺內(nèi)存更多、功能更強的機器上,希望它能應(yīng)付得了。這意味著我們?nèi)杂幸慌_機器同時在處理全部數(shù)據(jù)——這就是縱向擴展。
如果我們改而決定使用MapReduce,并將數(shù)據(jù)分成多個塊,然后讓不同的機器來處理每個塊,這就是橫向擴展。
五個Spark最佳實踐
這五個Spark最佳實踐幫助我將運行時間縮短至十分之一,并擴展項目。
1. 從小處入手——采樣數(shù)據(jù)
如果我們想讓大數(shù)據(jù)起作用,先要使用少量數(shù)據(jù)看到我們方向正確。在我的項目中,我采樣10%的數(shù)據(jù),并確保管道正常工作,這讓我可以使用Spark UI中的SQL部分,并查看數(shù)字流經(jīng)整個流程,不必等待太長的時間來運行流程。
憑我的經(jīng)驗,如果您用小樣本就能達到所需的運行時間,通??梢暂p松擴展。
2. 了解基礎(chǔ)部分:任務(wù)、分區(qū)和核心
這可能是使用Spark時要理解的最重要的一點:
1個分區(qū)用于在1個核心上運行的1個任務(wù)。
您要始終了解自己有多少分區(qū)——密切關(guān)注每個階段的任務(wù)數(shù)量,并在Spark連接中將它們與正確數(shù)量的核心進行匹配。幾個技巧和經(jīng)驗法則可以幫助您做到這一點(所有這些都需要根據(jù)您的情況進行測試):
- 任務(wù)與核心之間的比例應(yīng)該是每個核心約2至4個任務(wù)。
- 每個分區(qū)的大小應(yīng)約為200MB–400MB,這取決于每個worker的內(nèi)存,可根據(jù)需要來調(diào)整。
3. 調(diào)試Spark
Spark使用惰性求值,這意味著它在等到動作被調(diào)用后才執(zhí)行計算指令圖。動作示例包括show()和count()等。
這樣一來,很難知道我們代碼中的bug以及需要優(yōu)化的地方。我發(fā)現(xiàn)大有幫助的一個實踐是,使用df.cache()將代碼劃分為幾個部分,然后使用df.count()強制Spark在每個部分計算df。
現(xiàn)在使用Spark UI,您可以查看每個部分的計算,并找出問題所在。值得一提的是,如果不使用我們在(1)中提到的采樣就使用這種做法,可能會創(chuàng)建很長的運行時間,到時將很難調(diào)試。
4. 查找和解決偏度
讓我們從定義偏度開始。正如我們提到,我們的數(shù)據(jù)分到多個分區(qū);轉(zhuǎn)換后,每個分區(qū)的大小可能隨之變化。這會導(dǎo)致分區(qū)之間的大小出現(xiàn)很大的差異,這意味著我們的數(shù)據(jù)存在偏度。
可以通過在Spark UI中查看階段方面的細節(jié),并尋找最大數(shù)和中位數(shù)之間的顯著差異以找到偏度:
圖1. 很大的差異(中位數(shù)= 3秒,最大數(shù)= 7.5分鐘)意味著數(shù)據(jù)有偏度。
這意味著我們有幾個任務(wù)比其他任務(wù)要慢得多。
為什么這不好——這可能導(dǎo)致其他階段等待這幾項任務(wù),使核心處于等待狀態(tài)而無所事事。
如果您知道偏度來自何處,可以直接解決它并更改分區(qū)。如果您不知道/或沒辦法直接解決,嘗試以下操作:
調(diào)整任務(wù)與核心之間的比例
如前所述,如果擁有的任務(wù)比核心更多,我們希望當更長的任務(wù)運行時,其他核心仍然忙于處理其他任務(wù)。盡管這是事實,但前面提到的比例(2-4:1)無法真正解決任務(wù)持續(xù)時間之間這么大的差異。我們可以試著將比例提高到10:1,看看是否有幫助,但是這種方法可能有其他缺點。
為數(shù)據(jù)加入隨機字符串(salting)
Salting是指用隨機密鑰對數(shù)據(jù)重新分區(qū),以便可以平衡新分區(qū)。這是PySpark的代碼示例(使用通常會導(dǎo)致偏度的groupby):
圖2
5. Spark中迭代代碼方面的問題
這是個棘手的問題。如前所述,Spark使用惰性求值,因此運行代碼時,它僅構(gòu)建計算圖(DAG)。但當您有一個迭代過程時,該方法可能會很成問題,因為DAG重新打開了先前的迭代,而且變得很大。這可能太大了,驅(qū)動程序在內(nèi)存中裝不下。由于應(yīng)用程序卡住了,因此很難找到問題所在,但是在Spark UI中好像沒有作業(yè)在長時間運行(確實如此),直到驅(qū)動程序最終崩潰才發(fā)現(xiàn)并非如此。
這是目前Spark的一個固有問題,對我來說有用的解決方法是每5-6次迭代使用df.checkpoint()/ df.localCheckpoint()(試驗一番可找到適合您的數(shù)字)。這招管用的原因是,checkpoint()打破了譜系和DAG(不像cache()),保存了結(jié)果,并從新的檢查點開始。缺點在于,如果發(fā)生了什么岔子,您就沒有整個DAG來重新創(chuàng)建df。
原文標題:5 Apache Spark Best Practices For Data Science,作者:Zion Badash
【51CTO譯稿,合作站點轉(zhuǎn)載請注明原文譯者和出處為51CTO.com】