偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

一篇文章幫你徹底搞清楚“I/O多路復(fù)用”和“異步I/O”的前世今生

存儲 存儲軟件
在網(wǎng)絡(luò)的初期,網(wǎng)民很少,服務(wù)器完全無壓力,那時的技術(shù)也沒有現(xiàn)在先進,通常用一個線程來全程跟蹤處理一個請求。因為這樣最簡單。

 [[286434]]

曾經(jīng)的VIP服務(wù)

在網(wǎng)絡(luò)的初期,網(wǎng)民很少,服務(wù)器完全無壓力,那時的技術(shù)也沒有現(xiàn)在先進,通常用一個線程來全程跟蹤處理一個請求。因為這樣最簡單。

其實代碼實現(xiàn)大家都知道,就是服務(wù)器上有個ServerSocket在某個端口監(jiān)聽,接收到客戶端的連接后,會創(chuàng)建一個Socket,并把它交給一個線程進行后續(xù)處理。

線程主要從Socket讀取客戶端傳過來的數(shù)據(jù),然后進行業(yè)務(wù)處理,并把結(jié)果再寫入Socket傳回客戶端。

由于網(wǎng)絡(luò)的原因,Socket創(chuàng)建后并不一定能立刻從它上面讀取數(shù)據(jù),可能需要等一段時間,此時線程也必須一直阻塞著。在向Socket寫入數(shù)據(jù)時,也可能會使線程阻塞。

這里準(zhǔn)備了一個示例,主要邏輯如下:

客戶端:創(chuàng)建20個Socket并連接到服務(wù)器上,再創(chuàng)建20個線程,每個線程負(fù)責(zé)一個Socket。

服務(wù)器端:接收到這20個連接,創(chuàng)建20個Socket,接著創(chuàng)建20個線程,每個線程負(fù)責(zé)一個Socket。

為了模擬服務(wù)器端的Socket在創(chuàng)建后不能立馬讀取數(shù)據(jù),讓客戶端的20個線程分別休眠5-10之間的一個隨機秒數(shù)。

