偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

從實(shí)戰(zhàn)開(kāi)始,帶你深入了解Netty各個(gè)組件和ByteBuf

網(wǎng)絡(luò) 通信技術(shù)
本篇文章想來(lái)從實(shí)戰(zhàn)開(kāi)始,帶我深入了解Netty各個(gè)組件是做什么?ByteBuf執(zhí)行原理又是怎樣的?

 [[358902]]

上文對(duì)IO模型和Reactor模型進(jìn)行講解,是不是感覺(jué)有點(diǎn)懵懵的。哈哈哈,反正我并沒(méi)有對(duì)其有深入見(jiàn)解。我是這樣安慰自己的,知識(shí)在不斷的反復(fù)學(xué)習(xí)和思考中有新的感悟。不氣餒,繼續(xù)新的征程。本篇文章想來(lái)從實(shí)戰(zhàn)開(kāi)始,帶我深入了解Netty各個(gè)組件是做什么?ByteBuf執(zhí)行原理又是怎樣的?

01一 第一個(gè)Netty實(shí)例

用Netty實(shí)現(xiàn)通信。說(shuō)白了就是客戶端向服務(wù)端發(fā)消息,服務(wù)端接收消息并給客戶端響應(yīng)。所以我來(lái)看看服務(wù)端和客戶端是如何實(shí)現(xiàn)的?

11.1 服務(wù)端

1. 依賴

  1. <?xml version="1.0" encoding="UTF-8"?> 
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" 
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
  5.     <modelVersion>4.0.0</modelVersion> 
  6.  
  7.     <groupId>com.haopt.iot</groupId> 
  8.     <artifactId>first-netty</artifactId> 
  9.     <packaging>jar</packaging> 
  10.     <version>1.0-SNAPSHOT</version> 
  11.  
  12.     <dependencies> 
  13.         <dependency> 
  14.             <groupId>io.netty</groupId> 
  15.             <artifactId>netty-all</artifactId> 
  16.             <version>4.1.50.Final</version> 
  17.         </dependency> 
  18.  
  19.         <dependency> 
  20.             <groupId>junit</groupId> 
  21.             <artifactId>junit</artifactId> 
  22.             <version>4.12</version> 
  23.         </dependency> 
  24.     </dependencies> 
  25.  
  26.     <build> 
  27.         <plugins> 
  28.              
  29.             <plugin> 
  30.                 <groupId>org.apache.maven.plugins</groupId> 
  31.                 <artifactId>maven-compiler-plugin</artifactId> 
  32.                 <version>3.2</version> 
  33.                 <configuration> 
  34.                     <source>1.8</source> 
  35.                     <target>1.8</target> 
  36.                     <encoding>UTF-8</encoding> 
  37.                 </configuration> 
  38.             </plugin> 
  39.         </plugins> 
  40.     </build> 
  41. </project> 

2. 服務(wù)端-MyRPCServer

  1. package com.haopt.netty.server; 
  2.  
  3. import io.netty.bootstrap.ServerBootstrap; 
  4. import io.netty.buffer.UnpooledByteBufAllocator; 
  5. import io.netty.channel.ChannelFuture; 
  6. import io.netty.channel.ChannelOption; 
  7. import io.netty.channel.EventLoopGroup; 
  8. import io.netty.channel.nio.NioEventLoopGroup; 
  9. import io.netty.channel.socket.nio.NioServerSocketChannel; 
  10.  
  11. public class MyRPCServer { 
  12.     public void start(int port) throws Exception { 
  13.         // 主線程,不處理任何業(yè)務(wù)邏輯,只是接收客戶的連接請(qǐng)求 
  14.         EventLoopGroup boss = new NioEventLoopGroup(1); 
  15.         // 工作線程,線程數(shù)默認(rèn)是:cpu核數(shù)*2 
  16.         EventLoopGroup worker = new NioEventLoopGroup(); 
  17.         try { 
  18.             // 服務(wù)器啟動(dòng)類 
  19.             ServerBootstrap serverBootstrap = new ServerBootstrap(); 
  20.             serverBootstrap.group(boss, worker) //設(shè)置線程組 
  21.                     .channel(NioServerSocketChannel.class)  //配置server通道 
  22.                     .childHandler(new MyChannelInitializer()); //worker線程的處理器 
  23.             //ByteBuf 的分配要設(shè)置為非池化,否則不能切換到堆緩沖區(qū)模式 
  24.             serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); 
  25.             ChannelFuture future = serverBootstrap.bind(port).sync(); 
  26.             System.out.println("服務(wù)器啟動(dòng)完成,端口為:" + port); 
  27.             //等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉 
  28.             future.channel().closeFuture().sync(); 
  29.         } finally { 
  30.             //優(yōu)雅關(guān)閉 
  31.             boss.shutdownGracefully(); 
  32.             worker.shutdownGracefully(); 
  33.         } 
  34.     } 
  35.  

