Flink執(zhí)行流程與源碼分析
本文轉(zhuǎn)載自微信公眾號「大數(shù)據(jù)左右手」,作者王了個博。轉(zhuǎn)載本文請聯(lián)系大數(shù)據(jù)左右手公眾號。
Flink主要組件
作業(yè)管理器(JobManager)
(1) 控制一個應(yīng)用程序執(zhí)行的主進(jìn)程,也就是說,每個應(yīng)用程序 都會被一個不同的Jobmanager所控制執(zhí)行
(2) Jobmanager會先接收到要執(zhí)行的應(yīng)用程序,這個應(yīng)用程序會包括:作業(yè)圖( Job Graph)、邏輯數(shù)據(jù)流圖( ogical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。
(3) Jobmanager會把 Jobgraph轉(zhuǎn)換成一個物理層面的 數(shù)據(jù)流圖,這個圖被叫做 “執(zhí)行圖”(Executiongraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)。Job Manager會向資源管理器( Resourcemanager)請求執(zhí)行任務(wù)必要的資源,也就是 任務(wù)管理器(Taskmanager)上的插槽slot。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運行它們的 Taskmanager上。而在運行過程中Jobmanagera會負(fù)責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點(checkpoints)的協(xié)調(diào)。
任務(wù)管理器(Taskmanager)
(1) Flink中的工作進(jìn)程。通常在 Flink中會有多個 Taskmanageria運行, 每個 Taskmanageri都包含了一定數(shù)量的插槽( slots)。插槽的數(shù)量限制了Taskmanageri能夠執(zhí)行的任務(wù)數(shù)量。
(2) 啟動之后, Taskmanager會向資源管理器注冊它的插槽;收到資源管理器的指令后, Taskmanageri就會將一個或者多個插槽提供給Jobmanageri調(diào)用。Jobmanager就可以向插槽分配任務(wù)( tasks)來執(zhí)行了。
(3) 在執(zhí)行過程中, 一個 Taskmanagera可以跟其它運行同一應(yīng)用程序的Taskmanager交換數(shù)據(jù)。
資源管理器(Resource Manager)
(1) 主要負(fù)責(zé)管理任務(wù)管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定義的處理資源單元。
(2) Flink 為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如YARNMesos、K8s,以及 standalone部署。
(3) 當(dāng) Jobmanager申請插槽資源時, Resourcemanager會將有空閑插槽的Taskmanager?分配給Jobmanager。如果 Resourcemanagery沒有足夠的插槽來滿足 Jobmanager的請求, 它還可以向資源提供平臺發(fā)起會話,以提供啟動 Taskmanager進(jìn)程的容器。
分發(fā)器(Dispatcher)
(1) 可以跨作業(yè)運行,它為應(yīng)用提交提供了REST接口。
(2)當(dāng)一個應(yīng)用被提交執(zhí)行時,分發(fā)器就會啟動并將應(yīng)用移交給Jobmanage
(3) Dispatcher他會啟動一個 WebUi,用來方便地 展示和監(jiān)控作業(yè)執(zhí)行的信息。
任務(wù)提交流程
- 提交應(yīng)用
 - 啟動并提交應(yīng)用
 - 請求slots
 - 任務(wù)啟動
 - 注冊slots
 - 發(fā)出提供slot的指令
 - 提供slots
 - 提交要在slots中執(zhí)行的任務(wù)
 - 交換數(shù)據(jù)
 
任務(wù)提交流程(YARN)
a. Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置
b. 隨后向 Yarn ResourceManager提交任務(wù)ResourceManager分配 Container資源并通知對應(yīng)的NodeManager啟動
c. ApplicationMaster,ApplicationMaster 啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境
d. 然后啟動JobManager , 之后ApplicationMaster 向ResourceManager 申請資源啟動TaskManager
e. ResourceManager 分配 Container 資源后 , 由ApplicationMaster通知資源所在節(jié)點的NodeManager啟動TaskManager
f. NodeManager 加載 Flink 的 Jar 包和配置構(gòu)建環(huán)境并啟動 TaskManager
g. TaskManager 啟動后向 JobManager 發(fā)送心跳包,并等待 JobManager 向其分配任務(wù)。
源碼分析--集群啟動 JobManager 啟動分析
JobManager 的內(nèi)部包含非常重要的三大組件
- WebMonitorEndpoint
 - ResourceManager
 - Dispatcher
 
入口,啟動主類:StandaloneSessionClusterEntrypoint
- // 入 口
 - StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint);
 - clusterEntrypoint.startCluster();
 - runCluster(configuration, pluginManager);
 - // 第一步:初始化各種服務(wù)
 - /**
 - * 初始化了 主節(jié)點對外提供服務(wù)的時候所需要的 三大核心組件啟動時所需要的基礎(chǔ)服務(wù)
 - * 初始化服務(wù),如 JobManager 的 Akka RPC 服務(wù),HA 服務(wù),心跳檢查服務(wù),metric service
 - * 這些服務(wù)都是 Master 節(jié)點要使用到的一些服務(wù)
 - * 1、commonRpcService: 基于 Akka 的 RpcService 實現(xiàn)。RPC 服務(wù)啟動 Akka 參與者來接收從 RpcGateway 調(diào)用 RPC
 - * 2、haServices: 提供對高可用性所需的所有服務(wù)的訪問注冊,分布式計數(shù)器和領(lǐng)導(dǎo)人選舉
 - * 3、blobServer: 負(fù)責(zé)偵聽傳入的請求生成線程來處理這些請求。它還負(fù)責(zé)創(chuàng)建要存儲的目錄結(jié)構(gòu) blob 或臨時緩存它們
 - * 4、heartbeatServices: 提供心跳所需的所有服務(wù)。這包括創(chuàng)建心跳接收器和心跳發(fā)送者。
 - * 5、metricRegistry: 跟蹤所有已注冊的 Metric,它作為連接 MetricGroup 和 MetricReporter
 - * 6、archivedExecutionGraphStore: 存儲執(zhí)行圖ExecutionGraph的可序列化形式。
 - */
 - initializeServices(configuration, pluginManager);
 - // 創(chuàng)建 DispatcherResourceManagerComponentFactory, 初始化各種組件的
 - 工廠實例
 - // 其實內(nèi)部包含了三個重要的成員變量:
 - // 創(chuàng)建 ResourceManager 的工廠實例
 - // 創(chuàng)建 Dispatcher 的工廠實例
 - // 創(chuàng)建 WebMonitorEndpoint 的工廠實例
 - createDispatcherResourceManagerComponentFactory(configuration);
 - // 創(chuàng)建 集群運行需要的一些組件:Dispatcher, ResourceManager 等
 - // 創(chuàng) 建 ResourceManager
 - // 創(chuàng) 建 Dispatcher
 - // 創(chuàng) 建 WebMonitorEndpoint
 - clusterComponent = dispatcherResourceManagerComponentFactory.create(...)
 
