偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

自適應(yīng)批作業(yè)調(diào)度器:為 Flink 批作業(yè)自動推導(dǎo)并行度

開發(fā)
為了控制批作業(yè)的執(zhí)行時長,算子的并行度應(yīng)該和其需要處理的數(shù)據(jù)量成正比。用戶需要通過預(yù)估算子需要處理的數(shù)據(jù)量來配置并行度。但準確預(yù)估算子需要處理的數(shù)據(jù)量是一件很困難的事情。

?01引言

對大部分用戶來說,為 Flink 算子配置合適的并行度并不是一件容易的事。對于批作業(yè),小的并行度會導(dǎo)致作業(yè)運行時間長,故障恢復(fù)慢,而不必要的大并行度會導(dǎo)致資源浪費,任務(wù)部署和數(shù)據(jù) shuffle 開銷也會變大。

為了控制批作業(yè)的執(zhí)行時長,算子的并行度應(yīng)該和其需要處理的數(shù)據(jù)量成正比。用戶需要通過預(yù)估算子需要處理的數(shù)據(jù)量來配置并行度。但準確預(yù)估算子需要處理的數(shù)據(jù)量是一件很困難的事情:需要處理的數(shù)據(jù)量可能每天都在變化,作業(yè)中可能會存在大量的 UDF 和復(fù)雜算子導(dǎo)致難以判斷其產(chǎn)出的數(shù)據(jù)量。

為了解決這個問題,我們在 Flink 1.15 中引入了一種新的調(diào)度器:自適應(yīng)批作業(yè)調(diào)度器(Adaptive Batch Scheduler)。自適應(yīng)批作業(yè)調(diào)度器會在作業(yè)運行時根據(jù)每個算子需要處理的實際數(shù)據(jù)量來自動推導(dǎo)并行度。它會帶來以下好處:

  1. 大大降低批處理作業(yè)并發(fā)度調(diào)優(yōu)的繁瑣程度;
  2. 可以根據(jù)處理的數(shù)據(jù)量為不同的算子配置不同的并行度,這對于之前只能配置全局并行度的 SQL 作業(yè)尤其有益;
  3. 可以更好的適應(yīng)每日變化的數(shù)據(jù)量。

02用法

使 Flink 自動推導(dǎo)算子的并行度,需要進行以下配置:

  1. 啟用自適應(yīng)批作業(yè)調(diào)度器;
  2. 配置算子的并行度為 -1。

2.1 啟用自適應(yīng)批作業(yè)調(diào)度器

啟用自適應(yīng)批作業(yè)調(diào)度器,需要進行以下配置:

  1. 配置 jobmanager.scheduler: AdaptiveBatch;
  2. 將 execution.batch-shuffle-mode 配置為 ALL-EXCHANGES-BLOCKING (默認值)。因為目前自適應(yīng)批作業(yè)調(diào)度器只支持 shuffle mode 為 ALL-EXCHANGES-BLOCKING 的作業(yè)。

此外,還有一些相關(guān)配置來指定自動推導(dǎo)的算子并行度的上下限、預(yù)期每個算子處理的數(shù)據(jù)量以及 source 算子的默認并行度,詳情請參閱 Flink 文檔 [1]。

2.2 配置算子的并行度為 -1

自適應(yīng)批作業(yè)調(diào)度器只會為用戶未指定并行度的算子(即并行度為默認值 -1)推導(dǎo)并行度。所以需要進行以下配置:

  1. 配置 parallelism.default: -1;
  2. 對于 SQL 作業(yè),需要配置 table.exec.resource.default-parallelism: -1;
  3. 對于 DataStream/DataSet 作業(yè),避免在作業(yè)中通過算子的 setParallelism() 方法來指定并行度;
  4. 對于 DataStream/DataSet 作業(yè),避免在作業(yè)中通過 StreamExecutionEnvironment/ExecutionEnvironment 的 setParallelism() 方法來指定并行度。

03實現(xiàn)細節(jié)