客戶端的20個線程會在第5秒到第10秒這段時間內(nèi)陸陸續(xù)續(xù)的向服務(wù)器端發(fā)送數(shù)據(jù),服務(wù)器端的20個線程也會陸陸續(xù)續(xù)接收到數(shù)據(jù)。

  1. /** 
  2.  * @author lixinjie 
  3.  * @since 2019-05-07 
  4.  */ 
  5. public class BioServer { 
  6.  
  7.   static AtomicInteger counter = new AtomicInteger(0); 
  8.   static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
  9.    
  10.   public static void main(String[] args) { 
  11.     try { 
  12.       ServerSocket ss = new ServerSocket(); 
  13.       ss.bind(new InetSocketAddress("localhost", 8080)); 
  14.       while (true) { 
  15.         Socket s = ss.accept(); 
  16.         processWithNewThread(s); 
  17.       } 
  18.     } catch (Exception e) { 
  19.       e.printStackTrace(); 
  20.     } 
  21.   } 
  22.    
  23.   static void processWithNewThread(Socket s) { 
  24.     Runnable run = () -> { 
  25.       InetSocketAddress rsa = (InetSocketAddress)s.getRemoteSocketAddress(); 
  26.       System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + counter.incrementAndGet()); 
  27.       try { 
  28.         String result = readBytes(s.getInputStream()); 
  29.         System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.getAndDecrement()); 
  30.         s.close(); 
  31.       } catch (Exception e) { 
  32.         e.printStackTrace(); 
  33.       } 
  34.     }; 
  35.     new Thread(run).start(); 
  36.   } 
  37.    
  38.   static String readBytes(InputStream is) throws Exception { 
  39.     long start = 0; 
  40.     int total = 0; 
  41.     int count = 0; 
  42.     byte[] bytes = new byte[1024]; 
  43.     //開始讀數(shù)據(jù)的時間 
  44.     long begin = System.currentTimeMillis(); 
  45.     while ((count = is.read(bytes)) > -1) { 
  46.       if (start < 1) { 
  47.         //第一次讀到數(shù)據(jù)的時間 
  48.         start = System.currentTimeMillis(); 
  49.       } 
  50.       total += count
  51.     } 
  52.     //讀完數(shù)據(jù)的時間 
  53.     long end = System.currentTimeMillis(); 
  54.     return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs"
  55.   } 
  56.  
  57.   static String time() { 
  58.     return sdf.format(new Date()); 
  59.   } 
  1. /** 
  2.  * @author lixinjie 
  3.  * @since 2019-05-07 
  4.  */ 
  5. public class Client { 
  6.  
  7.   public static void main(String[] args) { 
  8.     try { 
  9.       for (int i = 0; i < 20; i++) { 
  10.         Socket s = new Socket(); 
  11.         s.connect(new InetSocketAddress("localhost", 8080)); 
  12.         processWithNewThread(s, i); 
  13.       } 
  14.     } catch (IOException e) { 
  15.       e.printStackTrace(); 
  16.     } 
  17.   } 
  18.  
  19.   static void processWithNewThread(Socket s, int i) { 
  20.     Runnable run = () -> { 
  21.       try { 
  22.         //睡眠隨機的5-10秒,模擬數(shù)據(jù)尚未就緒 
  23.         Thread.sleep((new Random().nextInt(6) + 5) * 1000); 
  24.         //寫1M數(shù)據(jù),為了拉長服務(wù)器端讀數(shù)據(jù)的過程 
  25.         s.getOutputStream().write(prepareBytes()); 
  26.         //睡眠1秒,讓服務(wù)器端把數(shù)據(jù)讀完 
  27.         Thread.sleep(1000); 
  28.         s.close(); 
  29.       } catch (Exception e) { 
  30.         e.printStackTrace(); 
  31.       } 
  32.     }; 
  33.     new Thread(run).start(); 
  34.   } 
  35.    
  36.   static byte[] prepareBytes() { 
  37.     byte[] bytes = new byte[1024*1024*1]; 
  38.     for (int i = 0; i < bytes.length; i++) { 
  39.       bytes[i] = 1; 
  40.     } 
  41.     return bytes; 
  42.   } 

執(zhí)行結(jié)果如下:

  1. 時間->IP:Port->線程Id:當(dāng)前線程數(shù) 
  2. 15:11:52->127.0.0.1:55201->10:1 
  3. 15:11:52->127.0.0.1:55203->12:2 
  4. 15:11:52->127.0.0.1:55204->13:3 
  5. 15:11:52->127.0.0.1:55207->16:4 
  6. 15:11:52->127.0.0.1:55208->17:5 
  7. 15:11:52->127.0.0.1:55202->11:6 
  8. 15:11:52->127.0.0.1:55205->14:7 
  9. 15:11:52->127.0.0.1:55206->15:8 
  10. 15:11:52->127.0.0.1:55209->18:9 
  11. 15:11:52->127.0.0.1:55210->19:10 
  12. 15:11:52->127.0.0.1:55213->22:11 
  13. 15:11:52->127.0.0.1:55214->23:12 
  14. 15:11:52->127.0.0.1:55217->26:13 
  15. 15:11:52->127.0.0.1:55211->20:14 
  16. 15:11:52->127.0.0.1:55218->27:15 
  17. 15:11:52->127.0.0.1:55212->21:16 
  18. 15:11:52->127.0.0.1:55215->24:17 
  19. 15:11:52->127.0.0.1:55216->25:18 
  20. 15:11:52->127.0.0.1:55219->28:19 
  21. 15:11:52->127.0.0.1:55220->29:20 
  22.  
  23. 時間->等待數(shù)據(jù)的時間,讀取數(shù)據(jù)的時間,總共讀取的字節(jié)數(shù)->線程Id:當(dāng)前線程數(shù) 
  24. 15:11:58->wait=5012ms,read=1022ms,total=1048576bs->17:20 
  25. 15:11:58->wait=5021ms,read=1022ms,total=1048576bs->13:19 
  26. 15:11:58->wait=5034ms,read=1008ms,total=1048576bs->11:18 
  27. 15:11:58->wait=5046ms,read=1003ms,total=1048576bs->12:17 
  28. 15:11:58->wait=5038ms,read=1005ms,total=1048576bs->23:16 
  29. 15:11:58->wait=5037ms,read=1010ms,total=1048576bs->22:15 
  30. 15:11:59->wait=6001ms,read=1017ms,total=1048576bs->15:14 
  31. 15:11:59->wait=6016ms,read=1013ms,total=1048576bs->27:13 
  32. 15:11:59->wait=6011ms,read=1018ms,total=1048576bs->24:12 
  33. 15:12:00->wait=7005ms,read=1008ms,total=1048576bs->20:11 
  34. 15:12:00->wait=6999ms,read=1020ms,total=1048576bs->14:10 
  35. 15:12:00->wait=7019ms,read=1007ms,total=1048576bs->26:9 
  36. 15:12:00->wait=7012ms,read=1015ms,total=1048576bs->21:8 
  37. 15:12:00->wait=7023ms,read=1008ms,total=1048576bs->25:7 
  38. 15:12:01->wait=7999ms,read=1011ms,total=1048576bs->18:6 
  39. 15:12:02->wait=9026ms,read=1014ms,total=1048576bs->10:5 
  40. 15:12:02->wait=9005ms,read=1031ms,total=1048576bs->19:4 
  41. 15:12:03->wait=10007ms,read=1011ms,total=1048576bs->16:3 
  42. 15:12:03->wait=10006ms,read=1017ms,total=1048576bs->29:2 
  43. 15:12:03->wait=10010ms,read=1022ms,total=1048576bs->28:1 

可以看到服務(wù)器端確實為每個連接創(chuàng)建一個線程,共創(chuàng)建了20個線程。

客戶端進入休眠約5-10秒,模擬連接上數(shù)據(jù)不就緒,服務(wù)器端線程在等待,等待時間約5-10秒。

客戶端陸續(xù)結(jié)束休眠,往連接上寫入1M數(shù)據(jù),服務(wù)器端開始讀取數(shù)據(jù),整個讀取過程約1秒。

可以看到,服務(wù)器端的工作線程會把時間花在“等待數(shù)據(jù)”和“讀取數(shù)據(jù)”這兩個過程上。

這有兩個不好的地方:

一是有很多客戶端同時發(fā)起請求的話,服務(wù)器端要創(chuàng)建很多的線程,可能會因為超過了上限而造成崩潰。

二是每個線程的大部分時光中都是在阻塞著,無事可干,造成極大的資源浪費。

開頭已經(jīng)說了那個年代網(wǎng)民很少,所以,不可能會有大量請求同時過來。至于資源浪費就浪費吧,反正閑著也是閑著。

來個簡單的小例子:

飯店共有10張桌子,且配備了10位服務(wù)員。只要有客人來了,大堂經(jīng)理就把客人帶到一張桌子,并安排一位服務(wù)員全程陪同。

即使客人暫時不需要服務(wù),服務(wù)員也一直在旁邊站著。可能覺著是一種浪費,其實非也,這就是尊貴的VIP服務(wù)。

其實,VIP映射的是一對一的模型,主要體現(xiàn)在“專用”上或“私有”上。

真正的多路復(fù)用技術(shù)

多路復(fù)用技術(shù)原本指的是,在通信方面,多種信號或數(shù)據(jù)(從宏觀上看)交織在一起,使用同一條傳輸通道進行傳輸。

這樣做的目的,一方面可以充分利用通道的傳輸能力,另一方面自然是省時省力省錢啦。

其實這個概念非常的“生活化”,隨手就可以舉個例子:

一條小水渠里水在流,在一端往里倒入大量乒乓球,在另一端用網(wǎng)進行過濾,把乒乓球和水流分開。

這就是一個比較“土”的多路復(fù)用,首先在發(fā)射端把多種信號或數(shù)據(jù)進行“混合”,接著是在通道上進行傳輸,最后在接收端“分離”出自己需要的信號或數(shù)據(jù)。

相信大家都看出來了,這里的重點其實就是處理好“混合”和“分離”,對于不同的信號或數(shù)據(jù),有不同的處理方法。

比如以前的有線電視是模擬信號,即電磁波。一家一般只有一根信號線,但可以同時接多個電視,每個電視任意換臺,互不影響。

這是由于不同頻率的波可以混合和分離。(當(dāng)然,可能不是十分準(zhǔn)確,明白意思就行了。)

再比如城市的高鐵站一般都有數(shù)個站臺供高鐵(同時)??浚鞘虚g的高鐵軌道單方向只有一條,如何保證那么多趟高鐵安全運行呢?

很明顯是分時使用,每趟高鐵都有自己的時刻。多趟高鐵按不同的時刻出站相當(dāng)于混合,按不同的時刻進站相當(dāng)于分離。

總結(jié)一下,多路指的是多種不同的信號或數(shù)據(jù)或其它事物,復(fù)用指的是共用同一個物理鏈路或通道或載體。

可見,多路復(fù)用技術(shù)是一種一對多的模型,“多”的這一方復(fù)用了“一”的這一方。

其實,一對多的模型主要體現(xiàn)在“公用”上或“共享”上。

您先看著,我一會再過來

一對一服務(wù)是典型的有錢任性,雖然響應(yīng)及時、服務(wù)周到,但不是每個人都能享受的,畢竟還是“屌絲”多嘛,那就來個共享服務(wù)吧。

所以實際當(dāng)中更多的情況是,客人坐下后,會給他一個菜單,讓他先看著,反正也不可能立馬點餐,服務(wù)員就去忙別的了。

可能不時的會有服務(wù)員從客人身旁經(jīng)過,發(fā)現(xiàn)客人還沒有點餐,就會主動去詢問現(xiàn)在需要點餐嗎?

如果需要,服務(wù)員就給你寫菜單,如果不需要,服務(wù)員就繼續(xù)往前走了。

這種情況飯店整體運行的也很好,但是服務(wù)員人數(shù)少多了?,F(xiàn)在服務(wù)10桌客人,4個服務(wù)員綽綽有余。(這節(jié)省的可都是純利潤呀。)

因為10桌客人同時需要服務(wù)的情況幾乎是不會發(fā)生的,絕大部分情況都是錯開的。如果真有的話,那就等會好了,又不是120/119,人命關(guān)天的。

回到代碼里,情況與之非常相似,完全可以采用相同的理論去處理。

連接建立后,找個地方把它放到那里,可以暫時先不管它,反正此時也沒有數(shù)據(jù)可讀。

但是數(shù)據(jù)早晚會到來的,所以,要不時的去詢問每個連接有數(shù)據(jù)沒有,有的話就讀取數(shù)據(jù),沒有的話就繼續(xù)不管它。

其實這個模式在Java里早就有了,就是Java NIO,這里的大寫字母“N”是單詞“New”,即“新”的意思,主要是為了和上面的“一對一”進行區(qū)分。

先鋪墊一下吧

現(xiàn)在需要把Socket交互的過程再稍微細(xì)化一些??蛻舳讼日埱筮B接,connect,服務(wù)器端然后接受連接,accept,然后客戶端再向連接寫入數(shù)據(jù),write,接著服務(wù)器端從連接上讀出數(shù)據(jù),read。

和打電話的場景一樣,主叫撥號,connect,被叫接聽,accept,主叫說話,speak,被叫聆聽,listen。主叫給被叫打電話,說明主叫找被叫有事,所以被叫關(guān)注的是接通電話,聽對方說。

客戶端主動向服務(wù)器端發(fā)起請求,說明客戶端找服務(wù)器端有事,所以服務(wù)器端關(guān)注的是接受請求,讀取對方傳來的數(shù)據(jù)。這里把接受請求,讀取數(shù)據(jù)稱為服務(wù)器端感興趣的操作。

在Java NIO中,接受請求的操作,用OP_ACCEPT表示,讀取數(shù)據(jù)的操作,用OP_READ表示。

我決定先過一遍飯店的場景,讓首次接觸Java NIO的同學(xué)不那么迷茫。就是把常規(guī)的場景進行了定向整理,稍微有點刻意,明白意思就行了。

1、專門設(shè)立一個“跑腿”服務(wù)員,工作職責(zé)單一,就是問問客人是否需要服務(wù)。

2、站在門口接待客人,本來是大堂經(jīng)理的工作,但是他不愿意在門口盯著,于是就委托給跑腿服務(wù)員,你幫我盯著,有人來了告訴我。

于是跑腿服務(wù)員就有了一個任務(wù),替大堂經(jīng)理盯梢。終于來客人了,跑腿服務(wù)員趕緊告訴了大堂經(jīng)理。

3、大堂經(jīng)理把客人帶到座位上,對跑腿服務(wù)員說,客人接下來肯定是要點餐的,但是現(xiàn)在在看菜單,不知道什么時候能看好,所以你不時的過來問問,看需不需要點餐,需要的話就再喊來一個“點餐”服務(wù)員給客人寫菜單。

于是跑腿服務(wù)員就又多了一個任務(wù),就是盯著這桌客人,不時來問問,如果需要服務(wù)的話,就叫點餐服務(wù)員過來服務(wù)。

4、跑腿服務(wù)員在某次詢問中,客人終于決定點餐了,跑題服務(wù)員趕緊找來一個點餐服務(wù)員為客人寫菜單。

5、就這樣,跑腿服務(wù)員既要盯著門外新過來的客人,也要盯著門內(nèi)已經(jīng)就坐的客人。新客人來了,通知大堂經(jīng)理去接待。就坐的客人決定點餐了,通知點餐服務(wù)員去寫菜單。

事情就這樣一直循環(huán)的持續(xù)下去,一切,都挺好。角色明確,職責(zé)單一,配合很好。

大堂經(jīng)理和點餐服務(wù)員是需求的提供者或?qū)崿F(xiàn)者,跑腿服務(wù)員是需求的發(fā)現(xiàn)者,并識別出需求的種類,需要接待的交給大堂經(jīng)理,需要點餐的交給點餐服務(wù)員。

