偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

非常簡(jiǎn)單的SpringCloudStream集成Kafka教程!

開(kāi)發(fā) 架構(gòu)
本章初步介紹了Spring Cloud Stream 集成Kafka的簡(jiǎn)單示例,實(shí)現(xiàn)了簡(jiǎn)單的發(fā)布-訂閱功能。但是Spring Cloud Stream肯定還有更多的功能,我們后續(xù)還將繼續(xù)深入學(xué)習(xí)更多Stream的功能。

哈嘍,大家好,我是指北君。

開(kāi)發(fā)中,服務(wù)與服務(wù)之間通信通常會(huì)用到消息中間件,如果我們使用了某一個(gè)MQ,那么消息中間件與我們的系統(tǒng)算是高耦合。將來(lái)有一天,要替換成另外的MQ,我們的改動(dòng)就會(huì)比較大。為了解決這個(gè)問(wèn)題,我們可以使用Spring Cloud Stream 來(lái)整合我們的消息中間件,降低耦合度,使服務(wù)可以更多關(guān)注自己的業(yè)務(wù)邏輯等。

今天為大家?guī)?lái)一個(gè)人人可實(shí)操的SpringCloudStream集成Kafka的快速入門(mén)示例。

1.前言

SpringCloudStream是一個(gè)構(gòu)建高擴(kuò)展性的事件消息驅(qū)動(dòng)的微服務(wù)框架。簡(jiǎn)單點(diǎn)說(shuō)就是幫助你操作MQ,可以與底層MQ框架解耦。將來(lái)想要替換MQ框架的時(shí)候會(huì)比較容易。

Kafka是一個(gè)分布式發(fā)布 - 訂閱消息系統(tǒng),源于LinkedIn的一個(gè)項(xiàng)目,2011年成為開(kāi)源Apache項(xiàng)目。

ZooKeeper 是 Apache 軟件基金會(huì)的一個(gè)軟件項(xiàng)目,它為大型分布式計(jì)算提供開(kāi)源的分布式配置服務(wù)、同步服務(wù)和命名注冊(cè),Kafka的實(shí)現(xiàn)同時(shí)也依賴(lài)于zookeeper。

2.Windows搭建簡(jiǎn)單的Kafka

2.1 啟動(dòng)zookeeper

使用Kafka首先需要啟動(dòng)zookeeper,windows中搭建zookeeper也很簡(jiǎn)單。以下幾步即可完成:

  • 下載zookeeper (本文使用3.7.0版本,下載鏈接在文章末尾。)
  • 配置基本環(huán)境變量:

將conf文件夾下面的 zoo_sample.cfg 重命名zoo.cfg。并修改其工作目錄dataDir。

bin文件夾下面有zkEnv.cmd有zookeeper相關(guān)的配置,其中就包括JAVA_HOME,所以系統(tǒng)環(huán)境變量需要配置JAVA_HOME,或者直接用Java的路徑來(lái)替換。

  • 啟動(dòng),在bin目錄下運(yùn)行zkServer.cmd腳本啟動(dòng)zookeeper。

默認(rèn)啟動(dòng)端口2181為。

正常啟動(dòng)如下:

2.2 搭建Kafka

本地使用kafka同樣也是如下的幾個(gè)步驟:

  • 下載Kafka(本文使用2.11版本,下載鏈接見(jiàn)文章末尾)。
  • 環(huán)境變量配置:

查看config文件下面的 server.properties配置文件中的zookeeper的配置。

zookeeper.connect=localhost:2181

在bin/windows文件夾下面kafka-run-class.bat文件中有JAVA_HOME的配置,同樣也可以直接改成系統(tǒng)的Java路徑。

  • 在kafka根目錄下使用如下命令啟動(dòng)kafka,并在zookeeper中注冊(cè)。
# .\bin\windows\kafka-server-start.bat .\config\server.properties
  • 創(chuàng)建topic,在bin\windows目錄下使用如下命令。創(chuàng)建名稱(chēng)為“test”的topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic test
  • 使用windows命令窗口的producer和consumer,在bin\windows目錄下使用如下命令
#test topic的消息生產(chǎn)者
kafka-console-producer.bat --broker-list localhost:9092 --topic test
#test topic的消息消費(fèi)者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
#test topic的消息消費(fèi)者(從頭消費(fèi))
kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic

kafka啟動(dòng)windows界面如下:

3 SpringCloudStream集成Kafka

3.1 引入依賴(lài)

由于我們直接使用Spring Cloud Stream 集成Kafka,官方也已經(jīng)有現(xiàn)成的starter。

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>

3.2 關(guān)于kafka的配置

