基于 CompletionService 實現(xiàn)高效并發(fā):一成功即返回的策略
前言
在并發(fā)編程中,我們經常會遇到這樣的場景:需要同時調用多個服務或方法,只要其中一個成功就立即返回成功結果,只有當所有調用都失敗時才返回失敗。這種場景在服務容錯、多源數(shù)據(jù)獲取等領域非常常見。
什么是 CompletionService
圖片
CompletionService是Java并發(fā)包中的一個接口,它結合了Executor和BlockingQueue的功能,能夠更方便地管理異步任務的執(zhí)行和結果獲取。其主要優(yōu)勢在于:
- 可以按照任務完成的順序獲取結果,而不是提交順序
 - 提供了超時獲取結果的能力
 - 簡化了并發(fā)任務的管理流程
 
CompletionService的核心實現(xiàn)類是ExecutorCompletionService,它需要一個Executor來執(zhí)行任務,本質上是對Executor的一種裝飾。
CompletionService的實現(xiàn)目標是任務先完成可優(yōu)先獲取到,即結果按照完成先后順序排序。
與CompletableFuture區(qū)別
CompletionService
- 定位:
Java 5引入,是對Executor和BlockingQueue的組合封裝,核心解決按任務完成順序獲取結果的問題。 - 設計理念:簡化多個異步任務的結果收集流程,避免按提交順序等待任務完成(傳統(tǒng)
Future集合需要逐個檢查是否完成,效率低)。 - 本質:通過內部維護一個阻塞隊列,當任務完成時自動將結果放入隊列,用戶只需從隊列中獲取即可,無需關心任務提交順序。
 
CompletableFuture
- 定位:
Java 8引入,是Future接口的增強實現(xiàn),支持異步任務的鏈式調用、組合、依賴管理等復雜操作。 - 設計理念:基于函數(shù)式編程思想,提供非阻塞的回調機制,允許將多個異步任務串聯(lián)或并聯(lián)成一個完整的流程,解決 “回調地獄” 問題。
 - 本質:不僅能獲取任務結果,還能描述任務之間的依賴關系(如任務
A完成后執(zhí)行任務B任務A和B都完成后執(zhí)行任務C)。 
代碼案例
并發(fā)調用3個接口,按接口返回順序處理結果(不關心提交順序)
public class CompletionServiceDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 創(chuàng)建線程池和CompletionService
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
        // 提交3個任務(模擬不同接口調用,執(zhí)行時間不同)
        completionService.submit(() -> {
            Thread.sleep(3000); // 模擬耗時3秒
            return"接口A結果";
        });
        completionService.submit(() -> {
            Thread.sleep(1000); // 模擬耗時1秒
            return"接口B結果";
        });
        completionService.submit(() -> {
            Thread.sleep(2000); // 模擬耗時2秒
            return"接口C結果";
        });
        // 按完成順序獲取結果(預期順序:B→C→A)
        for (int i = 0; i < 3; i++) {
            Future<String> future = completionService.take(); // 阻塞等待下一個完成的任務
            System.out.println("處理結果:" + future.get());
        }
        executor.shutdown();
    }
}先調用接口A,再用A的結果調用接口B,最后將B的結果與接口C的結果合并。
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 接口A:返回用戶ID
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
            return"user123";
        });
        // 接口B:依賴A的結果,返回用戶信息
        CompletableFuture<String> futureB = futureA.thenCompose(userId -> 
            CompletableFuture.supplyAsync(() -> {
                try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
                return"用戶信息:" + userId;
            })
        );
        // 接口C:獨立任務,返回用戶積分
        CompletableFuture<Integer> futureC = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); }
            return 1000;
        });
        // 合并B和C的結果
        CompletableFuture<String> combined = futureB.thenCombine(futureC, 
            (userInfo, score) -> userInfo + ",積分:" + score
        );
        System.out.println(combined.get()); // 阻塞獲取最終結果
    }
}實現(xiàn)思路
要實現(xiàn)只要有一個成功就立即返回,否則等所有失敗才返回的邏輯,我們可以采用以下策略:
- 創(chuàng)建一個線程池和對應的
CompletionService - 向
CompletionService提交所有需要執(zhí)行的任務 - 循環(huán)獲取已完成的任務結果
 - 一旦發(fā)現(xiàn)有成功的結果,立即取消所有未完成的任務并返回成功
 - 如果所有任務都執(zhí)行完畢且都失敗,則返回失敗
 
