偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

基于 CompletionService 實現(xiàn)高效并發(fā):一成功即返回的策略

開發(fā) 前端
在并發(fā)編程中,我們經常會遇到這樣的場景:需要同時調用多個服務或方法,只要其中一個成功就立即返回成功結果,只有當所有調用都失敗時才返回失敗。這種場景在服務容錯、多源數(shù)據(jù)獲取等領域非常常見。

前言

在并發(fā)編程中,我們經常會遇到這樣的場景:需要同時調用多個服務或方法,只要其中一個成功就立即返回成功結果,只有當所有調用都失敗時才返回失敗。這種場景在服務容錯、多源數(shù)據(jù)獲取等領域非常常見。

什么是 CompletionService

圖片圖片

CompletionServiceJava并發(fā)包中的一個接口,它結合了ExecutorBlockingQueue的功能,能夠更方便地管理異步任務的執(zhí)行和結果獲取。其主要優(yōu)勢在于:

  • 可以按照任務完成的順序獲取結果,而不是提交順序
  • 提供了超時獲取結果的能力
  • 簡化了并發(fā)任務的管理流程

CompletionService的核心實現(xiàn)類是ExecutorCompletionService,它需要一個Executor來執(zhí)行任務,本質上是對Executor的一種裝飾。

CompletionService的實現(xiàn)目標是任務先完成可優(yōu)先獲取到,即結果按照完成先后順序排序。

與CompletableFuture區(qū)別

CompletionService
  • 定位:Java 5引入,是對ExecutorBlockingQueue的組合封裝,核心解決按任務完成順序獲取結果的問題。
  • 設計理念:簡化多個異步任務的結果收集流程,避免按提交順序等待任務完成(傳統(tǒng)Future集合需要逐個檢查是否完成,效率低)。
  • 本質:通過內部維護一個阻塞隊列,當任務完成時自動將結果放入隊列,用戶只需從隊列中獲取即可,無需關心任務提交順序。
CompletableFuture
  • 定位:Java 8引入,是Future接口的增強實現(xiàn),支持異步任務的鏈式調用、組合、依賴管理等復雜操作。
  • 設計理念:基于函數(shù)式編程思想,提供非阻塞的回調機制,允許將多個異步任務串聯(lián)或并聯(lián)成一個完整的流程,解決 “回調地獄” 問題。
  • 本質:不僅能獲取任務結果,還能描述任務之間的依賴關系(如任務A完成后執(zhí)行任務B任務AB都完成后執(zhí)行任務C)。
代碼案例

并發(fā)調用3個接口,按接口返回順序處理結果(不關心提交順序)

public class CompletionServiceDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 創(chuàng)建線程池和CompletionService
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

        // 提交3個任務(模擬不同接口調用,執(zhí)行時間不同)
        completionService.submit(() -> {
            Thread.sleep(3000); // 模擬耗時3秒
            return"接口A結果";
        });
        completionService.submit(() -> {
            Thread.sleep(1000); // 模擬耗時1秒
            return"接口B結果";
        });
        completionService.submit(() -> {
            Thread.sleep(2000); // 模擬耗時2秒
            return"接口C結果";
        });

        // 按完成順序獲取結果(預期順序:B→C→A)
        for (int i = 0; i < 3; i++) {
            Future<String> future = completionService.take(); // 阻塞等待下一個完成的任務
            System.out.println("處理結果:" + future.get());
        }

        executor.shutdown();
    }
}

先調用接口A,再用A的結果調用接口B,最后將B的結果與接口C的結果合并。

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 接口A:返回用戶ID
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
            return"user123";
        });

        // 接口B:依賴A的結果,返回用戶信息
        CompletableFuture<String> futureB = futureA.thenCompose(userId -> 
            CompletableFuture.supplyAsync(() -> {
                try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
                return"用戶信息:" + userId;
            })
        );

        // 接口C:獨立任務,返回用戶積分
        CompletableFuture<Integer> futureC = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); }
            return 1000;
        });

        // 合并B和C的結果
        CompletableFuture<String> combined = futureB.thenCombine(futureC, 
            (userInfo, score) -> userInfo + ",積分:" + score
        );

        System.out.println(combined.get()); // 阻塞獲取最終結果
    }
}

實現(xiàn)思路

要實現(xiàn)只要有一個成功就立即返回,否則等所有失敗才返回的邏輯,我們可以采用以下策略:

  • 創(chuàng)建一個線程池和對應的CompletionService
  • CompletionService提交所有需要執(zhí)行的任務
  • 循環(huán)獲取已完成的任務結果
  • 一旦發(fā)現(xiàn)有成功的結果,立即取消所有未完成的任務并返回成功
  • 如果所有任務都執(zhí)行完畢且都失敗,則返回失敗

示例代碼

// 任務結果封裝類
class TaskResult {
    private boolean success;
    private String message;
    private String taskName;

    public TaskResult(boolean success, String message, String taskName) {
        this.success = success;
        this.message = message;
        this.taskName = taskName;
    }

    public boolean isSuccess() {
        return success;
    }

    public String getMessage() {
        return message;
    }

    public String getTaskName() {
        return taskName;
    }
}

// 模擬業(yè)務任務
class BusinessTask implements Callable<TaskResult> {
    private String taskName;
    private int executionTime; // 執(zhí)行時間(毫秒)
    private boolean shouldSucceed; // 是否應該成功

