Java 從零開(kāi)始手寫(xiě) RPC—如何實(shí)現(xiàn)客戶端調(diào)用服務(wù)端?
寫(xiě)完了客戶端和服務(wù)端,那么如何實(shí)現(xiàn)客戶端和服務(wù)端的調(diào)用呢?
下面就讓我們一起來(lái)看一下。
接口定義
計(jì)算方法
- package com.github.houbb.rpc.common.service;
 - import com.github.houbb.rpc.common.model.CalculateRequest;
 - import com.github.houbb.rpc.common.model.CalculateResponse;
 - /**
 - * <p> 計(jì)算服務(wù)接口 </p>
 - *
 - * <pre> Created: 2018/8/24 下午4:47 </pre>
 - * <pre> Project: fake </pre>
 - *
 - * @author houbinbin
 - * @since 0.0.1
 - */
 - public interface Calculator {
 - /**
 - * 計(jì)算加法
 - * @param request 請(qǐng)求入?yún)?nbsp;
 - * @return 返回結(jié)果
 - */
 - CalculateResponse sum(final CalculateRequest request);
 - }
 
pojo
對(duì)應(yīng)的參數(shù)對(duì)象:
- CalculateRequest
 
- package com.github.houbb.rpc.common.model;
 - import java.io.Serializable;
 - /**
 - * <p> 請(qǐng)求入?yún)?nbsp;</p>
 - *
 - * <pre> Created: 2018/8/24 下午5:05 </pre>
 - * <pre> Project: fake </pre>
 - *
 - * @author houbinbin
 - * @since 0.0.3
 - */
 - public class CalculateRequest implements Serializable {
 - private static final long serialVersionUID = 6420751004355300996L;
 - /**
 - * 參數(shù)一
 - */
 - private int one;
 - /**
 - * 參數(shù)二
 - */
 - private int two;
 - public CalculateRequest() {
 - }
 - public CalculateRequest(int one, int two) {
 - this.one = one;
 - this.two = two;
 - }
 - //getter setter toString
 - }
 
- CalculateResponse
 
- package com.github.houbb.rpc.common.model;
 - import java.io.Serializable;
 - /**
 - * <p> 請(qǐng)求入?yún)?nbsp;</p>
 - *
 - * <pre> Created: 2018/8/24 下午5:05 </pre>
 - * <pre> Project: fake </pre>
 - *
 - * @author houbinbin
 - * @since 0.0.3
 - */
 - public class CalculateResponse implements Serializable {
 - private static final long serialVersionUID = -1972014736222511341L;
 - /**
 - * 是否成功
 - */
 - private boolean success;
 - /**
 - * 二者的和
 - */
 - private int sum;
 - public CalculateResponse() {
 - }
 - public CalculateResponse(boolean success, int sum) {
 - this.success = success;
 - this.sum = sum;
 - }
 - //getter setter toString
 - }
 
客戶端
核心部分
RpcClient 需要添加對(duì)應(yīng)的 Handler,調(diào)整如下:
- Bootstrap bootstrap = new Bootstrap();
 - ChannelFuture channelFuture = bootstrap.group(workerGroup)
 - .channel(NioSocketChannel.class)
 - .option(ChannelOption.SO_KEEPALIVE, true)
 - .handler(new ChannelInitializer<Channel>(){
 - @Override
 - protected void initChannel(Channel ch) throws Exception {
 - ch.pipeline()
 - .addLast(new LoggingHandler(LogLevel.INFO))
 - .addLast(new CalculateRequestEncoder())
 - .addLast(new CalculateResponseDecoder())
 - .addLast(new RpcClientHandler());
 - }
 - })
 - .connect(RpcConstant.ADDRESS, port)
 - .syncUninterruptibly();
 
