Java異步非阻塞編程的幾種方式
一 從一個(gè)同步的Http調(diào)用說(shuō)起
一個(gè)很簡(jiǎn)單的業(yè)務(wù)邏輯,其他后端服務(wù)提供了一個(gè)接口,我們需要通過(guò)接口調(diào)用,獲取到響應(yīng)的數(shù)據(jù)。
逆地理接口:通過(guò)經(jīng)緯度獲取這個(gè)經(jīng)緯度所在的省市區(qū)縣以及響應(yīng)的code:
- curl-i"http://xxx?latitude=31.08966221524924&channel=amap7a&near=false&longitude=105.13990312814713"
 - {"adcode":"510722"}
 
服務(wù)端執(zhí)行,最簡(jiǎn)單的同步調(diào)用方式:
服務(wù)端響應(yīng)之前,IO會(huì)阻塞在:java.net.SocketInputStream#socketRead0 的native方法上:
通過(guò)jstack日志,可以發(fā)現(xiàn),此時(shí)這個(gè)Thread會(huì)一直在runable的狀態(tài):
- "main"#1 prio=5 os_prio=31 tid=0x00007fed0c810000 nid=0x1003 runnable [0x000070000ce14000] java.lang.Thread.State: RUNNABLE
 - at java.net.SocketInputStream.socketRead0(Native Method)
 - at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
 - at java.net.SocketInputStream.read(SocketInputStream.java:171)
 - at java.net.SocketInputStream.read(SocketInputStream.java:141)
 - at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84)
 - at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
 - at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
 - at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282)
 - at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
 - at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
 - at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
 - at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
 - at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165)
 - at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
 - at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
 - at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
 - at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
 - at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
 - at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
 - at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
 - at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
 - at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
 - at com.amap.aos.async.AsyncIO.blockingIO(AsyncIO.java:207)
 - .......
 
線(xiàn)程模型示例:
同步最大的問(wèn)題是在IO等待的過(guò)程中,線(xiàn)程資源沒(méi)有得到充分的利用,對(duì)于大量IO場(chǎng)景的業(yè)務(wù)吞吐量會(huì)有一定限制。
二 JDK NIO & Future
在JDK 1.5 中,JUC提供了Future抽象:
當(dāng)然并不是所有的Future都是這樣實(shí)現(xiàn)的,如 io.netty.util.concurrent.AbstractFuture 就是通過(guò)線(xiàn)程輪詢(xún)?nèi)ァ?/p>
這樣做的好處是,主線(xiàn)程可以不用等待IO響應(yīng),可以去做點(diǎn)其他的,比如說(shuō)再發(fā)送一個(gè)IO請(qǐng)求,可以等到一起返回:
- "main"#1 prio=5 os_prio=31 tid=0x00007fd7a500b000 nid=0xe03 waiting on condition [0x000070000a95d000] java.lang.Thread.State: WAITING (parking)
 - at sun.misc.Unsafe.park(Native Method)
 - - parking to wait for <0x000000076ee2d768> (a java.util.concurrent.CountDownLatch$Sync)
 - at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 - at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 - at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 - at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 - at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
 - at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:162)
 - at com.amap.aos.async.AsyncIO.futureBlockingGet(AsyncIO.java:201)
 - .....
 - "AsyncHttpClient-2-1"#11 prio=5 os_prio=31 tid=0x00007fd7a7247800 nid=0x340b runnable [0x000070000ba94000] java.lang.Thread.State: RUNNABLE
 - at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
 - at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
 - at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
 - at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
 - - locked <0x000000076eb00ef0> (a io.netty.channel.nio.SelectedSelectionKeySet)
 - - locked <0x000000076eb00f10> (a java.util.Collections$UnmodifiableSet)
 - - locked <0x000000076eb00ea0> (a sun.nio.ch.KQueueSelectorImpl)
 - at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
 - at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:693)
 - at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)
 - at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
 - at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
 - at java.lang.Thread.run(Thread.java:748)
 