1. initializeServices():初始化各種服務(wù)
- // 初 始 化 和 啟 動 AkkaRpcService, 內(nèi) 部 其 實 包 裝 了 一 個 ActorSystem commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...)
 - // 初始化一個負(fù)責(zé) IO 的線程池
 - ioExecutor = Executors.newFixedThreadPool(...)
 - // 初始化 HA 服務(wù)組件,負(fù)責(zé) HA 服務(wù)的是:ZooKeeperHaServices haServices = createHaServices(configuration, ioExecutor);
 - // 初始化 BlobServer 服務(wù)端
 - blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start();
 - // 初始化心跳服務(wù)組件, heartbeatServices = HeartbeatServices heartbeatServices = createHeartbeatServices(configuration);
 - // 初始化一個用來存儲 ExecutionGraph 的 Store, 實現(xiàn)是:
 - FileArchivedExecutionGraphStore
 - archivedExecutionGraphStore = createSerializableExecutionGraphStore(...)
 
2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多組件的工廠實例
- 1、DispatcherRunnerFactory,默認(rèn)實現(xiàn):DefaultDispatcherRunnerFactory
 - 2、ResourceManagerFactory,默認(rèn)實現(xiàn):StandaloneResourceManagerFactory
 - 3、RestEndpointFactory,默認(rèn)實現(xiàn):SessionRestEndpointFactory
 - clusterComponent = dispatcherResourceManagerComponentFactory
 - .create(configuration, ioExecutor, commonRpcService, haServices,
 - blobServer, heartbeatServices, metricRegistry,
 - archivedExecutionGraphStore,
 - new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
 - this);
 
