偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Flink Task調(diào)度部署機(jī)制

開發(fā) 前端
Flink開源社區(qū)較活躍,Task側(cè)的部署鏈路也一直在演進(jìn)中,持續(xù)跟進(jìn)并深入了解內(nèi)部實(shí)現(xiàn)邏輯能更好的支持我們解決Flink個(gè)性化調(diào)度策略上的一些問題。

1背景

在日常Flink使用過程中,我們經(jīng)常遇到Flink任務(wù)中某些Slot或者TM負(fù)載過重的問題,對(duì)日常的資源調(diào)配、運(yùn)維以及降本都帶來了很大的影響,所以我們對(duì)Flink的task部署機(jī)制進(jìn)行了梳理和調(diào)研,準(zhǔn)備在后續(xù)的工作中進(jìn)行優(yōu)化。由于jobGraph的生成以及任務(wù)提交流程因任務(wù)部署方式而不同,對(duì)我們后續(xù)的分析也沒有影響,這里忽略前置流程,直接從Dispatcher出發(fā),重點(diǎn)關(guān)注submit后executionGraph構(gòu)建以及后續(xù)的任務(wù)部署過程。

2Flink Scheduling Components 構(gòu)成

2.1   SchedulerNG

在Dispatcher收到submit請(qǐng)求后,先是啟動(dòng)了JobManagerRunner,再啟動(dòng)JobMaster,在初始化jobMaster的過程中,我們注意到這里開始了整個(gè)作業(yè)的Scheduling第一步,創(chuàng)建SchedulerNG。

this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);

我們看下SchedulerNG的職責(zé),可以看到調(diào)度的發(fā)起,作業(yè)狀態(tài)的跟蹤以及我們熟悉的cp,sp的trigger都是在這里:

圖片

我們這次主要跟蹤構(gòu)建executionGraph,然后根據(jù)Scheduling策略發(fā)起的整個(gè)部署過程。

2.2   ExecutionGraph

現(xiàn)階段(1.13)SchedulerNG默認(rèn)實(shí)現(xiàn)是DefaultScheduler,初始化過程中就會(huì)開始構(gòu)建我們的ExecutionGraph,ExecutionGraph中有幾個(gè)重要元素

  1. ExecutionJobVertex: 代表jobGraph中的一個(gè)JobVertex,是所有并行Task的集合
  2. ExecutionVertex: 代表ExecutionJobVertex中并行task中的一個(gè),一個(gè)ExecutionJobVertex可能同時(shí)有很多并行運(yùn)行的ExecutionVertex
  3. Execution: 代表ExecutionVertex的一次部署/執(zhí)行,一個(gè)ExecutionVertex可能會(huì)有很多次Execution

這里executionGraph通過jobGraph的拓?fù)鋱D構(gòu)建了自己的核心結(jié)構(gòu),看下從JobVertex到ExecutionJobVertex 的轉(zhuǎn)換流程:

// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
executionGraph.attachJobGraph(sortedTopology){
1. executionGraph第一步拿到了jobGraph中的有序JobVertex列表
2. 接著一對(duì)一創(chuàng)建ExecutionJobVertex
3. 根據(jù)producer并行度生成producedDataSets(IntermediateDataSet)
4. 再根據(jù)自身并行度生成所屬的ExecutionVertex[]
5. 構(gòu)建stateBackend信息和checkpointStorage信息等
6. 最后完成executionGraph的拓?fù)錁?gòu)建executionTopology
}

2.3   執(zhí)行層拓?fù)浣Y(jié)構(gòu)

我們知道Flink引擎在不停的致力于批流一體建設(shè),調(diào)度層的統(tǒng)一也是其中核心的一層。為了提高failover后recovery速度,減少對(duì)Flink任務(wù)的影響,現(xiàn)在Flink對(duì)于批、流的任務(wù)task調(diào)度都是以pipeline region為基礎(chǔ)。

Pipeline region的構(gòu)建內(nèi)嵌在executionGraph的初始化過程中,我們知道Flink中各個(gè)節(jié)點(diǎn)之間的鏈接都會(huì)有IntermediateDataSet這一種邏輯結(jié)構(gòu),用來表示JobVertex的輸出,即該JobVertex中包含的算子會(huì)產(chǎn)生的數(shù)據(jù)集。這個(gè)數(shù)據(jù)集的ResultPartitionType有幾種類型:

BLOCKING:都上游處理完數(shù)據(jù)后,再交給下游處理。這個(gè)數(shù)據(jù)分區(qū)可以被消費(fèi)多次,也可以并發(fā)消費(fèi)。這個(gè)分區(qū)并不會(huì)被自動(dòng)銷毀,而是交給調(diào)度器判斷。
BLOCKING_PERSISTENT:類似于Blocking,但是其生命周期由用戶端指定。調(diào)用JobMaster或者ResourceManager的API來銷毀,而不是由調(diào)度器控制。
PIPELINED:流交換模式??梢杂糜谟薪绾蜔o界流。這種分區(qū)類型的數(shù)據(jù)只能被每個(gè)消費(fèi)者消費(fèi)一次。且這種分區(qū)可以保留任意數(shù)據(jù)。
PIPELINED_BOUNDED:該策略在PIPELINED的基礎(chǔ)上保留有限制的buffer,避免對(duì)barrier造成阻塞。
PIPELINED_APPROXIMATE:和PIPELINED_BOUNDED類似,可以支持下游task重啟后繼續(xù)消費(fèi),用來支持task failover后的Approximate Local-Recovery策略。

接下來我們看看executionGraph的核心拓?fù)浣Y(jié)構(gòu)ExecutionTopology是如何構(gòu)建的:

第一步 先根據(jù)executionTopology構(gòu)建rawPipelinedRegions,多個(gè)vertex能否組合成一個(gè)pipeline region的關(guān)鍵在于這個(gè)vertex的consumedResult.getResultType().isReconnectable(),如果支持重連,那么兩個(gè)vertex之間就會(huì)進(jìn)行拆分,劃到不同的region。這里的isReconnectable就和我們的ResultPartitionType類型有關(guān),流處理中的PIPELINED和PIPELINED_BOUNDED都是默認(rèn)的false,在這種情況下所有的vertex其實(shí)都會(huì)放入同一個(gè)region。故我們?nèi)粘5膄link作業(yè)其實(shí)都只會(huì)生成一個(gè)pipeline region。
第二步 根據(jù)不同的pipeline region構(gòu)建自己的resultPartition信息,這個(gè)是為了構(gòu)建后續(xù)的PartitionReleaseStrategy,決定一個(gè)resultPartition何時(shí)finish以及被release
第三步 對(duì)vertex的coLocation情況進(jìn)行校驗(yàn),保證co-located tasks必須在同一個(gè)pipeline Region里。這里是因?yàn)楹罄m(xù)的scheduling strategy里會(huì)保證不同pipeline region的調(diào)度部署是階段隔離的,可能無法滿足colocation-constraint

2.4   Scheduling 策略

SchedulerNG Scheduling策略默認(rèn)為PipelinedRegionSchedulingStrategy,在executionGraph完成之后,就可以根據(jù)生成的剛剛executionTopology來初步構(gòu)建初步的Scheduling策略了。這里看下startScheduling代碼,可以看到Scheduling過程就是我們常說的基于pipeline region的Scheduling。

@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(this::isSourceRegion)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}

2.5   Execution Slot 分配器

默認(rèn)實(shí)現(xiàn)是SlotSharingExecutionSlotAllocator,在schedulerNG完成executionGraph構(gòu)建完成后,需要進(jìn)一步構(gòu)建Execution Slot 分配器。用于將physical shared slots分配到我們的logical slots 上,并將logical slot 分配給我們executionGraph中的execution(task)。通過代碼我們可以看到ExecutionSlotAllocator的職責(zé)非常簡單,只有簡單的allocate和cancel。

圖片

但在實(shí)現(xiàn)上這里有幾個(gè)重要元素需要了解:

LocalInputPreferredSlotSharingStrategy :在Flink內(nèi)部,所有的slot分配都是基于sharingslot來操作的,在滿足co-location的基礎(chǔ)上,F(xiàn)link期望將producer和consumeNode task盡可能的分布在一起,以減少數(shù)據(jù)傳輸成本。

SlotProfile:slot的資源信息,對(duì)task -> logical slot -> physical slot的mapping有非常重要的作用,包含了task的資源信息,slot的物理資源信息,傾向的location(TaskManagerLocation),傾向的allocation以及整個(gè)executionGraph之前分配過的allocation(用于黑名單,重啟后盡量避免分配在之前的slot里)。

ResourceProfileRetriever: 用于獲取executionVertex的實(shí)際資源信息。默認(rèn)是unknown,如果有明細(xì)配置會(huì)用于后續(xù)的executionSlotSharingGroup資源構(gòu)建。