    public BusinessTask(String taskName, int executionTime, boolean shouldSucceed) {
        this.taskName = taskName;
        this.executionTime = executionTime;
        this.shouldSucceed = shouldSucceed;
    }

    @Override
    public TaskResult call() throws Exception {
        System.out.println("任務 " + taskName + " 開始執(zhí)行");
        // 模擬任務執(zhí)行時間
        Thread.sleep(executionTime);
        
        if (shouldSucceed) {
            return new TaskResult(true, taskName + " 執(zhí)行成功", taskName);
        } else {
            throw new Exception(taskName + " 執(zhí)行失敗");
        }
    }
}

public class ConcurrentTaskDemo {
    public static void main(String[] args) {
        // 創(chuàng)建線程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletionService<TaskResult> completionService = new ExecutorCompletionService<>(executor);
        
        // 存儲所有提交的任務,用于后續(xù)可能的取消操作
        List<Future<TaskResult>> futures = new ArrayList<>();
        
        try {
            // 提交三個任務
            futures.add(completionService.submit(new BusinessTask("支付接口A", 1000, false)));
            futures.add(completionService.submit(new BusinessTask("支付接口B", 2000, true)));
            futures.add(completionService.submit(new BusinessTask("支付接口C", 3000, false)));
            
            // 等待任務完成,只要有一個成功就返回
            TaskResult successResult = null;
            int taskCount = futures.size();
            
            for (int i = 0; i < taskCount; i++) {
                try {
                    // 獲取已完成的任務結果,設置超時時間
                    Future<TaskResult> future = completionService.poll(5, TimeUnit.SECONDS);
                    if (future == null) {
                        System.out.println("獲取任務結果超時");
                        continue;
                    }
                    
                    // 如果沒有異常拋出,說明任務成功
                    TaskResult result = future.get();
                    successResult = result;
                    break; // 只要有一個成功就跳出循環(huán)
                    
                } catch (ExecutionException e) {
                    // 任務執(zhí)行失敗,繼續(xù)等待其他任務
                    System.out.println("任務執(zhí)行失敗: " + e.getCause().getMessage());
                } catch (TimeoutException e) {
                    System.out.println("等待任務完成超時");
                }
            }
            
            // 處理最終結果
            if (successResult != null) {
                System.out.println("整體執(zhí)行成功: " + successResult.getMessage());
                // 取消其他可能還在執(zhí)行的任務
                cancelRemainingTasks(futures);
            } else {
                System.out.println("所有任務都執(zhí)行失敗");
            }
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("線程被中斷: " + e.getMessage());
        } finally {
            // 關閉線程池
            executor.shutdown();
        }
    }
    
    // 取消所有未完成的任務
    private static void cancelRemainingTasks(List<Future<TaskResult>> futures) {
        for (Future<TaskResult> future : futures) {
            if (!future.isDone()) {
                future.cancel(true);
                System.out.println("取消未完成的任務");
            }
        }
    }
}

關鍵技術點

  • CompletionService的使用:通過其poll方法可以按完成順序獲取任務結果,還可以設置超時時間。
  • 任務取消機制:當一個任務成功后,我們需要取消其他未完成的任務以節(jié)省資源,這通過Futurecancel方法實現(xiàn)。
  • 異常處理:任務執(zhí)行失敗會拋出ExecutionException,我們捕獲這個異常并繼續(xù)等待其他任務。
  • 線程池管理:使用完線程池后必須關閉,以釋放資源。

注意事項

  • 線程安全:確保提交給CompletionService的任務是線程安全的,避免共享可變狀態(tài)。
  • 資源釋放:無論任務執(zhí)行成功與否,都要確保線程池被正確關閉。
  • 超時設置:合理設置poll方法的超時時間,避免無限等待。
  • 任務取消的局限性:對于已經開始執(zhí)行的任務,cancel (true)只能通過中斷來嘗試停止,如果任務不響應中斷,可能無法真正取消。
  • 線程池大小:根據(jù)任務特性合理設置線程池大小,過多的線程會消耗更多資源,過少則可能影響并發(fā)效率。
責任編輯:武曉燕 來源: 一安未來
相關推薦

2009-09-24 09:12:22

開源商業(yè)模式Intel

2012-07-20 13:27:19

Firefox 15瀏覽器

2010-03-19 09:10:50

Windows 7系統(tǒng)更新

2013-05-29 00:32:21

WordPress開源博客開源

2012-06-01 10:46:09

Windows 8微軟

2022-12-07 15:38:56

無線技術Wi-Fi

2021-05-26 15:37:31

物聯(lián)網互聯(lián)網IoT

2013-01-15 11:15:18

Ubuntu手機系統(tǒng)

2019-12-13 15:14:00

IT團隊企業(yè)

2024-03-19 15:02:28

云原生工業(yè)4.0

2016-02-24 09:41:44

IT人士經驗教訓

2009-01-27 20:57:00

服務器Server 2003案例

2013-03-28 11:11:02

云宕機SLA云安全

2009-12-15 17:55:57

Linux操作系統(tǒng)

2009-02-25 08:27:00

Windows 3.1N95Symbian

2012-10-10 13:23:24

2017-12-19 07:22:00

IPv6流量遷移

2020-04-03 16:25:26

機器視覺工業(yè)4.0工業(yè)物聯(lián)網

2023-11-03 08:32:53

Flask高并發(fā)
點贊
收藏

51CTO技術棧公眾號