主線(xiàn)程在等待結(jié)果返回過(guò)程中依然需要等待,沒(méi)有根本解決此問(wèn)題。
三 使用Callback回調(diào)方式
第二節(jié)中,依然需要主線(xiàn)程等待,獲取結(jié)果,那么可不可以在主線(xiàn)程完成發(fā)送請(qǐng)求后,再也不用關(guān)心這個(gè)邏輯,去執(zhí)行其他的邏輯?那就可以使用Callback機(jī)制。
如此一來(lái),主線(xiàn)程再也不需要關(guān)心發(fā)起IO后的業(yè)務(wù)邏輯,發(fā)送完請(qǐng)求后,就可以徹底去干其他事情,或者回到線(xiàn)程池中再供調(diào)度。如果是HttpServer,那么需要結(jié)合Servlet 3.1的異步Servlet。
異步Servelt參考資料
https://www.cnblogs.com/davenkin/p/async-servlet.html
使用Callback方式,從線(xiàn)程模型中看,發(fā)現(xiàn)線(xiàn)程資源已經(jīng)得到了比較充分的利用,整個(gè)過(guò)程中已經(jīng)沒(méi)有線(xiàn)程阻塞。
四 Callback hell
回調(diào)地獄,當(dāng)Callback的線(xiàn)程還需要執(zhí)行下一個(gè)IO調(diào)用的時(shí)候,這個(gè)時(shí)候進(jìn)入回調(diào)地獄模式。
典型的應(yīng)用場(chǎng)景如,通過(guò)經(jīng)緯度獲取行政區(qū)域adcode(逆地理接口),然后再根據(jù)獲得的adcode,獲取當(dāng)?shù)氐奶鞖庑畔?天氣接口)。
在同步的編程模型中,幾乎不會(huì)涉及到此類(lèi)問(wèn)題。
Callback方式的核心缺陷
五 JDK 1.8 CompletableFuture
那么有沒(méi)有辦法解決Callback Hell的問(wèn)題?當(dāng)然有,JDK 1.8中提供了CompletableFuture,先看看它是怎么解決這個(gè)問(wèn)題的。
將逆地理的Callback邏輯,封裝成一個(gè)獨(dú)立的CompletableFuture,當(dāng)異步線(xiàn)程回調(diào)時(shí),調(diào)用future.complete(T) ,將結(jié)果封裝。
將天氣執(zhí)行的Call邏輯,也封裝成為一個(gè)獨(dú)立的CompletableFuture ,完成之后,邏輯同上。
compose銜接,whenComplete輸出:
每一個(gè)IO操作,均可以封裝為獨(dú)立的CompletableFuture,從而避免回調(diào)地獄。
CompletableFuture,只有兩個(gè)屬性:
- result:Future的執(zhí)行結(jié)果 (Either the result or boxed AltResult)。
 - stack:操作棧,用于定義這個(gè)Future接下來(lái)操作的行為 (Top of Treiber stack of dependent actions)。
 
weatherFuture這個(gè)方法是如何被調(diào)用的呢?
通過(guò)堆??梢园l(fā)現(xiàn),是在 reverseCodeFuture.complete(result) 的時(shí)候,并且也將獲得的adcode作為參數(shù)執(zhí)行接下來(lái)的邏輯。
這樣一來(lái),就完美解決回調(diào)地獄問(wèn)題,在主的邏輯中,看起來(lái)像是在同步的進(jìn)行編碼。
六 Vert.x Future
Info-Service中,大量使用的 Vert.x Future 也是類(lèi)似的解決的方案,不過(guò)設(shè)計(jì)上使用Handler的概念。
核心執(zhí)行的邏輯差不多:
這當(dāng)然不是Vertx的全部,當(dāng)然這是題外話(huà)了。
七 Reactive Streams
異步編程對(duì)吞吐量以及資源有好處,但是有沒(méi)有統(tǒng)一的抽象去解決此類(lèi)問(wèn)題內(nèi),答案是 Reactive Streams。
核心抽象:Publisher Subscriber Processor Subscription ,整個(gè)包里面,只有這四個(gè)接口,沒(méi)有實(shí)現(xiàn)類(lèi)。
在JDK 9里面,已經(jīng)被作為一種規(guī)范封裝到 java.util.concurrent.Flow :
參考資料
https://www.baeldung.com/java-9-reactive-streams
http://ypk1226.com/2019/07/01/reactive/reactive-streams/
https://www.reactivemanifesto.org/
https://projectreactor.io/learn
一個(gè)簡(jiǎn)單的例子:

八 Reactor & Spring 5 & Spring WebFlux
Flux & Mono
參考資料
https://projectreactor.io/docs/core/3.1.0.M3/reference/index.html
https://speakerdeck.com/simonbasle/projectreactor-dot-io-reactor3-intro





































 
 
 








 
 
 
 