3. 服務(wù)端-ChannelHandler

  1. package com.haopt.netty.server.handler; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.ChannelInboundHandlerAdapter; 
  6. import io.netty.util.CharsetUtil; 
  7.  
  8. public class MyChannelHandler extends ChannelInboundHandlerAdapter { 
  9.     /** 
  10.     * 獲取客戶端發(fā)來(lái)的數(shù)據(jù) 
  11.     * @param ctx 
  12.     * @param msg 
  13.     * @throws Exception 
  14.     */ 
  15.     @Override 
  16.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  17.         ByteBuf byteBuf = (ByteBuf) msg; 
  18.         String msgStr = byteBuf.toString(CharsetUtil.UTF_8); 
  19.         System.out.println("客戶端發(fā)來(lái)數(shù)據(jù):" + msgStr); 
  20.         //向客戶端發(fā)送數(shù)據(jù) 
  21.         ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); 
  22.     } 
  23.      
  24.     /** 
  25.     * 異常處理 
  26.     * @param ctx 
  27.     * @param cause 
  28.     * @throws Exception 
  29.     */ 
  30.     @Override 
  31.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  32.         cause.printStackTrace(); 
  33.         ctx.close(); 
  34.     } 

4. 測(cè)試用例

  1. package com.haopt.netty.myrpc; 
  2. import com.haopt.netty.server.MyRPCServer; 
  3. import org.junit.Test; 
  4. public class TestServer { 
  5.     @Test 
  6.     public void testServer() throws Exception{ 
  7.         MyRPCServer myRPCServer = new MyRPCServer(); 
  8.         myRPCServer.start(5566); 
  9.     } 

21.2 客戶端

1. 客戶端-client

  1. package com.haopt.netty.client; 
  2. import com.haopt.netty.client.handler.MyClientHandler; 
  3. import io.netty.bootstrap.Bootstrap; 
  4. import io.netty.channel.ChannelFuture; 
  5. import io.netty.channel.EventLoopGroup; 
  6. import io.netty.channel.nio.NioEventLoopGroup; 
  7. import io.netty.channel.socket.nio.NioSocketChannel; 
  8. public class MyRPCClient { 
  9.     public void start(String host, int port) throws Exception { 
  10.         //定義⼯作線程組 
  11.         EventLoopGroup worker = new NioEventLoopGroup(); 
  12.         try { 
  13.             //注意:client使⽤的是Bootstrap 
  14.             Bootstrap bootstrap = new Bootstrap(); 
  15.             bootstrap.group(worker) 
  16.             .channel(NioSocketChannel.class) //注意:client使⽤的是NioSocketChannel 
  17.             .handler(new MyClientHandler()); 
  18.             //連接到遠(yuǎn)程服務(wù) 
  19.             ChannelFuture future = bootstrap.connect(host, port).sync(); 
  20.             future.channel().closeFuture().sync(); 
  21.         } finally { 
  22.              worker.shutdownGracefully(); 
  23.         } 
  24.     } 

2. 客戶端-(ClientHandler)

  1. package com.haopt.netty.client.handler; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.SimpleChannelInboundHandler; 
  6. import io.netty.util.CharsetUtil; 
  7. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  8.     @Override 
  9.     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
  10.         System.out.println("接收到服務(wù)端的消息:" + 
  11.         msg.toString(CharsetUtil.UTF_8)); 
  12.     } 
  13.     @Override 
  14.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  15.         // 向服務(wù)端發(fā)送數(shù)據(jù) 
  16.         String msg = "hello"
  17.         ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); 
  18.     } 
  19.     @Override 
  20.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  21.         cause.printStackTrace(); 
  22.         ctx.close(); 
  23.     } 