哈哈,Java NIO來啦

代碼的寫法非常的固定,可以配合著后面的解說來看,這樣就好理解了,如下:

  1. /** 
  2.  * @author lixinjie 
  3.  * @since 2019-05-07 
  4.  */ 
  5. public class NioServer { 
  6.  
  7.   static int clientCount = 0; 
  8.   static AtomicInteger counter = new AtomicInteger(0); 
  9.   static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
  10.    
  11.   public static void main(String[] args) { 
  12.     try { 
  13.       Selector selector = Selector.open(); 
  14.       ServerSocketChannel ssc = ServerSocketChannel.open(); 
  15.       ssc.configureBlocking(false); 
  16.       ssc.register(selector, SelectionKey.OP_ACCEPT); 
  17.       ssc.bind(new InetSocketAddress("localhost", 8080)); 
  18.       while (true) { 
  19.         selector.select(); 
  20.         Set<SelectionKey> keys = selector.selectedKeys(); 
  21.         Iterator<SelectionKey> iterator = keys.iterator(); 
  22.         while (iterator.hasNext()) { 
  23.           SelectionKey key = iterator.next(); 
  24.           iterator.remove(); 
  25.           if (key.isAcceptable()) { 
  26.             ServerSocketChannel ssc1 = (ServerSocketChannel)key.channel(); 
  27.             SocketChannel sc = null
  28.             while ((sc = ssc1.accept()) != null) { 
  29.               sc.configureBlocking(false); 
  30.               sc.register(selector, SelectionKey.OP_READ); 
  31.               InetSocketAddress rsa = (InetSocketAddress)sc.socket().getRemoteSocketAddress(); 
  32.               System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount)); 
  33.             } 
  34.           } else if (key.isReadable()) { 
  35.             //先將“讀”從感興趣操作移出,待把數(shù)據(jù)從通道中讀完后,再把“讀”添加到感興趣操作中 
  36.             //否則,該通道會一直被選出來 
  37.             key.interestOps(key.interestOps() & (~ SelectionKey.OP_READ)); 
  38.             processWithNewThread((SocketChannel)key.channel(), key); 
  39.           } 
  40.         } 
  41.       } 
  42.     } catch (Exception e) { 
  43.       e.printStackTrace(); 
  44.     } 
  45.   } 
  46.  
  47.   static void processWithNewThread(SocketChannel sc, SelectionKey key) { 
  48.     Runnable run = () -> { 
  49.       counter.incrementAndGet(); 
  50.       try { 
  51.         String result = readBytes(sc); 
  52.         //把“讀”加進去 
  53.         key.interestOps(key.interestOps() | SelectionKey.OP_READ); 
  54.         System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.get()); 
  55.         sc.close(); 
  56.       } catch (Exception e) { 
  57.         e.printStackTrace(); 
  58.       } 
  59.       counter.decrementAndGet(); 
  60.     }; 
  61.     new Thread(run).start(); 
  62.   } 
  63.    
  64.   static String readBytes(SocketChannel sc) throws Exception { 
  65.     long start = 0; 
  66.     int total = 0; 
  67.     int count = 0; 
  68.     ByteBuffer bb = ByteBuffer.allocate(1024); 
  69.     //開始讀數(shù)據(jù)的時間 
  70.     long begin = System.currentTimeMillis(); 
  71.     while ((count = sc.read(bb)) > -1) { 
  72.       if (start < 1) { 
  73.         //第一次讀到數(shù)據(jù)的時間 
  74.         start = System.currentTimeMillis(); 
  75.       } 
  76.       total += count
  77.       bb.clear(); 
  78.     } 
  79.     //讀完數(shù)據(jù)的時間 
  80.     long end = System.currentTimeMillis(); 
  81.     return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs"
  82.   } 
  83.    
  84.   static String time() { 
  85.     return sdf.format(new Date()); 
  86.   } 

