偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

理解RxJava線程模型

移動(dòng)開(kāi)發(fā) Android
RxJava作為目前一款超火的框架,它便捷的線程切換一直被人們津津樂(lè)道,本文從源碼的角度,來(lái)對(duì)RxJava的線程模型做一次深入理解。

[[182175]]

RxJava作為目前一款超火的框架,它便捷的線程切換一直被人們津津樂(lè)道,本文從源碼的角度,來(lái)對(duì)RxJava的線程模型做一次深入理解。(注:本文的多處代碼都并非原本的RxJava的源碼,而是用來(lái)說(shuō)明邏輯的偽代碼)

入手體驗(yàn)

RxJava 中切換線程非常簡(jiǎn)單,例如最常見(jiàn)的異步線程處理,主線程回調(diào)的模型,可以很優(yōu)雅的用如下代碼來(lái)做處理:

  1. Observable.just("magic"
  2.         .map(str -> doExpensiveWork(str)) 
  3.         .subscribeOn(Schedulers.io()) 
  4.         .observeOn(AndroidSchedulers.mainThread()) 
  5.         .subscribe(obj -> System.out.print(String.valueOf(obj)));  

如上,subscribeOn(Schedulers.io())保證了doExpensiveWork 函數(shù)發(fā)生在io線程,observeOn(AndroidSchedulers.mainThread())保證了subscribe 回調(diào)發(fā)生在Android 的主線程。所以,這自然而然的引出了本文的關(guān)鍵點(diǎn),subscribeOn與observeOn到底區(qū)別在哪里?

流程淺析

要想回答上面的問(wèn)題,我們首先需要對(duì)RxJava的流程有大體了解,一個(gè)Observable從產(chǎn)生,到最終執(zhí)行subscribe,中間可以經(jīng)歷n個(gè)變換,每次變換會(huì)產(chǎn)生一個(gè)新的Observable,就像奧運(yùn)開(kāi)幕的傳遞火炬一樣,每次火炬都會(huì)傳遞到下一個(gè)人,最終點(diǎn)燃圣火的是***一個(gè)火炬手,即最終執(zhí)行subscribe操作的是***一個(gè)Observable,所以,每個(gè)Observable之間必須有聯(lián)系,這種關(guān)系在代碼中的體現(xiàn)就是,每個(gè)變換后的Observable都會(huì)持有上一個(gè)Observable 中OnSubscribe對(duì)象的引用(Observable.create 函數(shù)所需的參數(shù)),最終 Observable的subscribe函數(shù)中的關(guān)鍵代碼是這一句:

  1. observable.onSubscribe.call(subscriber) 

這個(gè)observable就是***一個(gè)變換后的observable,那這個(gè)onSubscribe對(duì)象是誰(shuí)呢?如何一個(gè)observable沒(méi)有經(jīng)過(guò)任何變換,直接執(zhí)行了subscribe,當(dāng)然就是我們?cè)赾reate中傳入的onSubscribe, 但如果中間經(jīng)過(guò)map、reduce等變換,這個(gè)onSubscribe顯然就應(yīng)該是創(chuàng)建變換后的observable傳入的參數(shù),大部分變換最終都交由lift函數(shù):

  1. public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { 
  2.     return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); 
  3.  

所以,上文所提到的onSubscribe對(duì)象應(yīng)該是OnSubscribeLift的實(shí)例,而這個(gè)OnSubscribeLift所接收的兩個(gè)參數(shù),一個(gè)是前文提到的,上一個(gè)Observable中的OnSubscribe對(duì)象,而operator則是每種變換的一個(gè)抽象接口。再來(lái)看這個(gè)OnSubscribeLift對(duì)象的call方法:

  1. public void call(Subscriber<? super R> o) { 
  2. Subscriber<? super T> st = operator.call(o); 
  3. parent.call(st); 
  4. }  

operator與parent就是前文提到的兩個(gè)參數(shù),可見(jiàn),operator接口會(huì)擁有call方法,接收一個(gè)Subscriber, 并返回一個(gè)新的Subscriber對(duì)象,而接下來(lái)的parent.call(st)是回調(diào)上一層observable的onSubscribe的call方法,這樣如此繼續(xù),一直到一個(gè)onSubscribe截止。這樣我們首先理清了一條線路,就是從***一個(gè)observable的subscribe后,OnSubscribe調(diào)用的順序是從后向前的。