ExecutionSlotSharingGroup:Flink task資源申請(qǐng)的最終邏輯載體,用于將sharing到一起的task(execution group)組合成一個(gè)group用于生成資源,后續(xù)部署也會(huì)綁定對(duì)應(yīng)的task。

3Scheduling 主要過程

在JobMaster完成自身構(gòu)建之后,就委托SchedulerNG來開始了整個(gè)job的Scheduling:

@Override
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
transitionToRunning();
schedulingStrategy.startScheduling();
}

可以看到這里是由schedulingStrategy來負(fù)責(zé)整個(gè)調(diào)度過程的,也就是我們的PipelinedRegionSchedulingStrategy,

one by one將pipeline region進(jìn)行部署

private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
schedulingTopology, regions);


final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>();
for (SchedulingPipelinedRegion region : regionsSorted) {
maybeScheduleRegion(region, consumableStatusCache);
}
}

遍歷region中的ExecutionVertex依次進(jìn)行部署

final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region), id -> deploymentOption);
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);

將vertexDeployment交給SlotSharingExecutionSlotAllocator處理

private List<SlotExecutionVertexAssignment> allocateSlots(
final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
return executionSlotAllocator.allocateSlotsFor(
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList()));
}
接下來整個(gè)allocate的主要過程如下(忽略physical fail等情況)

通過SlotSharingStrategy拿到每個(gè)execution對(duì)應(yīng)的ExecutionSlotSharingGroup

  1. 先從 corresponding co-location constraint 去mapping中尋找是否有存在的slot sharing group
  2. 接著從producer 的角度來逐一檢查是否可以合并到同一個(gè)slot sharing group.
  3. 最后嘗試所有剩下的slot sharing group看是否符合execution 的要求(如同屬于一個(gè)job vertex的task不能分配到同一個(gè) slot sharing group).
  4. 如果以上都沒有滿足條件的就創(chuàng)建一個(gè)新的slot sharing group
  1. 檢查ExecutionSlotSharingGroup是否已經(jīng)有了對(duì)應(yīng)的sharedSlot
  2. 遍歷尚未得到分配的ExecutionSlotSharingGroup
  3. 計(jì)算對(duì)應(yīng)的SlotProfile
  4. 向PhysicalSlotProvider申請(qǐng)新的physical slot
  1. rm側(cè)會(huì)先檢查是否已經(jīng)有滿足條件的excess slot

  2. 如果沒有嘗試會(huì)申請(qǐng)新的woker以提供資源

  3. 由sharedSlotProfileRetriever來創(chuàng)建對(duì)應(yīng)的slotProfile并構(gòu)建PhysicalSlotRequest

  4. PhysicalSlotProvider向slotPool申請(qǐng)新的slot

  5. slotPool會(huì)向rm側(cè)申請(qǐng)新的slot

  1. 利用physical slot  future提前創(chuàng)建sharedSlotFutrue

  2. 將sharedSlotFutrue 分配給所有相關(guān)的executions

  3. 最后生成所有的SlotExecutionVertexAssignments

在完成所有的SlotExecutionVertexAssignment之后,生成對(duì)應(yīng)的DeploymentHandle并等待所有的assignedSlot創(chuàng)建完畢,正式開始部署對(duì)應(yīng)的任務(wù)。?

4問題思考

我們對(duì)整個(gè)Flink task的部署過程完成梳理后,重新對(duì)我們一開始的問題進(jìn)行思考:

4.1   為什么會(huì)出現(xiàn)slot負(fù)載過重的情況?如何避免?

問題的產(chǎn)生在于大量的task集中分配到了統(tǒng)一個(gè)sharedSlot,這個(gè)我們可以發(fā)現(xiàn)其實(shí)是在ExecutionSlotSharingGroup的構(gòu)建過程中產(chǎn)生的。我們看下源碼,可以很直接的看到整個(gè)group的分配是一個(gè)roundRobin過程,而executionVertices來自于有序拓?fù)浣Y(jié)構(gòu),中間傳遞過程也保證了有序性,所以最終會(huì)導(dǎo)致大量的task分配的index靠前的group中,最后落到了同一個(gè)slot。

為了避免這種情況,我們的做法其實(shí)有比較多,一種是在保證各種constraint的同時(shí)添加隨機(jī)性,以打散各個(gè)不均勻的task;還有一種就是構(gòu)建基于load-balance的分配過程,以盡可能的將task分布均勻。

附Flink部分源碼:

private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
final List<SchedulingExecutionVertex> executionVertices) {


for (SchedulingExecutionVertex executionVertex : executionVertices) {
final SlotSharingGroup slotSharingGroup =
getSlotSharingGroup(executionVertex.getId());
final List<ExecutionSlotSharingGroup> groups =
executionSlotSharingGroups.computeIfAbsent(
slotSharingGroup.getSlotSharingGroupId(), k -> new ArrayList<>());


ExecutionSlotSharingGroup group = null;
for (ExecutionSlotSharingGroup executionSlotSharingGroup : groups) {
if (isGroupAvailableForVertex(
executionSlotSharingGroup, executionVertex.getId())) {
group = executionSlotSharingGroup;
break;
}
}


if (group == null) {
group = new ExecutionSlotSharingGroup();
group.setResourceProfile(slotSharingGroup.getResourceProfile());
groups.add(group);
}


addVertexToExecutionSlotSharingGroup(executionVertex, group);
}
}
4.2   如何避免tm級(jí)別的負(fù)載過重?

這個(gè)問題主要是在于說有一些過重的task對(duì)應(yīng)的slot都分配在了同一個(gè)tm上,導(dǎo)致整個(gè)tm壓力過大,資源難以協(xié)調(diào)。在整個(gè)過程中其實(shí)我們有看到tm信息的交互,在co-location constraint上。我們看下該hint職責(zé):

The co-location group is used to make sure that the i-th subtasks for iteration head and iteration tail are scheduled on the same TaskManager.

也就是說其實(shí)是為了解決算子間相同index的task數(shù)據(jù)傳遞之類的問題,但對(duì)于task的均衡負(fù)載無法介入。對(duì)此我們嘗試去做的事情:

在當(dāng)前不使用細(xì)粒度資源配置的情況下,考慮task-slot之間均衡分布的同事,task-tm也能做到一定的負(fù)載均衡。這種情況可以通過tm單slot來解決,也可以在保證task-slotSharingGroup足夠隨機(jī)性的同時(shí),保證slotSharingGroup-tm的足夠隨機(jī)性。

在后續(xù)使用使用細(xì)粒度資源配置的情況下,不使用slotsharing,且將相同jobVertex對(duì)應(yīng)的task盡量分布在同一個(gè)task當(dāng)中。這個(gè)我們后續(xù)準(zhǔn)備在slotProfile中加入jobVertex相關(guān)的tag,SlotAllocator做slot matching的時(shí)候加入jobVertex constraint來保證task的位置分配。

5寫在最后

Flink開源社區(qū)較活躍,Task側(cè)的部署鏈路也一直在演進(jìn)中,持續(xù)跟進(jìn)并深入了解內(nèi)部實(shí)現(xiàn)邏輯能更好的支持我們解決Flink個(gè)性化調(diào)度策略上的一些問題。后續(xù)我們也準(zhǔn)備進(jìn)一步完善Flink在operator級(jí)別的細(xì)粒度資源配置能力,降低資源使用率的同時(shí)進(jìn)一步提高Flink作業(yè)穩(wěn)定性。

責(zé)任編輯:武曉燕 來源: 得物技術(shù)
相關(guān)推薦

2024-02-27 08:05:32

Flink分區(qū)機(jī)制數(shù)據(jù)傳輸

2014-01-06 17:09:10

ApacheMesos

2025-08-27 10:00:00

FlinkCheckpoint大數(shù)據(jù)

2022-01-14 07:56:38

Checkpoint機(jī)制Flink

2025-01-15 09:13:53

2021-11-02 06:58:55

FlinkWindow機(jī)制

2022-12-20 10:22:16

計(jì)算函數(shù)

2024-06-04 15:56:48

Task?.NET異步編程

2015-03-24 16:29:55

默認(rèn)線程池java

2020-10-10 14:21:49

CDH6.3.2flink部署

2025-09-08 02:00:00

2013-08-05 17:09:57

2023-06-20 07:32:04

2025-10-29 07:00:00

FlinkSpark大數(shù)據(jù)

2021-07-30 19:44:51

AndroidJava線程

2020-03-03 08:29:07

時(shí)延敏感網(wǎng)絡(luò)TSN網(wǎng)絡(luò)

2022-06-20 06:38:50

Flink批作業(yè)算子

2021-02-01 11:30:13

React前端調(diào)度

2025-09-15 01:45:00

2025-06-03 07:00:00

大數(shù)據(jù)Flink并行度
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)