這重試器寫(xiě)的真地地地地地地道,你覺(jué)得地道嗎?
服務(wù)總是不穩(wěn)定的,有的時(shí)候需要編寫(xiě)重試邏輯,比如,HTTP的重試;定時(shí)任務(wù)的重試等。
一、簡(jiǎn)單的實(shí)現(xiàn)
我們可以使用while或for循環(huán),配置try-catch和break組合,完成循環(huán)邏輯。
final Random random = new Random();
final int maxRetryCount = 10;
int times = 0;
while (true) {
    times++;
    if (times > maxRetryCount) {
        break;
    }
    try {
        // 業(yè)務(wù)邏輯
        System.out.println("最大重試" + maxRetryCount + "次,當(dāng)前是第" + times + "次");
        if (random.nextInt(10) > 5) {
            throw new RuntimeException("隨機(jī)數(shù)失敗");
        }
        if (random.nextInt(10) / 2 == 0) {
            System.out.println("邏輯執(zhí)行成功");
            break;
        }
        Thread.sleep(1000);
        // 業(yè)務(wù)邏輯
    } catch (Exception e) {
        System.out.println("進(jìn)入異常捕獲");
    }
}
System.out.println("業(yè)務(wù)邏輯執(zhí)行完畢");其中一次執(zhí)行結(jié)果:
最大重試10次,當(dāng)前是第1次 進(jìn)入異常捕獲 最大重試10次,當(dāng)前是第2次 進(jìn)入異常捕獲 最大重試10次,當(dāng)前是第3次 最大重試10次,當(dāng)前是第4次 進(jìn)入異常捕獲 最大重試10次,當(dāng)前是第5次 最大重試10次,當(dāng)前是第6次 進(jìn)入異常捕獲 最大重試10次,當(dāng)前是第7次 最大重試10次,當(dāng)前是第8次 邏輯執(zhí)行成功 業(yè)務(wù)邏輯執(zhí)行完畢上面這種實(shí)現(xiàn)算是重試邏輯的模板化代碼,大差不差的都是這種寫(xiě)法。
我們?cè)倏纯雌渌膶?xiě)法。
二、重試裝飾器的實(shí)現(xiàn)
本節(jié)我們使用裝飾器模式實(shí)現(xiàn),借助經(jīng)典的面向?qū)ο缶幊田L(fēng)格(通過(guò)類(lèi)和接口)。同時(shí),我們選擇更簡(jiǎn)潔的函數(shù)式方法。
首先,我們將聲明一個(gè)函數(shù),接收Supplier<T>和最大調(diào)用次數(shù)作為參數(shù)。
然后,還是使用while循環(huán)和try-catch塊多次調(diào)用該函數(shù)。
最后,我們將通過(guò)返回另一個(gè)Supplier<T>來(lái)保留原始數(shù)據(jù)類(lèi)型。
static <T> Supplier<T> retryFunction(Supplier<T> supplier, int maxRetries) {
    return () -> {
        int retries = 0;
        while (retries < maxRetries) {
            try {
                return supplier.get();
            } catch (Exception e) {
                retries++;
            }
        }
        throw new IllegalStateException(String.format("任務(wù)在 %s 次嘗試后失敗", maxRetries));
    };
}有了上面的函數(shù),我們可以基于這個(gè)函數(shù)裝飾器繼續(xù)創(chuàng)建CompletableFuture:
static <T> CompletableFuture<T> retryTask(Supplier<T> supplier, int maxRetries) {
    Supplier<T> retryableSupplier = retryFunction(supplier, maxRetries);
    return CompletableFuture.supplyAsync(retryableSupplier);
}模擬下業(yè)務(wù)場(chǎng)景,有個(gè)標(biāo)志位數(shù)字,我們需要檢查這個(gè)標(biāo)志位是否大于4,如果大于就結(jié)束任務(wù),如果小于4,重試。
final AtomicInteger retriesCounter = new AtomicInteger(0);
Supplier<Integer> codeToRun = () -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    int retryNr = retriesCounter.get();
    System.out.println("Retrying: " + retryNr + "; thread:" + Thread.currentThread().getName());
    if (retryNr < 4) {
        retriesCounter.incrementAndGet();
        throw new RuntimeException();
    }
    return 100;
};然后我們借助retryTask()函數(shù)完成重試邏輯,假設(shè)最大重試10次,根據(jù)上面的定義,正常會(huì)在第四次跳出:
CompletableFuture<Integer> result = retryTask(codeToRun, 10);結(jié)果會(huì)打?。?/span>
Retrying: 0 Retrying: 1 Retrying: 2 Retrying: 3 Retrying: 4 100 4如果重試次數(shù)小于4,就會(huì)觸發(fā)IllegalStateException異常:
try {
    result = retryTask(codeToRun, 3);
    System.out.println(result.get());
} catch (Exception e) {
    System.out.println("超過(guò)最大重試次數(shù)");
}結(jié)果會(huì)打?。?/span>
Retrying: 0 Retrying: 1 Retrying: 2 超過(guò)最大重試次數(shù)三、重試CompletableFuture
CompletableFuture提供了內(nèi)部邏輯出現(xiàn)異常時(shí)處理的方法,比如exceptionally()等方法,我們可以直接使用這些方法,不需要自定義裝飾器。
CompletableFuture的使用可以參考由淺入深掌握CompletableFuture的七種用法。
(一)不安全重試
exceptionally()方法允許指定一個(gè)替代函數(shù),當(dāng)主要邏輯出現(xiàn)異常時(shí),會(huì)調(diào)用指定的替代函數(shù)。
如果我們打算重試兩次,我們可以這樣寫(xiě):
static <T> CompletableFuture<T> retryTwice(Supplier<T> supplier) {
    return CompletableFuture.supplyAsync(supplier)
     .exceptionally(__ -> supplier.get())
     .exceptionally(__ -> supplier.get());
}如果重試次數(shù)可變,我們可以使用for循環(huán):
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.exceptionally(__ -> supplier.get());
    }
    return cf;
}上面的寫(xiě)法可以滿(mǎn)足需求,但是有一點(diǎn)需要注意,當(dāng)Supplier運(yùn)行比較快,在CompletableFuture和exceptionally()回退創(chuàng)建之前執(zhí)行完畢,那exceptionally()的方法就會(huì)在主線(xiàn)程執(zhí)行。比如,我們?cè)O(shè)定retryUnsafe休眠1000ms,指定codeToRun不做sleep:
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) throws InterruptedException {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
    Thread.sleep(1000);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.exceptionally(__ -> supplier.get());
    }
    return cf;
}
codeToRun = () -> {
    int retryNr = retriesCounter.get();
    System.out.println("Retrying: " + retryNr + "; thread:" + Thread.currentThread().getName());
    if (retryNr < 4) {
        retriesCounter.incrementAndGet();
        throw new RuntimeException();
    }
    return 100;
};
retryUnsafe(codeToRun, 3);運(yùn)行結(jié)果將是:
Retrying: 0; thread:ForkJoinPool.commonPool-worker-1 Retrying: 1; thread:main Retrying: 2; thread:main Retrying: 3; thread:main符合預(yù)期,后續(xù)調(diào)用是由主線(xiàn)程執(zhí)行的。如果初始調(diào)用很快,但后續(xù)調(diào)用預(yù)計(jì)會(huì)更慢,這可能會(huì)成為問(wèn)題。
(二)異步重試
如果是在Java12之后,我們可以通過(guò)exceptionallyAsync()方法實(shí)現(xiàn),將所有重試都異步執(zhí)行。
static <T> CompletableFuture<T> retryExceptionallyAsync(Supplier<T> supplier, int maxRetries) {
   CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
   for (int i = 0; i < maxRetries; i++) {
      cf = cf.exceptionallyAsync(__ -> supplier.get());
   }
   return cf;
}這個(gè)時(shí)候運(yùn)行結(jié)果將是:
Retrying: 0; thread:ForkJoinPool.commonPool-worker-1
Retrying: 1; thread:ForkJoinPool.commonPool-worker-1
Retrying: 2; thread:ForkJoinPool.commonPool-worker-1
Retrying: 3; thread:ForkJoinPool.commonPool-worker-1(三)嵌套CompletableFutures
如果是在Java12之前,我們需要實(shí)現(xiàn)兼容方案,那我們可以寫(xiě)一個(gè)增加邏輯,實(shí)現(xiàn)異步化,我們可以這樣寫(xiě):
static <T> CompletableFuture<T> retryNesting(Supplier<T> supplier, int maxRetries)
        throws InterruptedException {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
    Thread.sleep(1000);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.thenApply(CompletableFuture::completedFuture)
                .exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
                .thenCompose(Function.identity());
    }
    return cf;
}其實(shí)簡(jiǎn)單或就是有創(chuàng)建了一個(gè)CompletableFuture,運(yùn)行結(jié)果也是符合預(yù)期的。
四、總結(jié)
在本文中,我們探討了在CompletableFuture中重試函數(shù)調(diào)用的概念。我們首先深入研究了以函數(shù)式風(fēng)格實(shí)現(xiàn)裝飾器模式,使我們能夠重試函數(shù)本身。
隨后,我們利用CompletableFuture API完成相同的任務(wù),同時(shí)保持異步流程。我們發(fā)現(xiàn)了Java 12中引入的exceptionallyAsync()方法,它非常適合這個(gè)目的。最后,我們提出了一種替代方法,僅依賴(lài)于原始Java 8 API中的方法。
文末總結(jié)
本文介紹了使用實(shí)現(xiàn)重試邏輯的三種方式:簡(jiǎn)單模式、裝飾器模式、和利用CompletableFuture原生API的實(shí)現(xiàn)。
















 
 
 







 
 
 
 