接下來我們將介紹自適應(yīng)批作業(yè)調(diào)度器的實現(xiàn)細節(jié)。在此之前,我們簡要介紹一下涉及到的一些術(shù)語概念:

  1. 邏輯節(jié)點(JobVertex)[2] 和邏輯拓撲(JobGraph)[3]:邏輯節(jié)點是為了更優(yōu)的性能而將幾個算子鏈接到一起形成的算子鏈,邏輯拓撲則是多個邏輯節(jié)點連接組成的數(shù)據(jù)流圖。
  2. 執(zhí)行節(jié)點(ExecutionVertex)[4] 和執(zhí)行拓撲(ExecutionGraph)[5]:執(zhí)行節(jié)點對應(yīng)一個可部署物理任務(wù),是邏輯節(jié)點根據(jù)并行度進行展開生成的。例如,如果一個邏輯節(jié)點的并行度為 100,就會生成 100 個對應(yīng)的執(zhí)行節(jié)點。執(zhí)行拓撲則是所有執(zhí)行節(jié)點連接組成的物理執(zhí)行圖。

以上概念的介紹可以參見 Flink 文檔 [6]。需要注意的是,自適應(yīng)批作業(yè)調(diào)度器是通過推導(dǎo)邏輯節(jié)點的并行度來決定該節(jié)點包含的算子的并行度的。

實現(xiàn)細節(jié)主要包括以下幾部分:

  1. 使調(diào)度器能夠收集執(zhí)行節(jié)點產(chǎn)出數(shù)據(jù)的大?。?/li>
  2. 引入一個新組件 VertexParallelismDecider [7] 來負責(zé)根據(jù)邏輯節(jié)點需要處理的數(shù)據(jù)量計算其并行度;
  3. 支持動態(tài)構(gòu)建執(zhí)行拓撲,即執(zhí)行拓撲從一個空的執(zhí)行拓撲開始,然后隨著作業(yè)調(diào)度逐漸添加執(zhí)行節(jié)點;
  4. 引入自適應(yīng)批作業(yè)調(diào)度器來更新和調(diào)度執(zhí)行拓撲。

后續(xù)章節(jié)會對以上內(nèi)容進行詳細介紹。

圖片

圖 1 - 自動推導(dǎo)并行度的整體結(jié)構(gòu)

3.1 收集執(zhí)行節(jié)點產(chǎn)出的數(shù)據(jù)量

自適應(yīng)批作業(yè)調(diào)度器是根據(jù)邏輯節(jié)點需要處理的數(shù)據(jù)量來決定其并行度的,因此需要收集上游節(jié)點產(chǎn)出的數(shù)據(jù)量。為此,我們引入了一個 numBytesProduced 計數(shù)器來記錄每個執(zhí)行節(jié)點產(chǎn)出的數(shù)據(jù)分區(qū)(ResultPartition)的數(shù)據(jù)量,并在執(zhí)行節(jié)點運行完成時將累計值發(fā)送給調(diào)度器。

3.2 為邏輯節(jié)點決定合適的并行度

我們引入了一個新組件 VertexParallelismDecider 來負責(zé)為邏輯節(jié)點計算并行度。計算算法如下:

假設(shè)

  1. V 是用戶配置的期望每個執(zhí)行節(jié)點處理的數(shù)據(jù)量;
  2. totalBytenon-broadcast 是邏輯節(jié)點需要處理的非廣播數(shù)據(jù)的總量;
  3. totalBytesbroadcast 是邏輯節(jié)點需要處理的廣播數(shù)據(jù)的總量;
  4. maxBroadcastRatio 是每個執(zhí)行節(jié)點處理的廣播數(shù)據(jù)的比例上限;
  5. normalize(x) 是一個輸出與 x 最接近的 2 的冪的函數(shù)。

計算并行度的公式如下:

圖片

值得注意的是,我們在這個公式中引入了兩個特殊處理:

  1. 限制每個執(zhí)行節(jié)點處理的廣播數(shù)據(jù)的比例;
  2. 將并行度調(diào)整為 2 的冪。