3. 創(chuàng)建 WebMonitorEndpoint
- /*************************************************
 - * 創(chuàng)建 WebMonitorEndpoint 實例, 在 Standalone 模式下:DispatcherRestEndpoint
 - * 1、restEndpointFactory = SessionRestEndpointFactory
 - * 2、webMonitorEndpoint = DispatcherRestEndpoint
 - * 3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService() = ZooKeeperLeaderElectionService
 - * 當(dāng)前這個 DispatcherRestEndpoint 的作用是:
 - * 1、初始化的過程中,會一大堆的 Handler
 - * 2、啟動一個 Netty 的服務(wù)端,綁定了這些 Handler
 - * 3、當(dāng) client 通過 flink 命令執(zhí)行了某些操作(發(fā)起 restful 請求), 服務(wù)端由 webMonitorEndpoint 來執(zhí)行處理
 - * 4、舉個例子: 如果通過 flink run 提交一個 Job,那么最后是由 webMonitorEndpoint 中的 JobSubmitHandler 來執(zhí)行處理
 - * 5、補充一個:job 由 JobSubmitHandler 執(zhí)行完畢之后,轉(zhuǎn)交給 Dispatcher 去調(diào)度執(zhí)行
 - */
 - webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
 - configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever,
 - blobServer, executor, metricFetcher,
 - highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
 - fatalErrorHandler
 - );
 
4. 創(chuàng)建 resourceManager
- /*************************************************
 - * 創(chuàng)建 StandaloneResourceManager 實例對象
 - * 1、resourceManager = StandaloneResourceManager
 - * 2、resourceManagerFactory = StandaloneResourceManagerFactory
 - */
 - resourceManager = resourceManagerFactory.createResourceManager(
 - configuration, ResourceID.generate(),
 - rpcService, highAvailabilityServices, heartbeatServices,
 - fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()),
 - webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname
 - );
 
- protected ResourceManager<ResourceID> createResourceManager(
 - Configuration configuration,
 - ResourceID resourceId,
 - RpcService rpcService,
 - HighAvailabilityServices highAvailabilityServices,
 - HeartbeatServices heartbeatServices,
 - FatalErrorHandler fatalErrorHandler,
 - ClusterInformation clusterInformation,
 - @Nullable String webInterfaceUrl,
 - ResourceManagerMetricGroup resourceManagerMetricGroup,
 - ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
 - final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);
 - /*************************************************
 - * 注釋: 得到一個 StandaloneResourceManager 實例對象
 - */
 - return new StandaloneResourceManager(
 - rpcService,
 - resourceId,
 - highAvailabilityServices,
 - heartbeatServices,
 - resourceManagerRuntimeServices.getSlotManager(),
 - ResourceManagerPartitionTrackerImpl::new,
 - resourceManagerRuntimeServices.getJobLeaderIdService(),
 - clusterInformation,
 - fatalErrorHandler,
 - resourceManagerMetricGroup,
 - standaloneClusterStartupPeriodTime,
 - AkkaUtils.getTimeoutAsTime(configuration)
 - );
 - }
 
- /**
 - requestSlot():接受 solt請求
 - sendSlotReport(..): 將solt請求發(fā)送TaskManager
 - registerJobManager(...): 注冊job管理者。 該job指的是 提交給flink的應(yīng)用程序
 - registerTaskExecutor(...): 注冊task執(zhí)行者。
 - **/
 - public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices,
 - HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
 - JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler,
 - ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) {
 - /*************************************************
 - * 注釋: 當(dāng)執(zhí)行完畢這個構(gòu)造方法的時候,會觸發(fā)調(diào)用 onStart() 方法執(zhí)行
 - */
 - super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null);
 
- protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
 - this.rpcService = checkNotNull(rpcService, "rpcService");
 - this.endpointId = checkNotNull(endpointId, "endpointId");
 - /*************************************************
 - * 注釋:ResourceManager 或者 TaskExecutor 中的 RpcServer 實現(xiàn)
 - * 以 ResourceManager 為例說明:
 - * 啟動 ResourceManager 的 RPCServer 服務(wù)
 - * 這里啟動的是 ResourceManager 的 Rpc 服務(wù)端。
 - * 接收 TaskManager 啟動好了而之后, 進(jìn)行注冊和心跳,來匯報 Taskmanagaer 的資源情況
 - * 通過動態(tài)代理的形式構(gòu)建了一個Server
 - */
 - this.rpcServer = rpcService.startServer(this);
 
5. 在創(chuàng)建resourceManager同級:啟動任務(wù)接收器Starting Dispatcher
- /*************************************************
 - * 創(chuàng)建 并啟動 Dispatcher
 - * 1、dispatcherRunner = DispatcherRunnerLeaderElectionLifecycleManager
 - * 2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory
 - * 第一個參數(shù):ZooKeeperLeaderElectionService
 - * -
 - * 老版本: 這個地方是直接創(chuàng)建一個 Dispatcher 對象然后調(diào)用 dispatcher.start() 來啟動
 - * 新版本: 直接創(chuàng)建一個 DispatcherRunner, 內(nèi)部就是要創(chuàng)建和啟動 Dispatcher
 - * -
 - * DispatcherRunner 是對 Dispatcher 的封裝。
 - * DispatcherRunner被創(chuàng)建的代碼的內(nèi)部,會創(chuàng)建 Dispatcher并啟動
 - */
 - log.debug("Starting Dispatcher.");
 - dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
 - highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler,
 - // TODO_ZYM 注釋: 注意第三個參數(shù)
 - new HaServicesJobGraphStoreFactory(highAvailabilityServices),
 - ioExecutor, rpcService, partialDispatcherServices
 - );
 
Dispatcher 啟動后,將會等待任務(wù)提交,如果有任務(wù)提交,則會經(jīng)過submitJob(...)函數(shù)進(jìn)入后續(xù)處理。
提交(一個Flink應(yīng)用的提交必須經(jīng)過三個graph的轉(zhuǎn)換)
首先看下一些名詞
StreamGraph
是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)。可以用一個 DAG 來表示),DAG 的頂點是 StreamNode,邊是 StreamEdge,邊包含了由哪個 StreamNode 依賴哪個 StreamNode。
- StreamNode:用來代表 operator 的類,并具有所有相關(guān)的屬性,如并發(fā)度、入邊和出邊等。
 - StreamEdge:表示連接兩個StreamNode的邊。
 
DataStream 上常見的 transformation 有 map、flatmap、filter等(見DataStream Transformation了解更多)。這些transformation會構(gòu)造出一棵 StreamTransformation 樹,通過這棵樹轉(zhuǎn)換成 StreamGraph
以map方法為例,看看源碼
- public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
 - // 通過java reflection抽出mapper的返回值類型
 - TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
 - Utils.getCallLocationName(), true);
 - // 返回一個新的DataStream,SteramMap 為 StreamOperator 的實現(xiàn)類
 - return transform("Map", outType, new StreamMap<>(clean(mapper)));
 - }
 - public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
 - // read the output type of the input Transform to coax out errors about MissingTypeInfo
 - transformation.getOutputType();
 - // 新的transformation會連接上當(dāng)前DataStream中的transformation,從而構(gòu)建成一棵樹
 - OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
 - this.transformation,
 - operatorName,
 - operator,
 - outTypeInfo,
 - environment.getParallelism());
 - @SuppressWarnings({ "unchecked", "rawtypes" })
 - SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
 - // 所有的transformation都會存到 env 中,調(diào)用execute時遍歷該list生成StreamGraph
 - getExecutionEnvironment().addOperator(resultTransform);
 - return returnStream;
 - }
 
