Spring Cloud構(gòu)建微服務(wù)架構(gòu):消息驅(qū)動(dòng)的微服務(wù)(入門)【Dalston版】
之前在寫Spring Boot基礎(chǔ)教程的時(shí)候?qū)戇^一篇《Spring Boot中使用RabbitMQ》。在該文中,我們通過簡單的配置和注解就能實(shí)現(xiàn)向RabbitMQ中生產(chǎn)和消費(fèi)消息。實(shí)際上我們使用的對(duì)RabbitMQ的starter就是通過Spring Cloud Stream中對(duì)RabbitMQ的支持來實(shí)現(xiàn)的。下面我們就通過本文來了解一下Spring Cloud Stream。
Spring Cloud Stream是一個(gè)用來為微服務(wù)應(yīng)用構(gòu)建消息驅(qū)動(dòng)能力的框架。它可以基于Spring Boot來創(chuàng)建獨(dú)立的、可用于生產(chǎn)的Spring應(yīng)用程序。它通過使用Spring Integration來連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)的微服務(wù)應(yīng)用。Spring Cloud Stream為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn),并且引入了發(fā)布-訂閱、消費(fèi)組以及消息分區(qū)這三個(gè)核心概念。簡單的說,Spring Cloud Stream本質(zhì)上就是整合了Spring Boot和Spring Integration,實(shí)現(xiàn)了一套輕量級(jí)的消息驅(qū)動(dòng)的微服務(wù)框架。通過使用Spring Cloud Stream,可以有效地簡化開發(fā)人員對(duì)消息中間件的使用復(fù)雜度,讓系統(tǒng)開發(fā)人員可以有更多的精力關(guān)注于核心業(yè)務(wù)邏輯的處理。由于Spring Cloud Stream基于Spring Boot實(shí)現(xiàn),所以它秉承了Spring Boot的優(yōu)點(diǎn),實(shí)現(xiàn)了自動(dòng)化配置的功能幫忙我們可以快速的上手使用,但是目前為止Spring Cloud Stream只支持下面兩個(gè)著名的消息中間件的自動(dòng)化配置:
- RabbitMQ
- Kafka
快速入門
下面我們通過構(gòu)建一個(gè)簡單的示例來對(duì)Spring Cloud Stream有一個(gè)初步認(rèn)識(shí)。該示例主要目標(biāo)是構(gòu)建一個(gè)基于Spring Boot的微服務(wù)應(yīng)用,這個(gè)微服務(wù)應(yīng)用將通過使用消息中間件RabbitMQ來接收消息并將消息打印到日志中。所以,在進(jìn)行下面步驟之前請(qǐng)先確認(rèn)已經(jīng)在本地安裝了RabbitMQ,具體安裝步驟請(qǐng)參考此文。
構(gòu)建一個(gè)Spring Cloud Stream消費(fèi)者
- 創(chuàng)建一個(gè)基礎(chǔ)的Spring Boot工程,命名為:stream-hello
- 編輯pom.xml中的依賴關(guān)系,引入Spring Cloud Stream對(duì)RabbitMQ的支持,具體如下:
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>1.5.9.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
- </dependency>
- </dependencies>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>Dalston.SR4</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- 創(chuàng)建用于接收來自RabbitMQ消息的消費(fèi)者SinkReceiver,具體如下:
- @EnableBinding(Sink.class)
- public class SinkReceiver {
- private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);
- @StreamListener(Sink.INPUT)
- public void receive(Object payload) {
- logger.info("Received: " + payload);
- }
- }
- 創(chuàng)建應(yīng)用主類,這里同其他Spring Boot一樣,沒有什么特別之處,具體如下:
- @SpringBootApplication
- public class SinkApplication {
- public static void main(String[] args) {
- SpringApplication.run(SinkApplication.class, args);
- }
- }
到這里,我們快速入門示例的編碼任務(wù)就已經(jīng)完成了。下面我們分別啟動(dòng)RabbitMQ以及該Spring Boot應(yīng)用,然后做下面的試驗(yàn),看看它們是如何運(yùn)作的。
手工測(cè)試驗(yàn)證
- 我們先來看一下Spring Boot應(yīng)用的啟動(dòng)日志。
- ...
- INFO 16272 --- [main] o.s.c.s.b.r.RabbitMessageChannelBinder : declaring queue for inbound: input.anonymous.Y8VsFILmSC27eS5StsXp6A, bound to: input
- INFO 16272 --- [main] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@3c78e551 [delegate=amqp://guest@127.0.0.1:5672/]
- INFO 16272 --- [main] o.s.integration.channel.DirectChannel : Channel 'input.anonymous.Y8VsFILmSC27eS5StsXp6A.bridge' has 1 subscriber(s).
- INFO 16272 --- [main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.Y8VsFILmSC27eS5StsXp6A
- ...
從上面的日志內(nèi)容中,我們可以獲得以下信息:
- 使用guest用戶創(chuàng)建了一個(gè)指向127.0.0.1:5672位置的RabbitMQ連接,在RabbitMQ的控制臺(tái)中我們也可以發(fā)現(xiàn)它。
- 聲明了一個(gè)名為input.anonymous.Y8VsFILmSC27eS5StsXp6A的隊(duì)列,并通過RabbitMessageChannelBinder將自己綁定為它的消費(fèi)者。這些信息我們也能在RabbitMQ的控制臺(tái)中發(fā)現(xiàn)它們。
下面我們可以在RabbitMQ的控制臺(tái)中進(jìn)入input.anonymous.Y8VsFILmSC27eS5StsXp6A隊(duì)列的管理頁面,通過Publish Message功能來發(fā)送一條消息到該隊(duì)列中。
此時(shí),我們可以在當(dāng)前啟動(dòng)的Spring Boot應(yīng)用程序的控制臺(tái)中看到下面的內(nèi)容:
- INFO 16272 --- [C27eS5StsXp6A-1] com.didispace.HelloApplication : Received: [B@7cba610e
我們可以發(fā)現(xiàn)在應(yīng)用控制臺(tái)中輸出的內(nèi)容就是SinkReceiver中receive方法定義的,而輸出的具體內(nèi)容則是來自消息隊(duì)列中獲取的對(duì)象。這里由于我們沒有對(duì)消息進(jìn)行序列化,所以輸出的只是該對(duì)象的引用,在后面的小節(jié)中我們會(huì)詳細(xì)介紹接收消息后的處理。
在順利完成上面快速入門的示例后,我們簡單解釋一下上面的步驟是如何將我們的Spring Boot應(yīng)用連接上RabbitMQ來消費(fèi)消息以實(shí)現(xiàn)消息驅(qū)動(dòng)業(yè)務(wù)邏輯的。
首先,我們對(duì)Spring Boot應(yīng)用做的就是引入spring-cloud-starter-stream-rabbit依賴,該依賴包是Spring Cloud Stream對(duì)RabbitMQ支持的封裝,其中包含了對(duì)RabbitMQ的自動(dòng)化配置等內(nèi)容。從下面它定義的依賴關(guān)系中,我們還可以知道它等價(jià)于spring-cloud-stream-binder-rabbit依賴。
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
- </dependency>
- </dependencies>
接著,我們?cè)賮砜纯催@里用到的幾個(gè)Spring Cloud Stream的核心注解,它們都被定義在SinkReceiver中:
- @EnableBinding,該注解用來指定一個(gè)或多個(gè)定義了@Input或@Output注解的接口,以此實(shí)現(xiàn)對(duì)消息通道(Channel)的綁定。在上面的例子中,我們通過@EnableBinding(Sink.class)綁定了Sink接口,該接口是Spring Cloud Stream中默認(rèn)實(shí)現(xiàn)的對(duì)輸入消息通道綁定的定義,它的源碼如下:
- public interface Sink {
- String INPUT = "input";
- @Input(Sink.INPUT)
- SubscribableChannel input();
- }
它通過@Input注解綁定了一個(gè)名為input的通道。除了Sink之外,Spring Cloud Stream還默認(rèn)實(shí)現(xiàn)了綁定output通道的Source接口,還有結(jié)合了Sink和Source的Processor接口,實(shí)際使用時(shí)我們也可以自己通過@Input和@Output注解來定義綁定消息通道的接口。當(dāng)我們需要為@EnableBinding指定多個(gè)接口來綁定消息通道的時(shí)候,可以這樣定義:@EnableBinding(value = {Sink.class, Source.class})。
- @StreamListener:該注解主要定義在方法上,作用是將被修飾的方法注冊(cè)為消息中間件上數(shù)據(jù)流的事件監(jiān)聽器,注解中的屬性值對(duì)應(yīng)了監(jiān)聽的消息通道名。在上面的例子中,我們通過@StreamListener(Sink.INPUT)注解將receive方法注冊(cè)為對(duì)input消息通道的監(jiān)聽處理器,所以當(dāng)我們?cè)赗abbitMQ的控制頁面中發(fā)布消息的時(shí)候,receive方法會(huì)做出對(duì)應(yīng)的響應(yīng)動(dòng)作。
編寫消費(fèi)消息的單元測(cè)試用例
上面我們通過RabbitMQ的控制臺(tái)完成了發(fā)送消息來驗(yàn)證了消息消費(fèi)程序的功能,雖然這種方法比較low,但是通過上面的步驟,相信大家對(duì)RabbitMQ和Spring Cloud Stream的消息消費(fèi)已經(jīng)有了一些基礎(chǔ)的認(rèn)識(shí)。下面我們通過編寫生產(chǎn)消息的單元測(cè)試用例來完善我們的入門內(nèi)容。
- 在上面創(chuàng)建的工程中創(chuàng)建單元測(cè)試類:
- @RunWith(SpringRunner.class)
- @EnableBinding(value = {SinkApplicationTests.SinkSender.class})
- public class SinkApplicationTests {
- @Autowired
- private SinkSender sinkSender;
- @Test
- public void sinkSenderTester() {
- sinkSender.output().send(MessageBuilder.withPayload("produce a message :http://blog.didispace.com").build());
- }
- public interface SinkSender {
- String OUTPUT = "input";
- @Output(SinkSender.OUTPUT)
- MessageChannel output();
- }
- }
- 在應(yīng)用了上面的消息消費(fèi)者程序之后,運(yùn)行這里定義的單元測(cè)試程序,我們馬上就能在消息消費(fèi)者的控制臺(tái)中收到下面的內(nèi)容:
- INFO 50947 --- [L2W-c2AcChb2Q-1] com.didispace.stream.SinkReceiver : Received: produce a message :http://blog.didispace.com
在上面的單元測(cè)試中,我們通過@Output(SinkSender.OUTPUT)定義了一個(gè)輸出通過,而該輸出通道的名稱為input,與前文中的Sink中定義的消費(fèi)通道同名,所以這里的單元測(cè)試與前文的消費(fèi)者程序組成了一對(duì)生產(chǎn)者與消費(fèi)者。到這里,本文的內(nèi)容就次結(jié)束,如果您能夠獨(dú)立的完成上面的例子,那么對(duì)于Spring Cloud Stream的基礎(chǔ)使用算是入門了。但是,Spring Cloud Stream的使用遠(yuǎn)不止于此,在近期的博文中,我講繼續(xù)更新這部分內(nèi)容,幫助他們來理解和用好Spring Cloud Stream來構(gòu)建消息驅(qū)動(dòng)的微服務(wù)!
【本文為51CTO專欄作者“翟永超”的原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)通過51CTO聯(lián)系作者獲取授權(quán)】