偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Flink執(zhí)行流程與源碼分析

大數(shù)據(jù)
整體的流程與架構(gòu)可能三兩張圖或者三言?xún)烧Z(yǔ)就可以勾勒出畫(huà)面,但是背后源碼的實(shí)現(xiàn)是艱辛的。源碼的復(fù)雜度和當(dāng)初設(shè)計(jì)框架的抓狂感,我們只有想象?,F(xiàn)在我們只是站在巨人的肩膀上去學(xué)習(xí)。

[[422512]]

本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)左右手」,作者王了個(gè)博。轉(zhuǎn)載本文請(qǐng)聯(lián)系大數(shù)據(jù)左右手公眾號(hào)。

Flink主要組件

作業(yè)管理器(JobManager)

(1) 控制一個(gè)應(yīng)用程序執(zhí)行的主進(jìn)程,也就是說(shuō),每個(gè)應(yīng)用程序 都會(huì)被一個(gè)不同的Jobmanager所控制執(zhí)行

(2) Jobmanager會(huì)先接收到要執(zhí)行的應(yīng)用程序,這個(gè)應(yīng)用程序會(huì)包括:作業(yè)圖( Job Graph)、邏輯數(shù)據(jù)流圖( ogical dataflow graph)和打包了所有的類(lèi)、庫(kù)和其它資源的JAR包。

(3) Jobmanager會(huì)把 Jobgraph轉(zhuǎn)換成一個(gè)物理層面的 數(shù)據(jù)流圖,這個(gè)圖被叫做 “執(zhí)行圖”(Executiongraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)。Job Manager會(huì)向資源管理器( Resourcemanager)請(qǐng)求執(zhí)行任務(wù)必要的資源,也就是 任務(wù)管理器(Taskmanager)上的插槽slot。一旦它獲取到了足夠的資源,就會(huì)將執(zhí)行圖分發(fā)到真正運(yùn)行它們的 Taskmanager上。而在運(yùn)行過(guò)程中Jobmanagera會(huì)負(fù)責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說(shuō)檢查點(diǎn)(checkpoints)的協(xié)調(diào)。

任務(wù)管理器(Taskmanager)

(1) Flink中的工作進(jìn)程。通常在 Flink中會(huì)有多個(gè) Taskmanageria運(yùn)行, 每個(gè) Taskmanageri都包含了一定數(shù)量的插槽( slots)。插槽的數(shù)量限制了Taskmanageri能夠執(zhí)行的任務(wù)數(shù)量。

(2) 啟動(dòng)之后, Taskmanager會(huì)向資源管理器注冊(cè)它的插槽;收到資源管理器的指令后, Taskmanageri就會(huì)將一個(gè)或者多個(gè)插槽提供給Jobmanageri調(diào)用。Jobmanager就可以向插槽分配任務(wù)( tasks)來(lái)執(zhí)行了。

(3) 在執(zhí)行過(guò)程中, 一個(gè) Taskmanagera可以跟其它運(yùn)行同一應(yīng)用程序的Taskmanager交換數(shù)據(jù)。

資源管理器(Resource Manager)

(1) 主要負(fù)責(zé)管理任務(wù)管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定義的處理資源單元。

(2) Flink 為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如YARNMesos、K8s,以及 standalone部署。

(3) 當(dāng) Jobmanager申請(qǐng)插槽資源時(shí), Resourcemanager會(huì)將有空閑插槽的Taskmanager?分配給Jobmanager。如果 Resourcemanagery沒(méi)有足夠的插槽來(lái)滿(mǎn)足 Jobmanager的請(qǐng)求, 它還可以向資源提供平臺(tái)發(fā)起會(huì)話,以提供啟動(dòng) Taskmanager進(jìn)程的容器。

分發(fā)器(Dispatcher)

(1) 可以跨作業(yè)運(yùn)行,它為應(yīng)用提交提供了REST接口。

(2)當(dāng)一個(gè)應(yīng)用被提交執(zhí)行時(shí),分發(fā)器就會(huì)啟動(dòng)并將應(yīng)用移交給Jobmanage

(3) Dispatcher他會(huì)啟動(dòng)一個(gè) WebUi,用來(lái)方便地 展示和監(jiān)控作業(yè)執(zhí)行的信息。

任務(wù)提交流程

  1. 提交應(yīng)用
  2. 啟動(dòng)并提交應(yīng)用
  3. 請(qǐng)求slots
  4. 任務(wù)啟動(dòng)
  5. 注冊(cè)slots
  6. 發(fā)出提供slot的指令
  7. 提供slots
  8. 提交要在slots中執(zhí)行的任務(wù)
  9. 交換數(shù)據(jù)

任務(wù)提交流程(YARN)

a. Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置