netty 中的 handler 泳道設(shè)計(jì)的非常優(yōu)雅,讓我們的代碼可以非常靈活地進(jìn)行拓展。
接下來(lái)我們看一下對(duì)應(yīng)的實(shí)現(xiàn)。
RpcClientHandler
- package com.github.houbb.rpc.client.handler;
 - import com.github.houbb.log.integration.core.Log;
 - import com.github.houbb.log.integration.core.LogFactory;
 - import com.github.houbb.rpc.client.core.RpcClient;
 - import com.github.houbb.rpc.common.model.CalculateRequest;
 - import com.github.houbb.rpc.common.model.CalculateResponse;
 - import io.netty.channel.ChannelHandlerContext;
 - import io.netty.channel.SimpleChannelInboundHandler;
 - /**
 - * <p> 客戶端處理類 </p>
 - *
 - * <pre> Created: 2019/10/16 11:30 下午 </pre>
 - * <pre> Project: rpc </pre>
 - *
 - * @author houbinbin
 - * @since 0.0.2
 - */
 - public class RpcClientHandler extends SimpleChannelInboundHandler {
 - private static final Log log = LogFactory.getLog(RpcClient.class);
 - @Override
 - public void channelActive(ChannelHandlerContext ctx) throws Exception {
 - CalculateRequest request = new CalculateRequest(1, 2);
 - ctx.writeAndFlush(request);
 - log.info("[Client] request is :{}", request);
 - }
 - @Override
 - protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
 - CalculateResponse response = (CalculateResponse)msg;
 - log.info("[Client] response is :{}", response);
 - }
 - }
 
這里比較簡(jiǎn)單,channelActive 中我們直接發(fā)起調(diào)用,入?yún)⒌膶?duì)象為了簡(jiǎn)單,此處固定寫(xiě)死。
channelRead0 中監(jiān)聽(tīng)服務(wù)端的相應(yīng)結(jié)果,并做日志輸出。
CalculateRequestEncoder
請(qǐng)求參數(shù)是一個(gè)對(duì)象,netty 是無(wú)法直接傳輸?shù)模覀儗⑵滢D(zhuǎn)換為基本對(duì)象:
- package com.github.houbb.rpc.client.encoder;
 - import com.github.houbb.rpc.common.model.CalculateRequest;
 - import io.netty.buffer.ByteBuf;
 - import io.netty.channel.ChannelHandlerContext;
 - import io.netty.handler.codec.MessageToByteEncoder;
 - /**
 - * @author binbin.hou
 - * @since 0.0.3
 - */
 - public class CalculateRequestEncoder extends MessageToByteEncoder<CalculateRequest> {
 - @Override
 - protected void encode(ChannelHandlerContext ctx, CalculateRequest msg, ByteBuf out) throws Exception {
 - int one = msg.getOne();
 - int two = msg.getTwo();
 - out.writeInt(one);
 - out.writeInt(two);
 - }
 - }
 
CalculateResponseDecoder
針對(duì)服務(wù)端的響應(yīng),也是同理。
我們需要把基本的類型,封裝轉(zhuǎn)換為我們需要的對(duì)象。
- package com.github.houbb.rpc.client.decoder;
 - import com.github.houbb.rpc.common.model.CalculateResponse;
 - import io.netty.buffer.ByteBuf;
 - import io.netty.channel.ChannelHandlerContext;
 - import io.netty.handler.codec.ByteToMessageDecoder;
 - import java.util.List;
 - /**
 - * 響應(yīng)參數(shù)解碼
 - * @author binbin.hou
 - * @since 0.0.3
 - */
 - public class CalculateResponseDecoder extends ByteToMessageDecoder {
 - @Override
 - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
 - boolean success = in.readBoolean();
 - int sum = in.readInt();
 - CalculateResponse response = new CalculateResponse(success, sum);
 - out.add(response);
 - }
 - }
 