它的大致處理過程如下:

1、定義一個選擇器,Selector。

相當(dāng)于設(shè)立一個跑腿服務(wù)員。

2、定義一個服務(wù)器端套接字通道,ServerSocketChannel,并配置為非阻塞的。

相等于聘請了一位大堂經(jīng)理。

3、將套接字通道注冊到選擇器上,并把感興趣的操作設(shè)置為OP_ACCEPT。

相當(dāng)于大堂經(jīng)理給跑腿服務(wù)員說,幫我盯著門外,有客人來了告訴我。

4、進入死循環(huán),選擇器不時的進行選擇。

相當(dāng)于跑腿服務(wù)員一遍又一遍的去詢問、去轉(zhuǎn)悠。

5、選擇器終于選擇出了通道,發(fā)現(xiàn)通道是需要Acceptable的。

相當(dāng)于跑腿服務(wù)員終于發(fā)現(xiàn)門外來客人了,客人是需要接待的。

6、于是服務(wù)器端套接字接受了這個通道,開始處理。

相當(dāng)于跑腿服務(wù)員把大堂經(jīng)理叫來了,大堂經(jīng)理開始著手接待。

7、把新接受的通道配置為非阻塞的,并把它也注冊到了選擇器上,該通道感興趣的操作為OP_READ。

相當(dāng)于大堂經(jīng)理把客人帶到座位上,給了客人菜單,并又把客人委托給跑腿服務(wù)員,說客人接下來肯定是要點餐的,你不時的來問問。

