偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Flink Metrics監(jiān)控與 RestApi

安全 應(yīng)用安全
Flink 的 metrics 是 Flink 公開的一個(gè)度量系統(tǒng),metrics 也可以暴露給外部系統(tǒng),通過在 Flink 配置文件 conf/flink-conf.yaml 配置即可,F(xiàn)link原生已經(jīng)支持了很多reporter,如 JMX、InfluxDB、Prometheus 等等。

[[404556]]

本文轉(zhuǎn)載自微信公眾號(hào)「KK架構(gòu)師」,作者wangkai 。轉(zhuǎn)載本文請(qǐng)聯(lián)系KK架構(gòu)師公眾號(hào)。

 一、Flink metrics簡介

Flink 的 metrics 是 Flink 公開的一個(gè)度量系統(tǒng),metrics 也可以暴露給外部系統(tǒng),通過在 Flink 配置文件 conf/flink-conf.yaml 配置即可,F(xiàn)link原生已經(jīng)支持了很多reporter,如 JMX、InfluxDB、Prometheus 等等。

我們也可以自定義指標(biāo)通過 metric 收集,實(shí)際開發(fā)時(shí)經(jīng)常需要查看當(dāng)前程序的運(yùn)行狀況,flink 提供了 UI 界面,有比較詳細(xì)的統(tǒng)計(jì)信息。

但是 UI 界面也有不完善的地方,比如想要獲取 flink 的實(shí)時(shí)吞吐。本文將詳細(xì)介紹如何通過 metric 監(jiān)控 flink 程序,自定義監(jiān)控指標(biāo)以及 metrics 在 flink 的 UI 界面的應(yīng)用。

二、Metrics在UI頁面上的應(yīng)用

在 flink 的 UI 的界面上我們點(diǎn)擊任務(wù)詳情,然后點(diǎn)擊 Task Metrics 會(huì)彈出如下的界面,在 add metic 按鈕上 我們可以添加我需要的監(jiān)控指標(biāo)。

注意:如果點(diǎn)擊 Task Metrics 沒有顯示 Add metics 點(diǎn)擊一下任務(wù)的 DAG 圖就會(huì)顯示出來,當(dāng)我們點(diǎn)擊了 DAG 圖中某個(gè)算子的名字,那么 Add metric 顯示的就是該算子的監(jiān)控指標(biāo),且按照分區(qū)顯示,算子名前置的數(shù)字就是分區(qū)號(hào)。

三、各個(gè)指標(biāo)的含義

關(guān)于各個(gè)指標(biāo)的含義官網(wǎng)上有詳細(xì)介紹:

https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#availability

四、自定義監(jiān)控指標(biāo)

