聊聊Flink:Flink的運行時架構(gòu)
一、運行時架構(gòu)
上一篇我們可以看到Flink的核心組件的Deploy層,該層主要涉及了Flink的部署模式,F(xiàn)link支持多種部署模式:本地、集群(Standalone/YARN)、云(GCE/EC2)。
圖片
- Local(本地):單機模式,一般本地開發(fā)調(diào)試使用,像我們程序?qū)懙腤ordCountStream那個例子,直接運行main方法啟動。
- Cluster(集群)
- Standalone(獨立模式):Flink自帶集群,自己管理資源調(diào)度,生產(chǎn)環(huán)境也會有所應(yīng)用。
- YARN(YARN模式):計算資源統(tǒng)一由Hadoop YARN管理,生產(chǎn)環(huán)境應(yīng)用較多。
- Cloud(云端):AliCloud Realtime Compute、Amazon EMR、Huawei Cloud Stream Service 等。
我們這里主要來介紹Cluster集群的兩種模式Standalone、YARN。
二、YARN集群架構(gòu)
在講解Flink集群架構(gòu)之前,我們先了解一下YARN集群架構(gòu),我覺得是很有必要的。YARN集群總體上是經(jīng)典的主/從(Master/Slave)架構(gòu),主要由ResourceManager、NodeManager、ApplicationMaster和Container等幾個組件構(gòu)成。
圖片
2.1 ResourceManager
以后臺進程的形式運行,負責對集群資源進行統(tǒng)一管理和任務(wù)調(diào)度。ResourceManager的主要職責如下:
- 接收來自客戶端的請求。
- 啟動和管理各個應(yīng)用程序的ApplicationMaster。
- 接收來自ApplicationMaster的資源申請,并為其分配Container。
- 管理NodeManager,接收來自NodeManager的資源和節(jié)點健康情況匯報。
2.2 NodeManager
集群中每個節(jié)點上的資源和任務(wù)管理器,以后臺進程的形式運行。它會定時向ResourceManager匯報本節(jié)點上的資源(內(nèi)存、CPU)使用情況和各個Container的運行狀態(tài),同時會接收并處理來自ApplicationMaster的Container啟動/停止等請求。NodeManager不會監(jiān)視任務(wù),它僅監(jiān)視Container中的資源使用情況,例如。如果一個Container消耗的內(nèi)存比最初分配的更多,就會結(jié)束該Container。
2.3 Task
應(yīng)用程序具體執(zhí)行的任務(wù)。一個應(yīng)用程序可能有多個任務(wù),例如一個MapReduce程序可以有多個Map任務(wù)和多個Reduce任務(wù)。
2.4 Container
YARN中資源分配的基本單位,封裝了CPU和內(nèi)存資源的一個容器,相當于一個Task運行環(huán)境的抽象。從實現(xiàn)上看,Container是一個Java抽象類,定義了資源信息。應(yīng)用程序的Task將會被發(fā)布到Container中運行,從而限定了Task使用的資源量。
一個應(yīng)用程序所需的Container分為兩類:運行ApplicationMaster的Container和運行各類Task的Container。前者是由ResourceManager向內(nèi)部的資源調(diào)度器申請和啟動的,后者是由ApplicationMaster向ResourceManager申請的,并由ApplicationMaster請求NodeManager進行啟動。
我們可以將Container類比成數(shù)據(jù)庫連接池中的連接,需要的時候進行申請,使用完畢后進行釋放,而不需要每次獨自創(chuàng)建。
2.5 ApplicationMaster
ApplicationMaster可在Container內(nèi)運行任何類型的Task。例如,MapReduce ApplicationMaster請求一個容器來啟動Map Task或Reduce Task。也可以實現(xiàn)一個自定義的ApplicationMaster來運行特定的Task,以便任何分布式框架都可以受YARN支持,只要實現(xiàn)了相應(yīng)的ApplicationMaster即可。
我們可以這樣認為:ResourceManager管理整個集群,NodeManager管理集群中的單個節(jié)點,ApplicationMaster管理單個應(yīng)用程序(集群中可能同時有多個應(yīng)用程序在運行,每個應(yīng)用程序都有各自的ApplicationMaster)。
YARN集群中應(yīng)用程序的執(zhí)行流程如下圖所示:
- 客戶端提交應(yīng)用程序(可以是MapReduce程序、Spark程序等)到ResourceManager。
- ResourceManager分配用于運行ApplicationMaster的Container,然后與NodeManager通信,要求它在該Container中啟動ApplicationMaster。ApplicationMaster啟動后,它將負責此應(yīng)用程序的整個生命周期。
- ApplicationMaster向ResourceManager注冊(注冊后可以通過ResourceManager查看應(yīng)用程序的運行狀態(tài))并請求運行應(yīng)用程序各個Task所需的Container(資源請求是對一些Container的請求)。如果符合條件,ResourceManager會分配給ApplicationMaster所需的Container(表達為Container ID和主機名)。
- ApplicationMaster請求NodeManager使用這些Container來運行應(yīng)用程序的相應(yīng)Task(即將Task發(fā)布到指定的Container中運行)。
此外,各個運行中的Task會通過RPC協(xié)議向ApplicationMaster匯報自己的狀態(tài)和進度,這樣一旦某個Task運行失敗,ApplicationMaster就可以對其重新啟動。當應(yīng)用程序運行完成時,ApplicationMaster會向ResourceManager申請注銷自己。
圖片
三、Flink Standalone模式
Flink Standalone模式為經(jīng)典的主從(Master/Slave)架構(gòu),資源調(diào)度是Flink自己實現(xiàn)的。集群啟動后,主節(jié)點上會啟動一個JobManager進程,類似YARN集群的ResourceManager,因此主節(jié)點也稱為JobManager節(jié)點;各個從節(jié)點上會啟動一個TaskManager進程,類似YARN集群的NodeManager,因此從節(jié)點也稱為TaskManager節(jié)點。從Flink 1.6版本開始,將主節(jié)點上的進程名稱改為了StandaloneSessionClusterEntrypoint,從節(jié)點的進程名稱改為了TaskManagerRunner,在這里為了方便使用,仍然沿用之前版本的稱呼,即JobManager和TaskManager。
Client接收到Flink應(yīng)用程序后,將作業(yè)提交給JobManager。JobManager要做的第一件事就是分配Task(任務(wù))所需的資源。完成資源分配后,Task將被JobManager提交給相應(yīng)的TaskManager,TaskManager會啟動線程開始執(zhí)行。在執(zhí)行過程中,TaskManager會持續(xù)向JobManager匯報狀態(tài)信息,例如開始執(zhí)行、進行中或完成等狀態(tài)。作業(yè)執(zhí)行完成后,結(jié)果將通過JobManager發(fā)送給Client。
Flink所有組件之間的通信使用的是Akka框架,組件之間的數(shù)據(jù)交互使用的是Netty框架。
圖片
Client 不是運行時和程序執(zhí)行的一部分,而是用于準備數(shù)據(jù)流并將其發(fā)送給 JobManager。之后,客戶端可以斷開連接(分離模式),或保持連接來接收進程報告(附加模式)??蛻舳丝梢宰鳛橛|發(fā)執(zhí)行 Java/Scala 程序的一部分運行,也可以在命令行進程./bin/flink run …中運行。
可以通過多種方式啟動 JobManager 和 TaskManager:直接在機器上作為standalone 集群啟動、在容器中啟動、或者通過YARN等資源框架管理并啟動。TaskManager 連接到 JobManagers,宣布自己可用,并被分配工作。
3.1 JobManager
JobManager 具有許多與協(xié)調(diào) Flink 應(yīng)用程序的分布式執(zhí)行有關(guān)的職責:它決定何時調(diào)度下一個 task(或一組 task)、對完成的 task 或執(zhí)行失敗做出反應(yīng)、協(xié)調(diào) checkpoint、并且協(xié)調(diào)從失敗中恢復(fù)等等。這個進程由三個不同的組件組成:
- ResourceManager
ResourceManager 負責 Flink 集群中的資源提供、回收、分配 - 它管理 task slots,這是 Flink 集群中資源調(diào)度的單位(請參考TaskManagers)。Flink 為不同的環(huán)境和資源提供者(例如 YARN、Kubernetes 和 standalone 部署)實現(xiàn)了對應(yīng)的 ResourceManager。在 standalone 設(shè)置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行啟動新的 TaskManager。 - Dispatcher
Dispatcher 提供了一個 REST 接口,用來提交 Flink 應(yīng)用程序執(zhí)行,并為每個提交的作業(yè)啟動一個新的 JobMaster。它還運行 Flink WebUI 用來提供作業(yè)執(zhí)行信息。 - JobMaster
JobMaster 負責管理單個JobGraph的執(zhí)行。Flink 集群中可以同時運行多個作業(yè),每個作業(yè)都有自己的 JobMaster。
始終至少有一個 JobManager。高可用(HA)設(shè)置中可能有多個 JobManager,其中一個始終是 leader,其他的則是 standby。
3.2 TaskManager
TaskManager是Flink集群的工作進程。Task被調(diào)度到TaskManager上執(zhí)行。TaskManager相互通信,只為在后續(xù)的Task之間交換數(shù)據(jù)。
TaskManager的主要作用如下:
- 接收JobManager分配的任務(wù),負責具體的任務(wù)執(zhí)行。
- TaskManager會在同一個JVM進程內(nèi)以多線程的方式執(zhí)行任務(wù)。· 負責對應(yīng)任務(wù)在每個節(jié)點上的資源申請,管理任務(wù)的啟動、停止、銷毀、異?;謴?fù)等生命周期。
- 負責對數(shù)據(jù)進行緩存。TaskManager之間采用數(shù)據(jù)流的形式進行數(shù)據(jù)交互。
3.3 Tasks 和算子鏈
對于分布式執(zhí)行,F(xiàn)link 將算子的 subtasks 鏈接成 tasks。每個 task 由一個線程執(zhí)行。將算子鏈接成 task 是個有用的優(yōu)化:它減少線程間切換、緩沖的開銷,并且減少延遲的同時增加整體吞吐量。鏈行為是可以配置的。
下圖中樣例數(shù)據(jù)流用 5 個 subtask 執(zhí)行,因此有 5 個并行線程。