8、選擇器繼續(xù)不時的進行選擇著。

相當(dāng)于跑腿服務(wù)員繼續(xù)不時的詢問著、轉(zhuǎn)悠著。

9、選擇器終于又選擇出了通道,這次發(fā)現(xiàn)通道是需要Readable的。

相當(dāng)于跑腿服務(wù)員終于發(fā)現(xiàn)了一桌客人有了需求,是需要點餐的。

10、把這個通道交給了一個新的工作線程去處理。

相當(dāng)于跑腿服務(wù)員叫來了點餐服務(wù)員,點餐服務(wù)員開始為客人寫菜單。

11、這個工作線程處理完后,就被回收了,可以再去處理其它通道。

相當(dāng)于點餐服務(wù)員寫好菜單后,就走了,可以再去為其他客人寫菜單。

12、選擇器繼續(xù)著重復(fù)的選擇工作,不知道什么時候是個頭。

相當(dāng)于跑腿服務(wù)員繼續(xù)著重復(fù)的詢問、轉(zhuǎn)悠,不知道未來在何方。

相信你已經(jīng)看出來了,大堂經(jīng)理相當(dāng)于服務(wù)器端套接字,跑腿服務(wù)員相當(dāng)于選擇器,點餐服務(wù)員相當(dāng)于Worker線程。

啟動服務(wù)器端代碼,使用同一個客戶端代碼,按相同的套路發(fā)20個請求,結(jié)果如下:

  1. 時間->IP:Port->主線程Id:當(dāng)前連接數(shù) 
  2. 16:34:39->127.0.0.1:56105->1:1 
  3. 16:34:39->127.0.0.1:56106->1:2 
  4. 16:34:39->127.0.0.1:56107->1:3 
  5. 16:34:39->127.0.0.1:56108->1:4 
  6. 16:34:39->127.0.0.1:56109->1:5 
  7. 16:34:39->127.0.0.1:56110->1:6 
  8. 16:34:39->127.0.0.1:56111->1:7 
  9. 16:34:39->127.0.0.1:56112->1:8 
  10. 16:34:39->127.0.0.1:56113->1:9 
  11. 16:34:39->127.0.0.1:56114->1:10 
  12. 16:34:39->127.0.0.1:56115->1:11 
  13. 16:34:39->127.0.0.1:56116->1:12 
  14. 16:34:39->127.0.0.1:56117->1:13 
  15. 16:34:39->127.0.0.1:56118->1:14 
  16. 16:34:39->127.0.0.1:56119->1:15 
  17. 16:34:39->127.0.0.1:56120->1:16 
  18. 16:34:39->127.0.0.1:56121->1:17 
  19. 16:34:39->127.0.0.1:56122->1:18 
  20. 16:34:39->127.0.0.1:56123->1:19 
  21. 16:34:39->127.0.0.1:56124->1:20 
  22.  
  23. 時間->等待數(shù)據(jù)的時間,讀取數(shù)據(jù)的時間,總共讀取的字節(jié)數(shù)->線程Id:當(dāng)前線程數(shù) 
  24. 16:34:45->wait=1ms,read=1018ms,total=1048576bs->11:5 
  25. 16:34:45->wait=0ms,read=1054ms,total=1048576bs->10:5 
  26. 16:34:45->wait=0ms,read=1072ms,total=1048576bs->13:6 
  27. 16:34:45->wait=0ms,read=1061ms,total=1048576bs->14:5 
  28. 16:34:45->wait=0ms,read=1140ms,total=1048576bs->12:4 
  29. 16:34:46->wait=0ms,read=1001ms,total=1048576bs->15:5 
  30. 16:34:46->wait=0ms,read=1062ms,total=1048576bs->17:6 
  31. 16:34:46->wait=0ms,read=1059ms,total=1048576bs->16:5 
  32. 16:34:47->wait=0ms,read=1001ms,total=1048576bs->19:4 
  33. 16:34:47->wait=0ms,read=1001ms,total=1048576bs->20:4 
  34. 16:34:47->wait=0ms,read=1015ms,total=1048576bs->18:3 
  35. 16:34:47->wait=0ms,read=1001ms,total=1048576bs->21:2 
  36. 16:34:48->wait=0ms,read=1032ms,total=1048576bs->22:4 
  37. 16:34:49->wait=0ms,read=1002ms,total=1048576bs->23:3 
  38. 16:34:49->wait=0ms,read=1001ms,total=1048576bs->25:2 
  39. 16:34:49->wait=0ms,read=1028ms,total=1048576bs->24:4 
  40. 16:34:50->wait=0ms,read=1008ms,total=1048576bs->28:4 
  41. 16:34:50->wait=0ms,read=1033ms,total=1048576bs->27:3 
  42. 16:34:50->wait=1ms,read=1002ms,total=1048576bs->29:2 
  43. 16:34:50->wait=0ms,read=1001ms,total=1048576bs->26:2 

服務(wù)器端接受20個連接,創(chuàng)建20個通道,并把它們注冊到選擇器上,此時不需要額外線程。

當(dāng)某個通道已經(jīng)有數(shù)據(jù)時,才會用一個線程來處理它,所以,線程“等待數(shù)據(jù)”的時間是0,“讀取數(shù)據(jù)”的時間還是約1秒。

因為20個通道是陸陸續(xù)續(xù)有數(shù)據(jù)的,所以服務(wù)器端最多時是6個線程在同時運行的,換句話說,用包含6個線程的線程池就可以了。

對比與結(jié)論:

處理同樣的20個請求,一個需要用20個線程,一個需要用6個線程,節(jié)省了70%線程數(shù)。

在本例中,兩種感興趣的操作共用一個選擇器,且選擇器運行在主線程里,Worker線程是新的線程。

其實對于選擇器的個數(shù)、選擇器運行在哪個線程里、是否使用新的線程來處理請求都沒有要求,要根據(jù)實際情況來定。

