Android Bolts-更簡(jiǎn)單的完成線程調(diào)度和任務(wù)管理
尤塞恩·圣利奧·博爾特 Usain St Leo Bolt ,牙買(mǎi)加短跑運(yùn)動(dòng)員,男子100米、男子200米以及男子400米接力賽的世界紀(jì)錄保持人,同時(shí)是以上三項(xiàng)賽事的連續(xù)三屆奧運(yùn)金牌得主。
使用 Bolts 可以將一個(gè)完整的操作拆分成多個(gè)子任務(wù),這些子任務(wù)可以自由的拆分、組合和替換,每個(gè)任務(wù)作為整個(gè)任務(wù)鏈的一環(huán)可以運(yùn)行在指定線程中,同時(shí)既能從上行任務(wù)中獲取任務(wù)結(jié)果,又可以向下行任務(wù)發(fā)布當(dāng)前任務(wù)的結(jié)果,而不必考慮線程之間的交互。
前言
一個(gè)關(guān)于線程調(diào)度的簡(jiǎn)單需求,在子線程從網(wǎng)絡(luò)下載圖片,并返回下載的圖片,在主線程使用該圖片更新到 UI,同時(shí)返回當(dāng)前 UI 的狀態(tài) json,在子線程將 json 數(shù)據(jù)保存到本地文件,完成后在主線程彈出提示,這中間涉及到了 4 次線程切換,同時(shí)后面的任務(wù)需要前面任務(wù)完成后的返回值作為參數(shù)。
使用 Thread + Handler 實(shí)現(xiàn),線程調(diào)度很不靈活,代碼可讀性差,不美觀,擴(kuò)展性差,錯(cuò)誤處理異常麻煩。
- String url = "http://www.baidu.com";
- Handler handler = new Handler(Looper.getMainLooper());
- new Thread(() -> {
- // 下載
- Bitmap bitmap = downloadBitmap(url);
- handler.post(() -> {
- // 更新 UI
- String json = updateUI(bitmap);
- new Thread(() -> {
- // 向存儲(chǔ)寫(xiě)入U(xiǎn)I狀態(tài)
- saveUIState(json);
- // 保存成功后,提示
- handler.post(() -> toastMsg("save finish."));
- }).start();
- });
- }).start();
使用 RxJava 實(shí)現(xiàn),線程調(diào)度非常靈活,鏈?zhǔn)秸{(diào)用,代碼清晰,擴(kuò)展性好,有統(tǒng)一的異常處理機(jī)制,不過(guò) Rx 是一個(gè)很強(qiáng)大的庫(kù),如果只用來(lái)做線程調(diào)度的話, Rx 就顯得有點(diǎn)太重了。
- Observable.just(URL)
- // 下載
- .map(this::downloadBitmap)
- .subscribeOn(Schedulers.newThread())
- // 更新UI
- .observeOn(AndroidSchedulers.mainThread())
- .map(this::updateUI)
- // 存儲(chǔ) UI 狀態(tài)
- .observeOn(Schedulers.io())
- .map(this::saveUIState)
- // 顯示提示
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(rst -> toastMsg("save to " + rst),
- // handle error
- Throwable::printStackTrace);
使用 bolts 實(shí)現(xiàn),線程調(diào)度靈活,鏈?zhǔn)秸{(diào)用,代碼清晰,具有良好的擴(kuò)展性,具有統(tǒng)一的異常處理機(jī)制,雖然沒(méi)有 Rx 那么豐富的操作符,但是勝在類(lèi)庫(kù)非常非常小,只有 38 KB。
- Task
- .forResult(URL)
- // 下載
- .onSuccess(task -> downloadBitmap(task.getResult()), Task.BACKGROUND_EXECUTOR)
- // 更新UI
- .onSuccess(task -> updateUI(task.getResult()), Task.UI_THREAD_EXECUTOR)
- // 存儲(chǔ)UI狀態(tài)
- .onSuccess(task -> saveUIState(task.getResult()), Task.BACKGROUND_EXECUTOR)
- // 提示
- .onSuccess(task -> toastMsg("save to " + task.getResult()), Task.UI_THREAD_EXECUT
- // handle error
- .continueWith(task -> {
- if (task.isFaulted()) {
- task.getError().printStackTrace();
- return false;
- }
- return true;
- });
線程調(diào)度器
共有 4 種類(lèi)型執(zhí)行線程,將任務(wù)分發(fā)到指定線程執(zhí)行,分別是
- backgroud - 后臺(tái)線程池,可以并發(fā)執(zhí)行任務(wù)。
- scheduled - 單線程池,只有一個(gè)線程,主要用來(lái)執(zhí)行 delay 操作。
- immediate - 即時(shí)線程,如果線程調(diào)用棧小于 15,則在當(dāng)前線程執(zhí)行,否則代理給 background 。
- uiThread - 針對(duì) Android 設(shè)計(jì),使用 Handler 發(fā)送到主線程執(zhí)行。
backgroud
主要用來(lái)在后臺(tái)并發(fā)執(zhí)行多任務(wù)
- public static final ExecutorService BACKGROUND_EXECUTOR = BoltsExecutors.background();
在 Android 平臺(tái)下根據(jù) CPU 核數(shù)創(chuàng)建線程池,其他情況下,創(chuàng)建緩存線程池。
- background = !isAndroidRuntime()
- ? java.util.concurrent.Executors.newCachedThreadPool()
- : AndroidExecutors.newCachedThreadPool();
scheduled
主要用于任務(wù)之間做 delay 操作,并不實(shí)際執(zhí)行任務(wù)。
- scheduled = Executors.newSingleThreadScheduledExecutor();
immediate
主要用來(lái)簡(jiǎn)化那些不指定運(yùn)行線程的方法,默認(rèn)在當(dāng)前線程去執(zhí)行任務(wù),使用 ThreadLocal 保存每個(gè)線程調(diào)用棧的深度,如果深度不超過(guò) 15,則在當(dāng)前線程執(zhí)行,否則代理給 backgroud 執(zhí)行。
- private static final Executor IMMEDIATE_EXECUTOR = BoltsExecutors.immediate();
- // 關(guān)鍵方法
- @Override
- public void execute(Runnable command) {
- int depth = incrementDepth();
- try {
- if (depth <= MAX_DEPTH) {
- command.run();
- } else {
- BoltsExecutors.background().execute(command)
- }
- } finally {
- decrementDepth();
- }
- }
uiThread
為 Android 專(zhuān)門(mén)設(shè)計(jì),在主線程執(zhí)行任務(wù)。
- public static final Executor UI_THREAD_EXECUTOR = AndroidExecutors.uiThread();
- private static class UIThreadExecutor implements Executor {
- @Override
- public void execute(Runnable command) {
- new Handler(Looper.getMainLooper()).post(command);
- }
- }
核心類(lèi)
Task ,最核心的類(lèi),每個(gè)子任務(wù)都是一個(gè) Task ,它們負(fù)責(zé)自己需要執(zhí)行的任務(wù)。每個(gè) Task 具有 3 種狀態(tài) Result 、 Error 和 Cancel ,分別代表成功、異常和取消。
Continuation ,是一個(gè)接口,它就像鏈接子任務(wù)每一環(huán)的鎖扣,把一個(gè)個(gè)獨(dú)立的任務(wù)鏈接在一起。
通過(guò) Task - Continuation - Task - Continuation ... 的形式組成完整的任務(wù)鏈,順序在各自線程執(zhí)行。
創(chuàng)建 Task
根據(jù) Task 的 3 種狀態(tài),創(chuàng)建簡(jiǎn)單的 Task ,會(huì)復(fù)用已有的任務(wù)對(duì)象
- public static <TResult> Task<TResult> forResult(TResult value)
- public static <TResult> Task<TResult> forError(Exception error)
- public static <TResult> Task<TResult> cancelled()
使用 delay 方法,延時(shí)執(zhí)行并創(chuàng)建 Task
- public static Task delay(long delay)
- public static Task delay(long delay, CancellationToken cancellationToken)
使用 whenAny 方法,執(zhí)行多個(gè)任務(wù),當(dāng)任意任務(wù)返回結(jié)果時(shí),保存這個(gè)結(jié)果
- public static Task> whenAnyResult(Collection> tasks)
- public static Task> whenAny(Collection> tasks)
使用 whenAll 方法,執(zhí)行多個(gè)任務(wù),當(dāng)全部任務(wù)執(zhí)行完后,返回結(jié)果
- public static Task whenAll(Collection> tasks)
- public static Task> whenAllResult(final Collection> tasks)
使用 call 方法,執(zhí)行一個(gè)任務(wù),同時(shí)創(chuàng)建 Task
- public static Task call(final Callable callable, Executor executor,
- final CancellationToken ct)
鏈接子任務(wù)
使用 continueWith 方法,鏈接一個(gè)子任務(wù),如果前行任務(wù)已經(jīng)執(zhí)行完成,則立即執(zhí)行當(dāng)前任務(wù),否則加入隊(duì)列中,等待。
- public <TContinuationResult> Task<TContinuationResult> continueWith(
- final Continuation<TResult, TContinuationResult> continuation, final Executor executor,
- final CancellationToken ct)
使用 continueWithTask 方法,在當(dāng)前任務(wù)之后鏈接另一個(gè)任務(wù)鏈,這種做法是為了滿足那種將部分任務(wù)組合在一起分離出去,作為公共任務(wù)的場(chǎng)景,他接受將另外一個(gè)完全獨(dú)立的任務(wù)鏈,追加在當(dāng)前執(zhí)行的任務(wù)后面。
- public <TContinuationResult> Task<TContinuationResult> continueWithTask(
- final Continuation<TResult, Task<TContinuationResult>> continuation, final Executor executor,
- final CancellationToken ct)
使用 continueWhile 方法鏈接子任務(wù),與 continueWith 區(qū)別在于,他有一個(gè) predicate 表達(dá)式,只有當(dāng)表達(dá)式成立時(shí),才會(huì)追加子任務(wù),這樣做是在執(zhí)行任務(wù)前可以做一個(gè)攔截操作,也是為了不破環(huán)鏈?zhǔn)秸{(diào)用的整體風(fēng)格。
- public Task<Void> continueWhile(final Callable<Boolean> predicate,
- final Continuation<Void, Task<Void>> continuation, final Executor executor,
- final CancellationToken ct)
使用 onSuccess 和 onSuccessTask 鏈接單個(gè)任務(wù)個(gè)任務(wù)鏈,區(qū)別于 continueWith 在于, onSuccess 方法,前行任務(wù)如果失敗了,后行的任務(wù)也會(huì)直接失敗,不會(huì)再執(zhí)行,但是 continueWith 的各個(gè)子任務(wù)之間沒(méi)有關(guān)聯(lián),就算前行任務(wù)失敗,后行任務(wù)也會(huì)執(zhí)行。
- public <TContinuationResult> Task<TContinuationResult> onSuccess(
- final Continuation<TResult, TContinuationResult> continuation, Executor executor,
- final CancellationToken ct)
取消任務(wù)
- CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
- CancellationToken token = cancellationTokenSource.getToken();
- Task.call((Callable<String>) () -> null,
- Task.BACKGROUND_EXECUTOR,
- token);
- // 取消任務(wù)
- cancellationTokenSource.cancel();
異常的處理
關(guān)于異常的處理,整個(gè)機(jī)制下來(lái),每個(gè)任務(wù)作為一個(gè)獨(dú)立的單位,異常會(huì)被統(tǒng)一捕捉,因此不必針對(duì)任務(wù)中的方法進(jìn)行單獨(dú)的處理。
如果使用了 continueWith 鏈接任務(wù),那么當(dāng)前任務(wù)的的異常信息,將會(huì)保存在當(dāng)前 Task 中在下行任務(wù)中進(jìn)行處理,下行任務(wù)也可以不處理這個(gè)異常,直接執(zhí)行任務(wù),那么這個(gè)異常就到這里停止了,不會(huì)再向下傳遞,也就是說(shuō),只有下行任務(wù)才知道當(dāng)前任務(wù)的結(jié)果,不管是成功還是異常。
當(dāng)然了,如果任務(wù)之間有關(guān)聯(lián),由于上行任務(wù)的異常極大可能造成當(dāng)前任務(wù)的異常,那么當(dāng)前任務(wù)異常的信息,又會(huì)向下傳遞,但是上行任務(wù)的異常就到這里為止了。
如果使用 onSuccess 之類(lèi)的方法,如果上行任務(wù)異常了,那么下行任務(wù)根本不會(huì)執(zhí)行,而是直接將異常往下面?zhèn)鬟f,直到被處理掉。
任務(wù)的分離和組合
我們可以將一個(gè)完整的操作細(xì)分成多個(gè)任務(wù),每個(gè)任務(wù)都遵循單一職責(zé)的原則而盡量簡(jiǎn)單,這樣可以在任務(wù)之間再穿插新的任務(wù),或者將部分任務(wù)分離出來(lái)組合到一起等。
擴(kuò)展性
我們可以在兩個(gè)細(xì)分的任務(wù)之間添加一個(gè)新的操作,而不影響上行和下行任務(wù),如我們給文章開(kāi)頭的需求中更新 UI 之前,將 Bitmap 先保存到本地。
- Task
- .forResult(URL)
- // 下載
- .onSuccess(task -> downloadBitmap(task.getResult()), Task.BACKGROUND_EXECUTOR)
- // 保存在本地
- .onSuccess(task -> saveBitmapToFile(task.getResult()),Task.BACKGROUND_EXECUTOR)
- // 更新UI
- .onSuccess(task -> updateUI(task.getResult()), Task.UI_THREAD_EXECUTOR)
- ...
復(fù)用性
對(duì)一些公共的操作,可以單獨(dú)分離成新的任務(wù),當(dāng)需要做類(lèi)似操作時(shí),即可復(fù)用這部份功能,如可以將 下載圖片并更新 UI 、 保存狀態(tài)并彈出提示 兩塊功能分離出來(lái),作為公共的任務(wù)。
- // 下載圖片->更新UI
- public Continuation<String, Task<String>> downloadImageAndUpdateUI() {
- return task ->
- Task.call(() -> downloadBitmap(task.getResult()), Task.BACKGROUND_EXECUTOR)
- .continueWith(taskWithBitmap -> updateUI(taskWithBitmap.getResult()), Task.UI_THREAD_EXECUTOR);
- }
- // 保存狀態(tài)->提示信息
- public Continuation<String, Task<Boolean>> saveStateAndToast() {
- return task ->
- Task.call(() -> saveUIState(task.getResult()), Task.BACKGROUND_EXECUTOR)
- .continueWith(taskWithPath -> toastMsg("save to " + taskWithPath.getResult()));
- }
使用分離的任務(wù)
- Task
- .forResult(URL)
- .continueWithTask(downloadImageAndUpdateUI())
- .continueWithTask(saveStateAndToast())
- ...
總結(jié)
在 Task 中有一個(gè) continuations 是當(dāng)前任務(wù)后面追加的任務(wù)列表,當(dāng)當(dāng)前任務(wù)成功、異?;蛘呷∠麜r(shí),會(huì)去執(zhí)行列表中的后續(xù)任務(wù)。
通常情況下,我們使用鏈?zhǔn)秸{(diào)用構(gòu)建任務(wù)鏈,結(jié)果就是一條沒(méi)有分支的任務(wù)鏈。
添加任務(wù)時(shí):每次添加一個(gè) Continuation ,就會(huì)生成一個(gè) Task ,加到上行任務(wù)的 continuations 列表中,等待執(zhí)行,同時(shí)返回當(dāng)前的 Task ,以便后面的任務(wù)可以鏈接到當(dāng)前任務(wù)后面。
執(zhí)行任務(wù)時(shí):當(dāng)前任務(wù)執(zhí)行完之后,結(jié)果可能有 3 種,都會(huì)被保存到當(dāng)前的 Task 中,然后檢查 continuations 列表中的后續(xù)任務(wù),而當(dāng)前的 Task 就會(huì)作為參數(shù),傳遞到后續(xù)鏈接的任務(wù)中,來(lái)讓后面的任務(wù)得知上行任務(wù)的結(jié)果。