Springboot整合Kafka Stream實(shí)時(shí)統(tǒng)計(jì)數(shù)據(jù)
環(huán)境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2
Kafka Stream介紹
Kafka在0.10版本推出了Stream API,提供了對(duì)存儲(chǔ)在Kafka內(nèi)的數(shù)據(jù)進(jìn)行流式處理和分析的能力。
流式計(jì)算一般被用來(lái)和批量計(jì)算做比較。批量計(jì)算往往有一個(gè)固定的數(shù)據(jù)集作為輸入并計(jì)算結(jié)果。而流式計(jì)算的輸入往往是“無(wú)界”的(Unbounded Data),持續(xù)輸入的,即永遠(yuǎn)拿不到全量數(shù)據(jù)去做計(jì)算;同時(shí),計(jì)算結(jié)果也是持續(xù)輸出的,只能拿到某一個(gè)時(shí)刻的結(jié)果,而不是最終的結(jié)果。
Kafka Streams是一個(gè)客戶端類庫(kù),用于處理和分析存儲(chǔ)在Kafka中的數(shù)據(jù)。它建立在流式處理的一些重要的概念之上:如何區(qū)分事件時(shí)間和處理時(shí)間、Windowing的支持、簡(jiǎn)單高效的管理和實(shí)時(shí)查詢應(yīng)用程序狀態(tài)。
Kafka Streams的門檻非常低:和編寫一個(gè)普通的Kafka消息處理程序沒(méi)有太大的差異,可以通過(guò)多進(jìn)程部署來(lái)完成擴(kuò)容、負(fù)載均衡、高可用(Kafka Consumer的并行模型)。
Kafka Streams的一些特點(diǎn):
- 被設(shè)計(jì)成一個(gè)簡(jiǎn)單的、輕量級(jí)的客戶端類庫(kù),能夠被集成到任何Java應(yīng)用中
 - 除了Kafka之外沒(méi)有任何額外的依賴,利用Kafka的分區(qū)模型支持水平擴(kuò)容和保證順序性
 - 通過(guò)可容錯(cuò)的狀態(tài)存儲(chǔ)實(shí)現(xiàn)高效的狀態(tài)操作(windowed joins and aggregations)
 - 支持exactly-once語(yǔ)義
 - 支持紀(jì)錄級(jí)的處理,實(shí)現(xiàn)毫秒級(jí)的延遲
 - 提供High-Level的Stream DSL和Low-Level的Processor API
 
Stream Processing Topology流處理拓?fù)?/h3>
    - 流是Kafka  Streams提供的最重要的抽象:它表示一個(gè)無(wú)限的、不斷更新的數(shù)據(jù)集。流是不可變數(shù)據(jù)記錄的有序、可重放和容錯(cuò)序列,其中數(shù)據(jù)記錄定義為鍵值對(duì)。
 
    - Stream Processing Application是使用了Kafka Streams庫(kù)的應(yīng)用程序。它通過(guò)processor  topologies定義計(jì)算邏輯,其中每個(gè)processor topology都是多個(gè)stream processor(節(jié)點(diǎn))通過(guò)stream組成的圖。
 
    - A stream processor  是處理器拓?fù)渲械墓?jié)點(diǎn);它表示一個(gè)處理步驟,通過(guò)每次從拓?fù)渲械纳嫌翁幚砥鹘邮找粋€(gè)輸入記錄,將其操作應(yīng)用于該記錄,來(lái)轉(zhuǎn)換流中的數(shù)據(jù),并且隨后可以向其下游處理器生成一個(gè)或多個(gè)輸出記錄。
 
有兩種特殊的processor:
Source Processor 源處理器是一種特殊類型的流處理器,它沒(méi)有任何上游處理器。它通過(guò)使用來(lái)自一個(gè)或多個(gè)kafka topic的記錄并將其轉(zhuǎn)發(fā)到其下游處理器,從而從一個(gè)或多個(gè)kafka topic生成其拓?fù)涞妮斎肓鳌?/p>
Sink Processor 接收器處理器是一種特殊類型的流處理器,沒(méi)有下游處理器。它將從其上游處理器接收到的任何記錄發(fā)送到指定的kafka topic。

相關(guān)的核心概念查看如下鏈接
下面演示Kafka Stream 在Springboot中的應(yīng)用
依賴
- <dependency>
 - <groupId>org.springframework.boot</groupId>
 - <artifactId>spring-boot-starter-web</artifactId>
 - </dependency>
 - <dependency>
 - <groupId>org.springframework.kafka</groupId>
 - <artifactId>spring-kafka</artifactId>
 - </dependency>
 - <dependency>
 - <groupId>org.apache.kafka</groupId>
 - <artifactId>kafka-streams</artifactId>
 - </dependency>
 
配置
- server:
 - port: 9090
 - spring:
 - application:
 - name: kafka-demo
 - kafka:
 - streams:
 - application-id: ${spring.application.name}
 - properties:
 - spring.json.trusted.packages: '*'
 - bootstrap-servers:
 - - localhost:9092
 - - localhost:9093
 - - localhost:9094
 - producer:
 - acks: 1
 - retries: 10
 - key-serializer: org.apache.kafka.common.serialization.StringSerializer
 - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer
 - properties:
 - spring.json.trusted.packages: '*'
 - consumer:
 - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer
 - enable-auto-commit: false
 - group-id: ConsumerTest
 - auto-offset-reset: latest
 - properties:
 - session.timeout.ms: 12000
 - heartbeat.interval.ms: 3000
 - max.poll.records: 100
 - spring.json.trusted.packages: '*'
 - listener:
 - ack-mode: manual-immediate
 - type: batch
 - concurrency: 8
 - properties:
 - max.poll.interval.ms: 300000
 