map轉(zhuǎn)換將用戶自定義的函數(shù)MapFunction包裝到StreamMap這個Operator中,再將StreamMap包裝到OneInputTransformation,最后該transformation存到env中,當(dāng)調(diào)用env.execute時,遍歷其中的transformation集合構(gòu)造出StreamGraph
JobGraph
(1) StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點 chain 在一起作為一個節(jié)點。
- 將并不涉及到 shuffle 的算子進(jìn)行合并。
 - 對于同一個 operator chain 里面的多個算子,會在同一個 task 中執(zhí)行。
 - 對于不在同一個 operator chain 里的算子,會在不同的 task 中執(zhí)行。
 
(2) JobGraph 用來由 JobClient 提交給 JobManager,是由頂點(JobVertex)、中間結(jié)果(IntermediateDataSet)和邊(JobEdge)組成的 DAG 圖。
(3) JobGraph 定義作業(yè)級別的配置,而每個頂點和中間結(jié)果定義具體操作和中間數(shù)據(jù)的設(shè)置。
JobVertex
JobVertex 相當(dāng)于是 JobGraph 的頂點。經(jīng)過優(yōu)化后符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。
IntermediateDataSet
JobVertex的輸出,即經(jīng)過operator處理產(chǎn)生的數(shù)據(jù)集。
JobEdge
job graph中的一條數(shù)據(jù)傳輸通道。source 是IntermediateDataSet,sink 是 JobVertex。即數(shù)據(jù)通過JobEdge由IntermediateDataSet傳遞給目標(biāo)JobVertex。
(1) 首先是通過API會生成transformations,通過transformations會生成StreamGraph。
(2)將StreamGraph的某些StreamNode Chain在一起生成JobGraph,前兩步轉(zhuǎn)換都是在客戶端完成。
(3)最后會將JobGraph轉(zhuǎn)換為ExecutionGraph,相比JobGraph會增加并行度的概念,這一步是在Jobmanager里完成。
ExecutionJobVertex
ExecutionJobVertex一一對應(yīng)JobGraph中的JobVertex
ExecutionVertex
一個ExecutionJobVertex對應(yīng)n個ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任務(wù)的一個子任務(wù)
Execution
Execution 是對 ExecutionVertex 的一次執(zhí)行,通過 ExecutionAttemptId 來唯一標(biāo)識。
IntermediateResult
在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的對外輸出,一個 JobGraph 可能有 n(n >=0) 個輸出。在 ExecutionGraph 中,與此對應(yīng)的就是 IntermediateResult。每一個 IntermediateResult 就有 numParallelProducers(并行度) 個生產(chǎn)者,每個生產(chǎn)者的在相應(yīng)的 IntermediateResult 上的輸出對應(yīng)一個 IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一個輸出分區(qū)
ExecutionEdge
ExecutionEdge 表示 ExecutionVertex 的輸入,通過 ExecutionEdge 將 ExecutionVertex 和 IntermediateResultPartition 連接起來,進(jìn)而在不同的 ExecutionVertex 之間建立聯(lián)系。
ExecutionGraph的構(gòu)建
- 構(gòu)建JobInformation
 - 構(gòu)建ExecutionGraph
 - 將JobGraph進(jìn)行拓?fù)渑判?獲取sortedTopology頂點集合
 
- // ExecutionGraphBuilder
 - public static ExecutionGraph buildGraph(
 - @Nullable ExecutionGraph prior,
 - JobGraph jobGraph,
 - ...) throws JobExecutionException, JobException {
 - // 構(gòu)建JobInformation
 - // 構(gòu)建ExecutionGraph
 - // 將JobGraph進(jìn)行拓?fù)渑判?獲取sortedTopology頂點集合
 - List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
 - executionGraph.attachJobGraph(sortedTopology);
 - return executionGraph;
 - }
 