b. 隨后向 Yarn ResourceManager提交任務(wù)ResourceManager分配 Container資源并通知對(duì)應(yīng)的NodeManager啟動(dòng)

c. ApplicationMaster,ApplicationMaster 啟動(dòng)后加載Flink的Jar包和配置構(gòu)建環(huán)境

d. 然后啟動(dòng)JobManager , 之后ApplicationMaster 向ResourceManager 申請(qǐng)資源啟動(dòng)TaskManager

e. ResourceManager 分配 Container 資源后 , 由ApplicationMaster通知資源所在節(jié)點(diǎn)的NodeManager啟動(dòng)TaskManager

f. NodeManager 加載 Flink 的 Jar 包和配置構(gòu)建環(huán)境并啟動(dòng) TaskManager

g. TaskManager 啟動(dòng)后向 JobManager 發(fā)送心跳包,并等待 JobManager 向其分配任務(wù)。

源碼分析--集群?jiǎn)?dòng) JobManager 啟動(dòng)分析

JobManager 的內(nèi)部包含非常重要的三大組件

  • WebMonitorEndpoint
  • ResourceManager
  • Dispatcher

入口,啟動(dòng)主類(lèi):StandaloneSessionClusterEntrypoint

  1. // 入 口 
  2. StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint); 
  3. clusterEntrypoint.startCluster();  
  4. runCluster(configuration, pluginManager); 
  5.  
  6. // 第一步:初始化各種服務(wù) 
  7.  /** 
  8.   * 初始化了 主節(jié)點(diǎn)對(duì)外提供服務(wù)的時(shí)候所需要的 三大核心組件啟動(dòng)時(shí)所需要的基礎(chǔ)服務(wù) 
  9.   *  初始化服務(wù),如 JobManager 的 Akka RPC 服務(wù),HA 服務(wù),心跳檢查服務(wù),metric service 
  10.   *  這些服務(wù)都是 Master 節(jié)點(diǎn)要使用到的一些服務(wù) 
  11.   *  1、commonRpcService:  基于 Akka 的 RpcService 實(shí)現(xiàn)。RPC 服務(wù)啟動(dòng) Akka 參與者來(lái)接收從 RpcGateway 調(diào)用 RPC 
  12.   *  2、haServices:    提供對(duì)高可用性所需的所有服務(wù)的訪問(wèn)注冊(cè),分布式計(jì)數(shù)器和領(lǐng)導(dǎo)人選舉 
  13.   *  3、blobServer:    負(fù)責(zé)偵聽(tīng)傳入的請(qǐng)求生成線程來(lái)處理這些請(qǐng)求。它還負(fù)責(zé)創(chuàng)建要存儲(chǔ)的目錄結(jié)構(gòu) blob 或臨時(shí)緩存它們 
  14.   *  4、heartbeatServices:  提供心跳所需的所有服務(wù)。這包括創(chuàng)建心跳接收器和心跳發(fā)送者。 
  15.   *  5、metricRegistry:   跟蹤所有已注冊(cè)的 Metric,它作為連接 MetricGroup 和 MetricReporter 
  16.   *  6、archivedExecutionGraphStore:   存儲(chǔ)執(zhí)行圖ExecutionGraph的可序列化形式。 
  17. */ 
  18. initializeServices(configuration, pluginManager); 
  19.  
  20. // 創(chuàng)建 DispatcherResourceManagerComponentFactory, 初始化各種組件的 
  21. 工廠實(shí)例 
  22. // 其實(shí)內(nèi)部包含了三個(gè)重要的成員變量: 
  23. // 創(chuàng)建 ResourceManager 的工廠實(shí)例 
  24. // 創(chuàng)建 Dispatcher 的工廠實(shí)例 
  25. // 創(chuàng)建 WebMonitorEndpoint 的工廠實(shí)例 
  26. createDispatcherResourceManagerComponentFactory(configuration); 
  27.  
  28. // 創(chuàng)建 集群運(yùn)行需要的一些組件:Dispatcher, ResourceManager 等 
  29. // 創(chuàng) 建 ResourceManager 
  30. // 創(chuàng) 建 Dispatcher 
  31. // 創(chuàng) 建 WebMonitorEndpoint 
  32. clusterComponent = dispatcherResourceManagerComponentFactory.create(...) 