相信代碼執(zhí)行起來(lái)沒(méi)有任何問(wèn)題(如果有任何問(wèn)題反應(yīng)交流)。但是對(duì)上面代碼為何這樣實(shí)現(xiàn)有很多疑。嘿嘿,我也是奧。接下來(lái)我們對(duì)這些代碼中用到的組件進(jìn)行介紹,希望能消除之前疑慮。如果還是不能,可以把疑問(wèn)寫(xiě)于留言處,嘿嘿,我也不一定會(huì)有個(gè)好的解答,但是大佬總會(huì)有的。

02二 Netty核心組件

我們都知道Netty是基于事件驅(qū)動(dòng)。但是事件發(fā)生后,Netty的各個(gè)組件都做了什么?來(lái)看看下面內(nèi)容!

3 2.1 Channel

1. 初識(shí)Channel

  1. a 可以理解為socket連接,客戶端和服務(wù)端連接的時(shí)候會(huì)創(chuàng)建一個(gè)channel。 
  2. 負(fù)責(zé)基本的IO操作,例如:bind()、connect()、read()、write()。 
  3. b Netty的Channel接口所提供的API,大大減少了Socket類復(fù)雜性 

2. 常見(jiàn)Channel(不同的協(xié)議和阻塞類型的連接會(huì)有不同的Channel類型與之對(duì)應(yīng))

  1. a NioSocketChannel,NIO的客戶端 TCP Socket 連接。 
  2.  
  3. b NioServerSocketChannel,NIO的服務(wù)器端 TCP Socket 連接。 
  4.  
  5. c NioDatagramChannel, UDP 連接。 
  6.  
  7. d NioSctpChannel,客戶端 Sctp 連接。 
  8.  
  9. e NioSctpServerChannel,Sctp 服務(wù)器端連接,這些通道涵蓋了UDP和TCP⽹絡(luò)IO以及⽂件IO。 

4 2.2 EventLoopGroup、EventLoop

1. 概述

  1. 有了Channel連接服務(wù),連接之間消息流動(dòng)。服務(wù)器發(fā)出消息稱為出站,服務(wù)器接受消息稱為入站。 
  2. 那么消息出站和入站就產(chǎn)生了事件例如:連接已激活;數(shù)據(jù)讀取;用戶事件;異常事件;打開(kāi)連接; 
  3. 關(guān)閉連接等等。有了事件,有了事件就需要機(jī)制來(lái)監(jiān)控和協(xié)調(diào)事件,這個(gè)機(jī)制就是EventLoop。 

2. 初識(shí)EventLoopGroup、EventLoop


對(duì)上圖解釋

  1. a 一個(gè)EventLoopGroup包含一個(gè)或者多個(gè)EventLoop 
  2. b 一個(gè)EventLoop在生命周期內(nèi)之和一個(gè)Thread綁定 
  3. c EventLoop上所有的IO事件在它專有的Thread上被處理。 
  4. d Channel在它生命周期只注冊(cè)于一個(gè)Event Loop 
  5. e 一個(gè)Event Loop可能被分配給一個(gè)或者多個(gè)Channel 

3. 代碼實(shí)現(xiàn)

  1. // 主線程,不處理任何業(yè)務(wù)邏輯,只是接收客戶的連接請(qǐng)求 
  2. EventLoopGroup boss = new NioEventLoopGroup(1); 
  3. // ⼯作線程,線程數(shù)默認(rèn)是:cpu*2 
  4. EventLoopGroup worker = new NioEventLoopGroup(); 

5 2.3 ChannelHandler

1. 初識(shí)ChannelHandler

  1. 對(duì)于數(shù)據(jù)的出站和入棧的業(yè)務(wù)邏輯都是在ChannelHandler中。 

2. 對(duì)于出站和入站對(duì)應(yīng)的ChannelHandler


  1. ChannelInboundHandler ⼊站事件處理器 
  2. ChannelOutBoundHandler 出站事件處理器 

3. 開(kāi)發(fā)中常用的ChannelHandler(ChannelInboundHandlerAdapter、SimpleChannelInboundHandler)

a 源碼

b SimpleChannelInboundHandler的源碼(是ChannelInboundHandlerAdapter子類)


注意:

兩者的區(qū)別在于,前者不會(huì)釋放消息數(shù)據(jù)的引⽤,⽽后者會(huì)釋放消息數(shù)據(jù)的引⽤。

6 2.4 ChannelPipeline 

1. 初識(shí)ChannelPipeline

  1. 將ChannelHandler串起來(lái)。一個(gè)Channel包含一個(gè)ChannelPipeline,而ChannelPipeline維護(hù)者一個(gè)ChannelHandler列表。 
  2. ChannelHandler與Channel和ChannelPipeline之間的映射關(guān)系,由ChannelHandlerContext進(jìn)⾏維護(hù)。 

 

如上圖解釋

  1. ChannelHandler按照加⼊的順序會(huì)組成⼀個(gè)雙向鏈表,⼊站事件從鏈表的head往后傳遞到最后⼀個(gè)ChannelHandler。 
  2. 出站事件從鏈表的tail向前傳遞,直到最后⼀個(gè)ChannelHandler,兩種類型的ChannelHandler相互不會(huì)影響。 

7 2.5 Bootstrap

1. 初識(shí)Bootstrap

  1. 是引導(dǎo)作用,配置整個(gè)netty程序,將各個(gè)組件串起來(lái),最后綁定接口,啟動(dòng)服務(wù)。 

2. Bootstrap兩種類型(Bootstrap、ServerBootstrap)

  1. 客戶端只需要一個(gè)EventLoopGroup,服務(wù)端需要兩個(gè)EventLoopGroup。 

 

上圖解釋

  1. 與ServerChannel相關(guān)聯(lián)的EventLoopGroup 將分配⼀個(gè)負(fù)責(zé)為傳⼊連接請(qǐng)求創(chuàng)建 Channel 的EventLoop。 
  2. ⼀旦連接被接受,第⼆個(gè) EventLoopGroup 就會(huì)給它的 Channel 分配⼀個(gè) EventLoop。 

8 2.6 Future

1. 初識(shí)

  1. 操作完成時(shí)通知應(yīng)用程序的方式。這個(gè)對(duì)象可以看做異步操作執(zhí)行結(jié)果占位符,它在將來(lái)某個(gè)時(shí)刻完成,并提供對(duì)其結(jié)果的訪問(wèn)。 

2. ChannelFuture的由來(lái)

  1. JDK 預(yù)置了 interface java.util.concurrent.Future,但是其所提供的實(shí)現(xiàn), 
  2. 只允許⼿動(dòng)檢查對(duì)應(yīng)的操作是否已經(jīng)完成,或者⼀直阻塞直到它完成。這是⾮常 
  3. 繁瑣的,所以 Netty 提供了它⾃⼰的實(shí)現(xiàn)--ChannelFuture,⽤于在執(zhí)⾏異步 
  4. 操作的時(shí)候使⽤。 

3. Netty為什么完全是異步?

  1. a ChannelFuture提供了⼏種額外的⽅法,這些⽅法使得我們能夠注冊(cè)⼀個(gè)或者多個(gè)  ChannelFutureListener實(shí)例。 
  2.  
  3. b 監(jiān)聽(tīng)器的回調(diào)⽅法operationComplete(),將會(huì)在對(duì)應(yīng)的操作完成時(shí)被調(diào)⽤。 
  4.  然后監(jiān)聽(tīng)器可以判斷該操作是成功地完成了還是出錯(cuò)了。 
  5.    
  6. c 每個(gè) Netty 的出站 I/O 操作都將返回⼀個(gè) ChannelFuture,也就是說(shuō), 
  7.  它們都不會(huì)阻塞。所以說(shuō),Netty完全是異步和事件驅(qū)動(dòng)的。 

9 2.7 組件小結(jié)


上圖解釋

  1. 將組件串起來(lái) 

03三 緩存區(qū)-ByteBuf

ByteBuf是我們開(kāi)發(fā)中代碼操作最多部分和出現(xiàn)問(wèn)題最多的一部分。比如常見(jiàn)的TCP協(xié)議通信的粘包和拆包解決,和ByteBuf密切相關(guān)。后面文章會(huì)詳細(xì)分析,先不展開(kāi)。我們這里先了解ByteBuf的常用API和執(zhí)行內(nèi)幕。

10 3.1 ByteBuf概述

