偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

從來(lái)沒有一個(gè)人能把Flink講的這么透徹

開發(fā) 前端 開發(fā)工具
Flink使用java語(yǔ)言開發(fā),提供了scala編程的接口。使用java或者scala開發(fā)Flink是需要使用jdk8版本,如果使用Maven,maven版本需要使用3.0.4及以上。

一、 Filnk簡(jiǎn)介和編程模型

Flink使用java語(yǔ)言開發(fā),提供了scala編程的接口。使用java或者scala開發(fā)Flink是需要使用jdk8版本,如果使用Maven,maven版本需要使用3.0.4及以上。

[[286813]]

Dataflows:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

parallel Dataflows:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

Task和算子鏈:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

JobManager、TaskManager和clients:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

Flink運(yùn)行時(shí)包含兩種類型的進(jìn)程:

  • JobManger:也叫作masters,協(xié)調(diào)分布式執(zhí)行,調(diào)度task,協(xié)調(diào)checkpoint,協(xié)調(diào)故障恢復(fù)。在Flink程序中至少有一個(gè)JobManager,高可用可以設(shè)置多個(gè)JobManager,其中一個(gè)是Leader,其他都是standby狀態(tài)。
  • TaskManager:也叫workers,執(zhí)行dataflow生成的task,負(fù)責(zé)緩沖數(shù)據(jù),及TaskManager之間的交換數(shù)據(jù)。Flink程序中必須有一個(gè)TaskManager.

Flink程序可以運(yùn)行在standalone集群,Yarn或者M(jìn)esos資源調(diào)度框架中。

clients不是Flink程序運(yùn)行時(shí)的一部分,作用是向JobManager準(zhǔn)備和發(fā)送dataflow,之后,客戶端可以斷開連接或者保持連接。

TaskSlots 任務(wù)槽:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

每個(gè)Worker(TaskManager)是一個(gè)JVM進(jìn)程,可以執(zhí)行一個(gè)或者多個(gè)task,這些task可以運(yùn)行在任務(wù)槽上,每個(gè)worker上至少有一個(gè)任務(wù)槽。每個(gè)任務(wù)槽都有固定的資源,例如:TaskManager有三個(gè)TaskSlots,那么每個(gè)TaskSlot會(huì)將TaskMananger中的內(nèi)存均分,即每個(gè)任務(wù)槽的內(nèi)存是總內(nèi)存的1/3。任務(wù)槽的作用就是分離任務(wù)的托管內(nèi)存,不會(huì)發(fā)生cpu隔離。

通過調(diào)整任務(wù)槽的數(shù)據(jù)量,用戶可以指定每個(gè)TaskManager有多少任務(wù)槽,更多的任務(wù)槽意味著更多的task可以共享同一個(gè)JVM,同一個(gè)JVM中的task共享TCP連接和心跳信息,共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),從而減少TaskManager中的task開銷。

總結(jié):task slot的個(gè)數(shù)代表TaskManager可以并行執(zhí)行的task數(shù)。

二、 Flink 批處理