案例:在map算子內(nèi)計(jì)算輸入的總數(shù)據(jù),設(shè)置 :

  1. DataStream<String> userData = kafkaData.map(new RichMapFunction<String, String>() { 
  2.             Counter mapDataNub; 
  3.             @Override 
  4.             public void open(Configuration parameters) throws Exception { 
  5.                 mapDataNub=  getRuntimeContext() 
  6.                        .getMetricGroup() 
  7.                        .addGroup("flink_test_metric"
  8.                        .counter("mapDataNub"); 
  9.             } 
  10.             @Override 
  11.             public String map(String s)  { 
  12.                 String s1 =""
  13.                 try { 
  14.                     String[] split = s.split(","); 
  15.                     long userID = Long.parseLong(split[0]); 
  16.                     long itemId = Long.parseLong(split[1]); 
  17.                     long categoryId = Long.parseLong(split[2]); 
  18.                     String behavior = split[3]; 
  19.                     long timestamp = Long.parseLong(split[4]); 
  20.                     Map map = new HashMap(); 
  21.                     map.put("userID", userID); 
  22.                     map.put("itemId", itemId); 
  23.                     map.put("categoryId", categoryId); 
  24.                     map.put("behavior", behavior); 
  25.                     map.put("timestamp"timestamp); 
  26.                     s1 = JSON.toJSONString(map); 
  27.                     mapDataNub.inc(); 
  28.                     System.out.println("數(shù)據(jù)"+map.toString()); 
  29.                 } catch (NumberFormatException e) { 
  30.                     e.printStackTrace(); 
  31.                 } 
  32.                 return  s1; 
  33.             } 

程序啟動(dòng)之后就可以在任務(wù)的ui界面上查看

注意點(diǎn):

搜索自定義或者查看某個(gè)指標(biāo)需要點(diǎn)擊DAG圖中對(duì)應(yīng)算子的名稱

指標(biāo)的前綴0,1,2....是指算子的分區(qū)數(shù)

進(jìn)行監(jiān)控時(shí),盡量不要對(duì)算子進(jìn)行重命名,使用默認(rèn)的名字,這樣一套監(jiān)控程序可以監(jiān)控多個(gè)flink任務(wù),比如對(duì)sink重新命名,如果不同的flink程序?qū)ink的命名不一樣,則一套監(jiān)控?zé)o法監(jiān)控多個(gè)flink程序

  1. .addSink(KafkaSink.getProducer()).name("kafka_sink"); 

五、Flink UI 不顯示算子數(shù)據(jù)接收和發(fā)送的條數(shù)

有時(shí)候我們Flink任務(wù)正常運(yùn)行,數(shù)據(jù)也可以打印,而且都保存到數(shù)據(jù)庫了,但是UI上面卻不顯示數(shù)據(jù)接收和發(fā)送的條數(shù) ,導(dǎo)致無法進(jìn)行指標(biāo)監(jiān)控和查查flink任務(wù)運(yùn)行的具體情況,這是什么原因?qū)е碌哪?

原因:是因?yàn)槟J(rèn)情況下Flink開啟了operator chain,所以當(dāng)flink程序所有的算子都在一個(gè)chain里面時(shí),也就是在一個(gè)DAG(task)里面,所有沒有向下游發(fā)送數(shù)據(jù),所以顯示都為0。比如下圖的情況所有指標(biāo)都是0;

解決方案:第一種方法:在flink程序里添加自定義metric

第二種方法:使用startNewChain和disableChainin打斷程序默認(rèn)的operator chain

第三種方法:修改某個(gè)算子的并行度使其和上下游算子并行度不一致

六、Metric Reporter

Metrics可以暴露給外部系統(tǒng),通過在flink配置文件conf/flink-conf.yaml配置即可,flink原生已經(jīng)支持了很多reporter,如JMX、InfluxDB、Prometheus等等,同時(shí)也支持自定義reporter。

Flink自帶了很多Reporter,包括JMX、InfluxDB、Prometheus等等,接下來介紹下InfluxDB Reporter的使用。

只需在flink配置文件conf/flink-conf.yaml中配置Influxdb相關(guān)信息即可,主要包括域名、端口號(hào)、用戶密碼等等。

flink1.10之后采用

  1. metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory 
  2. metrics.reporter.influxdb.host: localhost 
  3. metrics.reporter.influxdb.port: 8086 
  4. metrics.reporter.influxdb.db: flink 
  5. metrics.reporter.influxdb.consistency: ANY 
  6. metrics.reporter.influxdb.connectTimeout: 60000 
  7. metrics.reporter.influxdb.writeTimeout: 60000 
  8. metrics.reporter.influxdb.interval: 30 SECONDS 

flink1.10之前

  1. metrics.reporters: influxdb 
  2. metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter 
  3. metrics.reporter.influxdb.host: localhost 
  4. metrics.reporter.influxdb.port: 8086 
  5. metrics.reporter.influxdb.db: flink_monitor 
  6. metrics.reporter.influxdb.username: flink-metrics 
  7. metrics.reporter.influxdb.password: 123 

注意事項(xiàng):收集flinkSQL任務(wù)的監(jiān)控指標(biāo),如果用戶書寫的sql語句 insert into 或者insert overwrite 中單引號(hào)帶有換行符,寫入influxdb會(huì)報(bào)錯(cuò)