1. 初識(shí)ByteBuf

  1. JavaNIO提供了緩存容器(ByteBuffer),但是使用復(fù)雜。因此netty引入緩存ButeBuf, 
  2. 一串字節(jié)數(shù)組構(gòu)成。 

2. ByteBuf兩個(gè)索引(readerIndex,writerIndex)

  1. a readerIndex 將會(huì)根據(jù)讀取的字節(jié)數(shù)遞增 
  2. b writerIndex 也會(huì)根據(jù)寫(xiě)⼊的字節(jié)數(shù)進(jìn)⾏遞增 
  3.  
  4. 注意:如果readerIndex超過(guò)了writerIndex的時(shí)候,Netty會(huì)拋出IndexOutOf-BoundsException異常。 

 

11 3.2 ByteBuf基本使用

1. 讀取

  1. package com.haopt.netty.myrpc.test; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.util.CharsetUtil; 
  5. public class TestByteBuf01 { 
  6.     public static void main(String[] args) { 
  7.         //構(gòu)造 
  8.         ByteBuf byteBuf = Unpooled.copiedBuffer("hello world"
  9.         CharsetUtil.UTF_8); 
  10.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  11.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  12.         System.out.println("byteBuf的可寫(xiě)容量為:" + byteBuf.writableBytes()); 
  13.         while (byteBuf.isReadable()){ //⽅法⼀:內(nèi)部通過(guò)移動(dòng)readerIndex進(jìn)⾏讀取 
  14.          System.out.println((char)byteBuf.readByte()); 
  15.         } 
  16.         //⽅法⼆:通過(guò)下標(biāo)直接讀取 
  17.         for (int i = 0; i < byteBuf.readableBytes(); i++) { 
  18.          System.out.println((char)byteBuf.getByte(i)); 
  19.         } 
  20.         //⽅法三:轉(zhuǎn)化為byte[]進(jìn)⾏讀取 
  21.         byte[] bytes = byteBuf.array(); 
  22.         for (byte b : bytes) { 
  23.         System.out.println((char)b); 
  24.         } 
  25.     } 

2. 寫(xiě)入

  1. package com.haopt.netty.myrpc.test; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.util.CharsetUtil; 
  5. public class TestByteBuf02 { 
  6.     public static void main(String[] args) { 
  7.         //構(gòu)造空的字節(jié)緩沖區(qū),初始⼤⼩為10,最⼤為20 
  8.         ByteBuf byteBuf = Unpooled.buffer(10,20); 
  9.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  10.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  11.         System.out.println("byteBuf的可寫(xiě)容量為:" + byteBuf.writableBytes()); 
  12.         for (int i = 0; i < 5; i++) { 
  13.          byteBuf.writeInt(i); //寫(xiě)⼊int類型,⼀個(gè)int占4個(gè)字節(jié) 
  14.         } 
  15.         System.out.println("ok"); 
  16.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  17.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  18.         System.out.println("byteBuf的可寫(xiě)容量為:" + byteBuf.writableBytes()); 
  19.         while (byteBuf.isReadable()){ 
  20.          System.out.println(byteBuf.readInt()); 
  21.         } 
  22.     } 

3. 丟棄已讀字節(jié)


  1. package com.haopt.netty.myrpc.test; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.util.CharsetUtil; 
  5. public class TestByteBuf03 { 
  6.     public static void main(String[] args) { 
  7.         ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8); 
  8.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  9.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  10.         System.out.println("byteBuf的可寫(xiě)容量為:" + byteBuf.writableBytes()); 
  11.         while (byteBuf.isReadable()){ 
  12.          System.out.println((char)byteBuf.readByte()); 
  13.         } 
  14.         byteBuf.discardReadBytes(); //丟棄已讀的字節(jié)空間 
  15.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  16.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  17.         System.out.println("byteBuf的可寫(xiě)容量為:" + byteBuf.writableBytes()); 
  18.     } 

4. clear()


  1. package com.haopt.netty.myrpc.test; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.util.CharsetUtil; 
  5. public class TestByteBuf04 { 
  6.     public static void main(String[] args) { 
  7.         ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8); 
  8.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  9.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  10.         System.out.println("byteBuf的可寫(xiě)容量為:" + byteBuf.writableBytes()); 
  11.         byteBuf.clear(); //重置readerIndex 、 writerIndex 為0 
  12.         System.out.println("byteBuf的容量為:" + byteBuf.capacity()); 
  13.         System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes()); 
  14.         System.out.println("byteBuf的可寫(xiě)容量為:" + byteBuf.writableBytes()); 
  15.     } 

12 3.3 ByteBuf 使⽤模式

3.3.1 根據(jù)存放緩沖區(qū),分為三類

1. 堆緩存區(qū)(HeapByteBuf)

  1. 內(nèi)存的分配和回收速度⽐較快,可以被JVM⾃動(dòng)回收,缺點(diǎn)是,如果進(jìn)⾏socket的IO讀寫(xiě),需要額外做⼀次內(nèi)存復(fù)制,將堆內(nèi)存對(duì)應(yīng)的緩沖區(qū)復(fù)制到內(nèi)核Channel中,性能會(huì)有⼀定程度的下降。 
  2. 由于在堆上被 JVM 管理,在不被使⽤時(shí)可以快速釋放??梢酝ㄟ^(guò) ByteBuf.array() 來(lái)獲取 byte[] 數(shù) 
  3. 據(jù)。 

2. 直接緩存區(qū)(DirectByteBuf)

  1. ⾮堆內(nèi)存,它在對(duì)外進(jìn)⾏內(nèi)存分配,相⽐堆內(nèi)存,它的分配和回收速度會(huì)慢⼀些,但是 
  2. 將它寫(xiě)⼊或從Socket Channel中讀取時(shí),由于減少了⼀次內(nèi)存拷⻉,速度⽐堆內(nèi)存塊。 

3. 復(fù)合緩存區(qū)

  1. 顧名思義就是將上述兩類緩沖區(qū)聚合在⼀起。Netty 提供了⼀個(gè) CompsiteByteBuf, 
  2. 可以將堆緩沖區(qū)和直接緩沖區(qū)的數(shù)據(jù)放在⼀起,讓使⽤更加⽅便。 

3.3.2 緩存區(qū)選擇

Netty默認(rèn)使⽤的是直接緩沖區(qū)(DirectByteBuf),如果需要使⽤堆緩沖區(qū)(HeapByteBuf)模式,則需要進(jìn)⾏系統(tǒng)參數(shù)的設(shè)置。

  1. //netty中IO操作都是基于Unsafe完成的 
  2. System.setProperty("io.netty.noUnsafe""true");  
  3. //ByteBuf的分配要設(shè)置為⾮池化,否則不能切換到堆緩沖器模式 
  4. serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT); 