spring:
application:
name: shop-server
cloud:
stream:
bindings:
#配置自己定義的通道與哪個(gè)中間件交互
input: #MessageChannel里Input和Output的值
destination: test #目標(biāo)主題 相當(dāng)于kafka的topic
output:
destination: test1 #本例子創(chuàng)建了另外一個(gè)topic (test1)用于區(qū)分不同的功能區(qū)分。
default-binder: kafka #默認(rèn)的binder是kafka
kafka:
binder:
zk-nodes: localhost:2181
bootstrap-servers: localhost:9092 #kafka服務(wù)地址,集群部署的時(shí)候需要配置多個(gè),
consumer:
group-id: consumer1
producer:
key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
client-id: producer1
server:
port: 8100

3.3 消費(fèi)者示例

首先需要定義SubscribableChannel 接口方法使用Input注解。

public interface Sink {
String INPUT = "input";

@Input("input")
SubscribableChannel input();
}

然后簡(jiǎn)單的使用 StreamListener 監(jiān)聽(tīng)某一通道的消息。

@Service
@EnableBinding(Sink.class)
public class MessageSinkHandler {

@StreamListener(Sink.INPUT)
public void handler(Message<String> msg){
System.out.println(" received message : "+msg);

}
}

cloud stream配置中綁定了對(duì)應(yīng)的Kafka topic,如下:

cloud:
stream:
bindings:
#配置自己定義的通道與哪個(gè)中間件交互
input: #SubscribableChannel里Input值
destination: test #目標(biāo)主題

我們使用Kafka console producer 生產(chǎn)消息。

kafka-console-producer.bat --broker-list localhost:9092 --topic test

同時(shí)啟動(dòng)我們的示例SpringBoot項(xiàng)目,使用producer推送幾條消息。

我們同時(shí)啟動(dòng)一個(gè)Kafka console consumer。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

消費(fèi)結(jié)果如下:

Spring Boot 項(xiàng)目消費(fèi)消息如下:

3.4 生產(chǎn)者示例

首先需要定義生產(chǎn)者M(jìn)essageChannel,這里會(huì)用到Output注解。

public interface KafkaSource {
String OUTPUT = "output";

@Output(KafkaSource.OUTPUT)
MessageChannel output();
}

使用MessageChannel 發(fā)送消息。

@Component
public class MessageService {

@Autowired
private KafkaSource source;

public Object sendMessage(Object msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
return msg;
}

定義一個(gè)Rest API 來(lái)觸發(fā)消息發(fā)送。

@RestController
public class MessageController {

@Autowired
private MessageService messageService;

@GetMapping(value = "/sendMessage/{msg}")
public String sendMessage(@PathVariable("msg") String msg){
messageService.sendMessage("messageService send out : " + msg + LocalDateTime.now());
return "sent message";
}
}

配置中關(guān)于producer的配置如下:

cloud:
stream:
bindings:
input:
destination: test
output:
destination: test1 #目標(biāo)topic

啟動(dòng)SpringBoot App, 并觸發(fā)如下API call。

??http://localhost:8100/sendMessage/JavaNorthProducer??

我們同時(shí)啟動(dòng)一個(gè)Kafka console consumer,這里我們使用另一個(gè)test1 topic

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1

console consumer消費(fèi)消息如下:

總結(jié)

本章初步介紹了Spring Cloud Stream 集成Kafka的簡(jiǎn)單示例,實(shí)現(xiàn)了簡(jiǎn)單的發(fā)布-訂閱功能。但是Spring Cloud Stream肯定還有更多的功能,我們后續(xù)還將繼續(xù)深入學(xué)習(xí)更多Stream的功能。

以上示例倉(cāng)庫(kù):https://github.com/javatechnorth/java-study-note/tree/master/kafka

下載鏈接:

??https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz??

??https://kafka.apache.org/downloads??

責(zé)任編輯:武曉燕 來(lái)源: Java技術(shù)指北
相關(guān)推薦

2018-08-02 15:13:35

2021-12-01 12:30:43

NiceUmiJS前端

2020-09-29 15:08:47

Go UI框架開(kāi)發(fā)

2010-02-05 13:56:56

Ubuntu Linu

2010-03-11 16:22:08

Python教程

2009-06-24 10:58:21

jQuery插件教程

2014-07-17 11:36:27

Android Stu使用教程

2019-12-03 11:00:08

spring bootspring-kafkJava

2014-04-24 13:35:11

OpenGL ES2.iOSAndroid

2011-05-11 15:10:21

jQueryCSS導(dǎo)航欄

2019-05-27 17:01:02

PHPPDO編程語(yǔ)言

2011-07-07 09:01:52

HTML 5

2024-08-05 08:45:35

SpringKafkaSCRAM

2010-07-06 11:09:52

Server 2008

2024-10-31 11:49:41

Kafka管理死信隊(duì)列

2009-09-29 10:40:12

政府應(yīng)急指揮平臺(tái)

2011-08-30 15:32:08

QtQuickQML

2020-02-21 17:33:17

SparkKafka數(shù)據(jù)

2009-07-06 14:43:30

JSP元素

2023-01-11 15:11:36

SpringEhcache
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)