示例代碼
// 任務結果封裝類
class TaskResult {
    private boolean success;
    private String message;
    private String taskName;
    public TaskResult(boolean success, String message, String taskName) {
        this.success = success;
        this.message = message;
        this.taskName = taskName;
    }
    public boolean isSuccess() {
        return success;
    }
    public String getMessage() {
        return message;
    }
    public String getTaskName() {
        return taskName;
    }
}
// 模擬業(yè)務任務
class BusinessTask implements Callable<TaskResult> {
    private String taskName;
    private int executionTime; // 執(zhí)行時間(毫秒)
    private boolean shouldSucceed; // 是否應該成功
    public BusinessTask(String taskName, int executionTime, boolean shouldSucceed) {
        this.taskName = taskName;
        this.executionTime = executionTime;
        this.shouldSucceed = shouldSucceed;
    }
    @Override
    public TaskResult call() throws Exception {
        System.out.println("任務 " + taskName + " 開始執(zhí)行");
        // 模擬任務執(zhí)行時間
        Thread.sleep(executionTime);
        
        if (shouldSucceed) {
            return new TaskResult(true, taskName + " 執(zhí)行成功", taskName);
        } else {
            throw new Exception(taskName + " 執(zhí)行失敗");
        }
    }
}
public class ConcurrentTaskDemo {
    public static void main(String[] args) {
        // 創(chuàng)建線程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletionService<TaskResult> completionService = new ExecutorCompletionService<>(executor);
        
        // 存儲所有提交的任務,用于后續(xù)可能的取消操作
        List<Future<TaskResult>> futures = new ArrayList<>();
        
        try {
            // 提交三個任務
            futures.add(completionService.submit(new BusinessTask("支付接口A", 1000, false)));
            futures.add(completionService.submit(new BusinessTask("支付接口B", 2000, true)));
            futures.add(completionService.submit(new BusinessTask("支付接口C", 3000, false)));
            
            // 等待任務完成,只要有一個成功就返回
            TaskResult successResult = null;
            int taskCount = futures.size();
            
            for (int i = 0; i < taskCount; i++) {
                try {
                    // 獲取已完成的任務結果,設置超時時間
                    Future<TaskResult> future = completionService.poll(5, TimeUnit.SECONDS);
                    if (future == null) {
                        System.out.println("獲取任務結果超時");
                        continue;
                    }
                    
                    // 如果沒有異常拋出,說明任務成功
                    TaskResult result = future.get();
                    successResult = result;
                    break; // 只要有一個成功就跳出循環(huán)
                    
                } catch (ExecutionException e) {
                    // 任務執(zhí)行失敗,繼續(xù)等待其他任務
                    System.out.println("任務執(zhí)行失敗: " + e.getCause().getMessage());
                } catch (TimeoutException e) {
                    System.out.println("等待任務完成超時");
                }
            }
            
            // 處理最終結果
            if (successResult != null) {
                System.out.println("整體執(zhí)行成功: " + successResult.getMessage());
                // 取消其他可能還在執(zhí)行的任務
                cancelRemainingTasks(futures);
            } else {
                System.out.println("所有任務都執(zhí)行失敗");
            }
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("線程被中斷: " + e.getMessage());
        } finally {
            // 關閉線程池
            executor.shutdown();
        }
    }
    
    // 取消所有未完成的任務
    private static void cancelRemainingTasks(List<Future<TaskResult>> futures) {
        for (Future<TaskResult> future : futures) {
            if (!future.isDone()) {
                future.cancel(true);
                System.out.println("取消未完成的任務");
            }
        }
    }
}關鍵技術點
CompletionService的使用:通過其poll方法可以按完成順序獲取任務結果,還可以設置超時時間。- 任務取消機制:當一個任務成功后,我們需要取消其他未完成的任務以節(jié)省資源,這通過
Future的cancel方法實現(xiàn)。 - 異常處理:任務執(zhí)行失敗會拋出
ExecutionException,我們捕獲這個異常并繼續(xù)等待其他任務。 - 線程池管理:使用完線程池后必須關閉,以釋放資源。
 
注意事項
- 線程安全:確保提交給
CompletionService的任務是線程安全的,避免共享可變狀態(tài)。 - 資源釋放:無論任務執(zhí)行成功與否,都要確保線程池被正確關閉。
 - 超時設置:合理設置
poll方法的超時時間,避免無限等待。 - 任務取消的局限性:對于已經開始執(zhí)行的任務,
cancel (true)只能通過中斷來嘗試停止,如果任務不響應中斷,可能無法真正取消。 - 線程池大小:根據(jù)任務特性合理設置線程池大小,過多的線程會消耗更多資源,過少則可能影響并發(fā)效率。
 















 
 
 




 
 
 
 