此外,上述公式不能直接用來決定 source 節(jié)點的并行度,因為 source 節(jié)點不會消費數(shù)據(jù)。為了解決這個問題,我們引入了配置選項 jobmanager.adaptive-batch-scheduler.default-source-parallelism,允許用戶手動配置 source 節(jié)點的并行度。請注意,并非所有 source 都需要此選項,因為某些 source 可以自己推導(dǎo)并行度(例如,HiveTableSource,詳情請參閱 HiveParallelismInference),對于這些source,更推薦由它們自己推導(dǎo)并行度。

3.2.1 限制每個執(zhí)行節(jié)點處理的廣播數(shù)據(jù)的比例

我們在公式限制每個執(zhí)行節(jié)點處理的廣播數(shù)據(jù)上限比例為 maxBroadcastRatio。 即每個執(zhí)行節(jié)點處理的非廣播數(shù)據(jù)至少為 (1-maxBroadcastRatio) * V。如果不這樣做,當(dāng)廣播數(shù)據(jù)的數(shù)據(jù)量接近 V 時,即使非廣播數(shù)據(jù)的量非常小,也可能會被計算出很大的并行度,這是不必要的,會導(dǎo)致資源浪費和任務(wù)部署的開銷變大。

通常情況下,一個執(zhí)行節(jié)點需要處理的廣播數(shù)據(jù)量會小于要處理的非廣播數(shù)據(jù)。 因此,我們將 maxBroadcastRatio 默認設(shè)置為 0.5。目前,這個值是硬編碼在代碼中的,我們后續(xù)會考慮將其改為可配置的。

3.2.2 將并行度調(diào)整為 2 的冪

normalize 函數(shù)會將并行度調(diào)整為最近的 2 的冪,這樣做是為了避免引入數(shù)據(jù)傾斜。為了更好的理解本節(jié),我們建議您先閱讀子分區(qū)動態(tài)映射部分。

以圖 4(b)為例,A1/A2 產(chǎn)生 4 個子分區(qū),B 最終被決定的并行度為 3。這種情況下,B1 將消費 1 個子分區(qū),B2 將消費 1 個子分區(qū),B3 將消費 2 個子分區(qū)。我們假設(shè)不同子分區(qū)的數(shù)據(jù)量都相同,這樣 B3 需要消費的數(shù)據(jù)量是 B1/B2 的 2 倍,從而導(dǎo)致了數(shù)據(jù)傾斜。

為了解決這個問題,我們需要讓所有下游執(zhí)行節(jié)點消費的子分區(qū)數(shù)量都一樣,也就是說上游產(chǎn)出的子分區(qū)數(shù)量應(yīng)該是下游邏輯節(jié)點并行度的整數(shù)倍。為簡單起見,我們希望用戶指定的最大并行度為 2^N(如果不是則會被自動調(diào)整到不超過配置值的 2^N),然后將下游邏輯節(jié)點的并行度調(diào)整到最接近的 2^M(M <= N),這樣就可以保證子分區(qū)被下游均勻消費。

不過這只是一個臨時的解決方案,最終應(yīng)該通過自動負載均衡來解決,我們將在后續(xù)版本中實現(xiàn)。

3.3 動態(tài)構(gòu)建執(zhí)行拓撲

在引入自適應(yīng)批作業(yè)調(diào)度器之前,執(zhí)行拓撲是以靜態(tài)方式構(gòu)建的,也就是在調(diào)度開始前執(zhí)行拓撲就被完全創(chuàng)建出來了。為了使邏輯節(jié)點并行度可以在運行時決定,執(zhí)行拓撲需要支持動態(tài)構(gòu)建。

3.3.1 向執(zhí)行拓撲動態(tài)添加節(jié)點和邊

動態(tài)構(gòu)建執(zhí)行拓撲是指一個 Flink 作業(yè)從一個空的執(zhí)行拓撲開始,然后隨著調(diào)度逐步附加執(zhí)行節(jié)點,如圖 2 所示。