比如說redis,和處理請求相關(guān)的就一個線程,選擇器運行在里面,處理請求的程序也運行在里面,所以這個線程既是I/O線程,也是Worker線程。

當(dāng)然,也可以使用兩個選擇器,一個處理OP_ACCEPT,一個處理OP_READ,讓它們分別運行在兩個單獨的I/O線程里。對于能快速完成的操作可以直接在I/O線程里做了,對于非常耗時的操作一定要使用Worker線程池來處理。

這種處理模式就是被稱為的多路復(fù)用I/O,多路指的是多個Socket通道,復(fù)用指的是只用一個線程來管理它們。

再稍微分析一下

一對一的形式,一個桌子配一個服務(wù)員,一個Socket分配一個線程,響應(yīng)速度最快,畢竟是VIP嘛,但是效率很低,服務(wù)員大部分時間都是在站著,線程大部分時間都是在等待。

多路復(fù)用的形式,所有桌子共用一個跑腿服務(wù)員,所有Socket共用一個選擇器線程,響應(yīng)速度肯定變慢了,畢竟是一對多嘛。但是效率提高了,點餐服務(wù)員在需要點餐時才會過去,工作線程在數(shù)據(jù)就緒時才會開始工作。

從VIP到多路復(fù)用,形式上確實有很大的不同,其本質(zhì)是從一對一到一對多的轉(zhuǎn)變,其實就是犧牲了響應(yīng)速度,換來了效率的提升,不過綜合性能還是得到了極大的改進。

就飯店而言,究竟幾張桌子配一個跑腿服務(wù)員,幾張桌子配一個點餐服務(wù)員,經(jīng)過一段時間運行,一定會有一個最優(yōu)解。

就程序而言,究竟需要幾個選擇器線程,幾個工作線程,經(jīng)過評估測試后,也會有一個最優(yōu)解。

一旦達到最優(yōu)解后,就不可能再提升了,這同樣是由多路復(fù)用這種一對多的形式所限制的。就像一對一的形式限制一樣。

人們的追求是無止境的,如何對多路復(fù)用繼續(xù)提升呢?答案一定是具有顛覆性的,即拋棄多路復(fù)用,采用全新的形式。

還以飯店為例,如何在最優(yōu)解的情況下,既要繼續(xù)減少服務(wù)員數(shù)量,還要使效率提升呢?可能有些朋友已經(jīng)猜到了,那就是拋棄服務(wù)員服務(wù)客人這種模式,把飯店改成自助餐廳。

在客人進門時,把餐具給他,并告訴他就餐時長、不準(zhǔn)浪費等這些規(guī)則,然后就不用管了。客人自己選餐,自己吃完,自己走人,不用再等服務(wù)員了,因此也不再需要服務(wù)員了。(收拾桌子的除外。)

這種模式對應(yīng)到程序里,其實就是AIO,在Java里也早就有了。

嘻嘻,Java AIO來啦

代碼的寫法非常的固定,可以配合著后面的解說來看,這樣就好理解了,如下:

  1. /** 
  2.  * @author lixinjie 
  3.  * @since 2019-05-13 
  4.  */ 
  5. public class AioServer { 
  6.  
  7.   static int clientCount = 0; 
  8.   static AtomicInteger counter = new AtomicInteger(0); 
  9.   static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
  10.    
  11.   public static void main(String[] args) { 
  12.     try { 
  13.       AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open(); 
  14.       assc.bind(new InetSocketAddress("localhost", 8080)); 
  15.       //非阻塞方法,其實就是注冊了個回調(diào),而且只能接受一個連接 
  16.       assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { 
  17.  
  18.         @Override 
  19.         public void completed(AsynchronousSocketChannel asc, Object attachment) { 
  20.           //再次注冊,接受下一個連接 
  21.           assc.accept(null, this); 
  22.           try { 
  23.             InetSocketAddress rsa = (InetSocketAddress)asc.getRemoteAddress(); 
  24.             System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount)); 
  25.           } catch (Exception e) { 
  26.           } 
  27.           readFromChannelAsync(asc); 
  28.         } 
  29.  
  30.         @Override 
  31.         public void failed(Throwable exc, Object attachment) { 
  32.            
  33.         } 
  34.       }); 
  35.       //不讓主線程退出 
  36.       synchronized (AioServer.class) { 
  37.         AioServer.class.wait(); 
  38.       } 
  39.     } catch (Exception e) { 
  40.       e.printStackTrace(); 
  41.     } 
  42.   } 
  43.  
  44.   static void readFromChannelAsync(AsynchronousSocketChannel asc) { 
  45.     //會把數(shù)據(jù)讀入到該buffer之后,再觸發(fā)工作線程來執(zhí)行回調(diào) 
  46.     ByteBuffer bb = ByteBuffer.allocate(1024*1024*1 + 1); 
  47.     long begin = System.currentTimeMillis(); 
  48.     //非阻塞方法,其實就是注冊了個回調(diào),而且只能接受一次讀取 
  49.     asc.read(bb, null, new CompletionHandler<Integer, Object>() { 
  50.       //從該連接上一共讀到的字節(jié)數(shù) 
  51.       int total = 0; 
  52.       /** 
  53.        * @param count 表示本次讀取到的字節(jié)數(shù),-1表示數(shù)據(jù)已讀完 
  54.        */ 
  55.       @Override 
  56.       public void completed(Integer count, Object attachment) { 
  57.         counter.incrementAndGet(); 
  58.         if (count > -1) { 
  59.           total += count
  60.         } 
  61.         int size = bb.position(); 
  62.         System.out.println(time() + "->count=" + count + ",total=" + total + "bs,buffer=" + size + "bs->" + Thread.currentThread().getId() + ":" + counter.get()); 
  63.         if (count > -1) {//數(shù)據(jù)還沒有讀完 
  64.           //再次注冊回調(diào),接受下一次讀取 
  65.           asc.read(bb, null, this); 
  66.         } else {//數(shù)據(jù)已讀完 
  67.           try { 
  68.             asc.close(); 
  69.           } catch (Exception e) { 
  70.             e.printStackTrace(); 
  71.           } 
  72.         } 
  73.         counter.decrementAndGet(); 
  74.       } 
  75.  
  76.       @Override 
  77.       public void failed(Throwable exc, Object attachment) { 
  78.          
  79.       } 
  80.     }); 
  81.     long end = System.currentTimeMillis(); 
  82.     System.out.println(time() + "->exe read req,use=" + (end -begin) + "ms" + "->" + Thread.currentThread().getId()); 
  83.   } 
  84.    
  85.   static String time() { 
  86.     return sdf.format(new Date()); 
  87.   } 