這就帶來(lái)了另外一個(gè)疑問(wèn),從上面的代碼可以看到,在執(zhí)行parent.call(st)之前已經(jīng)執(zhí)行了operator.call(o)方法,如果call方法里就把變換的操作執(zhí)行了的話,那似乎變換也會(huì)是從后向前傳遞的呀?所以這個(gè)operator.call方法絕對(duì)不是我們想象的那么簡(jiǎn)單。這里以map操作符為例,看源碼:

  1. public Subscriber<? super T> call(final Subscriber<? super R> s) { 
  2.     MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); 
  3.     o.add(parent); 
  4.     return parent; 
  5.  

這里果然沒(méi)有執(zhí)行變換操作,而是生成一個(gè)MapSubscriber對(duì)象,這里需要注意MapSubscriber構(gòu)造函數(shù)的兩個(gè)參數(shù),transformer是真正要執(zhí)行變換的Func1對(duì)象,這很好理解,那對(duì)于o這個(gè)Subscriber是哪一個(gè)呢?什么意思?舉個(gè)🌰:

o1 -> o2 -> subscribe(Subscriber s0);

o1 經(jīng)過(guò)map操作變?yōu)閛2, o2執(zhí)行subscribe操作,如果你理解上文可以知道,這段流程的執(zhí)行順序?yàn)閟0會(huì)首先傳遞給o2, o2的lift操作會(huì)將s0轉(zhuǎn)換為s1傳遞給o1, 那么在生成o2 這個(gè)map操作的 call(final Subscriber<? super R> s)方法中,s值得是誰(shuí)呢?是s0還是s1呢?答案應(yīng)該是s0,也就是它的下一級(jí)Subscriber,原因很簡(jiǎn)單,call方法中返回的MapSubscriber對(duì)象parent才是s1.

所以,我們來(lái)看一下MapSubscriber的onNext方法做了什么呢?

  1. public void onNext(T t) { 
  2.     R result; 
  3.     result = transformer.call(t); 
  4.     s.onNext(result); 
  5.  

很明了,首先執(zhí)行變換,然后回調(diào)下一級(jí)的onNext函數(shù)。

至此,一個(gè)observable從初始,到變換,再到subscribe,我們已經(jīng)對(duì)整個(gè)流程有了大體了解。簡(jiǎn)單來(lái)講一個(gè)o1經(jīng)過(guò)map變?yōu)閛2,可以理解為o2對(duì)o1做了一層hook,會(huì)經(jīng)歷兩次流程,首先是onSubscribe對(duì)象的call流程會(huì)從o2流向o1,我們簡(jiǎn)稱流程a,到達(dá)o1后,o1又會(huì)出發(fā)Subscriber的onNext系列流程,簡(jiǎn)稱流程b,流程b才是真正執(zhí)行變換的流程,其走向是從o1流向o2.理解了這個(gè),我們就可以更近一步的理解RxJava中線程的模型了。

tip: 一定要深刻理解流程a與流程b的區(qū)別。這對(duì)下文理解線程切換至關(guān)重要。

切換方式

RxJava對(duì)線程模型的抽象是Scheduler,這是一個(gè)抽象類,包含一個(gè)抽象方法:

  1. public abstract Worker createWorker(); 

這個(gè)Worker是何方神圣呢?它其實(shí)是Scheduler的抽象內(nèi)部類,主要 包含兩個(gè)抽象方法:

  1. 1) public abstract Subscription schedule(Action0 action); 
  2. 2) public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);  

可見(jiàn),Worker才是線程執(zhí)行的主力,兩個(gè)方法一個(gè)用與立即執(zhí)行任務(wù),另一個(gè)用與執(zhí)行延時(shí)任務(wù)。而Scheduler是Worker的工廠,用于對(duì)外提供Worker。

RxJava中共有兩種常見(jiàn)的方式來(lái)切換線程,分別是subscribeOn變換與observeOn變換,這兩者接收的參數(shù)都是Scheduler。接下來(lái)從源碼層面來(lái)對(duì)比這兩者的差別。

subscribeOn