查看influxdb收集到監(jiān)控信息,發(fā)現(xiàn)會(huì)自動(dòng)給我生成數(shù)據(jù)庫和measurement,所有的指標(biāo)都存儲(chǔ)在了具體的measurement中

七、flink metric監(jiān)控程序

前面介紹了flink公共的監(jiān)控指標(biāo)以及如何自定義監(jiān)控指標(biāo),那么實(shí)際開發(fā)flink任務(wù)我們需要及時(shí)知道這些監(jiān)控指標(biāo)的數(shù)據(jù),去獲取程序的健康值以及狀態(tài)。這時(shí)候就需要我們通過 flink REST API ,自己編寫監(jiān)控程序去獲取這些指標(biāo)。很簡單,當(dāng)我們知道每個(gè)指標(biāo)請(qǐng)求的URL,我們便可以編寫程序通過http請(qǐng)求獲取指標(biāo)的監(jiān)控?cái)?shù)據(jù)。

八、flink REST API監(jiān)控程序

為了獲取flink任務(wù)運(yùn)行狀態(tài)和吞吐量我們需要注意一下兩點(diǎn):

  • flink集群模式需要知道 JobManager 的地址和端口(5004)
  • 對(duì)于 flink on yarn 模式來說,則需要知道 RM 代理的 JobManager UI 地址,例如 http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx

1.獲取flink任務(wù)運(yùn)行狀態(tài)(我們可以在瀏覽器進(jìn)行測試,輸入如下的連接)

http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs

返回的結(jié)果

  1.  jobs: [{ 
  2.    id: "ce793f18efab10127f0626a37ff4b4d4"
  3.    status: "RUNNING" 
  4.   } 
  5.  ] 

2.獲取 job 詳情

需要在/jobs/jobid

http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs/ce793f18efab10127f0626a37ff4b4d4

  1.  jid: "ce793f18efab10127f0626a37ff4b4d4"
  2.  name"Test"
  3.  isStoppable: false
  4.  state: "RUNNING"
  5.  start - time: 1551577191874, 
  6.  end - time: -1, 
  7.  duration: 295120489, 
  8.  now: 1551872312363, 
  9.  。。。。。。 
  10.       此處省略n行 
  11.     。。。。。。 
  12.    }, { 
  13.     id: "cbc357ccb763df2852fee8c4fc7d55f2"
  14.     parallelism: 12, 
  15.     operator: ""
  16.     operator_strategy: ""
  17.     description: "Source: Custom Source -&gt; Flat Map"
  18.     optimizer_properties: {} 
  19.    } 
  20.   ] 
  21.  } 

九、更靈活的方式獲取每個(gè)指標(biāo)的請(qǐng)求連接

有人可能會(huì)問,這么多指標(biāo),難道我要把每個(gè)指標(biāo)的請(qǐng)求的URL格式都記住嗎?

今天教大家一個(gè)小技巧,一個(gè)前端技術(shù),就是進(jìn)入flink任務(wù)的UI界面,按住F12進(jìn)入開發(fā)者模式,然后我們點(diǎn)擊任意一個(gè)metric指標(biāo),便能立即看到每個(gè)指標(biāo)的請(qǐng)求的URL。比如獲取flink任務(wù)的背壓情況:

如下圖我們點(diǎn)擊某一個(gè)task的status,按一下f12,便看到了backpressue,點(diǎn)開backpressue就是獲取任務(wù)背壓情況的連接如下:

http://127.0.0.1/proxy/application_12423523_133234/jobs/86eb310874aeccb37b58ae2892feced3/vertices/cbc357ccb763df2852fee8c4fc7d55f2/backpressure

請(qǐng)求連接返回的json字符串如下:我們可以獲取每一個(gè)分區(qū)的背壓情況,如果不是OK狀態(tài)便可以進(jìn)行任務(wù)報(bào)警,其他的指標(biāo)獲取監(jiān)控值都可以這樣獲取 簡單而又便捷。

 

