螞蟻面試:Flink 并行度、算子、算子鏈、Slot、Slot 共享組之間的關(guān)系是什么?如何設(shè)置能夠使資源利用最大化?
一、Flink并行度
1. 并行度的概念
在Flink中,并行度(Parallelism)是指在Flink作業(yè)中并行執(zhí)行任務(wù)的程度,它決定了作業(yè)中任務(wù)的數(shù)量以及任務(wù)之間的數(shù)據(jù)劃分和分配方式,是實(shí)現(xiàn)高吞吐量和低延遲流處理的關(guān)鍵概念。一個特定算子的子任務(wù)(subtask)的個數(shù)被稱之為其并行度。包含并行子任務(wù)的數(shù)據(jù)流,就是并行數(shù)據(jù)流,它需要多個分區(qū)(stream partition)來分配并行任務(wù)。一般情況下,一個流程序的并行度,可以認(rèn)為就是其所有算子中最大的并行度。一個程序中,不同的算子可能具有不同的并行度。
例如,在一個Flink程序中,Source算子的并行度設(shè)置為2,Map算子的并行度設(shè)置為4,Sink算子的并行度設(shè)置為1,那么這個程序的并行度就是4。
Flink程序本質(zhì)上是并行的和分布式的,在執(zhí)行過程中,一個流(stream)包含一個或多個流分區(qū),而每一個operator包含一個或多個operator子任務(wù)。操作子任務(wù)間彼此獨(dú)立,在不同的線程中執(zhí)行,甚至是在不同的機(jī)器或不同的容器上。operator子任務(wù)的數(shù)量是這一特定operator的并行度。相同程序中的不同operator有不同級別的并行度。
2. 并行度的設(shè)置
Flink可以在不同的級別設(shè)置并行度,包括算子層次、執(zhí)行環(huán)境層次、客戶端層次和系統(tǒng)層次,且優(yōu)先級為:算子層次 > 執(zhí)行環(huán)境層次 > 客戶端層次 > 系統(tǒng)層次。
(1) 算子層次
單個算子、數(shù)據(jù)源和數(shù)據(jù)接收器的并行度可以通過調(diào)用 setParallelism() 方法來指定。這種方式設(shè)置的并行度,只針對當(dāng)前算子有效。
finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text =[...];
DataStream<Tuple2<String,Integer>> wordCounts = text
.flatMap(newLineSplitter())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
在上述代碼中,sum(1) 算子的并行度被設(shè)置為5,這意味著該算子會啟動5個并發(fā)任務(wù)來處理數(shù)據(jù)。
(2) 執(zhí)行環(huán)境層次
Flink程序運(yùn)行在執(zhí)行環(huán)境的上下文中,執(zhí)行環(huán)境為所有執(zhí)行的算子、數(shù)據(jù)源、數(shù)據(jù)接收器(sink)定義了一個默認(rèn)的并行度。可以顯式配置算子層次的并行度去覆蓋執(zhí)行環(huán)境的并行度??梢酝ㄟ^調(diào)用 setParallelism() 方法指定執(zhí)行環(huán)境的默認(rèn)并行度。
finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text =[...];
DataStream<Tuple2<String,Integer>> wordCounts =[...];
wordCounts.print();
env.execute("Word Count Example");
在上述代碼中,執(zhí)行環(huán)境的并行度被設(shè)置為3,這意味著如果沒有在算子層次顯式設(shè)置并行度,所有算子的并行度都將為3。
(3) 客戶端層次
將作業(yè)提交到Flink時可在客戶端設(shè)定其并行度。對于CLI客戶端,可以通過 -p 參數(shù)指定并行度。
./bin/flink run -p 10 WordCount-java.jar
在上述命令中,作業(yè)的并行度被設(shè)置為10。
(4) 系統(tǒng)層次
可以通過設(shè)置 ./conf/flink-conf.yaml 文件中的 parallelism.default 參數(shù),在系統(tǒng)層次來指定所有執(zhí)行環(huán)境的默認(rèn)并行度。
parallelism.default:2
在上述配置中,系統(tǒng)默認(rèn)的并行度被設(shè)置為2。
3. 如何合理規(guī)劃并行度
(1) 理解任務(wù)特性和需求
- 任務(wù)類型:CPU密集型任務(wù)可能需要較高的并行度來充分利用計(jì)算資源,而I/O密集型任務(wù)可能需要較低的并行度以減少資源競爭和網(wǎng)絡(luò)開銷。例如,對于一個復(fù)雜的數(shù)據(jù)分析任務(wù),可能是CPU密集型的,此時可以適當(dāng)提高并行度;而對于一個從數(shù)據(jù)庫讀取數(shù)據(jù)的任務(wù),可能是I/O密集型的,過高的并行度可能會導(dǎo)致網(wǎng)絡(luò)擁塞。
- 數(shù)據(jù)分布:如果數(shù)據(jù)分布不均勻,可能會導(dǎo)致某些任務(wù)負(fù)載過重,影響整體性能。此時,調(diào)整并行度可以使數(shù)據(jù)分布更均勻。比如,在處理用戶行為數(shù)據(jù)時,可能某些熱門用戶的數(shù)據(jù)量遠(yuǎn)遠(yuǎn)大于其他用戶,這時候可以通過調(diào)整并行度來避免數(shù)據(jù)傾斜。
(2) 考慮集群資源限制
- 資源可用性:集群的資源(如CPU核心數(shù)、內(nèi)存大小、網(wǎng)絡(luò)帶寬等)會限制可以設(shè)置的并行度。需要根據(jù)集群的實(shí)際情況來合理設(shè)置。例如,如果集群的CPU核心數(shù)有限,設(shè)置過高的并行度會導(dǎo)致任務(wù)競爭CPU資源,反而降低性能。
- 資源競爭:過高的并行度可能導(dǎo)致資源競爭加劇,反而降低整體性能。比如,多個任務(wù)同時競爭內(nèi)存資源,可能會導(dǎo)致頻繁的內(nèi)存交換,影響任務(wù)的執(zhí)行效率。
(3) 分析作業(yè)結(jié)構(gòu)和數(shù)據(jù)流動
- 算子依賴關(guān)系:作業(yè)中不同算子之間的依賴關(guān)系會影響數(shù)據(jù)流動和并行度的設(shè)置。需要確保數(shù)據(jù)能夠高效地在算子之間傳遞。例如,如果一個算子依賴于另一個算子的輸出結(jié)果,那么它們的并行度設(shè)置需要相互匹配,以避免數(shù)據(jù)阻塞。
- 數(shù)據(jù)傾斜:某些算子可能處理的數(shù)據(jù)量遠(yuǎn)大于其他算子,導(dǎo)致數(shù)據(jù)傾斜。通過調(diào)整并行度可以減少數(shù)據(jù)傾斜的影響。比如,對于一個聚合算子,如果某些分組的數(shù)據(jù)量過大,可以適當(dāng)提高該算子的并行度,將數(shù)據(jù)分散到更多的任務(wù)中處理。
(4) 實(shí)際應(yīng)用中的設(shè)置方法
- 算子級并行度:通過調(diào)用 setParallelism() 方法可以在算子操作后設(shè)置其并行度。這種方法允許對特定算子進(jìn)行精細(xì)控制。例如:
DataStream<String> stream =...;
stream.map(newMyMapFunction()).setParallelism(2);
- 作業(yè)級并行度:在創(chuàng)建執(zhí)行環(huán)境后,可以通過調(diào)用 setParallelism() 方法設(shè)置全局的默認(rèn)并行度。這種方法適用于對整個作業(yè)進(jìn)行統(tǒng)一配置。例如:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
- 客戶端設(shè)置:在提交任務(wù)時,可以通過命令行接口(CLI)的 -p 參數(shù)或Java程序中的相應(yīng)設(shè)置來指定并行度。例如:
./bin/flink run -p 10 your-job.jar
- 集群默認(rèn)設(shè)置:在集群的配置文件(如 flink-conf.yaml)中設(shè)置默認(rèn)并行度,這將影響集群上提交的所有作業(yè)。例如:
parallelism.default:2
(5) 監(jiān)控和調(diào)整
- 監(jiān)控執(zhí)行情況:通過Flink的Web UI或其他監(jiān)控工具監(jiān)控作業(yè)的執(zhí)行情況和集群資源利用率。例如,觀察任務(wù)的處理延遲、資源使用情況等指標(biāo)。
- 動態(tài)調(diào)整:根據(jù)實(shí)際情況動態(tài)調(diào)整并行度,以適應(yīng)不同的工作負(fù)載和數(shù)據(jù)流量。例如,如果發(fā)現(xiàn)某個算子的處理延遲過高,可以適當(dāng)提高其并行度。
(6) 注意事項(xiàng)
- 并行度與性能的關(guān)系:并行度并非越高越好,需要根據(jù)實(shí)際情況進(jìn)行權(quán)衡。過高的并行度可能導(dǎo)致資源競爭和開銷增加,反而降低性能。
- 考慮未來擴(kuò)展性:在設(shè)置并行度時,還需要考慮作業(yè)的擴(kuò)展性和未來可能的需求變化。例如,隨著業(yè)務(wù)的發(fā)展,數(shù)據(jù)量可能會增加,此時需要預(yù)留一定的并行度擴(kuò)展空間。
4. 不同組件的并行度設(shè)置建議
(1) 數(shù)據(jù)源(Source)
- Kafka數(shù)據(jù)源:如果數(shù)據(jù)源是Kafka,Source的并行度通常設(shè)置為Kafka對應(yīng)Topic的分區(qū)數(shù)。這樣可以確保每個分區(qū)的數(shù)據(jù)由一個獨(dú)立的并行任務(wù)來處理,充分利用資源。例如,如果Kafka的某個Topic有10個分區(qū),那么Source的并行度可以設(shè)置為10。如果消費(fèi)速度仍跟不上數(shù)據(jù)生產(chǎn)速度,可以考慮擴(kuò)大Kafka的分區(qū)數(shù),并相應(yīng)地調(diào)大并行度。但要注意,如果并行度多于Kafka的分區(qū)數(shù),會造成有的并行度空閑,浪費(fèi)資源。
- 其他數(shù)據(jù)源:對于其他數(shù)據(jù)源,需要根據(jù)數(shù)據(jù)源的特性和數(shù)據(jù)量來設(shè)置并行度。如果數(shù)據(jù)源支持并行讀取,可以適當(dāng)提高并行度;如果數(shù)據(jù)源的讀取性能有限,過高的并行度可能會導(dǎo)致資源競爭和性能下降。
(2) 轉(zhuǎn)換算子(Transform)
- Keyby之前的算子:如map、filter、flatmap等,這些算子一般不會做太重的操作,并行度可以和Source保持一致,使得算子之間可以做到forward傳輸數(shù)據(jù),不經(jīng)過網(wǎng)絡(luò)傳輸,提高處理效率。
- Keyby之后的算子:如果并發(fā)較大,建議設(shè)置并行度為2的整數(shù)次冪,例如128、256、512等。這是因?yàn)镕link內(nèi)部的一些機(jī)制(如狀態(tài)管理)在并行度為2的整數(shù)次冪時能更好地工作,使數(shù)據(jù)相對均勻地shuffle到下游算子。對于小并發(fā)任務(wù),并行度不一定需要設(shè)置成2的整數(shù)次冪。如果大并發(fā)任務(wù)沒有KeyBy操作,并行度也無需設(shè)置為2的整數(shù)次冪。
(3) 數(shù)據(jù)接收器(Sink)
Sink是數(shù)據(jù)流向下游的地方,可以根據(jù)Sink的數(shù)據(jù)量及下游的服務(wù)抗壓能力進(jìn)行評估。如果Sink是Kafka,可以設(shè)為Kafka對應(yīng)Topic的分區(qū)數(shù),并且Sink并行度最好和Kafka partition成倍數(shù)關(guān)系,否則可能會出現(xiàn)到Kafka partition數(shù)據(jù)不均勻的情況。但大多數(shù)情況下,Sink算子并行度不需要特別設(shè)置,只需要和整個任務(wù)的并行度相同就行。如果下游服務(wù)的抗壓能力有限,需要適當(dāng)降低并行度;如果下游服務(wù)能夠處理大量數(shù)據(jù),可以提高并行度以提高吞吐量。
二、Flink Slot
1. Slot的概念
在Apache Flink中,Task Slot(任務(wù)槽)是TaskManager的一個關(guān)鍵概念,它用于資源管理和并行任務(wù)的調(diào)度。每個TaskManager可以擁有多個Task Slot,每個Task Slot能夠獨(dú)立地運(yùn)行一個或多個子任務(wù)(subtask)。Slot是計(jì)算資源的隔離單元,一個Slot可以運(yùn)行多個SubTask,但是這些SubTask必須是來自同一個application的不同階段的subTask。
Slot數(shù)量通常與每個TaskManager節(jié)點(diǎn)的可用CPU內(nèi)核數(shù)成比例,一般Slot數(shù)量是每個節(jié)點(diǎn)的CPU內(nèi)核數(shù)。Slot的數(shù)量由集群中 flink-conf.yml 配置文件中 taskmanager.numberOfTaskSlots 設(shè)置。例如,在 flink-conf.yml 中設(shè)置 taskmanager.numberOfTaskSlots: 4,表示每個TaskManager有4個Slot。
2. Slot的特性
(1) 并行度與Task Slot數(shù)量的關(guān)系
每個Task Slot可以執(zhí)行一個并行任務(wù)實(shí)例(即一個subtask)。因此,Job的最大并行度受限于所有TaskManager上可用Task Slot的總數(shù)。如果Job的并行度大于總Task Slot數(shù),則部分任務(wù)將排隊(duì)等待空閑的Task Slot;反之,如果Task Slot數(shù)量過多而實(shí)際并行度較低,則會造成資源浪費(fèi)。
(2) 資源共享與隔離
在同一個TaskManager內(nèi)的不同Task Slot之間,網(wǎng)絡(luò)連接、文件句柄等非內(nèi)存資源是共享的,但每個Task Slot有自己獨(dú)立的內(nèi)存空間。這種設(shè)計(jì)既保證了一定程度上的資源隔離(如避免內(nèi)存溢出),又允許一定程度的資源共享(如減少網(wǎng)絡(luò)開銷),從而提高了整體效率。
(3) 鏈?zhǔn)秸{(diào)度(Chaining)
當(dāng)多個連續(xù)的任務(wù)屬于同一算子并且具有相同的并行度時,F(xiàn)link可以將它們合并到同一個Task Slot中執(zhí)行,稱為“鏈?zhǔn)秸{(diào)度”。這樣做的好處是可以減少線程切換和序列化/反序列化的開銷。鏈?zhǔn)秸{(diào)度的前提條件是這些任務(wù)之間沒有其他類型的任務(wù)插入,且它們的操作不會導(dǎo)致阻塞。
(4) 動態(tài)調(diào)整
用戶可以在配置文件中設(shè)置默認(rèn)的Task Slot數(shù)量,也可以在啟動集群時通過命令行參數(shù)指定。此外,還可以根據(jù)具體作業(yè)的需求動態(tài)調(diào)整每個TaskManager的Task Slot數(shù)量。動態(tài)調(diào)整Task Slot數(shù)量的能力有助于更好地適應(yīng)不同類型的負(fù)載變化,例如高峰期增加Task Slot來提升吞吐量,在低谷期減少Task Slot以節(jié)省資源。
3. Slot的配置方式
(1) 全局配置
在 flink-conf.yaml 文件中設(shè)置 taskmanager.numberOfTaskSlots 參數(shù),為整個集群設(shè)定默認(rèn)的Task Slot數(shù)量。
taskmanager.numberOfTaskSlots:4
(2) 命令行參數(shù)
在啟動Flink集群時,使用 -D taskmanager.numberOfTaskSlots=4 參數(shù)覆蓋默認(rèn)值。
./bin/start-cluster.sh -D taskmanager.numberOfTaskSlots=4
(3) 動態(tài)配置
對于某些特定的應(yīng)用場景,可能需要更靈活地控制每個TaskManager的Task Slot數(shù)量。這時可以利用Flink提供的REST API或者YARN/Kubernetes等平臺提供的機(jī)制來進(jìn)行動態(tài)調(diào)整。
4. Slot的最佳實(shí)踐
(1) 合理規(guī)劃并行度
確保Job的并行度與集群中的Task Slot總數(shù)相匹配,既能充分利用現(xiàn)有資源,又不會造成不必要的等待。例如,如果集群中有10個TaskManager,每個TaskManager有4個Slot,那么集群總共有40個Slot,Job的并行度可以設(shè)置為40以內(nèi)。
(2) 考慮任務(wù)特性
對于計(jì)算密集型任務(wù),可以適當(dāng)增加Task Slot的數(shù)量以提高并發(fā)處理能力;而對于I/O密集型任務(wù),則應(yīng)關(guān)注網(wǎng)絡(luò)帶寬和磁盤I/O性能。例如,對于一個計(jì)算密集型的數(shù)據(jù)分析任務(wù),可以增加Task Slot的數(shù)量,讓更多的子任務(wù)并行執(zhí)行。
(3) 監(jiān)控資源使用情況
定期檢查Task Slot的使用率、內(nèi)存消耗等指標(biāo),及時發(fā)現(xiàn)并解決潛在的問題。例如,可以使用Flink的監(jiān)控工具,查看每個TaskManager的Slot使用率和內(nèi)存使用情況。
(4) 測試與調(diào)優(yōu)
在生產(chǎn)環(huán)境中部署之前,先在小規(guī)模集群上進(jìn)行充分測試,找到最適合當(dāng)前工作負(fù)載的Task Slot配置。例如,可以在測試環(huán)境中調(diào)整Task Slot的數(shù)量,觀察作業(yè)的性能變化,找到一個最優(yōu)的配置。
5. 算子和Slot的關(guān)系
(1) 算子的子任務(wù)與Slot的分配
在Flink中,每個算子會根據(jù)其并行度被拆分成多個子任務(wù)(subtask),這些子任務(wù)需要被分配到不同的Slot中執(zhí)行。例如,一個算子的并行度為3,那么它會有3個子任務(wù),這3個子任務(wù)需要被分配到3個不同的Slot中。默認(rèn)情況下,F(xiàn)link允許子任務(wù)共享Slot,只要這些子任務(wù)屬于同一作業(yè)。這樣可以提高資源利用率,例如將資源密集型和非密集型的任務(wù)同時放到一個Slot中,它們可以自行分配對資源占用的比例。
(2) 算子鏈對Slot使用的影響
算子鏈(Operator Chain)是Flink中的一種優(yōu)化技術(shù),它將多個算子連接在一起形成一個鏈?zhǔn)浇Y(jié)構(gòu),以減少數(shù)據(jù)序列化和網(wǎng)絡(luò)傳輸開銷。當(dāng)算子形成算子鏈后,它們會被合并成一個任務(wù),這個任務(wù)只需要一個Slot來執(zhí)行。例如,Source算子和Map算子形成了算子鏈,它們的并行度都為2,那么合并后的任務(wù)也有2個子任務(wù),只需要2個Slot來執(zhí)行。
(3) Slot共享組對算子和Slot關(guān)系的影響
通過 slotSharingGroup 方法可以將算子分配到指定的共享組中,同一共享組的算子會盡可能共享Slot。例如,將多個算子都設(shè)置為同一個共享組,那么這些算子的子任務(wù)可以共享同一個Slot,從而提高資源利用率。但如果不同的算子設(shè)置了不同的共享組,它們的子任務(wù)就不能共享Slot,需要分別分配Slot。
三、Flink算子鏈
1. 算子鏈的概念
算子鏈(Operator Chain)是Flink中的一種優(yōu)化技術(shù),用于將多個算子連接在一起形成一個鏈?zhǔn)浇Y(jié)構(gòu),以減少數(shù)據(jù)序列化和網(wǎng)絡(luò)傳輸開銷,提高整體的處理性能。在Flink中,并行度相同的一對一(one to one)算子操作,可以直接鏈接在一起形成一個 “大” 的任務(wù)(task),每個task會被一個線程執(zhí)行。
例如,在一個WordCount程序中,Source算子和Map算子之間滿足算子鏈的要求,可以直接合并在一起,形成一個任務(wù);因?yàn)椴⑿卸葹?,所以合并后的任務(wù)也有兩個并行子任務(wù)。這樣,這個數(shù)據(jù)流圖所表示的作業(yè)最終會有5個任務(wù),由5個線程并行執(zhí)行。
2. 算子間的數(shù)據(jù)傳輸模式
(1) 一對一(One-to-one,forwarding)
這種模式下,數(shù)據(jù)流維護(hù)著分區(qū)以及元素的順序。比如source和map算子,source算子讀取數(shù)據(jù)之后,可以直接發(fā)送給map算子做處理,它們之間不需要重新分區(qū),也不需要調(diào)整數(shù)據(jù)的順序。這就意味著map算子的子任務(wù),看到的元素個數(shù)和順序跟source算子的子任務(wù)產(chǎn)生的完全一樣,保證著“一對一”的關(guān)系。map、filter、flatMap 等算子都是這種one-to-one的對應(yīng)關(guān)系。這種關(guān)系類似于Spark中的窄依賴。
(2) 重分區(qū)(Redistributing)
在這種模式下,數(shù)據(jù)流的分區(qū)會發(fā)生改變。比如 map 和后面的 keyBy/window 算子之間,以及 keyBy/window 算子和 Sink 算子之間,都是這樣的關(guān)系。每一個算子的子任務(wù),會根據(jù)數(shù)據(jù)傳輸?shù)牟呗裕褦?shù)據(jù)發(fā)送到不同的下游目標(biāo)任務(wù)。這些傳輸方式都會引起重分區(qū)的過程,這一過程類似于Spark中的shuffle。
3. 算子鏈的創(chuàng)建條件
(1) 上下游的并行度一致
上下游算子的并行度必須相同,才能形成算子鏈。例如,如果Source算子的并行度為2,Map算子的并行度為4,那么它們之間就不能形成算子鏈。
(2) 下游節(jié)點(diǎn)的入度為1
下游節(jié)點(diǎn)的入度為1,即下游節(jié)點(diǎn)沒有來自其他節(jié)點(diǎn)的輸入。例如,如果一個算子有兩個輸入流,那么它就不能與上游算子形成算子鏈。
(3) 上下游節(jié)點(diǎn)都在同一個slot group中
上下游節(jié)點(diǎn)都必須在同一個slot group中,才能形成算子鏈。關(guān)于slot group的概念,將在后面的共享組部分詳細(xì)介紹。
(4) 下游節(jié)點(diǎn)的chain策略為ALWAYS
下游節(jié)點(diǎn)的chain策略為ALWAYS,表示可以與上下游鏈接,map、flatmap、filter 等默認(rèn)是ALWAYS。
(5) 上游節(jié)點(diǎn)的chain策略為ALWAYS或HEAD
上游節(jié)點(diǎn)的chain策略為ALWAYS或HEAD,HEAD表示只能與下游鏈接,不能與上游鏈接,Source默認(rèn)是HEAD。
(6) 兩個節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是forward
兩個節(jié)點(diǎn)間的數(shù)據(jù)分區(qū)方式必須是forward,即數(shù)據(jù)不需要重新分區(qū)。
(7) 用戶沒有禁用chain
用戶沒有通過編程API禁用算子鏈。例如,沒有調(diào)用 disableChaining() 方法。
4. 算子鏈的控制方法
(1) 全局禁用算子鏈
可以通過調(diào)用 StreamExecutionEnvironment.disableOperatorChaining() 來全局禁用算子鏈。示例代碼如下:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
(2) 從當(dāng)前算子開始新鏈
可以通過在DataStream的operator后面調(diào)用 startNewChain() 來指示從該operator開始一個新的chain(與前面截?cái)?,不會被chain到前面)。示例代碼如下:
DataStream<String> stream = env.fromElements("a","b","c");
stream.map(value -> value.toUpperCase())
.startNewChain()
.filter(value -> value.startsWith("A"));
(3) 禁用算子鏈
可以通過調(diào)用 disableChaining() 來指示該operator不參與chaining(不會與前后的operator chain一起)。示例代碼如下:
DataStream<String> stream = env.fromElements("a","b","c");
stream.map(value -> value.toUpperCase())
.disableChaining()
.filter(value -> value.startsWith("A"));
5. 算子鏈與并行度、Slot的最優(yōu)搭配
(1) 并行度相同時的搭配
當(dāng)上下游算子的并行度相同時,更容易形成算子鏈,從而減少線程切換和數(shù)據(jù)傳輸開銷。此時,應(yīng)該盡量讓這些算子在同一個Slot中執(zhí)行,以提高資源利用率。例如,如果Source算子和Map算子的并行度都為4,且滿足算子鏈的其他條件,那么它們可以形成算子鏈,只需要4個Slot來執(zhí)行這兩個算子的任務(wù)。
(2) 并行度不同時的處理
如果上下游算子的并行度不同,無法形成算子鏈。這時需要根據(jù)具體情況調(diào)整并行度或分配Slot。例如,如果Source算子的并行度為2,而Map算子的并行度為4,可以考慮將Source算子的并行度提高到4,或者將Map算子的并行度降低到2,以嘗試形成算子鏈。如果無法調(diào)整并行度,就需要為每個算子的子任務(wù)分別分配Slot。
(3) 考慮算子的資源需求
對于資源密集型的算子,如aggregate、reduce、sum、window等,即使并行度相同,也可以考慮不與其他算子形成算子鏈,而是單獨(dú)分配Slot,以確保其有足夠的資源執(zhí)行。例如,對于一個復(fù)雜的窗口操作,可以使用 startNewChain() 或 disableChaining() 方法將其與其他算子分開,使其獨(dú)享一個Slot的資源。而對于非資源密集型的算子,如source、map、sink等,可以盡量與其他算子形成算子鏈,共享Slot資源。
四、Flink共享組
1. 共享組的概念
在Apache Flink中,slotSharingGroup() 是一個用于控制算子(operator)之間資源共享的機(jī)制。它允許多個算子共享相同的slot(即資源容器)。Slot是Flink中的資源單位,slot共享可以提高資源利用率,但在某些情況下,我們希望更精細(xì)地控制不同算子的資源分配,slotSharingGroup 就提供了這種能力。
2. 共享組的作用
(1) 控制資源分配
將算子分配到不同的slot sharing group,可以將某些關(guān)鍵算子隔離出來,確保它們不會與其他算子爭用資源。例如,對于一些重要的窗口操作或聚合操作,可以為其分配獨(dú)立的slot sharing group,避免受到其他輕量級算子的干擾。
(2) 提高性能和穩(wěn)定性
通過分組隔離,防止某些算子占用過多資源,從而影響其他算子的執(zhí)行性能。例如,如果某個算子由于處理復(fù)雜度高或其他原因產(chǎn)生背壓,可能會影響同一slot sharing group中的其他算子。通過 slotSharingGroup() 隔離算子,可以減少背壓的擴(kuò)散。
(3) 解決背壓問題
對于某些復(fù)雜的算子,可能會導(dǎo)致算子鏈中的其他算子受到背壓影響。通過將其分配到不同的slot sharing group,可以減少此類問題。例如,對于一個計(jì)算密集型的算子,可以將其分配到一個獨(dú)立的slot sharing group,避免對其他算子產(chǎn)生背壓。
3. 共享組的使用場景
(1) 算子資源隔離
當(dāng)某些算子需要較高的資源或執(zhí)行較復(fù)雜的邏輯時,可能希望將它們與其他輕量級算子隔離開來,避免干擾。比如某些窗口操作、聚合操作可能消耗大量內(nèi)存和計(jì)算資源,此時可以為其分配獨(dú)立的slot sharing group。
(2) 優(yōu)化并行度與資源利用率
在具有不同并行度的算子間,可以通過不同的slot sharing group來優(yōu)化資源利用,避免算子在同一slot中因?yàn)椴⑿卸炔町惗霈F(xiàn)負(fù)載不均的問題。例如,對于并行度較高的算子和并行度較低的算子,可以將它們分配到不同的slot sharing group,提高資源利用率。
(3) 避免背壓擴(kuò)散
如果某個算子由于處理復(fù)雜度高或其他原因產(chǎn)生背壓,可能會影響同一slot sharing group中的其他算子。通過 slotSharingGroup() 隔離算子,可以減少背壓的擴(kuò)散。例如,對于一個容易產(chǎn)生背壓的算子,可以將其分配到一個獨(dú)立的slot sharing group,避免影響其他算子的執(zhí)行。
4. 共享組的代碼示例
// 定義兩個數(shù)據(jù)流
DataStream<String> stream1 = env.fromElements("a","b","c");
DataStream<String> stream2 = env.fromElements("1","2","3");
// 給第一個算子鏈設(shè)置 slotSharingGroup
stream1.map(value -> value.toUpperCase())
.slotSharingGroup("group1")
.filter(value -> value.startsWith("A"))
.slotSharingGroup("group1");
// 給第二個算子鏈設(shè)置不同的 slotSharingGroup
stream2.map(value -> value +"X")
.slotSharingGroup("group2")
.filter(value -> value.endsWith("X"))
.slotSharingGroup("group2");
// 匯聚兩個流并繼續(xù)處理
stream1.union(stream2)
.map(value ->"Processed: "+ value)
.slotSharingGroup("group3");
env.execute();
在上述代碼中,stream1 的算子被分配到了 "group1",stream2 的算子被分配到了 "group2",兩者之間的算子不會共享相同的slot,從而實(shí)現(xiàn)了資源隔離。最后,通過 union() 操作將兩個流合并并設(shè)置為 "group3",合并后的流將使用一個新的共享組。
5. 共享組的效果
(1) 資源隔離
在上面的示例中,不同的算子鏈被分配到了不同的slot sharing group,實(shí)現(xiàn)了資源隔離。這可以確保關(guān)鍵算子不會受到其他算子的干擾,提高了作業(yè)的穩(wěn)定性和性能。
(2) 優(yōu)化資源分配
通過給不同的算子鏈分配不同的slot sharing group,F(xiàn)link在作業(yè)執(zhí)行時會為每個共享組分配不同的slot,避免了在同一個slot中同時運(yùn)行可能會競爭資源的算子。這可以提高資源利用率,避免資源浪費(fèi)。
(3) 減少資源爭用和背壓傳播
當(dāng)某些復(fù)雜算子引發(fā)的背壓或資源消耗比較高時,其他不相關(guān)的算子不會受到其影響,從而提高了作業(yè)的穩(wěn)定性和性能。例如,如果某個算子產(chǎn)生了背壓,由于它被分配到了一個獨(dú)立的slot sharing group,不會影響其他共享組中的算子。
6. 共享組的注意事項(xiàng)
(1) 默認(rèn)設(shè)置
默認(rèn)情況下,F(xiàn)link的所有算子都屬于同一個默認(rèn)的slot sharing group。如果不顯式設(shè)置 slotSharingGroup(),所有算子都會共享同一個slot。
(2) 資源不足問題
分配給一個slot sharing group的所有算子會被Flink盡可能分配到同一個slot中運(yùn)行。如果算子的并行度較高,而集群資源不足,可能會導(dǎo)致部分算子不能有效共享slot,這時可以通過調(diào)整集群資源或者優(yōu)化slot分配策略來解決。
(3) 鏈?zhǔn)讲僮饔绊?/p>
為可鏈?zhǔn)讲僮鞯乃阕釉O(shè)置不同的slot sharing group可能會導(dǎo)致鏈?zhǔn)讲僮?nbsp;operator chains 產(chǎn)生割裂,從而改變性能。因此,在設(shè)置slot sharing group時,需要考慮算子之間的鏈?zhǔn)疥P(guān)系。
(4) 算子調(diào)度
slot 共享組僅僅意味著調(diào)度器可以使被分組的算子被部署到同一個slot中,但無法保證將被分組的算子部署在一起。如果被分組算子被部署到單獨(dú)的slot中,slot資源將從特定的資源組需求中派生而來。
7. 共享組與并行度、Slot、算子鏈的最優(yōu)搭配
(1) 相同共享組內(nèi)的搭配
將資源需求相似的算子分配到同一個共享組中,可以提高資源利用率。例如,將所有的非資源密集型算子(如source、map、sink)分配到一個共享組,將資源密集型算子(如aggregate、reduce、sum、window)分配到另一個共享組。在同一個共享組內(nèi),盡量讓并行度相同的算子形成算子鏈,在同一個Slot中執(zhí)行。例如,在一個共享組內(nèi),Source算子和Map算子的并行度都為3,且滿足算子鏈的條件,它們可以形成算子鏈,只需要3個Slot。
(2) 不同共享組的隔離
對于資源需求差異較大或容易產(chǎn)生背壓的算子,分配到不同的共享組中,以避免資源競爭和背壓傳播。例如,將容易產(chǎn)生背壓的窗口操作算子分配到一個獨(dú)立的共享組,與其他算子隔離開來。這樣即使窗口操作算子產(chǎn)生背壓,也不會影響其他共享組中的算子。
(3) 綜合考慮并行度和Slot數(shù)量
在設(shè)置共享組時,需要綜合考慮整個作業(yè)的并行度和集群中可用的Slot數(shù)量。如果共享組內(nèi)的算子并行度之和超過了可用的Slot數(shù)量,可能會導(dǎo)致部分任務(wù)等待或資源不足。因此,需要合理調(diào)整算子的并行度和共享組的分配,以確保作業(yè)能夠高效執(zhí)行。