1. initializeServices():初始化各種服務(wù)

  1. // 初 始 化 和 啟 動(dòng) AkkaRpcService, 內(nèi) 部 其 實(shí) 包 裝 了 一 個(gè) ActorSystem commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...) 
  2.  
  3. // 初始化一個(gè)負(fù)責(zé) IO 的線程池 
  4. ioExecutor = Executors.newFixedThreadPool(...) 
  5. // 初始化 HA 服務(wù)組件,負(fù)責(zé) HA 服務(wù)的是:ZooKeeperHaServices haServices = createHaServices(configuration, ioExecutor); 
  6.  
  7. // 初始化 BlobServer 服務(wù)端 
  8. blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); 
  9.  
  10. // 初始化心跳服務(wù)組件, heartbeatServices = HeartbeatServices heartbeatServices = createHeartbeatServices(configuration); 
  11.  
  12. // 初始化一個(gè)用來(lái)存儲(chǔ) ExecutionGraph 的 Store, 實(shí)現(xiàn)是: 
  13. FileArchivedExecutionGraphStore 
  14. archivedExecutionGraphStore = createSerializableExecutionGraphStore(...) 

2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多組件的工廠實(shí)例

  1. 1、DispatcherRunnerFactory,默認(rèn)實(shí)現(xiàn):DefaultDispatcherRunnerFactory  
  2.  
  3. 2、ResourceManagerFactory,默認(rèn)實(shí)現(xiàn):StandaloneResourceManagerFactory  
  4.  
  5. 3、RestEndpointFactory,默認(rèn)實(shí)現(xiàn):SessionRestEndpointFactory 
  6.  
  7. clusterComponent = dispatcherResourceManagerComponentFactory 
  8.     .create(configuration, ioExecutor, commonRpcService, haServices, 
  9.      blobServer, heartbeatServices, metricRegistry, 
  10.      archivedExecutionGraphStore, 
  11.      new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), 
  12.      this); 

3. 創(chuàng)建 WebMonitorEndpoint

  1. /************************************************* 
  2.   *  創(chuàng)建 WebMonitorEndpoint 實(shí)例, 在 Standalone 模式下:DispatcherRestEndpoint 
  3.   *  1、restEndpointFactory = SessionRestEndpointFactory 
  4.   *  2、webMonitorEndpoint = DispatcherRestEndpoint 
  5.   *  3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService() = ZooKeeperLeaderElectionService 
  6.   *  當(dāng)前這個(gè) DispatcherRestEndpoint 的作用是: 
  7.   *  1、初始化的過(guò)程中,會(huì)一大堆的 Handler 
  8.   *  2、啟動(dòng)一個(gè) Netty 的服務(wù)端,綁定了這些 Handler 
  9.   *  3、當(dāng) client 通過(guò) flink 命令執(zhí)行了某些操作(發(fā)起 restful 請(qǐng)求), 服務(wù)端由 webMonitorEndpoint 來(lái)執(zhí)行處理 
  10.   *  4、舉個(gè)例子: 如果通過(guò) flink run 提交一個(gè) Job,那么最后是由 webMonitorEndpoint 中的 JobSubmitHandler 來(lái)執(zhí)行處理 
  11.   *  5、補(bǔ)充一個(gè):job 由 JobSubmitHandler 執(zhí)行完畢之后,轉(zhuǎn)交給 Dispatcher 去調(diào)度執(zhí)行 
  12.   */ 
  13.  webMonitorEndpoint = restEndpointFactory.createRestEndpoint( 
  14.   configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, 
  15.   blobServer, executor, metricFetcher, 
  16.   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), 
  17.   fatalErrorHandler 
  18.  ); 