服務(wù)端
設(shè)置處理類
RpcServer 中的處理類要稍微調(diào)整一下,其他的保持不變。
- ServerBootstrap serverBootstrap = new ServerBootstrap();
 - serverBootstrap.group(workerGroup, bossGroup)
 - .channel(NioServerSocketChannel.class)
 - // 打印日志
 - .handler(new LoggingHandler(LogLevel.INFO))
 - .childHandler(new ChannelInitializer<Channel>() {
 - @Override
 - protected void initChannel(Channel ch) throws Exception {
 - ch.pipeline()
 - .addLast(new CalculateRequestDecoder())
 - .addLast(new CalculateResponseEncoder())
 - .addLast(new RpcServerHandler());
 - }
 - })
 - // 這個(gè)參數(shù)影響的是還沒(méi)有被accept 取出的連接
 - .option(ChannelOption.SO_BACKLOG, 128)
 - // 這個(gè)參數(shù)只是過(guò)一段時(shí)間內(nèi)客戶端沒(méi)有響應(yīng),服務(wù)端會(huì)發(fā)送一個(gè) ack 包,以判斷客戶端是否還活著。
 - .childOption(ChannelOption.SO_KEEPALIVE, true);
 
RpcServerHandler
一開(kāi)始這里是空實(shí)現(xiàn),我們來(lái)添加一下對(duì)應(yīng)的實(shí)現(xiàn)。
- package com.github.houbb.rpc.server.handler;
 - import com.github.houbb.log.integration.core.Log;
 - import com.github.houbb.log.integration.core.LogFactory;
 - import com.github.houbb.rpc.common.model.CalculateRequest;
 - import com.github.houbb.rpc.common.model.CalculateResponse;
 - import com.github.houbb.rpc.common.service.Calculator;
 - import com.github.houbb.rpc.server.service.CalculatorService;
 - import io.netty.channel.ChannelHandlerContext;
 - import io.netty.channel.SimpleChannelInboundHandler;
 - /**
 - * @author binbin.hou
 - * @since 0.0.1
 - */
 - public class RpcServerHandler extends SimpleChannelInboundHandler {
 - private static final Log log = LogFactory.getLog(RpcServerHandler.class);
 - @Override
 - public void channelActive(ChannelHandlerContext ctx) throws Exception {
 - final String id = ctx.channel().id().asLongText();
 - log.info("[Server] channel {} connected " + id);
 - }
 - @Override
 - protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
 - final String id = ctx.channel().id().asLongText();
 - CalculateRequest request = (CalculateRequest)msg;
 - log.info("[Server] receive channel {} request: {} from ", id, request);
 - Calculator calculator = new CalculatorService();
 - CalculateResponse response = calculator.sum(request);
 - // 回寫(xiě)到 client 端
 - ctx.writeAndFlush(response);
 - log.info("[Server] channel {} response {}", id, response);
 - }
 - }
 
讀取到客戶端的訪問(wèn)之后,我們獲取到計(jì)算的入?yún)? CalculateRequest,然后調(diào)用 sum 方法,獲取到對(duì)應(yīng)的 CalculateResponse,將結(jié)果通知客戶端。
CalculateRequestDecoder
這里和客戶端是一一對(duì)應(yīng)的,我們首先把 netty 傳遞的基本類型轉(zhuǎn)換為 CalculateRequest 對(duì)象。
- package com.github.houbb.rpc.server.decoder;
 - import com.github.houbb.rpc.common.model.CalculateRequest;
 - import io.netty.buffer.ByteBuf;
 - import io.netty.channel.ChannelHandlerContext;
 - import io.netty.handler.codec.ByteToMessageDecoder;
 - import java.util.List;
 - /**
 - * 請(qǐng)求參數(shù)解碼
 - * @author binbin.hou
 - * @since 0.0.3
 - */
 - public class CalculateRequestDecoder extends ByteToMessageDecoder {
 - @Override
 - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
 - int one = in.readInt();
 - int two = in.readInt();
 - CalculateRequest request = new CalculateRequest(one, two);
 - out.add(request);
 - }
 - }
 