它的大致處理過程如下:

1、初始化一個AsynchronousServerSocketChannel對象,并開始監(jiān)聽

2、通過accept方法注冊一個“完成處理器”的接受連接回調(diào),即CompletionHandler,用于在接受到連接后的相關(guān)操作。

3、當(dāng)客戶端連接過來后,由系統(tǒng)來接受,并創(chuàng)建好AsynchronousSocketChannel對象,然后觸發(fā)該回調(diào),并把該對象傳進該回調(diào),該回調(diào)會在Worker線程中執(zhí)行。

4、在接受連接回調(diào)里,再次使用accept方法注冊一次相同的完成處理器對象,用于讓系統(tǒng)接受下一個連接。就是這種注冊只能使用一次,所以要不停的連續(xù)注冊,人家就是這樣設(shè)計的。

5、在接受連接回調(diào)里,使用AsynchronousSocketChannel對象的read方法注冊另一個接受數(shù)據(jù)回調(diào),用于在接受到數(shù)據(jù)后的相關(guān)操作。

6、當(dāng)客戶端數(shù)據(jù)過來后,由系統(tǒng)接受,并放入指定好的ByteBuffer中,然后觸發(fā)該回調(diào),并把本次接受到的數(shù)據(jù)字節(jié)數(shù)傳入該回調(diào),該回調(diào)會在Worker線程中執(zhí)行。

7、在接受數(shù)據(jù)回調(diào)里,如果數(shù)據(jù)沒有接受完,需要再次使用read方法把同一個對象注冊一次,用于讓系統(tǒng)接受下一次數(shù)據(jù)。這和上面的套路是一樣的。

8、客戶端的數(shù)據(jù)可能是分多次傳到服務(wù)器端的,所以接受數(shù)據(jù)回調(diào)會被執(zhí)行多次,直到數(shù)據(jù)接受完為止。多次接受到的數(shù)據(jù)合起來才是完整的數(shù)據(jù),這個一定要處理好。

9、關(guān)于ByteBuffer,要么足夠的大,能夠裝得下完整的客戶端數(shù)據(jù),這樣多次接受的數(shù)據(jù)直接往里追加即可。要么每次把ByteBuffer中的數(shù)據(jù)移到別的地方存儲起來,然后清空ByteBuffer,用于讓系統(tǒng)往里裝入下一次接受的數(shù)據(jù)。

注:如果出現(xiàn)ByteBuffer空間不足,則系統(tǒng)不會裝入數(shù)據(jù),就會導(dǎo)致客戶端數(shù)據(jù)總是讀不完,極有可能進入死循環(huán)。

啟動服務(wù)器端代碼,使用同一個客戶端代碼,按相同的套路發(fā)20個請求,結(jié)果如下:

  1. 時間->IP:Port->回調(diào)線程Id:當(dāng)前連接數(shù) 
  2. 17:20:47->127.0.0.1:56454->15:1 
  3. 時間->發(fā)起一個讀請求,耗時->回調(diào)線程Id 
  4. 17:20:47->exe read req,use=3ms->15 
  5. 17:20:47->127.0.0.1:56455->15:2 
  6. 17:20:47->exe read req,use=1ms->15 
  7. 17:20:47->127.0.0.1:56456->15:3 
  8. 17:20:47->exe read req,use=0ms->15 
  9. 17:20:47->127.0.0.1:56457->16:4 
  10. 17:20:47->127.0.0.1:56458->15:5 
  11. 17:20:47->exe read req,use=1ms->16 
  12. 17:20:47->exe read req,use=1ms->15 
  13. 17:20:47->127.0.0.1:56460->15:6 
  14. 17:20:47->127.0.0.1:56459->17:7 
  15. 17:20:47->exe read req,use=0ms->15 
  16. 17:20:47->127.0.0.1:56462->15:8 
  17. 17:20:47->127.0.0.1:56461->16:9 
  18. 17:20:47->exe read req,use=1ms->15 
  19. 17:20:47->exe read req,use=0ms->16 
  20. 17:20:47->exe read req,use=0ms->17 
  21. 17:20:47->127.0.0.1:56465->16:10 
  22. 17:20:47->127.0.0.1:56463->18:11 
  23. 17:20:47->exe read req,use=0ms->18 
  24. 17:20:47->127.0.0.1:56466->15:12 
  25. 17:20:47->exe read req,use=1ms->16 
  26. 17:20:47->127.0.0.1:56464->17:13 
  27. 17:20:47->exe read req,use=1ms->15 
  28. 17:20:47->127.0.0.1:56467->18:14 
  29. 17:20:47->exe read req,use=2ms->17 
  30. 17:20:47->exe read req,use=1ms->18 
  31. 17:20:47->127.0.0.1:56468->15:15 
  32. 17:20:47->exe read req,use=1ms->15 
  33. 17:20:47->127.0.0.1:56469->16:16 
  34. 17:20:47->127.0.0.1:56470->18:17 
  35. 17:20:47->exe read req,use=1ms->18 
  36. 17:20:47->exe read req,use=1ms->16 
  37. 17:20:47->127.0.0.1:56472->15:18 
  38. 17:20:47->127.0.0.1:56473->19:19 
  39. 17:20:47->exe read req,use=2ms->15 
  40. 17:20:47->127.0.0.1:56471->17:20 
  41. 17:20:47->exe read req,use=1ms->19 
  42. 17:20:47->exe read req,use=1ms->17 
  43.  
  44. 時間->本次接受到的字節(jié)數(shù),截至到目前接受到的字節(jié)總數(shù),buffer中的字節(jié)總數(shù)->回調(diào)線程Id:當(dāng)前線程數(shù) 
  45. 17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1 
  46. 17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1 
  47. 17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1 
  48. 17:20:52->count=230188,total=295724bs,buffer=295724bs->12:1 
  49. 17:20:52->count=752852,total=1048576bs,buffer=1048576bs->14:3 
  50. 17:20:52->count=131072,total=196608bs,buffer=196608bs->17:2 
  51.  
  52. 。。。。。。。。。。。。。。。。。。。。。。 
  53.  
  54. 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1 
  55. 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1 
  56. 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1 
  57. 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1 
  58. 17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1 
  59. 17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1 
  60. 17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1 