4. 創(chuàng)建 resourceManager

  1. /************************************************* 
  2.  *  創(chuàng)建 StandaloneResourceManager 實(shí)例對(duì)象 
  3.  *  1、resourceManager = StandaloneResourceManager 
  4.  *  2、resourceManagerFactory = StandaloneResourceManagerFactory 
  5. */ 
  6. resourceManager = resourceManagerFactory.createResourceManager( 
  7.  configuration, ResourceID.generate(), 
  8.  rpcService, highAvailabilityServices, heartbeatServices, 
  9.  fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), 
  10.  webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname 
  11. ); 
  1. protected ResourceManager<ResourceID> createResourceManager( 
  2.   Configuration configuration, 
  3.   ResourceID resourceId, 
  4.   RpcService rpcService, 
  5.   HighAvailabilityServices highAvailabilityServices, 
  6.   HeartbeatServices heartbeatServices, 
  7.   FatalErrorHandler fatalErrorHandler, 
  8.   ClusterInformation clusterInformation, 
  9.   @Nullable String webInterfaceUrl, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, 
  11.   ResourceManagerRuntimeServices resourceManagerRuntimeServices) { 
  12.  
  13.  final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); 
  14.  
  15.  /************************************************* 
  16.   *  注釋?zhuān)?nbsp;得到一個(gè) StandaloneResourceManager 實(shí)例對(duì)象 
  17.   */ 
  18.  return new StandaloneResourceManager( 
  19.   rpcService, 
  20.   resourceId, 
  21.   highAvailabilityServices, 
  22.   heartbeatServices, 
  23.   resourceManagerRuntimeServices.getSlotManager(), 
  24.   ResourceManagerPartitionTrackerImpl::new, 
  25.   resourceManagerRuntimeServices.getJobLeaderIdService(), 
  26.   clusterInformation, 
  27.   fatalErrorHandler, 
  28.   resourceManagerMetricGroup, 
  29.   standaloneClusterStartupPeriodTime, 
  30.   AkkaUtils.getTimeoutAsTime(configuration) 
  31.  ); 
  32.  
  33.  } 
  34.   
  1. /** 
  2. requestSlot():接受 solt請(qǐng)求 
  3. sendSlotReport(..): 將solt請(qǐng)求發(fā)送TaskManager 
  4. registerJobManager(...): 注冊(cè)job管理者。 該job指的是 提交給flink的應(yīng)用程序 
  5. registerTaskExecutor(...): 注冊(cè)task執(zhí)行者。 
  6. **/ 
  7. public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, 
  8.   HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, 
  9.   JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) { 
  11.  
  12.  /************************************************* 
  13.   *  注釋?zhuān)?nbsp;當(dāng)執(zhí)行完畢這個(gè)構(gòu)造方法的時(shí)候,會(huì)觸發(fā)調(diào)用 onStart() 方法執(zhí)行 
  14.   */ 
  15.  super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null); 
  1. protected RpcEndpoint(final RpcService rpcService, final String endpointId) { 
  2.  this.rpcService = checkNotNull(rpcService, "rpcService"); 
  3.  this.endpointId = checkNotNull(endpointId, "endpointId"); 
  4.  
  5.  /************************************************* 
  6.   *  注釋?zhuān)篟esourceManager 或者 TaskExecutor 中的 RpcServer 實(shí)現(xiàn) 
  7.   *  以 ResourceManager 為例說(shuō)明: 
  8.   *  啟動(dòng) ResourceManager 的 RPCServer 服務(wù) 
  9.   *  這里啟動(dòng)的是 ResourceManager 的 Rpc 服務(wù)端。 
  10.   *  接收 TaskManager 啟動(dòng)好了而之后, 進(jìn)行注冊(cè)和心跳,來(lái)匯報(bào) Taskmanagaer 的資源情況 
  11.   *  通過(guò)動(dòng)態(tài)代理的形式構(gòu)建了一個(gè)Server 
  12.   */ 
  13.  this.rpcServer = rpcService.startServer(this); 

5. 在創(chuàng)建resourceManager同級(jí):?jiǎn)?dòng)任務(wù)接收器Starting Dispatcher

  1. /************************************************* 
  2.  
  3.  *  創(chuàng)建 并啟動(dòng) Dispatcher 
  4.  *  1、dispatcherRunner = DispatcherRunnerLeaderElectionLifecycleManager 
  5.  *  2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory 
  6.  *  第一個(gè)參數(shù):ZooKeeperLeaderElectionService 
  7.  *  - 
  8.  *  老版本: 這個(gè)地方是直接創(chuàng)建一個(gè) Dispatcher 對(duì)象然后調(diào)用 dispatcher.start() 來(lái)啟動(dòng) 
  9.  *  新版本: 直接創(chuàng)建一個(gè) DispatcherRunner, 內(nèi)部就是要?jiǎng)?chuàng)建和啟動(dòng) Dispatcher 
  10.  *  - 
  11.  *  DispatcherRunner 是對(duì) Dispatcher 的封裝。 
  12.  *  DispatcherRunner被創(chuàng)建的代碼的內(nèi)部,會(huì)創(chuàng)建 Dispatcher并啟動(dòng) 
  13.  */ 
  14. log.debug("Starting Dispatcher."); 
  15. dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( 
  16.  highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, 
  17.  // TODO_ZYM 注釋?zhuān)?nbsp;注意第三個(gè)參數(shù) 
  18.  new HaServicesJobGraphStoreFactory(highAvailabilityServices), 
  19.  ioExecutor, rpcService, partialDispatcherServices 
  20. ); 

Dispatcher 啟動(dòng)后,將會(huì)等待任務(wù)提交,如果有任務(wù)提交,則會(huì)經(jīng)過(guò)submitJob(...)函數(shù)進(jìn)入后續(xù)處理。

提交(一個(gè)Flink應(yīng)用的提交必須經(jīng)過(guò)三個(gè)graph的轉(zhuǎn)換)

首先看下一些名詞

StreamGraph

是根據(jù)用戶(hù)通過(guò) Stream API 編寫(xiě)的代碼生成的最初的圖。用來(lái)表示程序的拓?fù)浣Y(jié)構(gòu)??梢杂靡粋€(gè) DAG 來(lái)表示),DAG 的頂點(diǎn)是 StreamNode,邊是 StreamEdge,邊包含了由哪個(gè) StreamNode 依賴(lài)哪個(gè) StreamNode。

  • StreamNode:用來(lái)代表 operator 的類(lèi),并具有所有相關(guān)的屬性,如并發(fā)度、入邊和出邊等。
  • StreamEdge:表示連接兩個(gè)StreamNode的邊。