批處理WordCount:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
  2. DataSource<String> ds = env.readTextFile("./data/words"); 
  3. FlatMapOperator<String, String> flatMap = ds.flatMap(new FlatMapFunction<String, String>() { 
  4. @Override 
  5. public void flatMap(String s, Collector<String> collector) throws Exception { 
  6. String[] ssplit = s.split(" "); 
  7. for (String cs : split) { 
  8. collector.collect(cs); 
  9. }); 
  10. MapOperator<String, Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() { 
  11. @Override 
  12. public Tuple2<String, Integer> map(String s) throws Exception { 
  13. return new Tuple2<String, Integer>(s, 1); 
  14. }); 
  15. UnsortedGrouping<Tuple2<String, Integer>> groupBy = map.groupBy(0); 
  16. AggregateOperator<Tuple2<String, Integer>> sum = groupBy.sum(1); 
  17. // sum.print();//可以觸發(fā)算子執(zhí)行 
  18. //排序,目前不支持全局排序 
  19. SortPartitionOperator<Tuple2<String, Integer>> sort = sum.sortPartition(1, Order.DESCENDING).setParallelism(1); 
  20. sort.writeAsText("./TempResult/result").setParallelism(1); 
  21. env.execute("my-wordcount"); 

三、 Flink 執(zhí)行流程

數(shù)據(jù)源分為有界和無(wú)界之分,有界數(shù)據(jù)源可以編寫批處理程序,無(wú)界數(shù)據(jù)源可以編寫流式程序。DataSet API用于批處理,DataStream API用于流式處理。

批處理使用ExecutionEnvironment和DataSet,流式處理使用StreamingExecutionEnvironment和DataStream。

DataSet和DataStream是Flink中表示數(shù)據(jù)的特殊類,DataSet處理的數(shù)據(jù)是有界的,DataStream處理的數(shù)據(jù)是無(wú)界的,這兩個(gè)類都是不可變的,一旦創(chuàng)建出來(lái)就無(wú)法添加或者刪除數(shù)據(jù)元。

Flink程序的執(zhí)行過程:

  • 獲取flink的執(zhí)行環(huán)境(execution environment)
  • 加載數(shù)據(jù)-- soure
  • 對(duì)加載的數(shù)據(jù)進(jìn)行轉(zhuǎn)換 -- transformation
  • 對(duì)結(jié)果進(jìn)行保存或者打印 --sink
  • 觸發(fā)flink程序的執(zhí)行(execute(),count(),collect(),print()),例如:調(diào)用ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。

四、 Flink standalone集群搭建

Flink可以在Linux和window中運(yùn)行,F(xiàn)link集群需要有一個(gè)Master節(jié)點(diǎn)和一個(gè)或者多個(gè)Worker節(jié)點(diǎn)組成。

安裝Flink集群之前需要準(zhǔn)備:1.每臺(tái)幾點(diǎn)需要配置jdk8環(huán)境變量。2.需要每臺(tái)節(jié)點(diǎn)有ssh服務(wù),且有免密通信。

步驟:

1. 進(jìn)入https://flink.apache.org/downloads.html 下載flink.

下載Flink版本,這里選擇了基于Scala2.11和Hadoop2.6的1.7.1版本.

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

2. 下載好Flink之后上傳到Master(node1)節(jié)點(diǎn)上解壓:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

3. 進(jìn)入../conf/flink-conf.yaml中配置:

  • jobmanager.rpc.address: node1 設(shè)置Master節(jié)點(diǎn)地址
  • jobmanager.heap.size: 1024m 設(shè)置Master使用的最大內(nèi)存,單位是MB
  • taskmanager.heap.size: 1024m 設(shè)置Worker使用的最大內(nèi)存,單位是MB

4. 配置../conf/slaves ,配置Worker節(jié)點(diǎn)列表

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

5. 將配置好的Flink發(fā)送到其他worker節(jié)點(diǎn)(node2,node3)上。

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

6. 啟動(dòng)Flink集群,訪問webui

在Master節(jié)點(diǎn)上,../bin/start-cluster.sh 啟動(dòng)集群。訪問webui:http:node1:8081

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

7. 停止集群:在Master節(jié)點(diǎn)中../bin/stop-cluster.sh

五、 將Flink任務(wù)提交到standalone集群運(yùn)行

將以上FlinkSocketWordCount 案例打包提交到集群中運(yùn)行,無(wú)論在Master節(jié)點(diǎn)還是在Worker節(jié)點(diǎn)提交都可以。

首先需要在node5節(jié)點(diǎn)中啟動(dòng)socket 9999端口:

  1. nc –lk 9999 

提交命令如下:

  1. ./flink run /root/test/MyFlink-1.0-SNAPSHOT-jar-with-dependencies.jar --port 9999 

在node5節(jié)點(diǎn)上輸入數(shù)據(jù)后在webUI中查看日志:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

六、 Flink流處理

1. 讀取Socket數(shù)據(jù)統(tǒng)計(jì)WordCount

  1. public class SocketWindowWordCount { 
  2. public static void main(String[] args) throws Exception { 
  3. StreamExecutionEnvironment env = 
  4. StreamExecutionEnvironment.getExecutionEnvironment(); 
  5. DataStreamSource<String> socketStream = env.socketTextStream("node5", 9999); 
  6. SingleOutputStreamOperator<Tuple2<String, Integer>> pairWords = 
  7. socketStream.flatMap(new Splitter()); 
  8. KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = pairWords.keyBy(0); 
  9. WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowStream = 
  10. keyBy.timeWindow(Time.seconds(5)); 
  11. DataStream<Tuple2<String, Integer>> dataStream = windowStream.sum(1); 
  12. dataStream.print(); 
  13. env.execute("socket wordcount"); 
  14. //Splitter 實(shí)現(xiàn)了 FlatMapFunction ,將輸入的一行數(shù)據(jù)按照空格進(jìn)行切分,返回tuple<word,1> 
  15. public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 
  16. @Override 
  17. public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { 
  18. for (String word: sentence.split(" ")) { 
  19. out.collect(new Tuple2<String, Integer>(word, 1)); 

2. 數(shù)據(jù)源Source

Source 是Flink獲取數(shù)據(jù)的地方。以下source中和批處理的source類似,但是以下源作為dataStream流處理時(shí),是一條條處理,最終得到的不是一個(gè)總結(jié)果,而是每次處理后都會(huì)得到一個(gè)結(jié)果。

  • socketTextStream – 讀取Socket數(shù)據(jù)流
  • readTextFile() -- 逐行讀取文本文件獲取數(shù)據(jù)流,每行都返回字符串。
  • fromCollection() – 從集合中創(chuàng)建數(shù)據(jù)流。
  • fromElements – 從給定的數(shù)據(jù)對(duì)象創(chuàng)建數(shù)據(jù)流,所有數(shù)據(jù)類型要一致。
  • addSource – 添加新的源函數(shù),例如從kafka中讀取數(shù)據(jù),參見讀取kafka數(shù)據(jù)案例。

3. 數(shù)據(jù)寫出 Sink

  • writeAsText() – 以字符串的形式逐行寫入文件,調(diào)用每個(gè)元素的toString()得到寫入的字符串。
  • writeAsCsv() – 將元組寫出以逗號(hào)分隔的csv文件。注意:只能作用到元組數(shù)據(jù)上。
  • print() – 控制臺(tái)直接輸出結(jié)果,調(diào)用對(duì)象的toString()方法得到輸出結(jié)果。
  • addSink() – 自定義接收函數(shù)。例如將結(jié)果保存到kafka中,參見kafka案例。

七、 Flink讀取Socket數(shù)據(jù)WordCount案例

1. 創(chuàng)建maven項(xiàng)目

2. 導(dǎo)入maven依賴

flink1.7.1 使用jdk1.8,scala2.11或者2.12.這里使用的scala2.11.如果只是使用java開發(fā)flink,Scala的版本選擇多少都可以。如果使用Scala開發(fā)那么就必須使用Scala對(duì)應(yīng)的版本。

  1. <properties> 
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
  3. <maven.compiler.source>1.8</maven.compiler.source> 
  4. <maven.compiler.target>1.8</maven.compiler.target> 
  5. <flink.version>1.7.1</flink.version> 
  6. </properties> 
  7.  
  8. <dependency> 
  9. <groupId>org.apache.flink</groupId> 
  10. <artifactId>flink-java</artifactId> 
  11. <version>${flink.version}</version> 
  12. </dependency> 
  13. <dependency> 
  14. <groupId>org.apache.flink</groupId> 
  15. <artifactId>flink-streaming-java_2.11</artifactId> 
  16. <version>${flink.version}</version> 
  17. </dependency> 
  18. <dependency> 
  19. <groupId>org.apache.flink</groupId> 
  20. <artifactId>flink-clients_2.11</artifactId> 
  21. <version>${flink.version}</version> 
  22. </dependency> 
  23. <dependency> 
  24. <groupId>org.apache.flink</groupId> 
  25. <artifactId>flink-connector-wikiedits_2.11</artifactId> 
  26. <version>${flink.version}</version> 
  27. </dependency> 

 

3. 創(chuàng)建StreamExecutionEnvironment 或者ExecutionEnvironment(批處理作業(yè))。用于設(shè)置執(zhí)行參數(shù)并創(chuàng)建從外部系統(tǒng)讀取的源。

代碼如下:

  1. public class FlinkSocketWordCount { 
  2. public static void main(String[] args) throws Exception { 
  3. final int port ; 
  4. try{ 
  5. final ParameterTool params = ParameterTool.fromArgs(args); 
  6. port = params.getInt("port"); 
  7. }catch (Exception e){ 
  8. System.err.println("No port specified. Please run 'FlinkSocketWordCount --port <port>'"); 
  9. return; 
  10. //獲取執(zhí)行環(huán)境 
  11. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  12. //從socket中獲取數(shù)據(jù)。 
  13. DataStreamSource<String> text = env.socketTextStream("node5", port); 
  14. SingleOutputStreamOperator<WordWithCount> wordWithCountInfos = text.flatMap(new FlatMapFunction<String, WordWithCount>() { 
  15. @Override 
  16. public void flatMap(String line, Collector<WordWithCount> collector) throws Exception { 
  17. for (String word : line.split(" ")) { 
  18. collector.collect(new WordWithCount(word, 1L)); 
  19. }); 
  20. //keyBy中所寫的字段必須是類WordWithCount中的字段,WordWithCount中如果重寫構(gòu)造必須寫上無(wú)參構(gòu)造 
  21. KeyedStream<WordWithCount, Tuple> keyedInfos = wordWithCountInfos.keyBy("word"); 
  22. WindowedStream<WordWithCount, Tuple, TimeWindow> windowedInfo = keyedInfos.timeWindow(Time.seconds(5), Time.seconds(1)); 
  23. SingleOutputStreamOperator<WordWithCount> windowCounts = windowedInfo.reduce(new ReduceFunction<WordWithCount>() { 
  24. @Override 
  25. public WordWithCount reduce(WordWithCount w1, WordWithCount w2) throws Exception { 
  26. return new WordWithCount(w1.getWord(), w1.getCount() + w2.getCount()); 
  27. }); 
  28. windowCounts.print(); 
  29. env.execute("Socket Window WordCount"); 
  30. public static class WordWithCount { 
  31. public String word; 
  32. public Long count; 
  33. public WordWithCount() { } 
  34. public WordWithCount(String word, Long count) { 
  35. this.word = word; 
  36. this.count = count; 
  37. public String getWord() { 
  38. return word; 
  39. public void setWord(String word) { 
  40. this.word = word; 
  41. public Long getCount() { 
  42. return count; 
  43. public void setCount(Long count) { 
  44. this.count = count; 
  45. @Override 
  46. public String toString() { 
  47. return word + " : " + count; 

八、 如何指定keys

比如某些算子(join,coGroup,keyBy,groupB y)要求在數(shù)據(jù)元上定義key。另外有些算子操作(reduce,groupReduce,Aggregate,Windows)允許數(shù)據(jù)在處理之前根據(jù)key進(jìn)行分組。在Flink中數(shù)據(jù)模型不是基于Key,Value格式處理的,因此不需將數(shù)據(jù)處理成鍵值對(duì)的格式,key是“虛擬的”,可以人為的來(lái)指定,實(shí)際數(shù)據(jù)處理過程中根據(jù)指定的key來(lái)對(duì)數(shù)據(jù)進(jìn)行分組,DataSet中使用groupBy來(lái)指定key,DataStream中使用keyBy來(lái)指定key。如何指定keys?

1. 使用Tuples來(lái)指定key

定義元組來(lái)指定key可以指定tuple中的第幾個(gè)元素當(dāng)做key,或者指定tuple中的聯(lián)合元素當(dāng)做key。需要使用org.apache.flink.api.java.tuple.TupleXX包下的tuple,最多支持25個(gè)元素且Tuple必須new創(chuàng)建。如果Tuple是嵌套的格式,例如:DataStream

2. 使用Field Expression來(lái)指定key

可以使用Field Expression來(lái)指定key,一般作用的對(duì)象可以是類對(duì)象,或者嵌套的Tuple格式的數(shù)據(jù)。

使用注意點(diǎn):

(1) 對(duì)于類對(duì)象可以使用類中的字段來(lái)指定key。

類對(duì)象定義需要注意:

  • 類的訪問級(jí)別必須是public
  • 必須寫出默認(rèn)的空的構(gòu)造函數(shù)
  • 類中所有的字段必須是public的或者必須有g(shù)etter,setter方法。例如類中有個(gè)字段是foo,那么這個(gè)字段的getter,setter方法為:getFoo() 和 setFoo().
  • Flink必須支持字段的類型。一般類型都支持

(2) 對(duì)于嵌套的Tuple類型的Tuple數(shù)據(jù)可以使用”xx.f0”表示嵌套tuple中第一個(gè)元素,也可以直接使用”xx.0”來(lái)表示第一個(gè)元素,參照案例GroupByUseFieldExpressions。

3. 使用Key Selector Functions來(lái)指定key

使用key Selector這種方式選擇key,非常方便,可以從數(shù)據(jù)類型中指定想要的key.

九、 累加器(Accumulator)和計(jì)數(shù)器(Counter)

Accumulator即累加器,可以在分布式統(tǒng)計(jì)數(shù)據(jù),只有在任務(wù)結(jié)束之后才能獲取累加器的最終結(jié)果。計(jì)數(shù)器是累加器的具體實(shí)現(xiàn),有:IntCounter,LongCounter和DoubleCounter。

累加器注意事項(xiàng):

  • 需要在算子內(nèi)部創(chuàng)建累加器對(duì)象
  • 通常在Rich函數(shù)中的open方法中注冊(cè)累加器,指定累加器的名稱
  • 在當(dāng)前算子內(nèi)任意位置可以使用累加器
  • 必須當(dāng)任務(wù)執(zhí)行結(jié)束后,通過env.execute(xxx)執(zhí)行后的JobExecutionResult對(duì)象獲取累加器的值。

IntCounter舉例:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
  2. DataSource<String> dataSource = env.fromElements("a", "b", "c", "d", "e", "f"); 
  3. MapOperator<String, String> map = dataSource.map(new RichMapFunction<String, String>() { 
  4. //1.創(chuàng)建累加器,在算子中創(chuàng)建累加器對(duì)象 
  5. private IntCounter numLines = new IntCounter(); 
  6. //2.注冊(cè)累加器對(duì)象,通常在Rich函數(shù)的open方法中使用 
  7. // getRuntimeContext().addAccumulator("num-lines", this.numLines);注冊(cè)累加器 
  8. public void open(Configuration parameters) throws Exception { 
  9. getRuntimeContext().addAccumulator("num-lines", this.numLines); 
  10. @Override 
  11. public String map(String s) throws Exception { 
  12. //3.使用累加器 ,可以在任意操作中使用,包括在open或者close方法中 
  13. this.numLines.add(1); 
  14. return s; 
  15. }).setParallelism(8); 
  16. map.writeAsText("./TempResult/result",FileSystem.WriteMode.OVERWRITE); 
  17. JobExecutionResult myJobExecutionResult = env.execute("IntCounterTest"); 
  18. //4.當(dāng)作業(yè)執(zhí)行完成之后,在JobExecutionResult對(duì)象中獲取累加器的值。 
  19. int accumulatorResult = myJobExecutionResult.getAccumulatorResult("num-lines"); 
  20. System.out.println("accumulator value = "+accumulatorResult); 

十、 Flink + kafka 整合使用

1. 在pom.xml中添加Flink Kafka連接器的依賴,如果添加了不要重復(fù)添加

  1. <!-- Flink Kafka連接器的依賴--> 
  2. <dependency> 
  3. <groupId>org.apache.flink</groupId> 
  4. <artifactId>flink-connector-kafka-0.11_2.11</artifactId> 
  5. <version>1.7.1</version> 
  6. </dependency> 

2. 從kafka中讀取數(shù)據(jù)處理,并將結(jié)果打印到控制臺(tái)

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  2. Properties props = new Properties(); 
  3. props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); 
  4. props.setProperty("group.id", "flink-group"); 
  5. /** 
  6. * 第一個(gè)參數(shù)是topic 
  7. * 第二個(gè)參數(shù)是value的反序列化格式 
  8. * 第三個(gè)參數(shù)是kafka配置 
  9. */ 
  10. FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>("FlinkTopic", new SimpleStringSchema(), props); 
  11. DataStreamSource<String> stringDataStreamSource = env.addSource(consumer011); 
  12. SingleOutputStreamOperator<String> flatMap = stringDataStreamSource.flatMap(new FlatMapFunction<String, String>() { 
  13. @Override 
  14. public void flatMap(String s, Collector<String> outCollector) throws Exception { 
  15. String[] ssplit = s.split(" "); 
  16. for (String currentOne : split) { 
  17. outCollector.collect(currentOne); 
  18. }); 
  19. //注意這里的tuple2需要使用org.apache.flink.api.java.tuple.Tuple2 這個(gè)包下的tuple2 
  20. SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() { 
  21. @Override 
  22. public Tuple2<String, Integer> map(String word) throws Exception { 
  23. return new Tuple2<>(word, 1); 
  24. }); 
  25. //keyby 將數(shù)據(jù)根據(jù)key 進(jìn)行分區(qū),保證相同的key分到一起,默認(rèn)是按照hash 分區(qū) 
  26. KeyedStream<Tuple2<String, Integer>, Tuple> keyByResult = map.keyBy(0); 
  27. WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowResult = keyByResult.timeWindow(Time.seconds(5)); 
  28. SingleOutputStreamOperator<Tuple2<String, Integer>> endResult = windowResult.sum(1); 
  29. //sink 直接控制臺(tái)打印 
  30. //執(zhí)行flink程序,設(shè)置任務(wù)名稱。console 控制臺(tái)每行前面的數(shù)字代表當(dāng)前數(shù)據(jù)是哪個(gè)并行線程計(jì)算得到的結(jié)果 
  31. endResult.print(); 
  32. //最后要調(diào)用execute方法啟動(dòng)flink程序 
  33. env.execute("kafka word count"); 

3. 將結(jié)果寫入kafka

  1. //sink 將結(jié)果存入kafka topic中,存入kafka中的是String類型,所有endResult需要做進(jìn)一步的轉(zhuǎn)換 
  2. FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>("node1:9092,node2:9092,node3:9092","FlinkResult",new SimpleStringSchema()); 
  3. //將tuple2格式數(shù)據(jù)轉(zhuǎn)換成String格式 
  4. endResult.map(new MapFunction<Tuple2<String,Integer>, String>() { 
  5. @Override 
  6. public String map(Tuple2<String, Integer> tp2) throws Exception { 
  7. return tp2.f0+"-"+tp2.f1; 
  8. }).addSink(producer); 

4. 將結(jié)果寫入文件

  1. //sink 將結(jié)果存入文件,FileSystem.WriteMode.OVERWRITE 文件目錄存在就覆蓋 
  2. endResult.writeAsText("./result/kafkaresult",FileSystem.WriteMode.OVERWRITE); 
  3. // endResult.writeAsText("./result/kafkaresult",FileSystem.WriteMode.NO_OVERWRITE); 

十一、 Flink + Kafka 整合數(shù)據(jù)一致性保證

1. Flink消費(fèi)kafka數(shù)據(jù)起始o(jì)ffset配置

Flink讀取Kafka數(shù)據(jù)確定開始位置有以下幾種設(shè)置方式:

(1) flinkKafkaConsumer.setStartFromEarliest()

從topic的最早offset位置開始處理數(shù)據(jù),如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略。

(2) flinkKafkaConsumer.setStartFromLatest()

從topic的最新offset位置開始處理數(shù)據(jù),如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略。

(3) flinkKafkaConsumer.setStartFromTimestamp(…)

從指定的時(shí)間戳(毫秒)開始消費(fèi)數(shù)據(jù),Kafka中每個(gè)分區(qū)中數(shù)據(jù)大于等于設(shè)置的時(shí)間戳的數(shù)據(jù)位置將被當(dāng)做開始消費(fèi)的位置。如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略。

(4) flinkKafkaConsumer.setStartFromGroupOffsets()

默認(rèn)的設(shè)置。根據(jù)代碼中設(shè)置的group.id設(shè)置的消費(fèi)者組,去kafka中或者zookeeper中找到對(duì)應(yīng)的消費(fèi)者offset位置消費(fèi)數(shù)據(jù)。如果沒有找到對(duì)應(yīng)的消費(fèi)者組的位置,那么將按照auto.offset.reset設(shè)置的策略讀取offset。

  1. FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>("FlinkTopic", new SimpleStringSchema(), props); 
  2. // consumer011.setStartFromEarliest(); 
  3. // consumer011.setStartFromLatest(); 
  4. // consumer011.setStartFromGroupOffsets(); 
  5. // consumer011.setStartFromTimestamp(111111);  
  6. DataStreamSource<String> dateSource = env.addSource(consumer011); 
  7. dateSource… … 

2. Flink消費(fèi)kafka數(shù)據(jù),消費(fèi)者offset提交配置

Flink提供了消費(fèi)kafka數(shù)據(jù)的offset如何提交給Kafka或者zookeeper(kafka0.8之前)的配置。注意,F(xiàn)link并不依賴提交給Kafka或者zookeeper中的offset來(lái)保證容錯(cuò)。提交的offset只是為了外部來(lái)查詢監(jiān)視kafka數(shù)據(jù)消費(fèi)的情況。

配置offset的提交方式取決于是否為job設(shè)置開啟checkpoint。可以使用env.enableCheckpointing(5000)來(lái)設(shè)置開啟checkpoint。

(1) 關(guān)閉checkpoint:

如何禁用了checkpoint,那么offset位置的提交取決于Flink讀取kafka客戶端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否開啟自動(dòng)提交offset, auto.commit.interval.ms決定自動(dòng)提交offset的周期。

(2) 開啟checkpoint:

如果開啟了checkpoint,那么當(dāng)checkpoint保存狀態(tài)完成后,將checkpoint中保存的offset位置提交到kafka。這樣保證了Kafka中保存的offset和checkpoint中保存的offset一致,可以通過配置setCommitOffsetsOnCheckpoints(boolean)來(lái)配置是否將checkpoint中的offset提交到kafka中(默認(rèn)是true)。如果使用這種方式,那么properties中配置的kafka offset自動(dòng)提交參數(shù)enable.auto.commit和周期提交參數(shù)auto.commit.interval.ms參數(shù)將被忽略。

3. 使用checkpoint + 兩階段提交來(lái)保證僅一次消費(fèi)kafka中的數(shù)據(jù)

當(dāng)談及“exactly-once semantics”僅一次處理數(shù)據(jù)時(shí),指的是每條數(shù)據(jù)只會(huì)影響最終結(jié)果一次。Flink可以保證當(dāng)機(jī)器出現(xiàn)故障或者程序出現(xiàn)錯(cuò)誤時(shí),也沒有重復(fù)的數(shù)據(jù)或者未被處理的數(shù)據(jù)出現(xiàn),實(shí)現(xiàn)僅一次處理的語(yǔ)義。Flink開發(fā)出了checkpointing機(jī)制,這種機(jī)制是在Flink應(yīng)用內(nèi)部實(shí)現(xiàn)僅一次處理數(shù)據(jù)的基礎(chǔ)。

checkpoint中包含:

  • 當(dāng)前應(yīng)用的狀態(tài)
  • 當(dāng)前消費(fèi)流數(shù)據(jù)的位置

在Flink1.4版本之前,F(xiàn)link僅一次處理數(shù)據(jù)只限于Flink應(yīng)用內(nèi)部(可以使用checkpoint機(jī)制實(shí)現(xiàn)僅一次數(shù)據(jù)數(shù)據(jù)語(yǔ)義),當(dāng)Flink處理完的數(shù)據(jù)需要寫入外部系統(tǒng)時(shí),不保證僅一次處理數(shù)據(jù)。為了提供端到端的僅一次處理數(shù)據(jù),在將數(shù)據(jù)寫入外部系統(tǒng)時(shí)也要保證僅一次處理數(shù)據(jù),這些外部系統(tǒng)必須提供一種手段來(lái)允許程序提交或者回滾寫入操作,同時(shí)還要保證與Flink的checkpoint機(jī)制協(xié)調(diào)使用。

在分布式系統(tǒng)中協(xié)調(diào)提交和回滾的常見方法就是兩階段提交協(xié)議。下面給出一個(gè)實(shí)例了解Flink如何使用兩階段提交協(xié)議來(lái)實(shí)現(xiàn)數(shù)據(jù)僅一次處理語(yǔ)義。

該實(shí)例是從kafka中讀取數(shù)據(jù),經(jīng)過處理數(shù)據(jù)之后將結(jié)果再寫回kafka。kafka0.11版本之后支持事務(wù),這也是Flink與kafka交互時(shí)僅一次處理的必要條件。【注意:當(dāng)Flink處理完的數(shù)據(jù)寫入kafka時(shí),即當(dāng)sink為kafka時(shí),自動(dòng)封裝了兩階段提交協(xié)議】。Flink支持僅一次處理數(shù)據(jù)不僅僅限于和Kafka的結(jié)合,只要sink提供了必要的兩階段協(xié)調(diào)實(shí)現(xiàn),可以對(duì)任何sink都能實(shí)現(xiàn)僅一次處理數(shù)據(jù)語(yǔ)義。

其原理如下:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

上圖Flink程序包含以下組件:

  • 一個(gè)從kafka中讀取數(shù)據(jù)的source
  • 一個(gè)窗口聚合操作
  • 一個(gè)將結(jié)果寫往kafka的sink。

要使sink支持僅一次處理數(shù)據(jù)語(yǔ)義,必須以事務(wù)的方式將數(shù)據(jù)寫往kafka,將兩次checkpoint之間的操作當(dāng)做一個(gè)事務(wù)提交,確保出現(xiàn)故障時(shí)操作能夠被回滾。假設(shè)出現(xiàn)故障,在分布式多并發(fā)執(zhí)行sink的應(yīng)用程序中,僅僅執(zhí)行單次提交或回滾事務(wù)是不夠的,因?yàn)榉植际街械母鱾€(gè)sink程序都必須對(duì)這些提交或者回滾達(dá)成共識(shí),這樣才能保證兩次checkpoint之間的數(shù)據(jù)得到一個(gè)一致性的結(jié)果。Flink使用兩階段提交協(xié)議(pre-commit+commit)來(lái)實(shí)現(xiàn)這個(gè)問題。

Filnk checkpointing開始時(shí)就進(jìn)入到pre-commit階段,具體來(lái)說,一旦checkpoint開始,F(xiàn)link的JobManager向輸入流中寫入一個(gè)checkpoint barrier將流中所有消息分隔成屬于本次checkpoint的消息以及屬于下次checkpoint的消息,barrier也會(huì)在操作算子間流轉(zhuǎn),對(duì)于每個(gè)operator來(lái)說,該barrier會(huì)觸發(fā)operator的State Backend來(lái)為當(dāng)前的operator來(lái)打快照。如下圖示:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

Flink DataSource中存儲(chǔ)著Kafka消費(fèi)的offset,當(dāng)完成快照保存后,將chechkpoint barrier傳遞給下一個(gè)operator。這種方式只有在Flink內(nèi)部狀態(tài)的場(chǎng)景是可行的,內(nèi)部狀態(tài)指的是由Flink的State Backend管理狀態(tài),例如上面的window的狀態(tài)就是內(nèi)部狀態(tài)管理。只有當(dāng)內(nèi)部狀態(tài)時(shí),pre-commit階段無(wú)需執(zhí)行額外的操作,僅僅是寫入一些定義好的狀態(tài)變量即可,checkpoint成功時(shí)Flink負(fù)責(zé)提交這些狀態(tài)寫入,否則就不寫入當(dāng)前狀態(tài)。

但是,一旦operator操作包含外部狀態(tài),事情就不一樣了。我們不能像處理內(nèi)部狀態(tài)一樣處理外部狀態(tài),因?yàn)橥獠繝顟B(tài)涉及到與外部系統(tǒng)的交互。這種情況下,外部系統(tǒng)必須要支持可以與兩階段提交協(xié)議綁定的事務(wù)才能保證僅一次處理數(shù)據(jù)。

本例中的data sink是將數(shù)據(jù)寫往kafka,因?yàn)閷懲鵮afka是有外部狀態(tài)的,這種情況下,pre-commit階段下data sink 在保存狀態(tài)到State Backend的同時(shí),還必須pre-commit外部的事務(wù)。如下圖:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

當(dāng)checkpoint barrier在所有的operator都傳遞一遍切對(duì)應(yīng)的快照都成功完成之后,pre-commit階段才算完成。這個(gè)過程中所有創(chuàng)建的快照都被視為checkpoint的一部分,checkpoint中保存著整個(gè)應(yīng)用的全局狀態(tài),當(dāng)然也包含pre-commit階段提交的外部狀態(tài)。當(dāng)程序出現(xiàn)崩潰時(shí),我們可以回滾狀態(tài)到最新已經(jīng)完成快照的時(shí)間點(diǎn)。

下一步就是通知所有的operator,告訴它們checkpoint已經(jīng)完成,這便是兩階段提交的第二個(gè)階段:commit階段。這個(gè)階段中JobManager會(huì)為應(yīng)用中的每個(gè)operator發(fā)起checkpoint已經(jīng)完成的回調(diào)邏輯。本例中,DataSource和Winow操作都沒有外部狀態(tài),因此在該階段,這兩個(gè)operator無(wú)需執(zhí)行任何邏輯,但是Data Sink是有外部狀態(tài)的,因此此時(shí)我們需要提交外部事務(wù)。如下圖示:

從來(lái)沒有一個(gè)人能把Flink講的這么透徹,小編的出現(xiàn)算是一個(gè)意外

匯總以上信息,總結(jié)得出:

(1) 一旦所有的operator完成各自的pre-commit,他們會(huì)發(fā)起一個(gè)commit操作。

(2) 如果一個(gè)operator的pre-commit失敗,所有其他的operator 的pre-commit必須被終止,并且Flink會(huì)回滾到最近成功完成的checkpoint位置。

(3) 一旦pre-commit完成,必須要確保commit也要成功,內(nèi)部的operator和外部的系統(tǒng)都要對(duì)此進(jìn)行保證。假設(shè)commit失敗【網(wǎng)絡(luò)故障原因】,F(xiàn)link程序就會(huì)崩潰,然后根據(jù)用戶重啟策略執(zhí)行重啟邏輯,重啟之后會(huì)再次commit。

因此,所有的operator必須對(duì)checkpoint最終結(jié)果達(dá)成共識(shí),即所有的operator都必須認(rèn)定數(shù)據(jù)提交要么成功執(zhí)行,要么被終止然后回滾。

(4) Flink中外部狀態(tài)實(shí)現(xiàn)兩階段提交

Flink外部狀態(tài)實(shí)現(xiàn)兩階段提交將邏輯封裝到TwoPhaseComitSinkFunction類中,下面擴(kuò)展TwoPhaseCommitSinkFunction來(lái)實(shí)現(xiàn)就文件的sink。若要實(shí)現(xiàn)支持exactly-once語(yǔ)義的文件sink,需要實(shí)現(xiàn)以下4個(gè)方法:

  • beginTransaction:開啟一個(gè)事務(wù),創(chuàng)建一個(gè)臨時(shí)文件,將數(shù)據(jù)寫入到臨時(shí)文件中
  • preCommit:在pre-commit階段,flush緩存數(shù)據(jù)到磁盤,然后關(guān)閉這個(gè)文件,確保不會(huì)有新的數(shù)據(jù)寫入到這個(gè)文件,同時(shí)開啟一個(gè)新事務(wù)執(zhí)行屬于下一個(gè)checkpoint的寫入操作
  • commit:在commit階段,我們以原子性的方式將上一階段的文件寫入真正的文件目錄下?!咀⒁猓簲?shù)據(jù)有延時(shí),不是實(shí)時(shí)的】
  • abort:一旦異常終止事務(wù),程序如何處理。這里要清除臨時(shí)文件。

 

責(zé)任編輯:趙寧寧 來(lái)源: 架構(gòu)師之巔
相關(guān)推薦

2025-03-13 10:31:20

DeepSeek開源EPLB

2015-06-12 15:29:06

一個(gè)人的爆品

2024-11-14 14:30:00

模型結(jié)構(gòu)AI

2013-08-14 10:23:22

創(chuàng)業(yè)個(gè)人創(chuàng)業(yè)互聯(lián)網(wǎng)創(chuàng)業(yè)

2011-07-06 14:29:49

中國(guó)移動(dòng)王建宙4G

2013-03-08 02:52:03

個(gè)人開發(fā)項(xiàng)目糾錯(cuò)

2011-06-16 14:21:43

習(xí)慣管理

2013-06-07 10:42:53

2025-03-07 09:18:10

2009-02-26 10:19:56

2017-07-13 12:33:15

戴爾

2014-05-29 10:43:29

斯諾登棱鏡監(jiān)聽

2022-11-10 09:28:40

框架開發(fā)

2014-08-08 15:34:53

安全漏洞漏洞防護(hù)安全防守

2009-09-27 16:04:49

CCIE資格

2012-05-29 09:22:50

游戲設(shè)計(jì)開發(fā)

2019-12-17 18:25:35

物聯(lián)網(wǎng)電腦互聯(lián)網(wǎng)

2009-03-20 09:12:56

阿里巴巴衛(wèi)哲馬云

2025-02-07 09:34:12

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)