字節(jié)跳動(dòng) Spark Shuffle 大規(guī)模云原生化演進(jìn)實(shí)踐
在字節(jié)跳動(dòng)內(nèi)部,Spark 計(jì)算引擎被廣泛應(yīng)用于大規(guī)模數(shù)據(jù)處理,機(jī)器學(xué)習(xí)等場(chǎng)景,天任務(wù)數(shù)超過(guò) 150W。線上集群磁盤(pán)類(lèi)型多樣,包括 SSD、HDD 及混合等。每天會(huì)產(chǎn)生超過(guò) 100PB 以上的 Shuffle 數(shù)據(jù),同時(shí)單個(gè)任務(wù)的 Shuffle 數(shù)據(jù)量可能達(dá)到數(shù)百 TB。巨量的 Shuffle 數(shù)據(jù)和復(fù)雜的計(jì)算資源環(huán)境也給 Spark 運(yùn)行過(guò)程中的 Shuffle 性能帶來(lái)了很多挑戰(zhàn)。本文將從背景介紹、穩(wěn)定性資源場(chǎng)景和混部資源場(chǎng)景分享字節(jié)跳動(dòng)在 Spark Shuffle 云原生化方面的大規(guī)模演進(jìn)實(shí)踐。
一、背景介紹
Spark 是字節(jié)跳動(dòng)內(nèi)使用廣泛的計(jì)算引擎,已廣泛應(yīng)用于各種大規(guī)模數(shù)據(jù)處理、機(jī)器學(xué)習(xí)和大數(shù)據(jù)場(chǎng)景。目前中國(guó)區(qū)域內(nèi)每天的任務(wù)數(shù)已經(jīng)超過(guò) 150 萬(wàn),每天的 Shuffle 讀寫(xiě)數(shù)據(jù)量超過(guò) 500 PB。同時(shí)某些單個(gè)任務(wù)的 Shuffle 數(shù)據(jù)能夠達(dá)到數(shù)百 TB 級(jí)別。
與此同時(shí)作業(yè)量與 Shuffle 的數(shù)據(jù)量還在增長(zhǎng),相比去年,今年的天任務(wù)數(shù)增加了 50 萬(wàn),總體數(shù)據(jù)量的增長(zhǎng)超過(guò)了 200 PB,達(dá)到了 50% 的增長(zhǎng)。Shuffle 是用戶作業(yè)中會(huì)經(jīng)常觸發(fā)的功能,各種 ReduceByKey、groupByKey、join、sortByKey 和 repartition 的操作都會(huì)使用到 Shuffle。所以在大規(guī)模的 Spark 集群內(nèi),Spark Shuffle 經(jīng)常會(huì)成為性能及穩(wěn)定性的瓶頸。Shuffle 的計(jì)算涉及到頻繁的磁盤(pán)和網(wǎng)絡(luò) IO 操作,主要是需要把所有節(jié)點(diǎn)的數(shù)據(jù)進(jìn)行重新分區(qū)并組合。
1、原理
在社區(qū)版 ESS 模式下默認(rèn)使用的 Shuffle 模式的基本原理中,剛才提到 Shuffle 的計(jì)算會(huì)把數(shù)據(jù)進(jìn)行重新分區(qū),這里就是把 Map 的數(shù)據(jù)重新組合到所有的 Reducers 上。如果有 M 個(gè) Mappers,和 R 個(gè) Reducers,就會(huì)把 M 個(gè) Mappers 的 Partition 數(shù)據(jù)分區(qū)成后面 R 個(gè) Reducers 的 Partition。
Shuffle 的過(guò)程可以分為兩個(gè)階段——Shuffle Write 和 Shuffle Read。
Shuffle Write 的時(shí)候,mapper 會(huì)把當(dāng)前的 Partition 按照 Reduce 的 Partition 分成 R 個(gè)新的 Partition,并排序后寫(xiě)到本地磁盤(pán)上。生成的 map output 包含兩個(gè)文件:索引文件和按 Partition 排序后的數(shù)據(jù)文件。
當(dāng)所有的 Mappers 寫(xiě)完 map output 后,就會(huì)開(kāi)始第二個(gè)階段,Shuffle Read 階段。這個(gè)時(shí)候每個(gè) Reducer 會(huì)向包含它的 Reducer Partition 的所有 ESS 訪問(wèn),并讀取對(duì)應(yīng) Reduce Partition 的數(shù)據(jù)。這里有可能會(huì)請(qǐng)求到所有 Partition 所在的 ESS,直到這個(gè) Reducer 獲取到所有對(duì)應(yīng)的 Reduce Partition 的數(shù)據(jù)。
在 Shuffle Fetch 階段,每個(gè) ESS 會(huì)收到所有 Reducer 的請(qǐng)求并返回相應(yīng)的數(shù)據(jù)。這將產(chǎn)生 M 乘 R 級(jí)別的網(wǎng)絡(luò)連接和隨機(jī)的磁盤(pán)讀寫(xiě) IO,涉及到大量的磁盤(pán)讀寫(xiě)和網(wǎng)絡(luò)傳輸。這就是為什么 Shuffle 會(huì)對(duì)磁盤(pán)以及網(wǎng)絡(luò) IO 的請(qǐng)求都特別頻繁的原因。
由于 Shuffle 對(duì)資源的需求和消耗都非常高,所以 CPU、磁盤(pán)和網(wǎng)絡(luò)開(kāi)銷(xiāo)都很有可能是造成 Fetch failure 的原因或 Shuffle 速度較慢的瓶頸。在字節(jié)跳動(dòng)大規(guī)模的 Shuffle 場(chǎng)景中,同一個(gè) ESS 節(jié)點(diǎn)可能需要同時(shí)服務(wù)多個(gè)商戶,而這些集群沒(méi)有進(jìn)行 IO 的隔離,就可能會(huì)導(dǎo)致 Shuffle 成為用戶作業(yè)失敗的主要原因和痛點(diǎn)問(wèn)題。
字節(jié)跳動(dòng)從 2021 年初開(kāi)始了 Spark Shuffle 的云原生化相關(guān)工作,Spark 作業(yè)與其他大數(shù)據(jù)生態(tài)開(kāi)始了從 Yarn Godel 的遷移。Godel 是字節(jié)跳動(dòng)基于 Kubernetes 自研的調(diào)度器,遷移時(shí)也提供了 Hadoop 上云的遷移方案——Yodel(Yarn on Godel),是一個(gè)完全兼容 Hadoop Yarn 的協(xié)議,目標(biāo)是將所有大數(shù)據(jù)應(yīng)用平滑地遷移到 Kubernetes 體系上。
在這套遷移工作中,ESS 也做了定制化的相關(guān)工作,完成了從之前 Yarn Node Manager 模式下的 Yarn Auxiliary Service 遷移至 Kubernetes DaemonSet 部署模方式的適配工作,并開(kāi)始 Shuffle 作業(yè)的遷移工作。歷時(shí)兩年,在 2023 年,順利將所有大數(shù)據(jù)應(yīng)用包括 Spark 應(yīng)用都遷移到了如今的云原生生態(tài)上。
2、挑戰(zhàn)
在云原生化的遷移過(guò)程中,也遇到了很多挑戰(zhàn):
- 首先,從 NM 遷移到 DaemonSet 的過(guò)程中,DaemonSet 上 ESS 的 CPU 有非常嚴(yán)格的限制,而在之前的 NM 模式下,ESS 基本上可以使用所有的 CPU 資源。所以在這個(gè)遷移實(shí)踐中,往往最開(kāi)始設(shè)置的 ESS 的 CPU 資源是不夠的,需要經(jīng)過(guò)持續(xù)不斷的調(diào)整。后續(xù),某些高優(yōu)集群甚至直接放開(kāi)對(duì) ESS 的 CPU 使用。
- 同時(shí),DaemonSet 和 Pod 對(duì) Spark 作業(yè)的 CPU 有更嚴(yán)格的限制。這也導(dǎo)致不少用戶的作業(yè)遷移到了新的架構(gòu)后變得更加緩慢了。這是因?yàn)樵谥暗哪J较?,CPU 是有一定的超發(fā)的,因此需要對(duì)這個(gè)情況進(jìn)行調(diào)整。我們?cè)?nbsp;Kubernetes 和 Godel 架構(gòu)下開(kāi)啟了 CPU Shares 模式,使用戶在遷移過(guò)程中感知不到性能上的差異。
- 另外,Pod 對(duì)內(nèi)存的限制也非常嚴(yán)格,這導(dǎo)致 Shuffle Read 時(shí)無(wú)法使用空閑的 page cache 資源,從而導(dǎo)致 Shuffle Read 時(shí) page cache 的命中率非常低。這個(gè)過(guò)程會(huì)帶來(lái)更多的磁盤(pán) IO 開(kāi)銷(xiāo),導(dǎo)致整體性能變差。對(duì)此我們采取了相應(yīng)的措施,通過(guò)適當(dāng)開(kāi)放 Pod 對(duì) page cache 的使用,降低 Shuffle 在遷移后對(duì)性能的影響。
3、收益
完成遷移工作之后,我們成功地將所有的離線資源池完成統(tǒng)一,在調(diào)度層面能夠更友好地實(shí)施一些優(yōu)化和調(diào)度策略,從而提高整體的資源使用率。ESS Daemonset 相比于 Yarn Auxilary Service 也獲得了不少的收益。首先,ESS DaemonSet 被獨(dú)立出來(lái)成為一個(gè)服務(wù),脫離與 NM 的緊耦合,減少了運(yùn)維成本。另外,Kubernetes 和 Pod 對(duì) ESS 資源的隔離也增加了 ESS 的穩(wěn)定性,這意味著 ESS 不會(huì)再受到其他作業(yè)或者節(jié)點(diǎn)上其它服務(wù)的影響。
云原生化后的 Spark 作業(yè)目前有兩個(gè)主要的運(yùn)行環(huán)境:
- 穩(wěn)定資源集群環(huán)境。這些穩(wěn)定資源的集群主要以服務(wù)高優(yōu)和 SLA 的任務(wù)為主。部署的磁盤(pán)是性能比較好的 SSD 磁盤(pán)。對(duì)于這些穩(wěn)定資源集群,主要使用基于社區(qū)、深度定制化后的 ESS 服務(wù)。使用 SSD 磁盤(pán),ESS 讀寫(xiě),也可以使用到本地的高性能 SSD 磁盤(pán)。部署在 Daemonset 模式,Godel 架構(gòu)下。
- 混部資源集群環(huán)境。這些集群主要服務(wù)于中低游的作業(yè),以一些臨時(shí)查詢(xún)、調(diào)試或者測(cè)試任務(wù)為主。這些集群的資源主要都部署在 HDD 磁盤(pán)上,有些是通過(guò)線上資源出讓或與其他服務(wù)共用的或者其他線上的服務(wù)共同部署的一些資源。這就導(dǎo)致集群的資源都不是獨(dú)占的,整體的磁盤(pán)性能以及儲(chǔ)存環(huán)境也都不是特別優(yōu)異。
二、穩(wěn)定資源場(chǎng)景
在穩(wěn)定集群環(huán)境中,存在較多的高優(yōu)作業(yè),首要任務(wù)是提高這些作業(yè) Shuffle 的穩(wěn)定性,以及運(yùn)行時(shí)的作業(yè)時(shí)長(zhǎng),以確保這些作業(yè)的 SLA。為了解決 Shuffle 的問(wèn)題,對(duì) ESS 深度定制了以下三方面能力:增強(qiáng) ESS 的監(jiān)控/治理能力、增加 ESS Shuffle 的限流功能、增加 Shuffle 溢寫(xiě)分裂功能。
1、ESS 深度定制
(1)增強(qiáng) ESS 的監(jiān)控及治理能力
在監(jiān)控方面,我們使用開(kāi)源版本的過(guò)程中發(fā)現(xiàn)現(xiàn)有的監(jiān)控不足以深度排查遇到的 Shuffle 問(wèn)題和當(dāng)前的 ESS 狀況。這就導(dǎo)致沒(méi)有辦法快速定位是哪些節(jié)點(diǎn)造成的 Shuffle 問(wèn)題,也沒(méi)有辦法感知到有問(wèn)題的節(jié)點(diǎn),因此,我們對(duì)監(jiān)控能力進(jìn)行了一些增強(qiáng)。
首先,我們?cè)黾恿吮O(jiān)控 Shuffle 慢和 Fetch Rate 能力的一些關(guān)鍵指標(biāo),包括 Queued Chunks 和 Chunk Fetch Rate。Queued Chunks 用于監(jiān)控當(dāng)前請(qǐng)求 ESS 節(jié)點(diǎn)上請(qǐng)求的堆積,而 Chunk Fetch Rate 用于監(jiān)控這些節(jié)點(diǎn)上請(qǐng)求的流量。同時(shí),我們還將 ESS 的 Metrics 指標(biāo)接入了字節(jié)跳動(dòng)的 Metrics 系統(tǒng),使我們能夠通過(guò)系統(tǒng)提供的 Application 維度的指標(biāo)快速定位 ESS 節(jié)點(diǎn)的堆積情況。在用戶界面 (UI) 方面,我們?cè)?nbsp;Stage 詳情頁(yè)加入了兩個(gè)新功能,用于展示當(dāng)前 Stage 里每個(gè) Task Shuffle 遇到最慢的幾個(gè)節(jié)點(diǎn),以及經(jīng)過(guò) Stage 統(tǒng)計(jì)后所有 Task 遇到 Shuffle 次數(shù)最多的 top 節(jié)點(diǎn)。這不僅方便用戶查詢(xún),也可以利用這些指標(biāo)進(jìn)行相關(guān)大盤(pán)的搭建。
- 收益
有了這些監(jiān)控與 UI 改善后,當(dāng)用戶在 UI 上看到 Shuffle 慢的時(shí)候可以通過(guò) UI 打開(kāi)對(duì)應(yīng)的 Shuffle 監(jiān)控。方便用戶和我們團(tuán)隊(duì)快速定位到導(dǎo)致 Shuffle 問(wèn)題的 ESS 節(jié)點(diǎn),看到這些節(jié)點(diǎn)上的實(shí)際情況,并快速定位這些堆積請(qǐng)求量是來(lái)自于哪些 Application。
新增的監(jiān)控也會(huì)在運(yùn)行排查 Shuffle 問(wèn)題時(shí)感知到 ESS 節(jié)點(diǎn)上實(shí)際的 Chunk 堆積、latency 等關(guān)鍵指標(biāo)。這在遇到 Shuffle 慢的情況下有助于更有效地實(shí)時(shí)采取措施。一旦定位到 Shuffle 問(wèn)題,我們可以分析情況并提供治理方向和優(yōu)化。
治理工作主要是通過(guò) BatchBrain 系統(tǒng)來(lái)實(shí)施。BatchBrain 是專(zhuān)門(mén)為 Spark 作業(yè)設(shè)計(jì)的一套智能作業(yè)調(diào)優(yōu)系統(tǒng),它主要對(duì)作業(yè)數(shù)據(jù)進(jìn)行采集,并進(jìn)行離線與實(shí)時(shí)分析。采集的數(shù)據(jù)包括 Spark 本身的 Event Log、內(nèi)部打入更詳細(xì)的 Timeline event 以及各種 Metrics 指標(biāo),包括對(duì) ESS 加上的定制化 Shuffle 指標(biāo)等。
在離線分析中主要需要治理周期性作業(yè),根據(jù)每個(gè)作業(yè)的歷史特征,結(jié)合采集的數(shù)據(jù),對(duì)這些作業(yè)的 Shuffle Stage 性能進(jìn)行分析,并經(jīng)過(guò)多次迭代調(diào)整,最終提供一套適合的Shuffle 參數(shù),使這些作業(yè)在重新運(yùn)行時(shí)可以對(duì)優(yōu)化后的Shuffle 參數(shù)進(jìn)行運(yùn)行,從而獲得更好的性能和效果。
BatchBrain 在實(shí)時(shí)分析部分也可以利用之前添加的 Shuffle 指標(biāo)進(jìn)行自動(dòng)掃描。用戶還可以通過(guò) BatchBrain API 查詢(xún)他們集群內(nèi)作業(yè)的 Shuffle 狀況,以及有效定位遇到 Shuffle 堆積的節(jié)點(diǎn)和作業(yè),并通過(guò)報(bào)警通知相關(guān)人員。如果發(fā)現(xiàn) Shuffle 慢是由于其他的作業(yè)或者異常作業(yè)導(dǎo)致的,用戶也可以直接采取治理動(dòng)作,例如停止或者驅(qū)逐這些作業(yè),以便為更高優(yōu)先級(jí)的作業(yè)騰出更多資源進(jìn)行 Shuffle。
(2)Shuffle 限流功能
通過(guò) Shuffle 的監(jiān)控和治理,我們發(fā)現(xiàn)在 ESS 節(jié)點(diǎn)上遇到 Shuffle 慢的情況,通常是因?yàn)槟承┤蝿?wù)的數(shù)據(jù)量過(guò)于龐大或者設(shè)置了不妥的參數(shù),導(dǎo)致這些 Shuffle Stage 的 Mapper 和 Reducer 數(shù)量都異常地大。異常大量的 Mapper 和 Reducer 數(shù)量可能會(huì)導(dǎo)致 ESS 節(jié)點(diǎn)上出現(xiàn)大量的請(qǐng)求堆積,而這些請(qǐng)求的 chunk size 也可能非常小。有些異常作業(yè)的平均 Chunk size 可能連 20 KB 都沒(méi)達(dá)到。這些作業(yè)對(duì) ESS 發(fā)送很大的請(qǐng)求量,這種情況可能會(huì)導(dǎo)致 ESS 無(wú)法及時(shí)處理所有的請(qǐng)求,從而引發(fā)請(qǐng)求堆積,甚至導(dǎo)致作業(yè)的延遲或直接失敗。
針對(duì)這些現(xiàn)象,我們采取的解決方案是對(duì) ESS 節(jié)點(diǎn)上每個(gè) Application 的總請(qǐng)求量進(jìn)行限制。當(dāng)某個(gè) Application 的 Fetch 請(qǐng)求達(dá)到了上限,ESS 將拒絕該 Application 發(fā)送的新 Fetch 請(qǐng)求,直到該 Application 等待現(xiàn)有請(qǐng)求的部分結(jié)束后才能繼續(xù)發(fā)送新的請(qǐng)求。這樣可以防止出現(xiàn)單個(gè) Application 占用節(jié)點(diǎn)上過(guò)大的資源而導(dǎo)致 ESS 沒(méi)有辦法正常為其他作業(yè)請(qǐng)求提供服務(wù)的情況,也可以避免其他作業(yè)失敗或 Shuffle 速度變慢。這個(gè)方案可以緩解異?;虼笠?guī)模的 Shuffle 作業(yè)對(duì)集群 Shuffle 的負(fù)面影響。
Shuffle 限流功能的特征
- 在作業(yè)運(yùn)行正常的時(shí)候,即使開(kāi)啟了限流功能,也不會(huì)對(duì)作業(yè)有任何影響。節(jié)點(diǎn)如果可以正常服務(wù),是不需要觸發(fā)任何限流的。
- 只有當(dāng)節(jié)點(diǎn)的負(fù)載超過(guò)可以承受的范圍,且 Shuffle IO 超過(guò)設(shè)置的閾值后,才會(huì)啟動(dòng)限流機(jī)制,減少異常任務(wù)可以向 ESS 發(fā)送的請(qǐng)求數(shù)量,減低這個(gè) ESS 服務(wù)當(dāng)前的壓力。由于這時(shí)候 ESS 服務(wù)的負(fù)載能力已經(jīng)超過(guò)了可承受的范圍,即使它收到這些請(qǐng)求,也無(wú)法正常返回這些請(qǐng)求,因此,限制異常任務(wù)過(guò)多的請(qǐng)求反而可能更好地提高這些任務(wù)本身的性能。
- 在限流的情況下,也會(huì)考慮作業(yè)的優(yōu)先級(jí)。對(duì)于高優(yōu)的任務(wù),會(huì)允許更大的流量。
- 當(dāng)限流生效后,如果發(fā)現(xiàn) ESS 的流量已經(jīng)恢復(fù)正常了將迅速解除限流。受限流的 Application 很快就可以恢復(fù)到之前的流量水平。
限流的詳細(xì)流程
限流功能主要在 ESS 服務(wù)端進(jìn)行,每隔 5 秒在節(jié)點(diǎn)上進(jìn)行 latency 指標(biāo)的掃描,當(dāng)這個(gè) latency 指標(biāo)超過(guò)設(shè)置的閾值時(shí),會(huì)判定該節(jié)點(diǎn)的負(fù)載已經(jīng)超出能夠承受的負(fù)載了。接著會(huì)對(duì) ESS 節(jié)點(diǎn)當(dāng)前所有正在進(jìn)行 Shuffle 的 Application 進(jìn)行評(píng)估,判斷是否要開(kāi)啟限流。利用之前加上的指標(biāo),可以統(tǒng)計(jì)近 5 分鐘這個(gè)節(jié)點(diǎn)上 Fetch 的總流量和 IO,根據(jù)總流量的上限,對(duì)每個(gè) ESS 節(jié)點(diǎn)當(dāng)前正在運(yùn)行 Shuffle 的 Application 合理地分配每個(gè) Application 的流量并進(jìn)行限制。流量分配也會(huì)根據(jù) Application 的優(yōu)先級(jí)進(jìn)行調(diào)整。如果有任何 Application 的 Shuffle 或者當(dāng)前堆積的 Chunk Fetch Rate 已經(jīng)超過(guò)了其分配的流量,它們將受到限流,新發(fā)送的請(qǐng)求也會(huì)被拒絕,直到堆積的請(qǐng)求已經(jīng)部分解除為止。
對(duì)于限流的分配,也有一個(gè)分級(jí)系統(tǒng)。首先,根據(jù)當(dāng)前節(jié)點(diǎn)上運(yùn)行 Shuffle 的 Application 的數(shù)量進(jìn)行分配,Application 的數(shù)量越多,每個(gè) Application 可以分配到的流量就越少。當(dāng)節(jié)點(diǎn)上 Application 數(shù)量比較少的時(shí)候,每個(gè) Application 可以分配更多的流量。限流級(jí)別也會(huì)根據(jù)節(jié)點(diǎn)上的實(shí)際情況每 30 秒進(jìn)行調(diào)整。
在限流的情況下,如果節(jié)點(diǎn)上的 latency 沒(méi)有改善,且 Shuffle 的總流量也沒(méi)有恢復(fù),就會(huì)升級(jí)限流,對(duì)所有 Application 進(jìn)行更嚴(yán)格的流量限制。相反,如果 latency 有好轉(zhuǎn)或者節(jié)點(diǎn)流量已經(jīng)在恢復(fù),就會(huì)降級(jí)限流甚至直接解除掉。最后,限流也會(huì)根據(jù)所有作業(yè)的優(yōu)先級(jí)進(jìn)行適當(dāng)調(diào)整。
上圖中有個(gè)例子,在作業(yè)較少的情況下,對(duì)一個(gè)高優(yōu)作業(yè)進(jìn)行限流,作業(yè)分配的流量可能會(huì)更高,然而,如果節(jié)點(diǎn)的負(fù)載一直沒(méi)有緩解,限流也會(huì)升級(jí)。同等的情況下,一個(gè)中低優(yōu)的作業(yè),會(huì)給它分配更少的流量。開(kāi)通限流功能之后,線上許多高優(yōu)集群都觀察到了性能的顯著提升。
首先,Chunk 的堆積問(wèn)題得到了明顯的減輕。由于受到限流的限制,異常任務(wù)引發(fā)的 Chunk 堆積情況有效的減少了,大大降低了集群中某些節(jié)點(diǎn)上出現(xiàn)大量請(qǐng)求堆積的情況。
另外,Latency 的狀況也得到了改善。在開(kāi)啟限流前,我們經(jīng)常會(huì)看到集群中的節(jié)點(diǎn)出現(xiàn)高延遲的情況。而在啟用限流功能后,整體的 Latency 狀況得到了明顯緩解。通過(guò)減少無(wú)必要和無(wú)效的請(qǐng)求,以及對(duì)各種大型或異常任務(wù)對(duì) ESS 節(jié)點(diǎn)發(fā)起的請(qǐng)求量進(jìn)行限制,我們避免了這些異常大型任務(wù)對(duì) ESS 服務(wù)負(fù)載的負(fù)面影響,減少了對(duì)其他高優(yōu)任務(wù)運(yùn)行的影響。
(3)Shuffle 溢寫(xiě)分裂的功能
在分析一些慢 Shuffle 的作業(yè)時(shí),我們也發(fā)現(xiàn)了另一個(gè)現(xiàn)象,一個(gè)作業(yè)中每個(gè) Executor 寫(xiě) Shuffle 數(shù)據(jù)的數(shù)量可能非常不均衡。由于 ESS 使用了 Dynamic Allocation 機(jī)制,每個(gè) Executor 的運(yùn)行時(shí)長(zhǎng)和分配的 Map Task 數(shù)量可能不同。這導(dǎo)致在作業(yè)運(yùn)行期間,大量的 Shuffle 數(shù)據(jù)可能集中在少數(shù)的 Executor 上,導(dǎo)致 Shuffle 數(shù)據(jù)實(shí)際上都集中在少數(shù)節(jié)點(diǎn)上。
例如下圖中,我們發(fā)現(xiàn)有 5 個(gè) Executor 的 Shuffle 寫(xiě)入量超過(guò)了其他 Executor 的 10 倍以上。在這種情況下,Shuffle 的請(qǐng)求可能會(huì)集中在這幾個(gè)節(jié)點(diǎn)上,導(dǎo)致這幾個(gè) ESS 節(jié)點(diǎn)的負(fù)載非常高,這也間接增加了 Fetch Failure 的可能性。
針對(duì)這種情況,我們提供的解決方案是控制每個(gè)容器或每個(gè)節(jié)點(diǎn)寫(xiě)入磁盤(pán)的 Shuffle 數(shù)據(jù)總量。這個(gè)功能可以從兩個(gè)角度實(shí)現(xiàn)。首先,通過(guò) Spark 本身來(lái)控制 Executor 的 Shuffle Write Size,也就是每個(gè) Executor 在執(zhí)行 Shuffle 時(shí)寫(xiě)入的最大數(shù)據(jù)量。每個(gè) Executor 會(huì)計(jì)算其當(dāng)前寫(xiě)入的 Shuffle 數(shù)據(jù)量,并將這信息匯報(bào)給 Spark Driver。Spark Driver 可以使用 Exclude on Failure 機(jī)制主動(dòng)將那些寫(xiě)入數(shù)據(jù)已經(jīng)超出閾值的 Executor 排除在調(diào)度范圍之外,并回收這些 Executor。此外,我們還通過(guò) Godel 調(diào)度器改善調(diào)度策略,盡量將新的 Executor 調(diào)度到其他節(jié)點(diǎn),避免單個(gè)容器的 Shuffle 寫(xiě)入數(shù)據(jù)過(guò)多,從而導(dǎo)致該節(jié)點(diǎn)的磁盤(pán)被填滿,或者在 Shuffle Fetch 階段數(shù)據(jù)集中在這幾個(gè) ESS 節(jié)點(diǎn)上。
2、云原生優(yōu)化
同時(shí),在云原生優(yōu)化方面,我們也進(jìn)行了一些 Executor 的調(diào)度和功能優(yōu)化,通過(guò) Godel 調(diào)度器的策略,提升 Shuffle 能力。Godel 調(diào)度器提供的調(diào)度策略,可以在調(diào)度 Executor 時(shí)盡量避免負(fù)載高的 Shuffle 節(jié)點(diǎn),從而降低這些節(jié)點(diǎn)后續(xù)遇到 Shuffle 問(wèn)題的可能性。此外,調(diào)度器還可以為 Executor 的 Shuffle Write 提供更多的功能以實(shí)現(xiàn)打散。例如,它可以在磁盤(pán)壓力特別大的節(jié)點(diǎn)上驅(qū)逐 Executor,或者在磁盤(pán)剩余空間不足時(shí),驅(qū)逐那些已經(jīng)寫(xiě)入大量 Shuffle 數(shù)據(jù)的容器。
Spark Driver 控制 Executor 的 Shuffle 與云原生調(diào)度功能結(jié)合可以將整體的 Shuffle 數(shù)據(jù)分散到更多的節(jié)點(diǎn)上,使 Shuffle Fetch 階段的數(shù)據(jù)和請(qǐng)求更加均衡分布。
- 效果
在線上開(kāi)啟了上述深度定制的 Shuffle 優(yōu)化后,我們觀察到了顯著的效果。以下是來(lái)自三個(gè)高優(yōu)集群的一些運(yùn)行數(shù)據(jù),每天在這三個(gè)高優(yōu)集群中的任務(wù)總數(shù)可能超過(guò) 30 萬(wàn),但平均每天因?yàn)?nbsp;Shuffle Fetch 失敗而最終失敗的作業(yè)總數(shù)平均在 20 到 30 左右,可以說(shuō)達(dá)到了低于 1/10000 的失敗率。如下圖可以觀察到這三個(gè)高優(yōu)集群在優(yōu)化后的穩(wěn)定性都有了顯著的提升,也大幅度減少了用戶在 Shuffle 上遇到的問(wèn)題。
三、混部資源場(chǎng)景
接下來(lái)介紹在混部場(chǎng)景中進(jìn)行的優(yōu)化。首先值得注意的是,在混部集群場(chǎng)景下,F(xiàn)etch Failure 的情況通常比在穩(wěn)定資源環(huán)境中嚴(yán)重得多。每天平均的 Fetch Failure 次數(shù)非常高,主要原因是這些資源大多來(lái)自于線上資源空閑的出讓?zhuān)鼈兊拇疟P(pán) IO 能力和磁盤(pán)空間都比較有限。此外,由于磁盤(pán) IOPS 和磁盤(pán)空間可能非常有限,與 HDFS 或其他服務(wù)混合部署的資源對(duì)集群的 Shuffle 性能影響較大,因此發(fā)生失敗的概率也較高。混部資源治理以降低作業(yè)的失敗率,確保作業(yè)的穩(wěn)定性為主要目標(biāo),同時(shí)需要提高整個(gè)集群的 Shuffle 性能,減少資源浪費(fèi)。
對(duì)于混部資源的集群,主要的方案是自研的 Cloud Shuffle Service(CSS),通過(guò)提供一個(gè)遠(yuǎn)端的 Shuffle 服務(wù)來(lái)減少這些作業(yè)對(duì)本地磁盤(pán)的依賴(lài)。
1、CSS 功能介紹
首先,CSS 提供了一個(gè) Push Based Shuffle 模式,與剛才介紹的 ESS 模式不同,在 Push Based Shuffle 模式下,不同 Mapper 的同一個(gè) Reducer Partition 數(shù)據(jù)都會(huì)發(fā)送到一個(gè)共同的遠(yuǎn)程服務(wù)上,在這個(gè)服務(wù)上進(jìn)行合并,最后在某個(gè) Worker 上寫(xiě)上一個(gè)或者多個(gè)文件,使得 Reduce 階段可以通過(guò) Sequential Read 模式讀取這些 Partition 數(shù)據(jù),減少隨機(jī) IO 的開(kāi)銷(xiāo)。
CSS 也支持 Partition Group 功能,它的作用是將多個(gè)分區(qū)數(shù)據(jù)分配到一個(gè) Reducer Partition Group。這樣,在 Map 階段 Mapper 可以通過(guò) Batch Push 方式傳送數(shù)據(jù),將批量數(shù)據(jù)直接傳輸?shù)綄?duì)應(yīng)分區(qū)組的工作節(jié)點(diǎn)上,從而降低了批量模式下 IO 的開(kāi)銷(xiāo),提高了批量模式的性能。
CSS 也提供了一個(gè)快速雙寫(xiě)備份的功能。由于使用的是 push based Shuffle 和聚合模式,所有的數(shù)據(jù)其實(shí)都聚集在一個(gè) Worker 上,如果這個(gè) Worker 數(shù)據(jù)丟失的話,等于所有的 Mapper 都要重新計(jì)算所對(duì)應(yīng)的數(shù)據(jù),因此對(duì)于 push 聚合的功能,使用一個(gè)雙寫(xiě)備份是比較重要的。CSS 提高寫(xiě)入的速度的方式是采用雙寫(xiě) In-memory 副本模式并進(jìn)行異步刷盤(pán),這樣 Mapper 無(wú)需等待刷盤(pán)結(jié)束就可以繼續(xù)推送后續(xù)的數(shù)據(jù)。
CSS 本身也具有一個(gè)負(fù)載均衡功能。CSS 通過(guò)一個(gè) Cluster Manager 去管理所有服務(wù)上的節(jié)點(diǎn)。Cluster Manager 會(huì)定期去采集和收取 CSS Worker 節(jié)點(diǎn)匯報(bào)的負(fù)載信息,當(dāng)有新的 Application 提交的時(shí)候,它會(huì)進(jìn)行資源的均衡分配,以確保 Shuffle Write 和 Shuffle Read 會(huì)優(yōu)先分配到集群上使用率較低的節(jié)點(diǎn),從而實(shí)現(xiàn)集群中更好的 Shuffle 負(fù)載均衡。
2、CSS 整體架構(gòu)
- Cluster Manager 負(fù)責(zé)集群的資源分配,并維護(hù)集群 Worker 和 Application 狀態(tài),它可以通過(guò) Zookeeper 或者本地磁盤(pán)保存這些信息,達(dá)到具有 High Availability 的服務(wù)。
- Worker 支持兩種寫(xiě)入模式,分別是磁盤(pán)模式和 HDFS 模式。目前常用的是磁盤(pán)模式,每個(gè)分區(qū)的數(shù)據(jù)會(huì)寫(xiě)入兩個(gè)不同的 Worker 節(jié)點(diǎn),以實(shí)現(xiàn)數(shù)據(jù)冗余。
- CSS Master 位于 Spark driver 端,主要負(fù)責(zé)與 Cluster Manager 的心跳聯(lián)系以及 Application Lifecycle。作業(yè)啟動(dòng)時(shí),也會(huì)向 Cluster Manager 申請(qǐng) Worker。Shuffle Stage 的過(guò)程也會(huì)統(tǒng)計(jì) Shuffle Stage 的元數(shù)據(jù)以及的進(jìn)展。
- Shuffle Client 是一個(gè)接入了 Spark Shuffle API 的組件,允許任何 Spark 作業(yè)直接使用 CSS 而無(wú)需額外配置。每個(gè) Executor 會(huì)使用 ShuffleClient 進(jìn)行讀寫(xiě)。Shuffle Client 在寫(xiě)入時(shí)進(jìn)行雙寫(xiě),在讀的時(shí)候,它可以向任何一個(gè)存有數(shù)據(jù)的 Worker 讀取這些數(shù)據(jù),如果其中一個(gè) Worker 讀取失敗的話,也會(huì)自動(dòng)切換到另一個(gè) Worker 上,并對(duì)多讀的數(shù)據(jù)進(jìn)行去重。
CSS 在寫(xiě)入時(shí) Worker 會(huì)直接發(fā)送數(shù)據(jù),Mapper 會(huì)同時(shí)將數(shù)據(jù)發(fā)送到兩個(gè) Worker,Worker 不會(huì)等到刷磁盤(pán)之后返回給 Mapper,而是異步返回給 Mapper 結(jié)果,如果遇到失敗,會(huì)在下一個(gè)請(qǐng)求再通知 Mapper。這時(shí) Mapper 會(huì)重新跟節(jié)點(diǎn)申請(qǐng)兩個(gè)新的 Worker,重新推送傳送失敗的數(shù)據(jù)。讀的時(shí)候可以從任何一個(gè)節(jié)點(diǎn)讀取數(shù)據(jù),通過(guò) Map ID,Attempt ID 和 Batch ID 進(jìn)行去重。
3、CSS 性能與未來(lái)演進(jìn)
在 1TB 的 TPC-DS Benchmark 性能測(cè)試下,CSS 在 30% 以上的 Query 中得到了提升。
CSS 作為一個(gè)遠(yuǎn)端 Shuffle 服務(wù),特別適合云原生化,支持彈性部署和更多的遠(yuǎn)程儲(chǔ)蓄服務(wù)。目前 CSS 已經(jīng)完成了開(kāi)源,有興趣的朋友可以去 CSS 開(kāi)源網(wǎng)站了解更多信息,也希望把后面的一些迭代和優(yōu)化同步到社區(qū)上。在未來(lái)云原生化的演進(jìn)中需要支持彈性部署、支持遠(yuǎn)程存儲(chǔ)服務(wù)等相關(guān)能力。
以上就是本次分享的內(nèi)容,謝謝大家。