DataStream 上常見(jiàn)的 transformation 有 map、flatmap、filter等(見(jiàn)DataStream Transformation了解更多)。這些transformation會(huì)構(gòu)造出一棵 StreamTransformation 樹(shù),通過(guò)這棵樹(shù)轉(zhuǎn)換成 StreamGraph

以map方法為例,看看源碼

  1. public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { 
  2.   // 通過(guò)java reflection抽出mapper的返回值類(lèi)型 
  3.   TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), 
  4.       Utils.getCallLocationName(), true); 
  5.  
  6.   // 返回一個(gè)新的DataStream,SteramMap 為 StreamOperator 的實(shí)現(xiàn)類(lèi) 
  7.   return transform("Map", outType, new StreamMap<>(clean(mapper))); 
  8.  
  9. public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { 
  10.   // read the output type of the input Transform to coax out errors about MissingTypeInfo 
  11.   transformation.getOutputType(); 
  12.  
  13.   // 新的transformation會(huì)連接上當(dāng)前DataStream中的transformation,從而構(gòu)建成一棵樹(shù) 
  14.   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( 
  15.       this.transformation, 
  16.       operatorName, 
  17.       operator, 
  18.       outTypeInfo, 
  19.       environment.getParallelism()); 
  20.  
  21.   @SuppressWarnings({ "unchecked""rawtypes" }) 
  22.   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); 
  23.  
  24.   // 所有的transformation都會(huì)存到 env 中,調(diào)用execute時(shí)遍歷該list生成StreamGraph 
  25.   getExecutionEnvironment().addOperator(resultTransform); 
  26.  
  27.   return returnStream; 

map轉(zhuǎn)換將用戶(hù)自定義的函數(shù)MapFunction包裝到StreamMap這個(gè)Operator中,再將StreamMap包裝到OneInputTransformation,最后該transformation存到env中,當(dāng)調(diào)用env.execute時(shí),遍歷其中的transformation集合構(gòu)造出StreamGraph

JobGraph

(1) StreamGraph經(jīng)過(guò)優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn)。

  • 將并不涉及到 shuffle 的算子進(jìn)行合并。
  • 對(duì)于同一個(gè) operator chain 里面的多個(gè)算子,會(huì)在同一個(gè) task 中執(zhí)行。
  • 對(duì)于不在同一個(gè) operator chain 里的算子,會(huì)在不同的 task 中執(zhí)行。

(2) JobGraph 用來(lái)由 JobClient 提交給 JobManager,是由頂點(diǎn)(JobVertex)、中間結(jié)果(IntermediateDataSet)和邊(JobEdge)組成的 DAG 圖。

(3) JobGraph 定義作業(yè)級(jí)別的配置,而每個(gè)頂點(diǎn)和中間結(jié)果定義具體操作和中間數(shù)據(jù)的設(shè)置。

JobVertex

JobVertex 相當(dāng)于是 JobGraph 的頂點(diǎn)。經(jīng)過(guò)優(yōu)化后符合條件的多個(gè)StreamNode可能會(huì)chain在一起生成一個(gè)JobVertex,即一個(gè)JobVertex包含一個(gè)或多個(gè)operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。

IntermediateDataSet

JobVertex的輸出,即經(jīng)過(guò)operator處理產(chǎn)生的數(shù)據(jù)集。

JobEdge

job graph中的一條數(shù)據(jù)傳輸通道。source 是IntermediateDataSet,sink 是 JobVertex。即數(shù)據(jù)通過(guò)JobEdge由IntermediateDataSet傳遞給目標(biāo)JobVertex。

(1) 首先是通過(guò)API會(huì)生成transformations,通過(guò)transformations會(huì)生成StreamGraph。

(2)將StreamGraph的某些StreamNode Chain在一起生成JobGraph,前兩步轉(zhuǎn)換都是在客戶(hù)端完成。

(3)最后會(huì)將JobGraph轉(zhuǎn)換為ExecutionGraph,相比JobGraph會(huì)增加并行度的概念,這一步是在Jobmanager里完成。

ExecutionJobVertex

ExecutionJobVertex一一對(duì)應(yīng)JobGraph中的JobVertex

ExecutionVertex

一個(gè)ExecutionJobVertex對(duì)應(yīng)n個(gè)ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任務(wù)的一個(gè)子任務(wù)

Execution

Execution 是對(duì) ExecutionVertex 的一次執(zhí)行,通過(guò) ExecutionAttemptId 來(lái)唯一標(biāo)識(shí)。