執(zhí)行拓撲由執(zhí)行節(jié)點和執(zhí)行邊(ExecutionEdge)組成。只有在以下情況下,才會將邏輯節(jié)點展開創(chuàng)建執(zhí)行節(jié)點并將其添加到執(zhí)行拓撲:

  1. 對應(yīng)邏輯節(jié)點的并行度已經(jīng)被確定(以便 Flink 知道應(yīng)該創(chuàng)建多少個執(zhí)行節(jié)點);
  2. 所有上游邏輯節(jié)點都已經(jīng)被展開(以便 Flink 通過執(zhí)行邊將新創(chuàng)建的執(zhí)行節(jié)點和上游執(zhí)行節(jié)點連接起來)。

圖片

圖 2 - 動態(tài)構(gòu)建執(zhí)行拓撲

3.3.2 子分區(qū)動態(tài)映射

在引入自適應(yīng)批作業(yè)調(diào)度器之前,在部署執(zhí)行節(jié)點時,F(xiàn)link 需要知道其下游邏輯節(jié)點的并行度。因為下游邏輯節(jié)點的并行度決定了上游執(zhí)行節(jié)點需要產(chǎn)出的子分區(qū)數(shù)量。以圖 3 為例,下游 B 的并行度為 2,因此上游的 A1/A2 需要產(chǎn)生 2 個子分區(qū),索引為 0 的子分區(qū)被 B1 消費,索引為 1 的子分區(qū)被 B2 消費。

圖片

圖 3 - 靜態(tài)執(zhí)行拓撲消費子分區(qū)的方式

但顯然,這不適用于動態(tài)圖,因為當(dāng)部署上游執(zhí)行節(jié)點時,下游邏輯節(jié)點的并行度可能尚未確定(即部署 A1/A2 時,B 的并行度還未確定)。為了解決這個問題,我們需要使上游執(zhí)行節(jié)點產(chǎn)生的子分區(qū)數(shù)量與下游邏輯節(jié)點的并行度解耦。

我們通過以下方法實現(xiàn)解耦:將上游執(zhí)行節(jié)點產(chǎn)生子分區(qū)的數(shù)量設(shè)置為下游邏輯節(jié)點的最大并行度(最大并行度是一個可配置的固定值),然后在下游邏輯節(jié)點并行度被確定后,將這些子分區(qū)均分給不同的下游執(zhí)行節(jié)點進行消費。也就是說,部署下游執(zhí)行節(jié)點時,每個下游執(zhí)行節(jié)點都會被分配到一個子分區(qū)范圍來消費。假設(shè) N 是下游邏輯節(jié)點并行度,P 是子分區(qū)的數(shù)量。對于第 k 個下游執(zhí)行節(jié)點,消費的子分區(qū)范圍應(yīng)該是:

圖片

以圖 4 為例,B 的最大并行度為 4,因此 A1/A2 有 4 個子分區(qū)。然后如果B的確定并行度為 2,則子分區(qū)映射將為圖 4(a),如果B的確定并行度為 3,則子分區(qū)映射將為圖 4(b)。

圖片

圖 4 - 動態(tài)執(zhí)行拓撲消費子分區(qū)的方式

3.4 動態(tài)更新并調(diào)度執(zhí)行拓撲

自適應(yīng)批作業(yè)調(diào)度器調(diào)度作業(yè)的方式和默認調(diào)度器基本相同,唯一的區(qū)別是:自適應(yīng)批作業(yè)調(diào)度器是從一個空的執(zhí)行拓撲開始調(diào)度,在處理任何調(diào)度事件之前,都會嘗試決定所有邏輯節(jié)點的并行度,然后嘗試為邏輯節(jié)點生成對應(yīng)的執(zhí)行節(jié)點,并通過執(zhí)行邊連接上游節(jié)點,更新執(zhí)行拓撲。