消息發(fā)送
- @Service
 - public class MessageSend {
 - @Resource
 - private KafkaTemplate<String, Message> kafkaTemplate ;
 - public void sendMessage2(Message message) {
 - kafkaTemplate.send(new ProducerRecord<String, Message>("test", message)).addCallback(result -> {
 - System.out.println("執(zhí)行成功..." + Thread.currentThread().getName()) ;
 - }, ex -> {
 - System.out.println("執(zhí)行失敗") ;
 - ex.printStackTrace() ;
 - }) ;
 - }
 - }
 
消息監(jiān)聽
- @KafkaListener(topics = {"test"})
 - public void listener2(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {
 - for (ConsumerRecord<String, Message> record : records) {
 - System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ;
 - }
 - try {
 - TimeUnit.SECONDS.sleep(0) ;
 - } catch (InterruptedException e) {
 - e.printStackTrace();
 - }
 - ack.acknowledge() ;
 - }
 - @KafkaListener(topics = {"demo"})
 - public void listenerDemo(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {
 - for (ConsumerRecord<String, Message> record : records) {
 - System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ;
 - }
 - ack.acknowledge() ;
 - }
 
Kafka Stream處理
消息轉(zhuǎn)換并轉(zhuǎn)發(fā)其它Topic
- @Bean
 - public KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {
 - KStream<Object, Object> stream = streamsBuilder.stream("test");
 - stream.map((key, value) -> {
 - System.out.println("原始消息內(nèi)容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ;
 - return new KeyValue<>(key, "{\"title\": \"123123\", \"message\": \"重新定義內(nèi)容\"}".getBytes(Charset.forName("UTF-8"))) ;
 - }).to("demo") ;
 - return stream;
 - }
 
執(zhí)行結(jié)果:

Stream對(duì)象處理
- @Bean
 - public KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) {
 - JsonSerde<Message> jsonSerde = new JsonSerde<>() ;
 - JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;
 - descri.addTrustedPackages("*") ;
 - KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
 - stream.map((key, value) -> {
 - value.setTitle("XXXXXXX") ;
 - return new KeyValue<>(key, value) ;
 - }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ;
 - return stream;
 - }
 
執(zhí)行結(jié)果:

分組處理
- @Bean
 - public KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) {
 - JsonSerde<Message> jsonSerde = new JsonSerde<>() ;
 - JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;
 - descri.addTrustedPackages("*") ;
 - KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
 - stream.selectKey(new KeyValueMapper<String, Message, String>() {
 - @Override
 - public String apply(String key, Message value) {
 - return value.getOrgCode() ;
 - }
 - })
 - .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
 - .count()
 - .toStream().print(Printed.toSysOut());
 - return stream;
 - }
 
執(zhí)行結(jié)果:

聚合
- @Bean
 - public KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) {
 - JsonSerde<Message> jsonSerde = new JsonSerde<>() ;
 - JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;
 - descri.addTrustedPackages("*") ;
 - KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
 - stream.selectKey(new KeyValueMapper<String, Message, String>() {
 - @Override
 - public String apply(String key, Message value) {
 - return value.getOrgCode() ;
 - }
 - })
 - .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
 - .aggregate(() -> 0L, (key, value ,aggValue) -> {
 - System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ;
 - return aggValue + 1 ;
 - }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
 - .toStream().print(Printed.toSysOut());
 - return stream;
 - }
 
執(zhí)行結(jié)果:

Filter過(guò)濾數(shù)據(jù)
- @Bean
 - public KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) {
 - JsonSerde<Message> jsonSerde = new JsonSerde<>() ;
 - JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;
 - descri.addTrustedPackages("*") ;
 - KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
 - stream.selectKey(new KeyValueMapper<String, Message, String>() {
 - @Override
 - public String apply(String key, Message value) {
 - return value.getOrgCode() ;
 - }
 - })
 - .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
 - .aggregate(() -> 0L, (key, value ,aggValue) -> {
 - System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ;
 - return aggValue + 1 ;
 - }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
 - .toStream()
 - .filter((key, value) -> !"2".equals(key))
 - .print(Printed.toSysOut());
 - return stream;
 - }
 
執(zhí)行結(jié)果:

過(guò)濾Key不等于"2"
分支多流處理
- @Bean
 - public KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) {
 - JsonSerde<Message> jsonSerde = new JsonSerde<>() ;
 - JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;
 - descri.addTrustedPackages("*") ;
 - KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
 - // 分支,多流處理
 - KStream<String, Message>[] arrStream = stream.branch(
 - (key, value) -> "男".equals(value.getSex()),
 - (key, value) -> "女".equals(value.getSex()));
 - Stream.of(arrStream).forEach(as -> {
 - as.foreach((key, message) -> {
 - System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ;
 - });
 - });
 - return stream;
 - }
 
執(zhí)行結(jié)果:

多字段分組
不能使用多個(gè)selectKey,后面的會(huì)覆蓋前面的
- @Bean
 - public KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) {
 - JsonSerde<Message> jsonSerde = new JsonSerde<>() ;
 - JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;
 - descri.addTrustedPackages("*") ;
 - KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
 - stream
 - .selectKey(new KeyValueMapper<String, Message, String>() {
 - @Override
 - public String apply(String key, Message value) {
 - System.out.println(Thread.currentThread().getName()) ;
 - return value.getTime() + " | " + value.getOrgCode() ;
 - }
 - })
 - .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
 - .count()
 - .toStream().print(Printed.toSysOut());
 - return stream;
 - }
 
執(zhí)行結(jié)果:
















 
 
 











 
 
 
 