IntermediateResult

在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的對(duì)外輸出,一個(gè) JobGraph 可能有 n(n >=0) 個(gè)輸出。在 ExecutionGraph 中,與此對(duì)應(yīng)的就是 IntermediateResult。每一個(gè) IntermediateResult 就有 numParallelProducers(并行度) 個(gè)生產(chǎn)者,每個(gè)生產(chǎn)者的在相應(yīng)的 IntermediateResult 上的輸出對(duì)應(yīng)一個(gè) IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一個(gè)輸出分區(qū)

ExecutionEdge

ExecutionEdge 表示 ExecutionVertex 的輸入,通過(guò) ExecutionEdge 將 ExecutionVertex 和 IntermediateResultPartition 連接起來(lái),進(jìn)而在不同的 ExecutionVertex 之間建立聯(lián)系。

ExecutionGraph的構(gòu)建

  1. 構(gòu)建JobInformation
  2. 構(gòu)建ExecutionGraph
  3. 將JobGraph進(jìn)行拓?fù)渑判?獲取sortedTopology頂點(diǎn)集合
  1. // ExecutionGraphBuilder 
  2.  public static ExecutionGraph buildGraph( 
  3.   @Nullable ExecutionGraph prior
  4.   JobGraph jobGraph, 
  5.   ...) throws JobExecutionException, JobException { 
  6.   // 構(gòu)建JobInformation 
  7.    
  8.   // 構(gòu)建ExecutionGraph 
  9.    
  10.   // 將JobGraph進(jìn)行拓?fù)渑判?獲取sortedTopology頂點(diǎn)集合 
  11.   List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); 
  12.    
  13.   executionGraph.attachJobGraph(sortedTopology); 
  14.  
  15.   return executionGraph; 
  16.  } 

構(gòu)建ExecutionJobVertex,連接IntermediateResultPartition和ExecutionVertex

  1. //ExecutionGraph 
  2.  public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException { 
  3.   for (JobVertex jobVertex : topologiallySorted) { 
  4.    // 構(gòu)建ExecutionJobVertex 
  5.    ExecutionJobVertex ejv = new ExecutionJobVertex( 
  6.      this, 
  7.      jobVertex, 
  8.      1, 
  9.      maxPriorAttemptsHistoryLength, 
  10.      rpcTimeout, 
  11.      globalModVersion, 
  12.      createTimestamp); 
  13.    // 連接IntermediateResultPartition和ExecutionVertex 
  14.    ev.connectToPredecessors(this.intermediateResults); 
  15.  } 
  16.    
  17.    
  18.   // ExecutionJobVertex 
  19.  public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException { 
  20.   List<JobEdge> inputs = jobVertex.getInputs(); 
  21.    
  22.   for (int num = 0; num < inputs.size(); num++) { 
  23.    JobEdge edge = inputs.get(num); 
  24.    IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); 
  25.    this.inputs.add(ires); 
  26.    int consumerIndex = ires.registerConsumer(); 
  27.     
  28.    for (int i = 0; i < parallelism; i++) { 
  29.     ExecutionVertex ev = taskVertices[i]; 
  30.     ev.connectSource(num, ires, edge, consumerIndex); 
  31.    } 
  32.   } 
  33.  } 

