一口氣搞懂Flink Metrics監(jiān)控指標(biāo)和性能優(yōu)化,全靠這33張圖和7千字
本文轉(zhuǎn)載自微信公眾號(hào)「3分鐘秒懂大數(shù)據(jù)」,作者在IT中穿梭旅行。轉(zhuǎn)載本文請(qǐng)聯(lián)系3分鐘秒懂大數(shù)據(jù)公眾號(hào)。
前言
大家好,我是土哥。
最近在公司做 Flink 推理任務(wù)的性能測(cè)試,要對(duì) job 的全鏈路吞吐、全鏈路時(shí)延、吞吐時(shí)延指標(biāo)進(jìn)行監(jiān)控和調(diào)優(yōu),其中要使用 Flink Metrics 對(duì)指標(biāo)進(jìn)行監(jiān)控。
接下來(lái)這篇文章,干貨滿滿,我將帶領(lǐng)讀者全面了解 Flink Metrics 指標(biāo)監(jiān)控,并通過(guò)實(shí)戰(zhàn)案例,對(duì)全鏈路吞吐、全鏈路時(shí)延、吞吐時(shí)延的指標(biāo)進(jìn)行性能優(yōu)化,徹底掌握 Flink Metrics 性能調(diào)優(yōu)的方法和 Metrics 的使用。大綱目錄如下:
1 Flink Metrics 簡(jiǎn)介
Flink Metrics 是 Flink 集群運(yùn)行中的各項(xiàng)指標(biāo),包含機(jī)器系統(tǒng)指標(biāo),比如:CPU、內(nèi)存、線程、JVM、網(wǎng)絡(luò)、IO、GC 以及任務(wù)運(yùn)行組件(JM、TM、Slot、作業(yè)、算子)等相關(guān)指標(biāo)。
Flink Metrics 包含兩大作用:
- 實(shí)時(shí)采集監(jiān)控?cái)?shù)據(jù)。在 Flink 的 UI 界面上,用戶可以看到自己提交的任務(wù)狀態(tài)、時(shí)延、監(jiān)控信息等等。
- 對(duì)外提供數(shù)據(jù)收集接口。用戶可以將整個(gè) Flink 集群的監(jiān)控?cái)?shù)據(jù)主動(dòng)上報(bào)至第三方監(jiān)控系統(tǒng),如:prometheus、grafana 等,下面會(huì)介紹。
1.1 Flink Metric Types
Flink 一共提供了四種監(jiān)控指標(biāo):分別為 Counter、Gauge、Histogram、Meter。
1. Count 計(jì)數(shù)器
統(tǒng)計(jì)一個(gè) 指標(biāo)的總量。寫過(guò) MapReduce 的開(kāi)發(fā)人員就應(yīng)該很熟悉 Counter,其實(shí)含義都是一樣的,就是對(duì)一個(gè)計(jì)數(shù)器進(jìn)行累加,即對(duì)于多條數(shù)據(jù)和多兆數(shù)據(jù)一直往上加的過(guò)程。其中 Flink 算子的接收記錄總數(shù) (numRecordsIn) 和發(fā)送記錄總數(shù) (numRecordsOut) 屬于 Counter 類型。
使用方式:可以通過(guò)調(diào)用 counter(String name)來(lái)創(chuàng)建和注冊(cè) MetricGroup
2. Gauge 指標(biāo)瞬時(shí)值
Gauge 是最簡(jiǎn)單的 Metrics ,它反映一個(gè)指標(biāo)的瞬時(shí)值。比如要看現(xiàn)在 TaskManager 的 JVM heap 內(nèi)存用了多少,就可以每次實(shí)時(shí)的暴露一個(gè) Gauge,Gauge 當(dāng)前的值就是 heap 使用的量。
使用前首先創(chuàng)建一個(gè)實(shí)現(xiàn) org.apache.flink.metrics.Gauge 接口的類。返回值的類型沒(méi)有限制。您可以通過(guò)在 MetricGroup 上調(diào)用 gauge。
3. Meter 平均值
用來(lái)記錄一個(gè)指標(biāo)在某個(gè)時(shí)間段內(nèi)的平均值。Flink 中的指標(biāo)有 Task 算子中的 numRecordsInPerSecond,記錄此 Task 或者算子每秒接收的記錄數(shù)。
使用方式:通過(guò) markEvent() 方法注冊(cè)事件的發(fā)生。通過(guò)markEvent(long n) 方法注冊(cè)同時(shí)發(fā)生的多個(gè)事件。
4. Histogram 直方圖
Histogram 用于統(tǒng)計(jì)一些數(shù)據(jù)的分布,比如說(shuō) Quantile、Mean、StdDev、Max、Min 等,其中最重要一個(gè)是統(tǒng)計(jì)算子的延遲。此項(xiàng)指標(biāo)會(huì)記錄數(shù)據(jù)處理的延遲信息,對(duì)任務(wù)監(jiān)控起到很重要的作用。
使用方式:通過(guò)調(diào)用 histogram(String name, Histogram histogram) 來(lái)注冊(cè)一個(gè) MetricGroup。
1.2 Scope
Flink 的指標(biāo)體系按樹(shù)形結(jié)構(gòu)劃分,域相當(dāng)于樹(shù)上的頂點(diǎn)分支,表示指標(biāo)大的分類。每個(gè)指標(biāo)都會(huì)分配一個(gè)標(biāo)識(shí)符,該標(biāo)識(shí)符將基于 3 個(gè)組件進(jìn)行匯報(bào):
- 注冊(cè)指標(biāo)時(shí)用戶提供的名稱;
- 可選的用戶自定義域;
- 系統(tǒng)提供的域。
例如,如果 A.B 是系統(tǒng)域,C.D 是用戶域,E 是名稱,那么指標(biāo)的標(biāo)識(shí)符將是 A.B.C.D.E. 你可以通過(guò)設(shè)置 conf/flink-conf.yam 里面的 metrics.scope.delimiter 參數(shù)來(lái)配置標(biāo)識(shí)符的分隔符(默認(rèn)“.”)。
舉例說(shuō)明:以算子的指標(biāo)組結(jié)構(gòu)為例,其默認(rèn)為:
算子的輸入記錄數(shù)指標(biāo)為:
hlinkui.taskmanager.1234.wordcount.flatmap.0.numRecordsIn
1.3 Metrics 運(yùn)行機(jī)制
在生產(chǎn)環(huán)境下,為保證對(duì)Flink集群和作業(yè)的運(yùn)行狀態(tài)進(jìn)行監(jiān)控,F(xiàn)link 提供兩種集成方式:
1.3.1 主動(dòng)方式 MetricReport
Flink Metrics 通過(guò)在 conf/flink-conf.yaml 中配置一個(gè)或者一些 reporters,將指標(biāo)暴露給一個(gè)外部系統(tǒng).這些 reporters 將在每個(gè) job 和 task manager 啟動(dòng)時(shí)被實(shí)例化。
1.3.2 被動(dòng)方式 RestAPI
通過(guò)提供 Rest 接口,被動(dòng)接收外部系統(tǒng)調(diào)用,可以返回集群、組件、作業(yè)、Task、算子的狀態(tài)。Rest API 實(shí)現(xiàn)類是 WebMonitorEndpoint
2 Flink Metrics 監(jiān)控系統(tǒng)搭建
Flink 主動(dòng)方式共提供了 8 種 Report。
我們使用 PrometheusPushGatewayReporter 方式 通過(guò) prometheus + pushgateway + grafana 組件搭建 Flink On Yarn 可視化監(jiān)控。
當(dāng) 用戶 使用 Flink 通過(guò) session 模式向 yarn 集群提交一個(gè) job 后,F(xiàn)link 會(huì)通過(guò) PrometheusPushGatewayReporter 將 metrics push 到 pushgateway 的 9091 端口上,然后使用外部系統(tǒng) prometheus 從 pushgateway 進(jìn)行 pull 操作,將指標(biāo)采集過(guò)來(lái),通過(guò) Grafana可視化工具展示出來(lái)。原理圖如下:
首先,我們先在 Flink On Yarn 集群中提交一個(gè) Job 任務(wù),讓其運(yùn)行起來(lái),然后執(zhí)行下面的操作。
2.1 配置 Reporter
下面所有工具、jar 包已經(jīng)全部下載好,需要的朋友在公眾號(hào)后臺(tái)回復(fù):02,可以全部獲取到。
2.1.1 導(dǎo)包
將 flink-metrics-prometheus_2.11-1.13.2.jar 包導(dǎo)入 flink-1.13.2/bin 目錄下。
2.1.2 配置 Reporter
選取 PrometheusPushGatewayReporter 方式,通過(guò)在官網(wǎng)查詢 Flink 1.13.2 Metrics 的配置后,在 flink-conf.yaml 設(shè)置,配置如下:
- metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
- metrics.reporter.promgateway.host: 192.168.244.129
- metrics.reporter.promgateway.port: 9091
- metrics.reporter.promgateway.jobName: myJob
- metrics.reporter.promgateway.randomJobNameSuffix: true
- metrics.reporter.promgateway.deleteOnShutdown: false
- metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
- metrics.reporter.promgateway.interval: 60 SECONDS
2.2 部署 pushgateway
Pushgateway 是一個(gè)獨(dú)立的服務(wù),Pushgateway 位于應(yīng)用程序發(fā)送指標(biāo)和 Prometheus 服務(wù)器之間。
Pushgateway 接收指標(biāo),然后將其作為目標(biāo)被 Prometheus 服務(wù)器拉取??梢詫⑵淇醋鞔矸?wù),或者與 blackbox exporter 的行為相反,它接收度量,而不是探測(cè)它們。
2.2.1 解壓 pushgateway
2.2.2. 啟動(dòng) pushgateway
進(jìn)入到 pushgateway-1.4.1 目錄下
- ./pushgateway &
查看是否在后臺(tái)啟動(dòng)成功
- ps aux|grep pushgateway
2.2.3. 登錄 pushgateway webui
2.3 部署 prometheus
Prometheus(普羅米修斯)是一個(gè)最初在 SoundCloud 上構(gòu)建的監(jiān)控系統(tǒng)。自 2012 年成為社區(qū)開(kāi)源項(xiàng)目,擁有非?;钴S的開(kāi)發(fā)人員和用戶社區(qū)。為強(qiáng)調(diào)開(kāi)源及獨(dú)立維護(hù),Prometheus 于 2016 年加入云原生云計(jì)算基金會(huì)(CNCF),成為繼Kubernetes 之后的第二個(gè)托管項(xiàng)目。
2.3.1 解壓prometheus-2.30.0
2.3.2 編寫配置文件
- scrape_configs:
- - job_name: 'prometheus'
- static_configs:
- - targets: ['192.168.244.129:9090']
- labels:
- instance: 'prometheus'
- - job_name: 'linux'
- static_configs:
- - targets: ['192.168.244.129:9100']
- labels:
- instance: 'localhost'
- - job_name: 'pushgateway'
- static_configs:
- - targets: ['192.168.244.129:9091']
- labels:
- instance: 'pushgateway'
2.3.3 啟動(dòng)prometheus
- ./prometheus --config.file=prometheus.yml &
啟動(dòng)完后,可以通過(guò) ps 查看一下端口:
- ps aux|grep prometheus
2.3.4 登錄prometheus webui
2.4 部署 grafana
Grafana 是一個(gè)跨平臺(tái)的開(kāi)源的度量分析和可視化工具,可以通過(guò)將采集的數(shù)據(jù)查詢?nèi)缓罂梢暬恼故荆⒓皶r(shí)通知。它主要有以下六大特點(diǎn):
- 展示方式:快速靈活的客戶端圖表,面板插件有許多不同方式的可視化指標(biāo)和日志,官方庫(kù)中具有豐富的儀表盤插件,比如熱圖、折線圖、圖表等多種展示方式;
- 數(shù)據(jù)源:Graphite,InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch 和 KairosDB 等;
- 通知提醒:以可視方式定義最重要指標(biāo)的警報(bào)規(guī)則,Grafana將不斷計(jì)算并發(fā)送通知,在數(shù)據(jù)達(dá)到閾值時(shí)通過(guò) Slack、PagerDuty 等獲得通知;
- 混合展示:在同一圖表中混合使用不同的數(shù)據(jù)源,可以基于每個(gè)查詢指定數(shù)據(jù)源,甚至自定義數(shù)據(jù)源;
- 注釋:使用來(lái)自不同數(shù)據(jù)源的豐富事件注釋圖表,將鼠標(biāo)懸停在事件上會(huì)顯示完整的事件元數(shù)據(jù)和標(biāo)記;
- 過(guò)濾器:Ad-hoc 過(guò)濾器允許動(dòng)態(tài)創(chuàng)建新的鍵/值過(guò)濾器,這些過(guò)濾器會(huì)自動(dòng)應(yīng)用于使用該數(shù)據(jù)源的所有查詢。
2.4.1 解壓grafana-8.1.5
2.4.2 啟動(dòng)grafana-8.1.5
- ./bin/grafana-server web &
2.4.3 登錄 grafana
登錄用戶名和密碼都是 admin
grafana 配置中文教程:
https://grafana.com/docs/grafana/latest/datasources/prometheus/
2.4.4 配置數(shù)據(jù)源、創(chuàng)建系統(tǒng)負(fù)載監(jiān)控
要訪問(wèn) Prometheus 設(shè)置,請(qǐng)將鼠標(biāo)懸停在配置(齒輪)圖標(biāo)上,然后單擊數(shù)據(jù)源,然后單擊 Prometheus 數(shù)據(jù)源,根據(jù)下圖進(jìn)行操作。
操作完成后,點(diǎn)擊進(jìn)行驗(yàn)證。
2.4.5 添加儀表盤
點(diǎn)擊最左側(cè) + 號(hào),選擇 DashBoard,選擇新建一個(gè) pannel。
至此,F(xiàn)link 的 metrics 的指標(biāo)展示在 Grafana 中了。
flink 指標(biāo)對(duì)應(yīng)的指標(biāo)名比較長(zhǎng),可以在 Legend 中配置顯示內(nèi)容,在{{key}} 將 key 換成對(duì)應(yīng)需要展示的字段即可,如:{{job_name}},{{operator_name}}。
3 指標(biāo)性能測(cè)試
上述監(jiān)控系統(tǒng)搭建好了之后,我們可以進(jìn)行性能指標(biāo)監(jiān)控了?,F(xiàn)在以一個(gè)實(shí)戰(zhàn)案例進(jìn)行介紹:
3.1 業(yè)務(wù)場(chǎng)景介紹
金融風(fēng)控場(chǎng)景
3.1.1 業(yè)務(wù)需求:
Flink Source 從 data kafka topic 中讀取推理數(shù)據(jù),通過(guò) sql 預(yù)處理成模型推理要求的數(shù)據(jù)格式,在進(jìn)行 keyBy 分組后流入下游 connect 算子,與模型 connect 后進(jìn)入 Co-FlatMap 算子再進(jìn)行推理,原理圖如下:
3.1.2 業(yè)務(wù)要求:
根據(jù)模型的復(fù)雜程度,要求推理時(shí)延到達(dá) 20ms 以內(nèi),全鏈路耗時(shí) 50ms 以內(nèi), 吞吐量達(dá)到每秒 1.2w 條以上。
3.1.3 業(yè)務(wù)數(shù)據(jù):
推理數(shù)據(jù):3000w,推理字段 495 個(gè),機(jī)器學(xué)習(xí) Xgboost 模型字段:495。
3.2 指標(biāo)解析
由于性能測(cè)試要求全鏈路耗時(shí) 50ms 以內(nèi),應(yīng)該使用 Flink Metrics 的 Latency Marker 進(jìn)行計(jì)算。
3.2.1 全鏈路時(shí)延計(jì)算方式 :
全鏈路時(shí)延指的是一條推理數(shù)據(jù)進(jìn)入 source 算子到數(shù)據(jù)預(yù)處理算子直到最后一個(gè)算子輸出結(jié)果的耗時(shí),即處理一條數(shù)據(jù)需要多長(zhǎng)時(shí)間,包含算子內(nèi)處理邏輯時(shí)間,算子間數(shù)據(jù)傳遞時(shí)間,緩沖區(qū)內(nèi)等待時(shí)間。
全鏈路時(shí)延要使用 latency metric 計(jì)算。latency metric 是由 source 算子根據(jù)當(dāng)前本地時(shí)間生成的一個(gè) marker ,并不參與各個(gè)算子的邏輯計(jì)算,僅僅跟著數(shù)據(jù)往下游算子流動(dòng),每到達(dá)一個(gè)算子則算出當(dāng)前本地時(shí)間戳并與 source 生成的時(shí)間戳相減,得到 source 算子到當(dāng)前算子的耗時(shí),當(dāng)?shù)竭_(dá) sink 算子或者說(shuō)最后一個(gè)算子時(shí),算出當(dāng)前本地時(shí)間戳與 source 算子生成的時(shí)間戳相減,即得到全鏈路時(shí)延。原理圖如下:
由于使用到 Lateny marker,所有需要在 flink-conf.yaml 配置參數(shù)。
- latency.metrics.interval
系統(tǒng)配置截圖如下:
3.2.2 全鏈路吞吐計(jì)算方式 :
全鏈路吞吐 = 單位時(shí)間處理數(shù)據(jù)數(shù)量 / 單位時(shí)間。
3.3 提交任務(wù)到Flink on Yarn集群
**3.3.1 直接提交 Job **
- # -m jobmanager 的地址
- # -yjm 1024 指定 jobmanager 的內(nèi)存信息
- # -ytm 1024 指定 taskmanager 的內(nèi)存信息
- bin/flink run \
- -t yarn-per-job -yjm 4096 -ytm 8800 -s 96 \
- --detached -c com.threeknowbigdata.datastream.XgboostModelPrediction \
- examples/batch/WordCount.jar \
提交完成后,我們通過(guò) Flink WEBUI 可以看到 job 運(yùn)行的任務(wù)結(jié)果如下:
因?yàn)橥评砟P椭皇且粋€(gè) model,存在狀態(tài)中,所以全鏈路吞吐考慮的是每秒有多少條推理數(shù)據(jù)進(jìn)入 source 算子到倒數(shù)第二個(gè)算子(最后一個(gè)算子只是指標(biāo)匯總)流出,這個(gè)條數(shù)就是全鏈路吞吐。
可以看到在處理 2000W 條數(shù)據(jù)時(shí),代碼直接統(tǒng)計(jì)輸出的數(shù)值和 flink webUI 的統(tǒng)計(jì)數(shù)值基本一致,所以統(tǒng)計(jì)數(shù)值是可信的。
Flink WEBUI 跑的結(jié)果數(shù)據(jù)
打開(kāi) Prometheus 在對(duì)話框輸入全鏈路時(shí)延計(jì)算公式
- 計(jì)算公式:
- avg(flink_taskmanager_job_latency_source_id_
- operator_id _operator_subtask_index_latency{
- source_id="cbc357ccb763df2852fee8c4fc7d55f2",
- operator_id="c9c0ca46716e76f6b700eddf4366d243",quantile="0.999"})
3.4 優(yōu)化前性能分析
在將任務(wù)提交到集群后,經(jīng)過(guò)全鏈路時(shí)延計(jì)算公式、吞吐時(shí)延計(jì)算公式,最后得到優(yōu)化前的結(jié)果時(shí)延指標(biāo)統(tǒng)計(jì)圖如下:
吞吐指標(biāo)統(tǒng)計(jì)圖如下:
通過(guò)本次測(cè)試完后,從圖中可以發(fā)現(xiàn):
時(shí)延指標(biāo):加并行度,吞吐量也跟隨高,但是全鏈路時(shí)延大幅增長(zhǎng)( 1并行至32并行,時(shí)延從 110ms 增加至 3287ms )
這遠(yuǎn)遠(yuǎn)沒(méi)有達(dá)到要求的結(jié)果。
3.5 問(wèn)題分析
通過(guò) Prometheus分析后,結(jié)果如下:
3.5.1 并行度問(wèn)題 :
反壓現(xiàn)象:在 Flink WEB-UI 上,可以看到應(yīng)用存在著非常嚴(yán)重的反壓,這說(shuō)明鏈路中存在較為耗時(shí)的算子,阻塞了整個(gè)鏈路;
數(shù)據(jù)處理慢于拉取數(shù)據(jù):數(shù)據(jù)源消費(fèi)數(shù)據(jù)的速度,大于下游數(shù)據(jù)處理速度;
增加計(jì)算并行度:所以在接下來(lái)的測(cè)試中會(huì)調(diào)大推理算子并行度,相當(dāng)于提高下游數(shù)據(jù)處理能力。
3.5.2 Buffer 超時(shí)問(wèn)題 :
Flink 雖是純流式框架,但默認(rèn)開(kāi)啟了緩存機(jī)制(上游累積部分?jǐn)?shù)據(jù)再發(fā)送到下游);
緩存機(jī)制可以提高應(yīng)用的吞吐量,但是也增大了時(shí)延;
推理場(chǎng)景:為獲取最好的時(shí)延指標(biāo),第二輪測(cè)試超時(shí)時(shí)間置 0,記錄吞吐量。
3.5.3 Buffer 數(shù)量問(wèn)題 :
同上,F(xiàn)link 中的 Buffer 數(shù)量是可以配置的;
Buffer 數(shù)量越多,能緩存的數(shù)據(jù)也就越多;
推理場(chǎng)景:為獲取最好的時(shí)延指標(biāo),第二輪測(cè)試:減小 Flink 的 Buffer 數(shù)量來(lái)優(yōu)化時(shí)延指標(biāo)。
3.5.4 調(diào)優(yōu)參數(shù)配置
SOURCE 與 COFLATMAP 的并行度按照 1:12 配置;
Buffer 超時(shí)時(shí)間配置為 0ms (默認(rèn)100ms);
- //在代碼中設(shè)置
- senv.setBufferTimeout(0);
Buffer 數(shù)量的配置如下:
修改flink-conf.yaml
- memory.buffers-per-channel: 2
- memory.float-buffers-per-gate: 2
- memory.max-buffers-per-channel: 2
配置截圖如下:
3.6 優(yōu)化后性能分析
經(jīng)過(guò)修改配置后,將任務(wù)再次提交到集群后,經(jīng)過(guò)全鏈路時(shí)延計(jì)算公式、吞吐時(shí)延計(jì)算公式,最后得到優(yōu)化后的結(jié)果。
時(shí)延指標(biāo)統(tǒng)計(jì)圖如下:
吞吐指標(biāo)統(tǒng)計(jì)圖如下:
優(yōu)化后 LGB 推理測(cè)試總結(jié) :
時(shí)延指標(biāo):并行度提升,時(shí)延也會(huì)增加,但幅度很小(可接受)。實(shí)際上,在測(cè)試過(guò)程中存在一定反壓,若調(diào)大 SOURCE 與 COFLATMAP 的并行度比例,全鏈路時(shí)延可進(jìn)一步降低;吞吐量指標(biāo):隨著并行度的增加,吞吐量也隨著提高,當(dāng)并行度提高至 96 時(shí),吞吐量可以達(dá)到 1.3W,此時(shí)的時(shí)延維持在 50ms 左右(比較穩(wěn)定)。
3.7 優(yōu)化前后 LGB 分析總結(jié)
如下圖所示:
3.7.1吞吐量---影響因素:
內(nèi)存:對(duì)吞吐和時(shí)延沒(méi)什么影響, 并行度與吞吐成正相關(guān)。
- 增大 kafka 分區(qū),吞吐增加
- 增大 source、維表 source 并行度
- 增大 flatmap 推理并行度
3.7.2全鏈路時(shí)延---影響因素:
- Buffer 超時(shí)越短、個(gè)數(shù)越少、時(shí)延越低。
- 整個(gè)鏈路是否有算子堵塞(車道排隊(duì)模型)。
- 調(diào)大推理算子并行度,時(shí)延降低,吞吐升高(即增加了推理的處理能力)。