調(diào)度器會在每次調(diào)度之前嘗試按照拓撲順序決定所有邏輯節(jié)點的并行度:

  1. 對于 source 節(jié)點,其并行度會在開始調(diào)度之前就進行確定;
  2. 對于非 source 節(jié)點,需要在其所有上游節(jié)點數(shù)據(jù)產(chǎn)出完成后才能確定其并行度。

然后,調(diào)度程序?qū)L試按照拓撲順序?qū)⑦壿嫻?jié)點展開生成執(zhí)行節(jié)點。一個可以被展開的邏輯節(jié)點應(yīng)該滿足以下條件:

  1. 該邏輯節(jié)點并行度已確定;
  2. 所有上游邏輯節(jié)點都已經(jīng)被展開。

04未來展望 - 自動負載均衡

運行批作業(yè)時,可能會出現(xiàn)數(shù)據(jù)傾斜(某個執(zhí)行節(jié)點需要處理的數(shù)據(jù)遠多于其他執(zhí)行節(jié)點),這會導(dǎo)作業(yè)出現(xiàn)長尾現(xiàn)象,拖慢作業(yè)的完成速度。如果 Flink 可以自動改善或者解決這個問題,可以給用戶很大的幫助。

一種典型的數(shù)據(jù)傾斜情況是某些子分區(qū)的數(shù)據(jù)量明顯大于其他子分區(qū)。這種情況可以通過劃分更細粒度的子分區(qū),并根據(jù)子分區(qū)大小來平衡工作負載來解決(如圖 5)。自適應(yīng)批作業(yè)調(diào)度器的工作可以被認為是邁向它的第一步,因為自動重新平衡的要求類似于自適應(yīng)批作業(yè)調(diào)度器,它們都需要動態(tài)圖的支持和結(jié)果分區(qū)大小的采集。

基于自適應(yīng)批作業(yè)調(diào)度器的實現(xiàn),我們可以通過增加最大并行度(為了更細粒度的子分區(qū))和簡單地更改子分區(qū)范圍劃分算法(為了平衡工作負載)來解決上述問題。在目前的設(shè)計中,子分區(qū)范圍是按照子分區(qū)的個數(shù)來劃分的,我們可以改成按照子分區(qū)中的數(shù)據(jù)量來劃分,這樣每個子分區(qū)范圍內(nèi)的數(shù)據(jù)量可以大致相同,從而平衡下游執(zhí)行節(jié)點的工作量。

圖片

圖 5 - 自動負載均衡

注釋

[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/elastic_scaling/#adaptive-batch-scheduler

[2] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java

[3] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java

[4] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java

[5] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java

[6] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/internals/job_scheduling/#jobmanager-數(shù)據(jù)結(jié)構(gòu)

[7] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismDecider.java?

責(zé)任編輯:未麗燕 來源: Apache Flink
相關(guān)推薦

2011-04-06 14:16:49

SQL Server自動備份

2021-11-05 15:55:35

作業(yè)幫Kubernetes調(diào)度器

2024-03-15 15:09:28

2010-04-15 10:41:13

2022-07-26 16:54:08

QuartzJava

2011-03-30 14:29:13

QuartzJava

2024-07-08 00:00:02

.NET系統(tǒng)調(diào)度器

2017-06-06 10:30:12

前端Web寬度自適應(yīng)

2012-05-16 11:13:35

傲游瀏覽器手機版

2022-11-09 17:12:38

AI模型

2023-08-28 08:00:45

2021-04-18 12:12:29

systemd定時器系統(tǒng)運維

2015-08-12 15:10:46

Ubuntucronlinux

2010-08-30 10:26:20

DIV自適應(yīng)高度

2023-07-31 08:24:34

MySQL索引計數(shù)

2011-12-13 20:08:54

云計算BMC

2011-10-19 08:04:12

2010-08-30 09:52:03

DIV高度自適應(yīng)

2012-05-09 10:58:25

JavaMEJava

2014-09-05 10:10:32

Android自適應(yīng)布局設(shè)計
點贊
收藏

51CTO技術(shù)棧公眾號