拆分計(jì)劃(可執(zhí)行能力)

  1. // ExecutionVertex 
  2.  public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) { 
  3.  
  4.   final DistributionPattern pattern = edge.getDistributionPattern(); 
  5.   final IntermediateResultPartition[] sourcePartitions = source.getPartitions(); 
  6.  
  7.   ExecutionEdge[] edges; 
  8.  
  9.   switch (pattern) { 
  10.    // 下游 JobVertex 的輸入 partition 算法,如果是 forward 或 rescale 的話為 POINTWISE 
  11.    case POINTWISE: 
  12.     edges = connectPointwise(sourcePartitions, inputNumber); 
  13.     break; 
  14.    // 每一個(gè)并行的ExecutionVertex節(jié)點(diǎn)都會(huì)鏈接到源節(jié)點(diǎn)產(chǎn)生的所有中間結(jié)果IntermediateResultPartition 
  15.    case ALL_TO_ALL: 
  16.     edges = connectAllToAll(sourcePartitions, inputNumber); 
  17.     break; 
  18.  
  19.    default
  20.     throw new RuntimeException("Unrecognized distribution pattern."); 
  21.  
  22.   } 
  23.  
  24.   inputEdges[inputNumber] = edges; 
  25.   for (ExecutionEdge ee : edges) { 
  26.    ee.getSource().addConsumer(ee, consumerNumber); 
  27.   } 
  28.  } 
  29.  
  30.  
  31.  private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  32.   final int numSources = sourcePartitions.length; 
  33.   final int parallelism = getTotalNumberOfParallelSubtasks(); 
  34.  
  35.   // 如果并發(fā)數(shù)等于partition數(shù),則一對(duì)一進(jìn)行連接 
  36.   if (numSources == parallelism) { 
  37.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; 
  38.   } 
  39.   //  如果并發(fā)數(shù)大于partition數(shù),則一對(duì)多進(jìn)行連接 
  40.   else if (numSources < parallelism) { 
  41.  
  42.    int sourcePartition; 
  43.  
  44.    if (parallelism % numSources == 0) { 
  45.     int factor = parallelism / numSources; 
  46.     sourcePartition = subTaskIndex / factor; 
  47.    } 
  48.    else { 
  49.     float factor = ((float) parallelism) / numSources; 
  50.     sourcePartition = (int) (subTaskIndex / factor); 
  51.    } 
  52.  
  53.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) }; 
  54.   } 
  55.   // 果并發(fā)數(shù)小于partition數(shù),則多對(duì)一進(jìn)行連接 
  56.   else { 
  57.    if (numSources % parallelism == 0) { 
  58.     int factor = numSources / parallelism; 
  59.     int startIndex = subTaskIndex * factor; 
  60.  
  61.     ExecutionEdge[] edges = new ExecutionEdge[factor]; 
  62.     for (int i = 0; i < factor; i++) { 
  63.      edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber); 
  64.     } 
  65.     return edges; 
  66.    } 
  67.    else { 
  68.     float factor = ((float) numSources) / parallelism; 
  69.  
  70.     int start = (int) (subTaskIndex * factor); 
  71.     int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ? 
  72.       sourcePartitions.length : 
  73.       (int) ((subTaskIndex + 1) * factor); 
  74.  
  75.     ExecutionEdge[] edges = new ExecutionEdge[end - start]; 
  76.     for (int i = 0; i < edges.length; i++) { 
  77.      edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber); 
  78.     } 
  79.  
  80.     return edges; 
  81.    } 
  82.   } 
  83.  } 
  84.  
  85.  
  86.  private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  87.   ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length]; 
  88.  
  89.   for (int i = 0; i < sourcePartitions.length; i++) { 
  90.    IntermediateResultPartition irp = sourcePartitions[i]; 
  91.    edges[i] = new ExecutionEdge(irp, this, inputNumber); 
  92.   } 
  93.  
  94.   return edges; 
  95.  } 

返回ExecutionGraph

TaskManager

TaskManager啟動(dòng)

  1. public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { 
  2.         //主要初始化一堆的service,并新建一個(gè)org.apache.flink.runtime.taskexecutor.TaskExecutor 
  3.   final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration,resourceId); 
  4.   //調(diào)用TaskExecutor的start()方法 
  5.         taskManagerRunner.start(); 

TaskExecutor :submitTask()

接著的重要函數(shù)是shumitTask()函數(shù),該函數(shù)會(huì)通過(guò)AKKA機(jī)制,向TaskManager發(fā)出一個(gè)submitTask的消息請(qǐng)求,TaskManager收到消息請(qǐng)求后,會(huì)執(zhí)行submitTask()方法。(省略了部分代碼)。

  1. public CompletableFuture<Acknowledge> submitTask( 
  2.    TaskDeploymentDescriptor tdd, 
  3.    JobMasterId jobMasterId, 
  4.    Time timeout) { 
  5.  
  6.     jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); 
  7.     taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); 
  8.     
  9.    TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(xxx); 
  10.  
  11.    InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(xxx); 
  12.  
  13.    TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); 
  14.    CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); 
  15.  
  16.    LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); 
  17.    ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); 
  18.    PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); 
  19.  
  20.    final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( 
  21.     jobId, 
  22.     tdd.getAllocationId(), 
  23.     taskInformation.getJobVertexId(), 
  24.     tdd.getSubtaskIndex()); 
  25.  
  26.    final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); 
  27.  
  28.    final TaskStateManager taskStateManager = new TaskStateManagerImpl( 
  29.     jobId, 
  30.     tdd.getExecutionAttemptId(), 
  31.     localStateStore, 
  32.     taskRestore, 
  33.     checkpointResponder); 
  34.             //新建一個(gè)Task 
  35.    Task task = new Task(xxxx); 
  36.  
  37.    log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); 
  38.  
  39.    boolean taskAdded; 
  40.  
  41.    try { 
  42.     taskAdded = taskSlotTable.addTask(task); 
  43.    } catch (SlotNotFoundException | SlotNotActiveException e) { 
  44.     throw new TaskSubmissionException("Could not submit task.", e); 
  45.    } 
  46.  
  47.    if (taskAdded) { 
  48.        //啟動(dòng)任務(wù) 
  49.     task.startTaskThread(); 
  50.  
  51.     return CompletableFuture.completedFuture(Acknowledge.get()); 
  52.    }  