構(gòu)建ExecutionJobVertex,連接IntermediateResultPartition和ExecutionVertex
- //ExecutionGraph
 - public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
 - for (JobVertex jobVertex : topologiallySorted) {
 - // 構(gòu)建ExecutionJobVertex
 - ExecutionJobVertex ejv = new ExecutionJobVertex(
 - this,
 - jobVertex,
 - 1,
 - maxPriorAttemptsHistoryLength,
 - rpcTimeout,
 - globalModVersion,
 - createTimestamp);
 - // 連接IntermediateResultPartition和ExecutionVertex
 - ev.connectToPredecessors(this.intermediateResults);
 - }
 - // ExecutionJobVertex
 - public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
 - List<JobEdge> inputs = jobVertex.getInputs();
 - for (int num = 0; num < inputs.size(); num++) {
 - JobEdge edge = inputs.get(num);
 - IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
 - this.inputs.add(ires);
 - int consumerIndex = ires.registerConsumer();
 - for (int i = 0; i < parallelism; i++) {
 - ExecutionVertex ev = taskVertices[i];
 - ev.connectSource(num, ires, edge, consumerIndex);
 - }
 - }
 - }
 
拆分計劃(可執(zhí)行能力)
- // ExecutionVertex
 - public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
 - final DistributionPattern pattern = edge.getDistributionPattern();
 - final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
 - ExecutionEdge[] edges;
 - switch (pattern) {
 - // 下游 JobVertex 的輸入 partition 算法,如果是 forward 或 rescale 的話為 POINTWISE
 - case POINTWISE:
 - edges = connectPointwise(sourcePartitions, inputNumber);
 - break;
 - // 每一個并行的ExecutionVertex節(jié)點都會鏈接到源節(jié)點產(chǎn)生的所有中間結(jié)果IntermediateResultPartition
 - case ALL_TO_ALL:
 - edges = connectAllToAll(sourcePartitions, inputNumber);
 - break;
 - default:
 - throw new RuntimeException("Unrecognized distribution pattern.");
 - }
 - inputEdges[inputNumber] = edges;
 - for (ExecutionEdge ee : edges) {
 - ee.getSource().addConsumer(ee, consumerNumber);
 - }
 - }
 - private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
 - final int numSources = sourcePartitions.length;
 - final int parallelism = getTotalNumberOfParallelSubtasks();
 - // 如果并發(fā)數(shù)等于partition數(shù),則一對一進(jìn)行連接
 - if (numSources == parallelism) {
 - return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) };
 - }
 - // 如果并發(fā)數(shù)大于partition數(shù),則一對多進(jìn)行連接
 - else if (numSources < parallelism) {
 - int sourcePartition;
 - if (parallelism % numSources == 0) {
 - int factor = parallelism / numSources;
 - sourcePartition = subTaskIndex / factor;
 - }
 - else {
 - float factor = ((float) parallelism) / numSources;
 - sourcePartition = (int) (subTaskIndex / factor);
 - }
 - return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };
 - }
 - // 果并發(fā)數(shù)小于partition數(shù),則多對一進(jìn)行連接
 - else {
 - if (numSources % parallelism == 0) {
 - int factor = numSources / parallelism;
 - int startIndex = subTaskIndex * factor;
 - ExecutionEdge[] edges = new ExecutionEdge[factor];
 - for (int i = 0; i < factor; i++) {
 - edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber);
 - }
 - return edges;
 - }
 - else {
 - float factor = ((float) numSources) / parallelism;
 - int start = (int) (subTaskIndex * factor);
 - int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
 - sourcePartitions.length :
 - (int) ((subTaskIndex + 1) * factor);
 - ExecutionEdge[] edges = new ExecutionEdge[end - start];
 - for (int i = 0; i < edges.length; i++) {
 - edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber);
 - }
 - return edges;
 - }
 - }
 - }
 - private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
 - ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
 - for (int i = 0; i < sourcePartitions.length; i++) {
 - IntermediateResultPartition irp = sourcePartitions[i];
 - edges[i] = new ExecutionEdge(irp, this, inputNumber);
 - }
 - return edges;
 - }
 
返回ExecutionGraph
TaskManager
TaskManager啟動
- public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
 - //主要初始化一堆的service,并新建一個org.apache.flink.runtime.taskexecutor.TaskExecutor
 - final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration,resourceId);
 - //調(diào)用TaskExecutor的start()方法
 - taskManagerRunner.start();
 - }
 