系統(tǒng)接受到連接后,在工作線程中執(zhí)行了回調(diào)。并且在回調(diào)中執(zhí)行了read方法,耗時是0,因為只是注冊了個接受數(shù)據(jù)的回調(diào)而已。

系統(tǒng)接受到數(shù)據(jù)后,把數(shù)據(jù)放入ByteBuffer,在工作線程中執(zhí)行了回調(diào)。并且回調(diào)中可以直接使用ByteBuffer中的數(shù)據(jù)。

接受數(shù)據(jù)的回調(diào)被執(zhí)行了多次,多次接受到的數(shù)據(jù)加起來正好等于客戶端傳來的數(shù)據(jù)。

因為系統(tǒng)是接受到數(shù)據(jù)后才觸發(fā)的回調(diào),所以服務(wù)器端最多時是3個線程在同時運行回調(diào)的,換句話說,線程池包含3個線程就可以了。

對比與結(jié)論:

處理同樣的20個請求,一個需要用20個線程,一個需要用6個線程,一個需要3個線程,又節(jié)省了50%線程數(shù)。

注:不用特別較真這個比較結(jié)果,這里只是為了說明問題而已。哈哈。

三種處理方式的對比

第一種是阻塞IO,阻塞點有兩個,等待數(shù)據(jù)就緒的過程和讀取數(shù)據(jù)的過程。

第二種是阻塞IO,阻塞點有一個,讀取數(shù)據(jù)的過程。

第三種是非阻塞IO,沒有阻塞點,當(dāng)工作線程啟動時,數(shù)據(jù)已經(jīng)(被系統(tǒng))準(zhǔn)備好可以直接用了。

可見,這是一個逐步消除阻塞點的過程。

再次來談?wù)劯鞣NIO:

只有一個線程,接受一個連接,讀取數(shù)據(jù),處理業(yè)務(wù),寫回結(jié)果,再接受下一個連接,這是同步阻塞。這種用法幾乎沒有。

一個線程和一個線程池,線程接受到連接后,把它丟給線程池中的線程,再接受下一個連接,這是異步阻塞。對應(yīng)示例一。

一個線程和一個線程池,線程運行selector,執(zhí)行select操作,把就緒的連接拿出來丟給線程池中的線程,再執(zhí)行下一次的select操作,就是多路復(fù)用,這是異步阻塞。對應(yīng)示例二。

一個線程和一個線程池,線程注冊一個accept回調(diào),系統(tǒng)幫我們接受好連接后,才觸發(fā)回調(diào)在線程池中執(zhí)行,執(zhí)行時再注冊read回調(diào),系統(tǒng)幫我們接受好數(shù)據(jù)后,才觸發(fā)回調(diào)在線程池中執(zhí)行,就是AIO,這是異步非阻塞。對應(yīng)示例三。

redis也是多路復(fù)用,但它只有一個線程在執(zhí)行select操作,處理就緒的連接,整個是串行化的,所以天然不存在并發(fā)問題。只能把它歸為同步阻塞了。

BIO是阻塞IO,可以是同步阻塞,也可以是異步阻塞。AIO是異步IO,只有異步非阻塞這一種。因此沒有同步非阻塞這種說法,因為同步一定是阻塞的。

注:以上的說法是站在用戶程序/線程的立場上來說的。

建議把代碼下載下來,自己運行一下,體會體會:

https://github.com/coding-new-talking/java-code-demo.git

責(zé)任編輯:武曉燕 來源: 編程新說
相關(guān)推薦

2023-05-08 00:06:45

Go語言機制

2020-10-13 07:51:03

五種IO模型

2021-03-17 16:53:51

IO多路

2021-02-10 08:09:48

Netty網(wǎng)絡(luò)多路復(fù)用

2021-01-19 06:43:10

Netty框架網(wǎng)絡(luò)技術(shù)

2021-06-09 19:25:13

IODubbo

2024-12-30 00:00:05

2021-03-24 08:03:38

NettyJava NIO網(wǎng)絡(luò)技術(shù)

2025-05-08 08:01:05

2019-07-11 10:29:28

操作系統(tǒng)虛擬機Linux

2023-11-08 09:22:14

I/ORedis阻塞

2021-05-17 11:14:36

網(wǎng)絡(luò)請求ajaxpromise

2019-11-23 17:27:54

IO開源

2024-06-25 08:18:55

2024-04-29 08:15:07

I/OCPU密集型

2023-08-07 08:52:03

Java多路復(fù)用機制

2015-02-09 16:01:18

服務(wù)器

2015-10-15 10:21:37

O2O云計算科技名詞

2013-04-15 10:59:08

iOS開發(fā)ARC版本說明

2021-02-15 13:38:38

多線程異步模型
點贊
收藏

51CTO技術(shù)棧公眾號