在SpringBoot項目中使用CompletableFuture優(yōu)化并發(fā)REST調(diào)用的正確姿勢
環(huán)境:SpringBoot2.7.18
1. 簡介
在項目開發(fā)時,經(jīng)常會遇到從不同的接口服務拉取數(shù)據(jù)并將其匯總到響應中。在微服務中,這些數(shù)據(jù)源通常是外部 REST API。在本篇文章中,我們將使用 Java 的 CompletableFuture 高效地并行請求多個外部 REST API 中的數(shù)據(jù)。同時,會對整個請求過程中的異常處理、請求超時進行詳細的介紹。
2. 為什么要并行調(diào)用?
假設我們需要更新對象中的多個字段,每個字段的值都來自外部 REST 調(diào)用。一種方法也是最簡單的方式是依次調(diào)用每個 API 來更新每個字段。
但是,等待一個 REST 調(diào)用完成后再啟動另一個會增加服務的整體響應時間。例如,如果我們調(diào)用兩個應用程序接口,每個需要 5 秒鐘,那么總時間至少要 10 秒鐘,因為第二個調(diào)用需要等待第一個調(diào)用完成。
相反,我們可以并行調(diào)用所有 API,這樣總時間就是最慢(耗時最長)的 REST 調(diào)用時間。例如,一個調(diào)用需要 7 秒,另一個需要 5 秒。在這種情況下,我們將等待 7 秒,因為我們已經(jīng)并行處理了所有內(nèi)容,必須等待所有結果完成。
因此,并行化是減少服務響應時間、提高服務可擴展性和改善用戶體驗的絕佳選擇。
3. 實戰(zhàn)案例
3.1 定義用于更新的目標 POJO
public class Purchase {
  private String orderDescription ;
  private String paymentDescription ;
  private String buyerName ;
  private String orderId ;
  private String paymentId ;
  private String userId ;
  // getters and setters
}該采購類有三個需要更新的字段,每個字段都需要通過 ID 進行不同的 REST 調(diào)用來查詢。
接下來,先創(chuàng)建一個類,定義一個 RestTemplate Bean 和一個用于 REST 調(diào)用的域 URL:
@Component
public class PurchaseRestCallsAsyncExecutor {
  private static final String BASE_URL = "http://www.pack.com" ;
  private final RestTemplate restTemplate ;
  public PurchaseRestCallsAsyncExecutor(RestTemplate restTemplate) {
    this.restTemplate = restTemplate ;
  }
}接下來,分別編寫3個REST接口調(diào)用的方法
現(xiàn)在,讓我們來定義 /orders API 調(diào)用:
public String getOrderDescription(String orderId) {
  ResponseEntity<String> result = restTemplate.getForEntity(
    String.format("%s/orders/%s", BASE_URL, orderId), 
    String.class) ;
  return result.getBody() ;
}然后,讓我們定義 /payments API 調(diào)用:
public String getPaymentDescription(String paymentId) {
  ResponseEntity<String> result = restTemplate.getForEntity(
    String.format("%s/payments/%s", BASE_URL, paymentId),
    String.class) ;
  return result.getBody() ;
}最后,我們定義了 /users API 調(diào)用:
public String getUserName(String userId) {
  ResponseEntity<String> result = restTemplate.getForEntity(
    String.format("%s/users/%s", BASE_URL, userId),
    String.class) ;
  return result.getBody() ;
}這三個接口方法都使用 getForEntity() 方法進行 REST 調(diào)用,并將結果封裝在一個 ResponseEntity 對象中。
3.2 使用 CompletableFuture 進行多次 REST 調(diào)用
現(xiàn)在,我們就可以創(chuàng)建一個方法,用于構建和運行一組三個 CompletableFutures:
public void updatePurchase(Purchase purchase) {
  CompletableFuture.allOf(
    CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
      .thenAccept(purchase::setOrderDescription),
    CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
      .thenAccept(purchase::setPaymentDescription),
    CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
      .thenAccept(purchase::setBuyerName)
  ).join() ;
}我們使用allOf()方法來構建CompletableFuture的步驟。每個參數(shù)都是一個并行任務,這些任務以另一個通過REST調(diào)用及其結果構建的CompletableFuture的形式存在。
我們首先使用supplyAsync()方法提供了一個Supplier,從這個Supplier中我們將檢索數(shù)據(jù)。然后,我們使用thenAccept()來消費supplyAsync()的結果,并將其設置到Purchase類中相應的字段上。
在allOf()方法結束時,我們只是構建了這些任務,但尚未執(zhí)行任何操作。
最后,我們在所有任務構建完畢后調(diào)用join()方法來并行運行所有任務并收集它們的結果。由于join()是一個線程阻塞操作,我們只在最后調(diào)用它,而不是在每個任務步驟之后調(diào)用,這是為了通過減少線程阻塞來優(yōu)化應用程序性能。
由于我們沒有為supplyAsync()方法提供一個自定義的ExecutorService,因此所有任務都在同一個Executor中運行。默認情況下,Java使用ForkJoinPool.commonPool()。
建議為supplyAsync()方法指定一個自定義的ExecutorService是一個好習慣,這樣我們可以對線程池參數(shù)有更多的控制。
3.3 錯誤處理
在分布式系統(tǒng)中,服務不可用或網(wǎng)絡故障是很常見的。這些故障可能發(fā)生在外部 REST API 中,而我們作為該 API 的客戶端卻并不知情。例如,如果應用程序宕機,這就導致發(fā)送的請求將永遠無法完成。
因此,我們可以使用 handle() 方法單獨處理每個 REST 調(diào)用異常:
public <U> CompletableFuture<U> handle(
  BiFunction<? super T, Throwable, ? extends U> fn) ;該方法的參數(shù)是一個 BiFunction,其中包含作為參數(shù)的上一個任務的結果和異常。 接下來我們將 handle() 步驟添加到 CompletableFuture 的一個步驟中
