偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Java 從零開始手寫 RPC -序列化

開發(fā) 后端
序列化 (Serialization)是將對象的狀態(tài)信息轉換為可以存儲或傳輸的形式的過程。在序列化期間,對象將其當前狀態(tài)寫入到臨時或持久性存儲區(qū)。

[[429947]]

前面幾節(jié)我們實現了最基礎的客戶端調用服務端,這一節(jié)來學習一下通訊中的對象序列化。

為什么需要序列化

netty 底層都是基于 ByteBuf 進行通訊的。

前面我們通過編碼器/解碼器專門為計算的入參/出參進行處理,這樣方便我們直接使用 pojo。

但是有一個問題,如果想把我們的項目抽象為框架,那就需要為所有的對象編寫編碼器/解碼器。

顯然,直接通過每一個對象寫一對的方式是不現實的,而且用戶如何使用,也是未知的。

序列化的方式

基于字節(jié)的實現,性能好,可讀性不高。

基于字符串的實現,比如 json 序列化,可讀性好,性能相對較差。

ps: 可以根據個人還好選擇,相關序列化可參考下文,此處不做展開。

json 序列化框架簡介[1]

實現思路

可以將我們的 Pojo 全部轉化為 byte,然后 Byte 轉換為 ByteBuf 即可。

反之亦然。

代碼實現

maven

引入序列化包:

  1. <dependency> 
  2.     <groupId>com.github.houbb</groupId> 
  3.     <artifactId>json</artifactId> 
  4.     <version>0.1.1</version> 
  5. </dependency> 

服務端

核心

服務端的代碼可以大大簡化:

  1. serverBootstrap.group(workerGroup, bossGroup) 
  2.     .channel(NioServerSocketChannel.class) 
  3.     // 打印日志 
  4.     .handler(new LoggingHandler(LogLevel.INFO)) 
  5.     .childHandler(new ChannelInitializer<Channel>() { 
  6.         @Override 
  7.         protected void initChannel(Channel ch) throws Exception { 
  8.             ch.pipeline() 
  9.                     .addLast(new RpcServerHandler()); 
  10.         } 
  11.     }) 
  12.     // 這個參數影響的是還沒有被accept 取出的連接 
  13.     .option(ChannelOption.SO_BACKLOG, 128) 
  14.     // 這個參數只是過一段時間內客戶端沒有響應,服務端會發(fā)送一個 ack 包,以判斷客戶端是否還活著。 
  15.     .childOption(ChannelOption.SO_KEEPALIVE, true); 

這里只需要一個實現類即可。

RpcServerHandler

服務端的序列化/反序列化調整為直接使用 JsonBs 實現。

  1. package com.github.houbb.rpc.server.handler; 
  2.  
  3.  
  4. import com.github.houbb.json.bs.JsonBs; 
  5. import com.github.houbb.log.integration.core.Log; 
  6. import com.github.houbb.log.integration.core.LogFactory; 
  7. import com.github.houbb.rpc.common.model.CalculateRequest; 
  8. import com.github.houbb.rpc.common.model.CalculateResponse; 
  9. import com.github.houbb.rpc.common.service.Calculator; 
  10. import com.github.houbb.rpc.server.service.CalculatorService; 
  11.  
  12.  
  13. import io.netty.buffer.ByteBuf; 
  14. import io.netty.buffer.Unpooled; 
  15. import io.netty.channel.ChannelHandlerContext; 
  16. import io.netty.channel.SimpleChannelInboundHandler; 
  17.  
  18.  
  19. /** 
  20.  * @author binbin.hou 
  21.  * @since 0.0.1 
  22.  */ 
  23. public class RpcServerHandler extends SimpleChannelInboundHandler { 
  24.  
  25.  
  26.     private static final Log log = LogFactory.getLog(RpcServerHandler.class); 
  27.  
  28.  
  29.     @Override 
  30.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  31.         final String id = ctx.channel().id().asLongText(); 
  32.         log.info("[Server] channel {} connected " + id); 
  33.     } 
  34.  
  35.  
  36.     @Override 
  37.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
  38.         final String id = ctx.channel().id().asLongText(); 
  39.  
  40.  
  41.         ByteBuf byteBuf = (ByteBuf)msg; 
  42.         byte[] bytes = new byte[byteBuf.readableBytes()]; 
  43.         byteBuf.readBytes(bytes); 
  44.         CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class); 
  45.         log.info("[Server] receive channel {} request: {} from ", id, request); 
  46.  
  47.  
  48.         Calculator calculator = new CalculatorService(); 
  49.         CalculateResponse response = calculator.sum(request); 
  50.  
  51.  
  52.         // 回寫到 client 端 
  53.         byte[] responseBytes = JsonBs.serializeBytes(response); 
  54.         ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes); 
  55.         ctx.writeAndFlush(responseBuffer); 
  56.         log.info("[Server] channel {} response {}", id, response); 
  57.     } 
  58.  
  59.  