最后創(chuàng)建執(zhí)行Task的線程,然后調(diào)用startTaskThread()來(lái)啟動(dòng)具體的執(zhí)行線程,Task線程內(nèi)部的run()方法承載了被執(zhí)行的核心邏輯。

Task是執(zhí)行在TaskExecutor進(jìn)程里的一個(gè)線程,下面來(lái)看看其run方法

(1) 檢測(cè)當(dāng)前狀態(tài),正常情況為CREATED,如果是FAILED或CANCELING直接返回,其余狀態(tài)將拋異常。

(2) 讀取DistributedCache文件。

(3) 啟動(dòng)ResultPartitionWriter和InputGate。

(4) 向taskEventDispatcher注冊(cè)partitionWriter。

(5) 根據(jù)nameOfInvokableClass加載對(duì)應(yīng)的類(lèi)并實(shí)例化。

(6) 將狀態(tài)置為RUNNING并執(zhí)行invoke方法。

  1. public void run() { 
  2.         while (true) { 
  3.             ExecutionState current = this.executionState; 
  4.             invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); 
  5.             network.registerTask(this); 
  6.             Environment env = new RuntimeEnvironment(. . . . ); 
  7.             invokable.setEnvironment(env); 
  8.             //  actual task core work 
  9.             if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { 
  10.             } 
  11.             // notify everyone that we switched to running 
  12.             notifyObservers(ExecutionState.RUNNING, null); 
  13.             executingThread.setContextClassLoader(userCodeClassLoader); 
  14.             // run the invokable 
  15.             invokable.invoke(); 
  16.  
  17.             if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { 
  18.                 notifyObservers(ExecutionState.FINISHED, null); 
  19.             } 
  20.             Finally{ 
  21.                 // free the network resources 
  22.                 network.unregisterTask(this); 
  23.                 // free memory resources 
  24.                 if (invokable != null) { 
  25.                     memoryManager.releaseAll(invokable); 
  26.                 } 
  27.                 libraryCache.unregisterTask(jobId, executionId); 
  28.                 removeCachedFiles(distributedCacheEntries, fileCache); 

總結(jié)

整體的流程與架構(gòu)可能三兩張圖或者三言?xún)烧Z(yǔ)就可以勾勒出畫(huà)面,但是背后源碼的實(shí)現(xiàn)是艱辛的。源碼的復(fù)雜度和當(dāng)初設(shè)計(jì)框架的抓狂感,我們只有想象?,F(xiàn)在我們只是站在巨人的肩膀上去學(xué)習(xí)。

本篇的主題是"Flink架構(gòu)與執(zhí)行流程",做下小結(jié),F(xiàn)link on Yarn的提交執(zhí)行流程:

1 Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置。

2 向Yarn ResourceManager提交任務(wù)。

3 ResourceManager分配Container資源并通知對(duì)應(yīng)的NodeManager啟動(dòng)ApplicationMaster。

4 ApplicationMaster啟動(dòng)后加載Flink的Jar包和配置構(gòu)建環(huán)境。

5 啟動(dòng)JobManager之后ApplicationMaster向ResourceManager申請(qǐng)資源啟動(dòng)TaskManager。

6 ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節(jié)點(diǎn)的NodeManager啟動(dòng)TaskManager。

7 NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動(dòng)TaskManager。

8 TaskManager啟動(dòng)后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)。

 

責(zé)任編輯:武曉燕 來(lái)源: 大數(shù)據(jù)左右手
相關(guān)推薦

2022-04-05 12:59:07

源碼線程onEvent

2022-08-27 08:02:09

SQL函數(shù)語(yǔ)法

2016-10-21 13:03:18

androidhandlerlooper

2012-08-30 09:48:02

Struts2Java

2024-07-15 09:58:03

OpenRestyNginx日志

2016-11-25 13:26:50

Flume架構(gòu)源碼

2016-11-29 09:38:06

Flume架構(gòu)核心組件

2016-11-25 13:14:50

Flume架構(gòu)源碼

2020-07-13 09:09:23

Sentinel源碼Bucket

2022-06-07 10:33:29

Camera組件鴻蒙

2025-05-26 09:05:00

2015-01-14 13:22:36

OpenStack創(chuàng)建快照glance api

2009-12-22 13:36:39

Linux Sysfs

2017-08-22 13:45:27

2009-07-08 10:30:57

WebWork

2022-07-15 08:52:03

Linux優(yōu)化

2024-10-21 10:45:52

2017-04-19 15:32:46

ReactRouter構(gòu)建源碼

2016-11-29 16:59:46

Flume架構(gòu)源碼

2011-03-15 11:33:18

iptables
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)