首先看subscribeOn的部分

  1. public final Observable<T> subscribeOn(Scheduler scheduler) { 
  2.     return create(new OperatorSubscribeOn<T>(this, scheduler)); 
  3.  

create一個(gè)新的Observable,傳入的參數(shù)是OperatorSubscribeOn,很明顯這應(yīng)該是OnSubscribe的一個(gè)實(shí)現(xiàn),關(guān)注這個(gè)OperatorSubscribeOn的call實(shí)現(xiàn)方法:

  1. public void call(final Subscriber<? super T> subscriber) { 
  2.      final Worker inner = scheduler.createWorker();         
  3.      inner.schedule(new Action0() { 
  4.            @Override 
  5.             public void call() { 
  6.                 final Thread t = Thread.currentThread(); 
  7.                  
  8.                 Subscriber<T> s = new Subscriber<T>(subscriber) { 
  9.                     @Override 
  10.                     public void onNext(T t) { 
  11.                         subscriber.onNext(t); 
  12.                     } 
  13.                      
  14.                     ... 
  15.                      
  16.                 }; 
  17.                  
  18.                 source.unsafeSubscribe(s); 
  19.             } 
  20.     }); 
  21.  

這里比較關(guān)鍵了,上文提到了流程a與流程b,首先明確一點(diǎn),這個(gè)call方法的執(zhí)行時(shí)機(jī)是流程a,也就是說(shuō)這個(gè)call發(fā)生在流程b之前,call方法里首先通過(guò)外部傳入的scheduler創(chuàng)建Worker – inner對(duì)象,接著在inner中執(zhí)行了一段代碼,神奇了,Action0中call方法這段代碼就在worker線程中執(zhí)行了,也就是此刻程進(jìn)行了切換。注意***一句代碼source.unsafeSubscribe(s),source 代表創(chuàng)建OperatorSubscribeOn對(duì)象是傳進(jìn)來(lái)的上一個(gè)Observable, 這句的源碼如下:

  1. public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { 
  2.             return onSubscribe.call(subscriber); 

和上文提到的lift方法中OnSubscribeLift對(duì)象的call方法中parent.call(st)作用類似,就是將當(dāng)前的Observable與上一個(gè)Observable通過(guò)onSubscribe關(guān)聯(lián)起來(lái)。

至此,我們可以大致了解了subscribeOn的原理,它會(huì)在流程a就進(jìn)行了線程切換,但由于流程a上實(shí)際上都是Observable之間串聯(lián)關(guān)系的代碼,并且是從后面的Observable流向前面的Observable,這帶來(lái)的一個(gè)隱含意思就是,對(duì)于流程b而言,最早的subscribeOn會(huì)屏蔽其后面的subscribeOn! 比如:

  1. Observable.just("magic"
  2.           .map(file -> doExpensiveWork(file)) 
  3.           .subscribeOn(Schedulers.io()) 
  4.           .subscribeOn(AndroidSchedulers.mainThread()) 
  5.           .subscribe(obj -> doAction(obj)));  

這段代碼中無(wú)論是doExpensiveWork函數(shù)還是doAction函數(shù),都會(huì)在io線程出觸發(fā)。

observeOn

理解了subscribeOn,那理解observeOn就會(huì)更容易一下,observeOn函數(shù)最終會(huì)轉(zhuǎn)換到這個(gè)函數(shù):

  1. public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { 
  2.         return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); 
  3.  

很明顯,這是做了一次lift操作,我們需要關(guān)注OperatorObserveOn這個(gè)Operator,查看其call方法:

  1. public Subscriber<? super T> call(Subscriber<? super T> child) { 
  2.     ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); 
  3.     parent.init(); 
  4.     return parent; 
  5.  

這里返回的是一個(gè)ObserveOnSubscriber對(duì)象,我們關(guān)注這個(gè)Subscriber的onNext函數(shù),

  1. public void onNext(final T t) { 
  2.     schedule(); 
  3.  

它只是簡(jiǎn)單的執(zhí)行了schedule函數(shù),來(lái)看下這個(gè)schedule:

  1. protected void schedule() { 
  2.         recursiveScheduler.schedule(this); 
  3.  

這里亂入的recursiveScheduler.schedule是什么鬼?它并不神奇,它就是ObserveOnSubscriber構(gòu)造函數(shù)傳進(jìn)來(lái)的scheduler創(chuàng)建的worker:

  1. this.recursiveScheduler = scheduler.createWorker(); 

所以,magic再次產(chǎn)生,observeOn在其onNext中進(jìn)行了線程的切換,那這個(gè)onNext是在什么時(shí)候執(zhí)行的呢?通過(guò)上文可知,是在流程b中。所以observeOn會(huì)影響其后面的流程,直到出現(xiàn)下一次observeOn或者結(jié)束。

周邊技巧

線程模型的選擇

RxJava為我們內(nèi)置了幾種線程模型,主要區(qū)別如下:

  • computation內(nèi)部是一個(gè)線程,線程池的大小cpu核數(shù):Runtime.getRuntime().availableProcessors(),這種線程比較適合做純cpu運(yùn)算,如求100億以內(nèi)的斐波那契數(shù)列的和之類。
  • newThread每次createWorker都會(huì)生成一個(gè)新的線程。
  • io與newThread類似,但內(nèi)部是一個(gè)沒(méi)有上線的線程池,一般來(lái)講,使用io會(huì)比newThread好一些,因?yàn)槠鋬?nèi)部的線程池可以重用線程。
  • immediate在當(dāng)前線程立即執(zhí)行
  • trampoline

在當(dāng)前線程執(zhí)行,與immediate不同的是,它并不會(huì)立即執(zhí)行,而是將其存入隊(duì)列,等待當(dāng)前Scheduler中其它任務(wù)執(zhí)行完畢后執(zhí)行,這個(gè)在我們時(shí)常使用的并不多,它主要服務(wù)于repeat ,retry這類特殊的變換操作。

  • from接收一個(gè)Executor,允許我們自定義Scheduler。

Scheduler.Worker強(qiáng)勢(shì)搶鏡

其實(shí)RxJava中的Worker完全可以抽出來(lái)為我所用,如下面這種寫法,就是新開(kāi)線程執(zhí)行了一個(gè)action。

  1. Scheduler.Worker worker = Schedulers. newThread().createWorker(); 
  2. worker.schedule(new Action0() { 
  3.             @Override 
  4.             public void call() { 
  5.                 throw new RuntimeException("surprise"); 
  6.             } 
  7.         });  

當(dāng)然,你要選擇合適的時(shí)機(jī)去關(guān)閉(unsubscribe)worker來(lái)釋放資源。

自帶光環(huán)的操作符

某些操作符是有默認(rèn)的線程模型的,比如前文提到的repeat 與retry會(huì)默認(rèn)在trampoline線程模型下執(zhí)行, buffer ,debounce之類會(huì)默認(rèn)切換到computation。這里不做深入探討,只是當(dāng)你遇到某些問(wèn)題時(shí)記得,有些人物是自帶裝備與光環(huán)的。

總結(jié)

理解RxJava的線程模型最重要的是要與我們平常對(duì)異步的理解來(lái)區(qū)分開(kāi):

  1. doAsync("magic", new Callback() { 
  2.     @Override 
  3.     public void handle(Object msg) { 
  4.         a) .... 
  5.     } 
  6. }); 
  7.          
  8. b)....  

這是之前我們常寫的代碼,通常只會(huì)區(qū)分UI線程和非UI 線程,doAsync函數(shù)開(kāi)始后,程序進(jìn)行了分流,一個(gè)線程在執(zhí)行一個(gè)doAsync, 另一個(gè)線程在執(zhí)行b段代碼。RxJava另辟蹊徑,對(duì)整個(gè)線程做了抽象,RxJava的處理順序像一條流水,這不僅僅表現(xiàn)在代碼寫起來(lái)像一條鎖鏈上,邏輯上也是如此,對(duì)Observable自身而言,更改線程只是變換了流水前進(jìn)的軌道,并不是進(jìn)行分流,Android中常見(jiàn) 非UI線程處理數(shù)據(jù),UI 線程展示數(shù)據(jù)也只是這條流水變換的一種方式。

就我個(gè)人的理解,對(duì)于RxJava的線程切換,把它理解為異步,非異步,阻塞,非阻塞都有些不恰當(dāng),暫且只能理解為變換。so amazing! 

責(zé)任編輯:龐桂玉 來(lái)源: 安卓開(kāi)發(fā)精選
相關(guān)推薦

2024-05-31 08:10:58

Netty線程模型多路復(fù)用模型

2017-03-22 13:20:07

RxJavaSingleCompletable

2017-12-18 16:33:55

多線程對(duì)象模型

2025-03-05 00:22:00

2023-11-05 12:05:35

JVM內(nèi)存

2025-07-03 02:15:00

DOM對(duì)象模型JavaScript

2019-01-28 08:50:09

線程安全

2011-07-22 14:14:23

java

2020-10-14 08:50:38

搞懂 Netty 線程

2009-12-04 18:00:46

PHP開(kāi)發(fā)MVC模型

2013-07-11 10:37:20

Java內(nèi)存模型

2009-06-18 09:56:44

ADO.NET對(duì)象模型

2022-07-07 08:00:51

Java內(nèi)存模型

2015-07-01 13:34:22

Kubernetes應(yīng)用部署模型

2025-03-07 08:00:00

LLM蒸餾和量化大模型

2025-01-10 07:10:00

2024-05-24 14:35:49

2009-07-14 11:30:15

Swing線程

2012-01-17 17:21:24

JavaSwing
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)