這重試器寫(xiě)的真地地地地地地道,你覺(jué)得地道嗎?
服務(wù)總是不穩(wěn)定的,有的時(shí)候需要編寫(xiě)重試邏輯,比如,HTTP的重試;定時(shí)任務(wù)的重試等。
一、簡(jiǎn)單的實(shí)現(xiàn)
我們可以使用while
或for
循環(huán),配置try-catch
和break
組合,完成循環(huán)邏輯。
final Random random = new Random();
final int maxRetryCount = 10;
int times = 0;
while (true) {
times++;
if (times > maxRetryCount) {
break;
}
try {
// 業(yè)務(wù)邏輯
System.out.println("最大重試" + maxRetryCount + "次,當(dāng)前是第" + times + "次");
if (random.nextInt(10) > 5) {
throw new RuntimeException("隨機(jī)數(shù)失敗");
}
if (random.nextInt(10) / 2 == 0) {
System.out.println("邏輯執(zhí)行成功");
break;
}
Thread.sleep(1000);
// 業(yè)務(wù)邏輯
} catch (Exception e) {
System.out.println("進(jìn)入異常捕獲");
}
}
System.out.println("業(yè)務(wù)邏輯執(zhí)行完畢");
其中一次執(zhí)行結(jié)果:
最大重試10次,當(dāng)前是第1次 進(jìn)入異常捕獲 最大重試10次,當(dāng)前是第2次 進(jìn)入異常捕獲 最大重試10次,當(dāng)前是第3次 最大重試10次,當(dāng)前是第4次 進(jìn)入異常捕獲 最大重試10次,當(dāng)前是第5次 最大重試10次,當(dāng)前是第6次 進(jìn)入異常捕獲 最大重試10次,當(dāng)前是第7次 最大重試10次,當(dāng)前是第8次 邏輯執(zhí)行成功 業(yè)務(wù)邏輯執(zhí)行完畢
上面這種實(shí)現(xiàn)算是重試邏輯的模板化代碼,大差不差的都是這種寫(xiě)法。
我們?cè)倏纯雌渌膶?xiě)法。
二、重試裝飾器的實(shí)現(xiàn)
本節(jié)我們使用裝飾器模式實(shí)現(xiàn),借助經(jīng)典的面向?qū)ο缶幊田L(fēng)格(通過(guò)類(lèi)和接口)。同時(shí),我們選擇更簡(jiǎn)潔的函數(shù)式方法。
首先,我們將聲明一個(gè)函數(shù),接收Supplier<T>
和最大調(diào)用次數(shù)作為參數(shù)。
然后,還是使用while
循環(huán)和try-catch
塊多次調(diào)用該函數(shù)。
最后,我們將通過(guò)返回另一個(gè)Supplier<T>
來(lái)保留原始數(shù)據(jù)類(lèi)型。
static <T> Supplier<T> retryFunction(Supplier<T> supplier, int maxRetries) {
return () -> {
int retries = 0;
while (retries < maxRetries) {
try {
return supplier.get();
} catch (Exception e) {
retries++;
}
}
throw new IllegalStateException(String.format("任務(wù)在 %s 次嘗試后失敗", maxRetries));
};
}
有了上面的函數(shù),我們可以基于這個(gè)函數(shù)裝飾器繼續(xù)創(chuàng)建CompletableFuture
:
static <T> CompletableFuture<T> retryTask(Supplier<T> supplier, int maxRetries) {
Supplier<T> retryableSupplier = retryFunction(supplier, maxRetries);
return CompletableFuture.supplyAsync(retryableSupplier);
}
模擬下業(yè)務(wù)場(chǎng)景,有個(gè)標(biāo)志位數(shù)字,我們需要檢查這個(gè)標(biāo)志位是否大于4,如果大于就結(jié)束任務(wù),如果小于4,重試。
final AtomicInteger retriesCounter = new AtomicInteger(0);
Supplier<Integer> codeToRun = () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
int retryNr = retriesCounter.get();
System.out.println("Retrying: " + retryNr + "; thread:" + Thread.currentThread().getName());
if (retryNr < 4) {
retriesCounter.incrementAndGet();
throw new RuntimeException();
}
return 100;
};
然后我們借助retryTask()
函數(shù)完成重試邏輯,假設(shè)最大重試10次,根據(jù)上面的定義,正常會(huì)在第四次跳出:
CompletableFuture<Integer> result = retryTask(codeToRun, 10);
結(jié)果會(huì)打?。?/span>
Retrying: 0 Retrying: 1 Retrying: 2 Retrying: 3 Retrying: 4 100 4
如果重試次數(shù)小于4,就會(huì)觸發(fā)IllegalStateException
異常:
try {
result = retryTask(codeToRun, 3);
System.out.println(result.get());
} catch (Exception e) {
System.out.println("超過(guò)最大重試次數(shù)");
}
結(jié)果會(huì)打?。?/span>
Retrying: 0 Retrying: 1 Retrying: 2 超過(guò)最大重試次數(shù)
三、重試CompletableFuture
CompletableFuture
提供了內(nèi)部邏輯出現(xiàn)異常時(shí)處理的方法,比如exceptionally()
等方法,我們可以直接使用這些方法,不需要自定義裝飾器。
CompletableFuture
的使用可以參考由淺入深掌握CompletableFuture的七種用法。
(一)不安全重試
exceptionally()
方法允許指定一個(gè)替代函數(shù),當(dāng)主要邏輯出現(xiàn)異常時(shí),會(huì)調(diào)用指定的替代函數(shù)。
如果我們打算重試兩次,我們可以這樣寫(xiě):
static <T> CompletableFuture<T> retryTwice(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier)
.exceptionally(__ -> supplier.get())
.exceptionally(__ -> supplier.get());
}
如果重試次數(shù)可變,我們可以使用for
循環(huán):
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
上面的寫(xiě)法可以滿足需求,但是有一點(diǎn)需要注意,當(dāng)Supplier
運(yùn)行比較快,在CompletableFuture
和exceptionally()
回退創(chuàng)建之前執(zhí)行完畢,那exceptionally()
的方法就會(huì)在主線程執(zhí)行。比如,我們?cè)O(shè)定retryUnsafe
休眠1000ms,指定codeToRun
不做sleep:
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) throws InterruptedException {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
Thread.sleep(1000);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
codeToRun = () -> {
int retryNr = retriesCounter.get();
System.out.println("Retrying: " + retryNr + "; thread:" + Thread.currentThread().getName());
if (retryNr < 4) {
retriesCounter.incrementAndGet();
throw new RuntimeException();
}
return 100;
};
retryUnsafe(codeToRun, 3);
運(yùn)行結(jié)果將是:
Retrying: 0; thread:ForkJoinPool.commonPool-worker-1 Retrying: 1; thread:main Retrying: 2; thread:main Retrying: 3; thread:main
符合預(yù)期,后續(xù)調(diào)用是由主線程執(zhí)行的。如果初始調(diào)用很快,但后續(xù)調(diào)用預(yù)計(jì)會(huì)更慢,這可能會(huì)成為問(wèn)題。
(二)異步重試
如果是在Java12之后,我們可以通過(guò)exceptionallyAsync()
方法實(shí)現(xiàn),將所有重試都異步執(zhí)行。
static <T> CompletableFuture<T> retryExceptionallyAsync(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionallyAsync(__ -> supplier.get());
}
return cf;
}
這個(gè)時(shí)候運(yùn)行結(jié)果將是:
Retrying: 0; thread:ForkJoinPool.commonPool-worker-1
Retrying: 1; thread:ForkJoinPool.commonPool-worker-1
Retrying: 2; thread:ForkJoinPool.commonPool-worker-1
Retrying: 3; thread:ForkJoinPool.commonPool-worker-1
(三)嵌套CompletableFutures
如果是在Java12之前,我們需要實(shí)現(xiàn)兼容方案,那我們可以寫(xiě)一個(gè)增加邏輯,實(shí)現(xiàn)異步化,我們可以這樣寫(xiě):
static <T> CompletableFuture<T> retryNesting(Supplier<T> supplier, int maxRetries)
throws InterruptedException {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
Thread.sleep(1000);
for (int i = 0; i < maxRetries; i++) {
cf = cf.thenApply(CompletableFuture::completedFuture)
.exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
.thenCompose(Function.identity());
}
return cf;
}
其實(shí)簡(jiǎn)單或就是有創(chuàng)建了一個(gè)CompletableFuture
,運(yùn)行結(jié)果也是符合預(yù)期的。
四、總結(jié)
在本文中,我們探討了在CompletableFuture中重試函數(shù)調(diào)用的概念。我們首先深入研究了以函數(shù)式風(fēng)格實(shí)現(xiàn)裝飾器模式,使我們能夠重試函數(shù)本身。
隨后,我們利用CompletableFuture API完成相同的任務(wù),同時(shí)保持異步流程。我們發(fā)現(xiàn)了Java 12中引入的exceptionallyAsync()
方法,它非常適合這個(gè)目的。最后,我們提出了一種替代方法,僅依賴于原始Java 8 API中的方法。
文末總結(jié)
本文介紹了使用實(shí)現(xiàn)重試邏輯的三種方式:簡(jiǎn)單模式、裝飾器模式、和利用CompletableFuture
原生API的實(shí)現(xiàn)。