3.3.3 ByteBuf對(duì)象是否池化(Netty是默認(rèn)池化的)

1. 池化化和非池化的實(shí)現(xiàn)

  1. PooledByteBufAllocator,實(shí)現(xiàn)了ByteBuf的對(duì)象的池化,提⾼性能減少并最⼤限度地減少內(nèi)存碎⽚。 
  2. UnpooledByteBufAllocator,沒(méi)有實(shí)現(xiàn)對(duì)象的池化,每次會(huì)⽣成新的對(duì)象實(shí)例。 

2. 代碼實(shí)現(xiàn)(讓Netty中ByteBuf對(duì)象不池化)

  1. //通過(guò)ChannelHandlerContext獲取ByteBufAllocator實(shí)例 
  2. ctx.alloc(); 
  3. //通過(guò)channel也可以獲取 
  4. channel.alloc(); 
  5.  
  6. //Netty默認(rèn)使⽤了PooledByteBufAllocator 
  7.  
  8. //可以在引導(dǎo)類中設(shè)置⾮池化模式 
  9. serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT); 
  10. //或通過(guò)系統(tǒng)參數(shù)設(shè)置 
  11. System.setProperty("io.netty.allocator.type""pooled"); 
  12. System.setProperty("io.netty.allocator.type""unpooled"); 

我在開(kāi)發(fā)項(xiàng)目中,我一般不進(jìn)行更改。因?yàn)槲矣X(jué)得池化效率更高。有其他高見(jiàn),歡迎留言。

13 3.5 ByteBuf的釋放

ByteBuf如果采⽤的是堆緩沖區(qū)模式的話,可以由GC回收,但是如果采⽤的是直接緩沖區(qū),就不受GC的 管理,就得⼿動(dòng)釋放,否則會(huì)發(fā)⽣內(nèi)存泄露。