public void updatePurchaseHandlingExceptions(Purchase purchase) {
  CompletableFuture.allOf(
    CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
      .thenAccept(purchase::setPaymentDescription)
      .handle((result, exception) -> {
        if (exception != null) {
          // 異常處理
          return null ;
        }
        return result ;
      })
  ).join() ;
}在示例中,handle() 從 thenAccept() 調(diào)用的 setPaymentDescription() 中獲取一個 Void 類型,然后將 thenAccept() 動作中拋出的任何錯誤存儲到異常中。最后,如果沒有異常拋出,則 handle() 返回作為參數(shù)傳遞的值。否則,返回空值。
3.4 處理 REST 調(diào)用超時
當我們使用 CompletableFuture 時,我們可以指定一個任務超時,類似于我們在 REST 調(diào)用中定義的超時。因此,如果任務沒有在指定時間內(nèi)完成,Java 會以超時異常(TimeoutException)結束任務執(zhí)行,修改代碼如下:
public void updatePurchaseHandlingExceptions(Purchase purchase) {
  CompletableFuture.allOf(
    CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
      .thenAccept(purchase::setOrderDescription)
      // 設置超時時間5s
      .orTimeout(5, TimeUnit.SECONDS)
      .handle((result, exception) -> {
        if (exception instanceof TimeoutException) {
          // 異常處理
          return null ;
        }
        return result ;
      })
  ).join() ;
}我們在 CompletableFuture 中通過 orTimeout() 方法設置超時時間,如果在 5 秒內(nèi)未完成任務時停止任務執(zhí)行。同時還在 handle() 方法中添加了 if 語句,以便單獨處理 TimeoutException。在 CompletableFuture 中添加超時可確保任務始終完成。這對于避免線程無限期地等待可能永遠不會完成的操作結果非常重要。因此,它減少了處于長時間運行狀態(tài)的線程數(shù)量,提高了應用程序的健康度。















 
 
 








 
 
 
 