客戶端

核心

客戶端可以簡化如下:

  1. channelFuture = bootstrap.group(workerGroup) 
  2.     .channel(NioSocketChannel.class) 
  3.     .option(ChannelOption.SO_KEEPALIVE, true
  4.     .handler(new ChannelInitializer<Channel>(){ 
  5.         @Override 
  6.         protected void initChannel(Channel ch) throws Exception { 
  7.             channelHandler = new RpcClientHandler(); 
  8.             ch.pipeline() 
  9.                     .addLast(new LoggingHandler(LogLevel.INFO)) 
  10.                     .addLast(channelHandler); 
  11.         } 
  12.     }) 
  13.     .connect(RpcConstant.ADDRESS, port) 
  14.     .syncUninterruptibly(); 

RpcClientHandler

客戶端的序列化/反序列化調整為直接使用 JsonBs 實現。

  1. package com.github.houbb.rpc.client.handler; 
  2.  
  3.  
  4. import com.github.houbb.json.bs.JsonBs; 
  5. import com.github.houbb.log.integration.core.Log; 
  6. import com.github.houbb.log.integration.core.LogFactory; 
  7. import com.github.houbb.rpc.client.core.RpcClient; 
  8. import com.github.houbb.rpc.common.model.CalculateResponse; 
  9.  
  10.  
  11. import io.netty.buffer.ByteBuf; 
  12. import io.netty.channel.ChannelHandlerContext; 
  13. import io.netty.channel.SimpleChannelInboundHandler; 
  14.  
  15.  
  16. /** 
  17.  * <p> 客戶端處理類 </p> 
  18.  * 
  19.  * <pre> Created: 2019/10/16 11:30 下午  </pre> 
  20.  * <pre> Project: rpc  </pre> 
  21.  * 
  22.  * @author houbinbin 
  23.  * @since 0.0.2 
  24.  */ 
  25. public class RpcClientHandler extends SimpleChannelInboundHandler { 
  26.  
  27.  
  28.     private static final Log log = LogFactory.getLog(RpcClient.class); 
  29.  
  30.  
  31.     /** 
  32.      * 響應信息 
  33.      * @since 0.0.4 
  34.      */ 
  35.     private CalculateResponse response; 
  36.  
  37.  
  38.     @Override 
  39.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
  40.         ByteBuf byteBuf = (ByteBuf)msg; 
  41.         byte[] bytes = new byte[byteBuf.readableBytes()]; 
  42.         byteBuf.readBytes(bytes); 
  43.  
  44.  
  45.         this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class); 
  46.         log.info("[Client] response is :{}", response); 
  47.     } 
  48.  
  49.  
  50.     @Override 
  51.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  52.         // 每次用完要關閉,不然拿不到response,我也不知道為啥(目測得了解netty才行) 
  53.         // 個人理解:如果不關閉,則永遠會被阻塞。 
  54.         ctx.flush(); 
  55.         ctx.close(); 
  56.     } 
  57.  
  58.  
  59.     public CalculateResponse getResponse() { 
  60.         return response; 
  61.     } 
  62.  
  63.  

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2021-10-13 08:21:52

Java websocket Java 基礎

2021-10-29 08:07:30

Java timeout Java 基礎

2021-10-21 08:21:10

Java Reflect Java 基礎

2021-10-19 08:58:48

Java 語言 Java 基礎

2019-09-23 19:30:27

reduxreact.js前端

2021-10-14 08:39:17

Java Netty Java 基礎

2017-02-10 09:30:33

數據化運營流量

2024-12-06 17:02:26

2020-07-02 15:32:23

Kubernetes容器架構

2015-11-17 16:11:07

Code Review

2019-01-18 12:39:45

云計算PaaS公有云

2018-04-18 07:01:59

Docker容器虛擬機

2022-08-06 08:41:18

序列化反序列化Hessian

2018-03-19 10:20:23

Java序列化反序列化

2023-03-06 07:28:57

RPC框架序列化

2021-10-27 08:10:15

Java 客戶端 Java 基礎

2024-05-15 14:29:45

2010-05-26 17:35:08

配置Xcode SVN

2018-09-14 17:16:22

云計算軟件計算機網絡

2009-06-14 22:01:27

Java對象序列化反序列化
點贊
收藏

51CTO技術棧公眾號