3.5.1 ByteBuf的手動(dòng)釋放(一般不推薦使用,了解)

1. 實(shí)現(xiàn)邏輯

  1. ⼿動(dòng)釋放,就是在使⽤完成后,調(diào)⽤ReferenceCountUtil.release(byteBuf); 進(jìn)⾏釋放。 
  2. 通過(guò)release⽅法減去 byteBuf的使⽤計(jì)數(shù),Netty 會(huì)⾃動(dòng)回收 byteBuf。 

2. 代碼

  1. /** 
  2. * 獲取客戶端發(fā)來(lái)的數(shù)據(jù) 
  3. * @param ctx 
  4. * @param msg 
  5. * @throws Exception 
  6. */ 
  7. @Override 
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  9.     ByteBuf byteBuf = (ByteBuf) msg; 
  10.     String msgStr = byteBuf.toString(CharsetUtil.UTF_8); 
  11.     System.out.println("客戶端發(fā)來(lái)數(shù)據(jù):" + msgStr); 
  12.     //釋放資源 
  13.     ReferenceCountUtil.release(byteBuf); 

注意:

⼿動(dòng)釋放可以達(dá)到⽬的,但是這種⽅式會(huì)⽐較繁瑣,如果⼀旦忘記釋放就可能會(huì)造成內(nèi)存泄露。

3.5.1 ByteBuf的自動(dòng)釋放

⾃動(dòng)釋放有三種⽅式,分別是:⼊站的TailHandler、繼承SimpleChannelInboundHandler、 HeadHandler的出站釋放。

1. TailHandler

Netty的ChannelPipleline的流⽔線的末端是TailHandler,默認(rèn)情況下如果每個(gè)⼊站處理器Handler都把消息往下傳,TailHandler會(huì)釋放掉ReferenceCounted類型的消息。

  1. /** 
  2. * 獲取客戶端發(fā)來(lái)的數(shù)據(jù) 
  3. * @param ctx 
  4. * @param msg 
  5. * @throws Exception 
  6. */ 
  7. @Override 
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  9.     ByteBuf byteBuf = (ByteBuf) msg; 
  10.     String msgStr = byteBuf.toString(CharsetUtil.UTF_8); 
  11.     System.out.println("客戶端發(fā)來(lái)數(shù)據(jù):" + msgStr); 
  12.     //向客戶端發(fā)送數(shù)據(jù) 
  13.     ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); 
  14.     ctx.fireChannelRead(msg); //將ByteBuf向下傳遞 

源碼:

在DefaultChannelPipeline中的TailContext內(nèi)部類會(huì)在最后執(zhí)⾏

  1. @Override 
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) { 
  3.  onUnhandledInboundMessage(ctx, msg); 
  4. //最后會(huì)執(zhí)⾏ 
  5. protected void onUnhandledInboundMessage(Object msg) { 
  6.   try { 
  7.       logger.debug( 
  8.       "Discarded inbound message {} that reached at the tail of the 
  9.       pipeline. " + "Please check your pipeline configuration.", msg); 
  10.   } finally { 
  11.     ReferenceCountUtil.release(msg); //釋放資源 
  12.   } 

2. SimpleChannelInboundHandler

當(dāng)ChannelHandler繼承了SimpleChannelInboundHandler后,在SimpleChannelInboundHandler的channelRead()⽅法中,將會(huì)進(jìn)⾏資源的釋放。

SimpleChannelInboundHandler的源碼

  1. //SimpleChannelInboundHandler中的channelRead() 
  2. @Override 
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  4.     boolean release = true
  5.     try { 
  6.       if (acceptInboundMessage(msg)) { 
  7.         @SuppressWarnings("unchecked"
  8.         I imsg = (I) msg; 
  9.         channelRead0(ctx, imsg); 
  10.       } else { 
  11.         release = false
  12.         ctx.fireChannelRead(msg); 
  13.       } 
  14.     } finally { 
  15.       if (autoRelease && release) { 
  16.        ReferenceCountUtil.release(msg); //在這⾥釋放 
  17.       } 
  18.     } 

我們handler代碼編寫(xiě):

  1. package com.haopt.myrpc.client.handler; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.SimpleChannelInboundHandler; 
  6. import io.netty.util.CharsetUtil; 
  7. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  8.     @Override 
  9.     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
  10.       System.out.println("接收到服務(wù)端的消息:" + 
  11.       msg.toString(CharsetUtil.UTF_8)); 
  12.     } 
  13.     @Override 
  14.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  15.       // 向服務(wù)端發(fā)送數(shù)據(jù) 
  16.       String msg = "hello"
  17.       ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); 
  18.     } 
  19.     @Override 
  20.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  21.       cause.printStackTrace(); 
  22.       ctx.close(); 
  23.     } 

