如何用Spring WebFlux構(gòu)建Reactive REST API
譯文【51CTO.com快譯】
在本文中,我們將討論如何使用Spring WebFlux來構(gòu)建響應(yīng)式REST API。在正式討論之前,讓我們首先來看看系統(tǒng)的開發(fā),傳統(tǒng)REST在實現(xiàn)中遇到的問題,以及當(dāng)前API的普遍需求。
下圖簡要地羅列了傳統(tǒng)應(yīng)用和現(xiàn)代應(yīng)用系統(tǒng)的主要特點(diǎn)。如今的系統(tǒng)講求的是:分布式應(yīng)用、云原生、高可用性和可擴(kuò)展性。因此,有效地利用系統(tǒng)現(xiàn)有的資源是至關(guān)重要的。
應(yīng)用程序API需求的演變
那么傳統(tǒng)的REST API請求處理又是如何工作的呢?
傳統(tǒng)REST API模型
如上圖所示,傳統(tǒng)REST API會帶來如下問題:
- 阻塞和同步 → 通常,請求線程會去等待各種阻塞的I/O直至結(jié)束之后,才能被釋放,進(jìn)而將響應(yīng)返回給調(diào)用方。
- 每個請求的線程數(shù) → Web容器會用到基于請求的線程(thread-per-request)模型。該模型限制了待處理的并發(fā)請求數(shù)量。也就是說,容器會對請求進(jìn)行排隊,進(jìn)而最終影響到API的性能。
- 處理高并發(fā)用戶的限制 → 正是由于Web容器使用了基于請求的線程模型,因此我們無法去處理那些高并發(fā)量的請求。
- 無法更好地利用系統(tǒng)資源 → 阻塞的I/O會造成線程處于空閑狀態(tài),進(jìn)而導(dǎo)致Web容器無法接受更多的請求,我們也就無法有效地利用現(xiàn)有的系統(tǒng)資源。
- 沒有背壓(backpressure)支持 → 由于我們無法從客戶端或服務(wù)器處施加背壓,因此應(yīng)用程序在負(fù)載量大時,無法維持正常運(yùn)行。也就是說,倘若應(yīng)用突然面臨大量的請求,則服務(wù)器或客戶端可能會由于中斷,而無法訪問到該應(yīng)用。
下面,讓我們來看看響應(yīng)式API的優(yōu)勢,以及如何使用響應(yīng)式編程,來解決上述問題。
- 異步和無阻塞 → 響應(yīng)式編程為編寫異步和非阻塞應(yīng)用程序提供了靈活性。
- 事件/消息驅(qū)動 → 系統(tǒng)能夠為任何活動生成對應(yīng)的事件或消息。例如,那些來自數(shù)據(jù)庫的數(shù)據(jù)會被視為事件流。
- 支持背壓 → 我們可以通過施加背壓,來“優(yōu)雅地”處理從一個系統(tǒng)到另一個系統(tǒng)的壓力,從而避免了拒絕服務(wù)的出現(xiàn)。
- 可預(yù)測的應(yīng)用響應(yīng)時間 → 由于線程是異步且非阻塞的,因此我們可以預(yù)測負(fù)載下的應(yīng)用響應(yīng)時間。
- 更好地利用系統(tǒng)資源 → 同樣由于線程是異步且非阻塞的,因此各種線程不會被I/O所占用,它們能夠支持更多的用戶請求。
- 基于負(fù)載的擴(kuò)容方式
- 擺脫基于請求的線程 → 借助響應(yīng)式API,并得益于異步且非阻塞的線程,我們可以擺脫基于請求的線程模型。在請求被產(chǎn)生后,模型會與服務(wù)器一起創(chuàng)建事件,并通過請求線程,去處理其他的請求。
那么,響應(yīng)式編程的具體流程是怎樣的呢?如下圖所示,一旦應(yīng)用程序調(diào)用了從某個數(shù)據(jù)源獲取數(shù)據(jù)的操作,那么就會立即返回一個線程,并且讓來自該數(shù)據(jù)源的數(shù)據(jù)作為數(shù)據(jù)/事件流出現(xiàn)。在此,應(yīng)用程序是訂閱者(subscriber),數(shù)據(jù)源是發(fā)布者(publisher)。一旦數(shù)據(jù)流完成后,onComplete事件就會被觸發(fā)。
數(shù)據(jù)流工作流程
如下圖所示,如果發(fā)生了任何異常情況,發(fā)布者將會觸發(fā)onError事件。
數(shù)據(jù)流工作流程
在某些情況下,例如:從數(shù)據(jù)庫中刪除一個條目,發(fā)布者只會立即觸發(fā)onComplete/onError事件,而不會調(diào)用onNext事件,畢竟沒有任何數(shù)據(jù)可以返回。
數(shù)據(jù)流工作流程
下面,我們進(jìn)一步討論:什么是背壓,以及如何將背壓應(yīng)用于響應(yīng)流。例如,我們有一個客戶端應(yīng)用正在向另一個服務(wù)請求數(shù)據(jù)。該服務(wù)能夠以1000 TPS(吞吐量)的速率發(fā)布事件,而客戶端應(yīng)用只能以200 TPS的速率處理事件。
那么在這種情況下,客戶端應(yīng)用程序需要通過緩沖數(shù)據(jù)來進(jìn)行處理。而在隨后的調(diào)用中,客戶端應(yīng)用程序可能會緩沖更多的數(shù)據(jù),以致最終耗盡內(nèi)存。顯然,這對于那些依賴該客戶端應(yīng)用的其他程序,會造成級聯(lián)效應(yīng)。為了避免此類情況,客戶端應(yīng)用可以要求服務(wù)在事件的末尾進(jìn)行緩沖,并以客戶端應(yīng)用的速率去推送各種事件。這就是所謂的背壓,具體流程請見下圖。
背壓示例
下面,我們將介紹響應(yīng)流的規(guī)范(請參見--https://www.reactive-streams.org/),以及一個實現(xiàn)案例--Project Reactor(請參見--https://projectreactor.io/)。通常,響應(yīng)流的規(guī)范定義了如下接口類型:
- 發(fā)布者(Publisher) → 發(fā)布者是那些具有無限數(shù)量順序元素的提供者。它可以按照訂閱者的要求進(jìn)行發(fā)布。其Java代碼如下所示:
- public interface Publisher<T> {
- public void subscribe(Subscriber<? super T> s);
- }
- 訂閱者(Subscriber) → 訂閱者恰好是那些具有無限數(shù)量順序元素的使用者。其Java代碼如下所示:
- public interface Subscriber<T> {
- public void onSubscribe(Subscription s);
- public void onNext(T t);
- public void onError(Throwable t);
- public void onComplete();
- }
- 訂閱(Subscription) → 表示訂閱者向發(fā)布者訂閱的某個一對一的周期。其Java代碼如下所示:
- public interface Subscription {
- public void request(long n);
- public void cancel();
- }
- 處理器(Processor) → 表示一個處理階段,即訂閱者和發(fā)布者之間根據(jù)約定進(jìn)行處理。
下面是響應(yīng)流規(guī)范的類圖:
響應(yīng)流規(guī)范
其實,響應(yīng)流規(guī)范具有許多種實現(xiàn)方式,上述Project Reactor只是其中的一種。Reactor可以完全實現(xiàn)無阻塞、且有效的請求管理。它能夠提供兩個響應(yīng)式和可組合的API,即:Flux [N](請參見-- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)和Mono [0|1](請參見--https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html)。它們廣泛地實現(xiàn)了響應(yīng)式擴(kuò)展(Reactive Extensions)。Reactor為HTTP(包括Websocket)提供了非阻塞的背壓式網(wǎng)絡(luò)引擎、TCP和UDP。它也非常適合于微服務(wù)的架構(gòu)。
- Flux→ 它是發(fā)布者帶有各種rx運(yùn)算符的響應(yīng)流(Reactive Streams),它會發(fā)出0到N個元素,然后輸出成功、或帶有某個錯誤的完成結(jié)果。其流程圖如下所示:
圖片來源:https://projectreactor.io
- Mono → 它也是發(fā)布者具有各種基本rx運(yùn)算符的響應(yīng)流,能夠通過發(fā)出0到1個元素,輸出成功、或帶有某個錯誤的完成結(jié)果。其流程圖如下所示:
圖片來源:https://projectreactor.io
由于Reactor的實施往往涉及到Spring 5.x,因此,我們可以使用帶有Spring servlet棧的命令式編程,來構(gòu)建REST API。下圖展示了Spring如何支持響應(yīng)式和servlet棧的實現(xiàn)。
圖片來源:spring.io
下面是一個公布了響應(yīng)式REST API的應(yīng)用。在該應(yīng)用中,我們使用到了:
- 帶有WebFlux的Spring Boot
- 具有響應(yīng)式支持的Spring數(shù)據(jù)
- Cassandra數(shù)據(jù)庫
下圖是該應(yīng)用的整體架構(gòu):
下面是build.gradle文件的Groovy代碼,它包含了與Spring WebFlux協(xié)同使用的各種依賴項。
- plugins {
- id 'org.springframework.boot' version '2.2.6.RELEASE'
- id 'io.spring.dependency-management' version '1.0.9.RELEASE'
- id 'java'
- }
- group = 'org.smarttechie'
- version = '0.0.1-SNAPSHOT'
- sourceCompatibility = '1.8'
- repositories {
- mavenCentral()
- }
- dependencies {
- implementation 'org.springframework.boot:spring-boot-starter-data-cassandra-reactive'
- implementation 'org.springframework.boot:spring-boot-starter-webflux'
- testImplementation('org.springframework.boot:spring-boot-starter-test') {
- exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
- }
- testImplementation 'io.projectreactor:reactor-test'
- }
- test {
- useJUnitPlatform()
- }
在此應(yīng)用程序中,我公布了如下API。您可以通過GitHub的相關(guān)鏈接--https://github.com/2013techsmarts/Spring-Reactive-Examples,下載源代碼。
在構(gòu)建響應(yīng)式API時,我們可以使用功能性樣式編程模型來構(gòu)建API,而無需使用RestController。當(dāng)然,您需要具有如下的router和handler組件:
Router:
- package org.smarttechie.router;
- import org.smarttechie.handler.ProductHandler;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.http.MediaType;
- import org.springframework.web.reactive.function.server.RouterFunction;
- import org.springframework.web.reactive.function.server.RouterFunctions;
- import org.springframework.web.reactive.function.server.ServerResponse;
- import static org.springframework.web.reactive.function.server.RequestPredicates.*;
- @Configuration
- public class ProductRouter {
- /**
- * The router configuration for the product handler.
- * @param productHandler
- * @return
- */
- @Bean
- public RouterFunction<ServerResponse> productsRoute(ProductHandler productHandler){
- return RouterFunctions.route(GET("/products").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::getAllProducts).andRoute(POST("/product").and(accept(MediaType.APPLICATION_JSON)),productHandler::createProduct).andRoute(DELETE("/product/{id}").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::deleteProduct).andRoute(PUT("/product/{id}").and(accept(MediaType.APPLICATION_JSON)),productHandler::updateProduct);
- }
- }
Handler:
- package org.smarttechie.handler;
- import org.smarttechie.model.Product;
- import org.smarttechie.service.ProductService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.http.MediaType;
- import org.springframework.stereotype.Component;
- import org.springframework.web.reactive.function.server.ServerRequest;
- import org.springframework.web.reactive.function.server.ServerResponse;
- import reactor.core.publisher.Mono;
- import static org.springframework.web.reactive.function.BodyInserters.fromObject;
- @Component
- public class ProductHandler {
- @Autowired
- private ProductService productService;
- static Mono<ServerResponse> notFound = ServerResponse.notFound().build();
- /**
- * The handler to get all the available products.
- * @param serverRequest
- * @return - all the products info as part of ServerResponse
- */
- public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) {
- return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.getAllProducts(), Product.class);
- }
- /**
- * The handler to create a product
- * @param serverRequest
- * @return - return the created product as part of ServerResponse
- */
- public Mono<ServerResponse> createProduct(ServerRequest serverRequest) {
- Mono<Product> productToSave = serverRequest.bodyToMono(Product.class);
- return productToSave.flatMap(product ->
- ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.save(product), Product.class));
- }
- /**
- * The handler to delete a product based on the product id.
- * @param serverRequest
- * @return - return the deleted product as part of ServerResponse
- */
- public Mono<ServerResponse> deleteProduct(ServerRequest serverRequest) {
- String id = serverRequest.pathVariable("id");
- Mono<Void> deleteItem = productService.deleteProduct(Integer.parseInt(id));
- return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(deleteItem, Void.class);
- }
- /**
- * The handler to update a product.
- * @param serverRequest
- * @return - The updated product as part of ServerResponse
- */
- public Mono<ServerResponse> updateProduct(ServerRequest serverRequest) {
- return productService.update(serverRequest.bodyToMono(Product.class)).flatMap(product ->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(product))) .switchIfEmpty(notFound);
- }
- }
至止,我們已經(jīng)對如何公布響應(yīng)式REST API有所了解。針對上述實現(xiàn),我們使用了Gatling(譯者注:是一款功能強(qiáng)大的負(fù)載測試工具),在響應(yīng)式API和非響應(yīng)式API(使用Spring RestController構(gòu)建非響應(yīng)式API)上,進(jìn)行了簡單的基準(zhǔn)化測試。其結(jié)果比較如下圖所示。具體的Gatling負(fù)載測試腳本,請參考GitHub上的鏈接:https://github.com/2013techsmarts/Spring-Reactive-Examples。
負(fù)載測試結(jié)果比較
原標(biāo)題:Build Reactive REST APIs With Spring WebFlux ,作者:Siva Prasad Rao Janapati
【51CTO譯稿,合作站點(diǎn)轉(zhuǎn)載請注明原文譯者和出處為51CTO.com】