3.4 Task Slots 和資源
每個 worker(TaskManager)都是一個 JVM 進程,可以在單獨的線程中執(zhí)行一個或多個 subtask。為了控制一個 TaskManager 中接受多少個 task,就有了所謂的 task slots(至少一個)。
每個 task slot 代表 TaskManager 中資源的固定子集。例如,具有 3 個 slot 的 TaskManager,會將其托管內(nèi)存 1/3 用于每個 slot。分配資源意味著 subtask 不會與其他作業(yè)的 subtask 競爭托管內(nèi)存,而是具有一定數(shù)量的保留托管內(nèi)存。注意此處沒有 CPU 隔離;當前 slot 僅分離 task 的托管內(nèi)存。
通過調(diào)整 task slot 的數(shù)量,用戶可以定義 subtask 如何互相隔離。每個 TaskManager 有一個 slot,這意味著每個 task 組都在單獨的 JVM 中運行(例如,可以在單獨的容器中啟動)。具有多個 slot 意味著更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 連接(通過多路復(fù)用)和心跳信息。它們還可以共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),從而減少了每個 task 的開銷。
圖片
默認情況下,F(xiàn)link 允許 subtask 共享 slot,即便它們是不同的 task 的 subtask,只要是來自于同一作業(yè)即可。結(jié)果就是一個 slot 可以持有整個作業(yè)管道。允許 slot 共享有兩個主要優(yōu)點:
- Flink 集群所需的 task slot 和作業(yè)中使用的最大并行度恰好一樣。無需計算程序總共包含多少個 task(具有不同并行度)。
- 容易獲得更好的資源利用。如果沒有 slot 共享,非密集 subtask(source/map())將阻塞和密集型 subtask(window) 一樣多的資源。通過 slot 共享,我們示例中的基本并行度從 2 增加到 6,可以充分利用分配的資源,同時確保繁重的 subtask 在 TaskManager 之間公平分配。
圖片
四、Flink On YARN模式
Flink On YARN模式遵循YARN的官方規(guī)范,YARN只負責資源的管理和調(diào)度,運行哪種應(yīng)用程序由用戶自己實現(xiàn),因此可能在YARN上同時運行MapReduce程序、Spark程序、Flink程序等。YARN很好地對每一個程序?qū)崿F(xiàn)了資源的隔離,這使得Spark、MapReduce、Flink等可以運行于同一個集群中,共享集群存儲資源與計算資源。Flink On YARN模式的運行架構(gòu)如下圖所示。
圖片
- 當啟動一個Client(客戶端)會話時,Client首先會上傳Flink應(yīng)用程序JAR包和配置文件到HDFS。
- Client向ResourceManager申請用于運行ApplicationMaster的Container。
- ResourceManager分配用于運行ApplicationMaster的Container,然后與NodeManager通信,要求它在該Container中啟動ApplicationMaster(ApplicationMaster與Flink JobManager運行于同一Container中,這樣ApplicationMaster就能知道Flink JobManager的地址)。ApplicationMaster啟動后,它將負責此應(yīng)用程序的整個生命周期。另外,ApplicationMaster還提供了Flink的WebUI服務(wù)。
- ApplicationMaster向ResourceManager注冊(注冊后可以通過ResourceManager查看應(yīng)用程序的運行狀態(tài))并請求運行Flink TaskManager所需的Container(資源請求是對一些Container的請求)。如果符合條件,ResourceManager會分配給ApplicationMaster所需的Container(表達為Container ID和主機名)。ApplicationMaster請求NodeManager使用這些Container來運行Flink TaskManager。各個NodeManager從HDFS中下載Flink JAR包和配置文件。至此,F(xiàn)link相關(guān)任務(wù)就可以運行了。
此外,各個運行中的Flink TaskManager會通過RPC協(xié)議向ApplicationMaster匯報自己的狀態(tài)和進度。