TaskExecutor :submitTask()
接著的重要函數(shù)是shumitTask()函數(shù),該函數(shù)會通過AKKA機制,向TaskManager發(fā)出一個submitTask的消息請求,TaskManager收到消息請求后,會執(zhí)行submitTask()方法。(省略了部分代碼)。
- public CompletableFuture<Acknowledge> submitTask(
 - TaskDeploymentDescriptor tdd,
 - JobMasterId jobMasterId,
 - Time timeout) {
 - jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
 - taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
 - TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(xxx);
 - InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(xxx);
 - TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
 - CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
 - LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
 - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
 - PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
 - final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
 - jobId,
 - tdd.getAllocationId(),
 - taskInformation.getJobVertexId(),
 - tdd.getSubtaskIndex());
 - final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
 - final TaskStateManager taskStateManager = new TaskStateManagerImpl(
 - jobId,
 - tdd.getExecutionAttemptId(),
 - localStateStore,
 - taskRestore,
 - checkpointResponder);
 - //新建一個Task
 - Task task = new Task(xxxx);
 - log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
 - boolean taskAdded;
 - try {
 - taskAdded = taskSlotTable.addTask(task);
 - } catch (SlotNotFoundException | SlotNotActiveException e) {
 - throw new TaskSubmissionException("Could not submit task.", e);
 - }
 - if (taskAdded) {
 - //啟動任務(wù)
 - task.startTaskThread();
 - return CompletableFuture.completedFuture(Acknowledge.get());
 - }
 
最后創(chuàng)建執(zhí)行Task的線程,然后調(diào)用startTaskThread()來啟動具體的執(zhí)行線程,Task線程內(nèi)部的run()方法承載了被執(zhí)行的核心邏輯。
Task是執(zhí)行在TaskExecutor進(jìn)程里的一個線程,下面來看看其run方法
(1) 檢測當(dāng)前狀態(tài),正常情況為CREATED,如果是FAILED或CANCELING直接返回,其余狀態(tài)將拋異常。
(2) 讀取DistributedCache文件。
(3) 啟動ResultPartitionWriter和InputGate。
(4) 向taskEventDispatcher注冊partitionWriter。
(5) 根據(jù)nameOfInvokableClass加載對應(yīng)的類并實例化。
(6) 將狀態(tài)置為RUNNING并執(zhí)行invoke方法。
- public void run() {
 - while (true) {
 - ExecutionState current = this.executionState;
 - invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
 - network.registerTask(this);
 - Environment env = new RuntimeEnvironment(. . . . );
 - invokable.setEnvironment(env);
 - // actual task core work
 - if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
 - }
 - // notify everyone that we switched to running
 - notifyObservers(ExecutionState.RUNNING, null);
 - executingThread.setContextClassLoader(userCodeClassLoader);
 - // run the invokable
 - invokable.invoke();
 - if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
 - notifyObservers(ExecutionState.FINISHED, null);
 - }
 - Finally{
 - // free the network resources
 - network.unregisterTask(this);
 - // free memory resources
 - if (invokable != null) {
 - memoryManager.releaseAll(invokable);
 - }
 - libraryCache.unregisterTask(jobId, executionId);
 - removeCachedFiles(distributedCacheEntries, fileCache);
 
總結(jié)
整體的流程與架構(gòu)可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實現(xiàn)是艱辛的。源碼的復(fù)雜度和當(dāng)初設(shè)計框架的抓狂感,我們只有想象?,F(xiàn)在我們只是站在巨人的肩膀上去學(xué)習(xí)。
本篇的主題是"Flink架構(gòu)與執(zhí)行流程",做下小結(jié),F(xiàn)link on Yarn的提交執(zhí)行流程:
1 Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置。
2 向Yarn ResourceManager提交任務(wù)。
3 ResourceManager分配Container資源并通知對應(yīng)的NodeManager啟動ApplicationMaster。
4 ApplicationMaster啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境。
5 啟動JobManager之后ApplicationMaster向ResourceManager申請資源啟動TaskManager。
6 ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節(jié)點的NodeManager啟動TaskManager。
7 NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動TaskManager。
8 TaskManager啟動后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)。























 
 
 







 
 
 
 