CalculateResponseEncoder
這里和客戶端類似,我們需要把 response 轉(zhuǎn)換為基本類型進(jìn)行網(wǎng)絡(luò)傳輸。
- package com.github.houbb.rpc.server.encoder;
 - import com.github.houbb.rpc.common.model.CalculateResponse;
 - import io.netty.buffer.ByteBuf;
 - import io.netty.channel.ChannelHandlerContext;
 - import io.netty.handler.codec.MessageToByteEncoder;
 - /**
 - * @author binbin.hou
 - * @since 0.0.3
 - */
 - public class CalculateResponseEncoder extends MessageToByteEncoder<CalculateResponse> {
 - @Override
 - protected void encode(ChannelHandlerContext ctx, CalculateResponse msg, ByteBuf out) throws Exception {
 - boolean success = msg.isSuccess();
 - int result = msg.getSum();
 - out.writeBoolean(success);
 - out.writeInt(result);
 - }
 - }
 
CalculatorService
服務(wù)端對(duì)應(yīng)的實(shí)現(xiàn)類。
- public class CalculatorService implements Calculator {
 - @Override
 - public CalculateResponse sum(CalculateRequest request) {
 - int sum = request.getOne()+request.getTwo();
 - return new CalculateResponse(true, sum);
 - }
 - }
 
測(cè)試
服務(wù)端
啟動(dòng)服務(wù)端:
- new RpcServer().start();
 
服務(wù)端啟動(dòng)日志:
- [DEBUG] [2021-10-05 11:53:11.795] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
 - [INFO] [2021-10-05 11:53:11.807] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)開(kāi)始啟動(dòng)服務(wù)端
 - 十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered
 - 信息: [id: 0xd399474f] REGISTERED
 - 十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler bind
 - 信息: [id: 0xd399474f] BIND: 0.0.0.0/0.0.0.0:9527
 - 十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelActive
 - 信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
 - [INFO] [2021-10-05 11:53:13.101] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)端啟動(dòng)完成,監(jiān)聽(tīng)【9527】端口
 
客戶端
啟動(dòng)客戶端:
- new RpcClient().start();
 
日志如下:
- [DEBUG] [2021-10-05 11:54:12.158] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
 - [INFO] [2021-10-05 11:54:12.164] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服務(wù)開(kāi)始啟動(dòng)客戶端
 - 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered
 - 信息: [id: 0x4d75c580] REGISTERED
 - 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler connect
 - 信息: [id: 0x4d75c580] CONNECT: /127.0.0.1:9527
 - 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelActive
 - 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] ACTIVE
 - [INFO] [2021-10-05 11:54:13.403] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服務(wù)啟動(dòng)客戶端完成,監(jiān)聽(tīng)端口:9527
 - 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler write
 - 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] WRITE: 8B
 - +-------------------------------------------------+
 - | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
 - +--------+-------------------------------------------------+----------------+
 - |00000000| 00 00 00 01 00 00 00 02 |........ |
 - +--------+-------------------------------------------------+----------------+
 - 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler flush
 - 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] FLUSH
 - [INFO] [2021-10-05 11:54:13.450] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelActive] - [Client] request is :CalculateRequest{one=1, two=2}
 - 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead
 - 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ: 5B
 - +-------------------------------------------------+
 - | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
 - +--------+-------------------------------------------------+----------------+
 - |00000000| 01 00 00 00 03 |..... |
 - +--------+-------------------------------------------------+----------------+
 - 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
 - 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ COMPLETE
 - [INFO] [2021-10-05 11:54:13.508] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :CalculateResponse{success=true, sum=3}
 
可以看到,輸出了對(duì)應(yīng)的請(qǐng)求參數(shù)和響應(yīng)結(jié)果。
當(dāng)然,此時(shí)服務(wù)端也有對(duì)應(yīng)的新增日志:
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead
 - 信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ: [id: 0xbc9f5927, L:/127.0.0.1:9527 - R:/127.0.0.1:54030]
 - 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
 - 信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ COMPLETE
 - [INFO] [2021-10-05 11:54:13.432] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927
 - [INFO] [2021-10-05 11:54:13.495] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 request: CalculateRequest{one=1, two=2} from
 - [INFO] [2021-10-05 11:54:13.505] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 response CalculateResponse{success=true, sum=3}
 















 
 
 






 
 
 
 