Java并行編程:從并行任務集獲取反饋
在并行任務啟動后,強制性地從并行任務得到反饋。
假想有一個程序,可以發(fā)送批郵件,還使用了多線程機制。你想知道有多少郵件成功發(fā)送嗎?你想知道在實際發(fā)送過程期間,這個批處理工作的實時進展嗎?
要實現多線程的這種反饋,我們可以使用Callable接口。此接口的工作方式基本上與Runnable相同,但是執(zhí)行方法(call())會返回一個值,該值反映了執(zhí)行計算的結果。
- package com.ricardozuasti;
 - import java.util.concurrent.Callable;
 - public class FictionalEmailSender implements Callable<Boolean>{
 - private String to;
 - private String subject;
 - private String body;
 - public FictionalEmailSender(String to, String subject, String body){
 - this.to = to;
 - this.subject = subject;
 - this.body = body;
 - }
 - @Override
 - public Boolean call() throws InterruptedException {
 - // 在0~0.5秒間模擬發(fā)送郵件
 - Thread.sleep(Math.round(Math.random()*0.5*1000));
 - // 假設我們有80%的幾率成功發(fā)送郵件
 - if(Math.random()>0.2){
 - return true;
 - }else{
 - return false;
 - }
 - }
 - }
 
注意:Callable接口可用于返回任意數據類型,因此我們的任務可以返回我們需要的任何信息。
現在,我們使用一個線程池ExecutorService來發(fā)送郵件,由于我們的任務是以Callable接口實現的,我們提交執(zhí)行的每個新任務,都會得到一個Future引用。注意我們要使用直接的構造器創(chuàng)建ExecutorService,而不是使用來自Executors的工具方法創(chuàng)建。這是因為使用指定類ThreadPoolExecutor提供了一些方法可以派上用場。
- package com.ricardozuasti;
 - import java.util.concurrent.Future;
 - import java.util.concurrent.LinkedBlockingQueue;
 - import java.util.concurrent.ThreadPoolExecutor;
 - import java.util.concurrent.TimeUnit;
 - import java.util.ArrayList;
 - import java.util.List;
 - public class Concurrency2 {
 - public static void main(String[] args){
 - try{
 - ThreadPoolExecutor executor = new ThreadPoolExecutor(30, 30, 1,
 - TimeUnit.SECONDS, new LinkedBlockingQueue());
 - List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(9000);
 - // 發(fā)送垃圾郵件, 用戶名假設為4位數字
 - for(int i=1000; i<10000; i++){
 - futures.add(executor.submit(new FictionalEmailSender(i+"@sina.com",
 - "Knock, knock, Neo", "The Matrix has you...")));
 - }
 - // 提交所有的任務后,關閉executor
 - System.out.println("Starting shutdown...");
 - executor.shutdown();
 - // 每秒鐘打印執(zhí)行進度
 - while(!executor.isTerminated()){
 - executor.awaitTermination(1, TimeUnit.SECONDS);
 - int progress = Math.round((executor.getCompletedTaskCount()
 - *100)/executor.getTaskCount());
 - System.out.println(progress + "% done (" +
 - executor.getCompletedTaskCount() + " emails have been sent).");
 - }
 - // 現在所有郵件已發(fā)送完, 檢查futures, 看成功發(fā)送的郵件有多少
 - int errorCount = 0;
 - int successCount = 0;
 - for(Future<Boolean> future : futures){
 - if(future.get()){
 - successCount++;
 - }else{
 - errorCount++;
 - }
 - }
 - System.out.println(successCount + " emails were successfully sent, but " +
 - errorCount + " failed.");
 - }catch(Exception ex){
 - ex.printStackTrace();
 - }
 - }
 - }
 
執(zhí)行這個類,輸出結果如下:
- Starting shutdown...
 - 1% done (118 emails have been sent).
 - 2% done (232 emails have been sent).
 - 3% done (358 emails have been sent).
 - 5% done (478 emails have been sent).
 - 6% done (587 emails have been sent).
 - 7% done (718 emails have been sent).
 - 9% done (850 emails have been sent).
 - 10% done (969 emails have been sent).
 - ……
 
所有的任務都由ExecutorService提交,我們開始它的關閉(防止提交新任務)并使用一個循環(huán)(實時場景,可能你會繼續(xù)做其它的事情)來等待,直至所有任務都被執(zhí)行完成、計算和打印當前每次迭代的進度。
注意,你可以存儲executor引用,也可以在任意時間從其它線程查詢它的計算結果和報告進程進度。
最后,使用Future集合引用,我們得到ExecutorService提交的每個Callable接口,通知成功發(fā)送的郵件數量和發(fā)送失敗的郵件數量。
此結構不但易于使用,還使得相關性得到清晰的隔離,在調度程序和實際任務之間提供了一個預定義的通信機制。
原文鏈接:http://blog.csdn.net/chszs/article/details/7418880
【編輯推薦】















 
 
 







 
 
 
 