3. 堆緩沖區(qū)(HeadHandler)

出站處理流程中,申請(qǐng)分配到的ByteBuf,通過(guò)HeadHandler完成⾃動(dòng)釋放。

在出站流程開(kāi)始的時(shí)候,通過(guò)調(diào)⽤ctx.writeAndFlush(msg),Bytebuf緩沖區(qū)開(kāi)始進(jìn)⼊出站處理的pipeline流⽔線。

在每⼀個(gè)出站Handler中的處理完成后,最后消息會(huì)來(lái)到出站的最后⼀棒HeadHandler,再經(jīng)過(guò)⼀輪復(fù)雜的調(diào)⽤,在flush完成后終將被release掉。

  1. package com.haopt.myrpc.client.handler; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.SimpleChannelInboundHandler; 
  6. import io.netty.util.CharsetUtil; 
  7. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  8. @Override 
  9. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
  10. Exception { 
  11. System.out.println("接收到服務(wù)端的消息:" + 
  12. msg.toString(CharsetUtil.UTF_8)); 
  13. @Override 
  14. public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  15. // 向服務(wù)端發(fā)送數(shù)據(jù) 
  16. String msg = "hello"
  17. ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); 
  18. @Override 
  19. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
  20. throws Exception { 
  21. cause.printStackTrace(); 
  22. ctx.close(); 

 

14 3.6 ByteBuf小結(jié)

a ⼊站流程中,如果對(duì)原消息不做處理,調(diào)ctx.fireChannelRead(msg) 把

原消息往下傳,由流⽔線最后⼀棒 TailHandler 完成⾃動(dòng)釋放。

b 如果截?cái)嗔?#12042;站處理流⽔線,則繼承SimpleChannelInboundHandler ,完成⼊站ByteBuf ⾃動(dòng)釋放。

c 出站處理過(guò)程中,申請(qǐng)分配到的 ByteBuf,通過(guò) HeadHandler 完成⾃動(dòng)釋放。

d ⼊站處理中,如果將原消息轉(zhuǎn)化為新的消息ctx.fireChannelRead(newMsg)往下傳,那必須把原消息release掉。

e ⼊站處理中,如果已經(jīng)不再調(diào)⽤ ctx.fireChannelRead(msg) 傳遞任何消息,也沒(méi)有繼承SimpleChannelInboundHandler 完成⾃動(dòng)釋放,那更要把原消息release掉。

 

責(zé)任編輯:姜華 來(lái)源: 花花和Java
相關(guān)推薦

2018-09-04 16:20:46

MySQ索引數(shù)據(jù)結(jié)構(gòu)

2018-11-21 08:00:05

Dubbo分布式系統(tǒng)

2020-11-06 16:50:43

工具GitLab CICD

2010-06-23 20:31:54

2010-07-13 09:36:25

2010-11-19 16:22:14

Oracle事務(wù)

2009-08-25 16:27:10

Mscomm控件

2020-09-21 09:53:04

FlexCSS開(kāi)發(fā)

2022-08-26 13:48:40

EPUBLinux

2021-01-27 11:10:49

JVM性能調(diào)優(yōu)

2020-07-20 06:35:55

BashLinux

2011-11-07 09:37:42

Hpyer-V虛擬化云計(jì)算

2023-10-06 00:04:02

2011-07-18 15:08:34

2022-06-03 10:09:32

威脅檢測(cè)軟件

2010-11-15 11:40:44

Oracle表空間

2018-06-22 13:05:02

前端JavaScript引擎

2021-01-19 12:00:39

前端監(jiān)控代碼

2010-09-27 09:31:42

JVM內(nèi)存結(jié)構(gòu)

2021-04-28 10:13:58

zookeeperZNode核心原理
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)