十、案例:實(shí)時(shí)獲取yarn上flink任務(wù)運(yùn)行狀態(tài)

我們使用 flink REST API的方式,通過http請(qǐng)求實(shí)時(shí)獲取flink任務(wù)狀態(tài),不是RUNNING狀態(tài)則進(jìn)行電話或郵件報(bào)警,達(dá)到實(shí)時(shí)監(jiān)控的效果。

  1. public class SendGet { 
  2.     public static String sendGet(String url) { 
  3.         String result = ""
  4.         BufferedReader in = null
  5.         try { 
  6.             String urlNameString = url; 
  7.             URL realUrl = new URL(urlNameString); 
  8.             // 打開和URL之間的連接 
  9.             URLConnection connection = realUrl.openConnection(); 
  10.             // 設(shè)置通用的請(qǐng)求屬性 
  11.             connection.setRequestProperty("accept""*/*"); 
  12.             connection.setRequestProperty("connection""Keep-Alive"); 
  13.             connection.setRequestProperty("user-agent"
  14.                     "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)"); 
  15.             // 建立實(shí)際的連接 
  16.             connection.connect(); 
  17.             in = new BufferedReader(new InputStreamReader( 
  18.                     connection.getInputStream())); 
  19.             String line; 
  20.             while ((line = in.readLine()) != null) { 
  21.                 result += line; 
  22.             } 
  23.         } catch (Exception e) { 
  24.             System.out.println("發(fā)送GET請(qǐng)求出現(xiàn)異常!" + e); 
  25.             e.printStackTrace(); 
  26.         } 
  27.         // 使用finally塊來關(guān)閉輸入流 
  28.         finally { 
  29.             try { 
  30.                 if (in != null) { 
  31.                     in.close(); 
  32.                 } 
  33.             } catch (Exception e2) { 
  34.                 e2.printStackTrace(); 
  35.             } 
  36.         } 
  37.         return result; 
  38.     } 
  39.  
  40.     public static void main(String[] args) { 
  41.         String s = sendGet("http://127.0.0.1:5004/proxy/application_1231435364565_0350/jobs"); 
  42.         JSONObject jsonObject = JSON.parseObject(s); 
  43.         String string = jsonObject.getString("jobs"); 
  44.         String substring = string.substring(1, string.length() - 1); 
  45.         JSONObject jsonObject1 = JSONObject.parseObject(substring); 
  46.         String status = jsonObject1.getString("status"); 
  47.         System.out.println(status); 
  48.     } 

結(jié)果

 

責(zé)任編輯:武曉燕 來源: KK架構(gòu)師
相關(guān)推薦

2021-09-11 21:02:24

監(jiān)控Sentry Web性能

2021-06-03 09:00:00

Kubernetes集群容器

2021-09-30 06:35:23

監(jiān)控性能優(yōu)化

2024-03-13 13:44:43

開發(fā)插件開源

2022-07-26 07:47:14

架構(gòu)

2024-01-03 16:29:01

Agent性能優(yōu)化

2021-09-08 10:47:33

Flink執(zhí)行流程

2014-12-04 09:47:59

2015-04-13 10:13:29

2021-04-16 08:20:00

Flink CEP直播監(jiān)控

2022-05-18 07:30:51

OperatorprometheusVM 集群

2013-11-06 10:46:58

OpenStack監(jiān)控監(jiān)控系統(tǒng)

2010-09-17 10:41:27

SIP協(xié)議視頻監(jiān)控

2021-04-29 08:27:06

druidundertowMetrics

2017-07-07 14:30:27

Flink架構(gòu)拓?fù)?/a>

2012-10-29 10:14:07

APPHadoopSplunk

2022-07-12 16:54:54

字節(jié)跳動(dòng)Flink狀態(tài)查詢

2022-06-20 05:52:27

FlinkTTL流查詢

2022-08-25 18:23:07

攜程HBase存儲(chǔ)Metrics

2018-05-21 14:57:38

云監(jiān)控服務(wù)監(jiān)控原因
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)