Java 從零開始手寫 RPC-Netty4 實現(xiàn)客戶端和服務(wù)端
說明
上一篇代碼基于 socket 的實現(xiàn)非常簡單,但是對于實際生產(chǎn),一般使用 netty。
至于 netty 的優(yōu)點可以參考:
為什么選擇 netty?[1]
http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty

代碼實現(xiàn)
maven 引入
- <dependency>
 - <groupId>io.netty</groupId>
 - <artifactId>netty-all</artifactId>
 - <version>${netty.version}</version>
 - </dependency>
 
引入 netty 對應(yīng)的 maven 包,此處為 4.1.17.Final。
服務(wù)端代碼實現(xiàn)
netty 的服務(wù)端啟動代碼是比較固定的。
- package com.github.houbb.rpc.server.core;
 - import com.github.houbb.log.integration.core.Log;
 - import com.github.houbb.log.integration.core.LogFactory;
 - import com.github.houbb.rpc.server.constant.RpcServerConst;
 - import com.github.houbb.rpc.server.handler.RpcServerHandler;
 - import io.netty.bootstrap.ServerBootstrap;
 - import io.netty.channel.*;
 - import io.netty.channel.nio.NioEventLoopGroup;
 - import io.netty.channel.socket.nio.NioServerSocketChannel;
 - /**
 - * rpc 服務(wù)端
 - * @author binbin.hou
 - * @since 0.0.1
 - */
 - public class RpcServer extends Thread {
 - private static final Log log = LogFactory.getLog(RpcServer.class);
 - /**
 - * 端口號
 - */
 - private final int port;
 - public RpcServer() {
 - this.port = RpcServerConst.DEFAULT_PORT;
 - }
 - public RpcServer(int port) {
 - this.port = port;
 - }
 - @Override
 - public void run() {
 - // 啟動服務(wù)端
 - log.info("RPC 服務(wù)開始啟動服務(wù)端");
 - EventLoopGroup bossGroup = new NioEventLoopGroup();
 - EventLoopGroup workerGroup = new NioEventLoopGroup();
 - try {
 - ServerBootstrap serverBootstrap = new ServerBootstrap();
 - serverBootstrap.group(workerGroup, bossGroup)
 - .channel(NioServerSocketChannel.class)
 - .childHandler(new ChannelInitializer<Channel>() {
 - @Override
 - protected void initChannel(Channel ch) throws Exception {
 - ch.pipeline().addLast(new RpcServerHandler());
 - }
 - })
 - // 這個參數(shù)影響的是還沒有被accept 取出的連接
 - .option(ChannelOption.SO_BACKLOG, 128)
 - // 這個參數(shù)只是過一段時間內(nèi)客戶端沒有響應(yīng),服務(wù)端會發(fā)送一個 ack 包,以判斷客戶端是否還活著。
 - .childOption(ChannelOption.SO_KEEPALIVE, true);
 - // 綁定端口,開始接收進來的鏈接
 - ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
 - log.info("RPC 服務(wù)端啟動完成,監(jiān)聽【" + port + "】端口");
 - channelFuture.channel().closeFuture().syncUninterruptibly();
 - log.info("RPC 服務(wù)端關(guān)閉完成");
 - } catch (Exception e) {
 - log.error("RPC 服務(wù)異常", e);
 - } finally {
 - workerGroup.shutdownGracefully();
 - bossGroup.shutdownGracefully();
 - }
 - }
 - }
 
為了簡單,服務(wù)端啟動端口號固定,RpcServerConst 常量類內(nèi)容如下:
- public final class RpcServerConst {
 - private RpcServerConst(){}
 - /**
 - * 默認(rèn)端口
 - * @since 0.0.1
 - */
 - public static final int DEFAULT_PORT = 9627;
 - }
 
RpcServerHandler
當(dāng)然,還有一個比較核心的類就是 RpcServerHandler
- public class RpcServerHandler extends SimpleChannelInboundHandler {
 - @Override
 - protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
 - // do nothing now
 - }
 - }
 
目前是空實現(xiàn),后續(xù)可以添加對應(yīng)的日志輸出及邏輯處理。
測試
啟動測試的代碼非常簡單:
- /**
 - * 服務(wù)啟動代碼測試
 - * @param args 參數(shù)
 - */
 - public static void main(String[] args) {
 - new RpcServer().start();
 - }
 
說明
上面我們實現(xiàn)了服務(wù)端的實現(xiàn),這一節(jié)來一起看一下 client 客戶端代碼實現(xiàn)。
代碼實現(xiàn)
RpcClient
- /*
 - * Copyright (c) 2019. houbinbin Inc.
 - * rpc All rights reserved.
 - */
 - package com.github.houbb.rpc.client.core;
 - import com.github.houbb.log.integration.core.Log;
 - import com.github.houbb.log.integration.core.LogFactory;
 - import com.github.houbb.rpc.client.handler.RpcClientHandler;
 - import io.netty.bootstrap.Bootstrap;
 - import io.netty.channel.Channel;
 - import io.netty.channel.ChannelFuture;
 - import io.netty.channel.ChannelInitializer;
 - import io.netty.channel.ChannelOption;
 - import io.netty.channel.EventLoopGroup;
 - import io.netty.channel.nio.NioEventLoopGroup;
 - import io.netty.channel.socket.nio.NioSocketChannel;
 - import io.netty.handler.logging.LogLevel;
 - import io.netty.handler.logging.LoggingHandler;
 - /**
 - * <p> rpc 客戶端 </p>
 - *
 - * <pre> Created: 2019/10/16 11:21 下午 </pre>
 - * <pre> Project: rpc </pre>
 - *
 - * @author houbinbin
 - * @since 0.0.2
 - */
 - public class RpcClient extends Thread {
 - private static final Log log = LogFactory.getLog(RpcClient.class);
 - /**
 - * 監(jiān)聽端口號
 - */
 - private final int port;
 - public RpcClient(int port) {
 - this.port = port;
 - }
 - public RpcClient() {
 - this(9527);
 - }
 - @Override
 - public void run() {
 - // 啟動服務(wù)端
 - log.info("RPC 服務(wù)開始啟動客戶端");
 - EventLoopGroup workerGroup = new NioEventLoopGroup();
 - try {
 - Bootstrap bootstrap = new Bootstrap();
 - ChannelFuture channelFuture = bootstrap.group(workerGroup)
 - .channel(NioSocketChannel.class)
 - .option(ChannelOption.SO_KEEPALIVE, true)
 - .handler(new ChannelInitializer<Channel>(){
 - @Override
 - protected void initChannel(Channel ch) throws Exception {
 - ch.pipeline()
 - .addLast(new LoggingHandler(LogLevel.INFO))
 - .addLast(new RpcClientHandler());
 - }
 - })
 - .connect("localhost", port)
 - .syncUninterruptibly();
 - log.info("RPC 服務(wù)啟動客戶端完成,監(jiān)聽端口:" + port);
 - channelFuture.channel().closeFuture().syncUninterruptibly();
 - log.info("RPC 服務(wù)開始客戶端已關(guān)閉");
 - } catch (Exception e) {
 - log.error("RPC 客戶端遇到異常", e);
 - } finally {
 - workerGroup.shutdownGracefully();
 - }
 - }
 - }
 
.connect("localhost", port) 聲明了客戶端需要連接的服務(wù)端,此處和服務(wù)端的端口保持一致。
RpcClientHandler
客戶端處理類也比較簡單,暫時留空。
- /*
 - * Copyright (c) 2019. houbinbin Inc.
 - * rpc All rights reserved.
 - */
 - package com.github.houbb.rpc.client.handler;
 - import io.netty.channel.ChannelHandlerContext;
 - import io.netty.channel.SimpleChannelInboundHandler;
 - /**
 - * <p> 客戶端處理類 </p>
 - *
 - * <pre> Created: 2019/10/16 11:30 下午 </pre>
 - * <pre> Project: rpc </pre>
 - *
 - * @author houbinbin
 - * @since 0.0.2
 - */
 - public class RpcClientHandler extends SimpleChannelInboundHandler {
 - @Override
 - protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
 - // do nothing.
 - }
 - }
 
啟動測試
服務(wù)端
首先啟動服務(wù)端。
客戶端
然后啟動客戶端連接服務(wù)端,實現(xiàn)如下:
- /**
 - * 服務(wù)啟動代碼測試
 - * @param args 參數(shù)
 - */
 - public static void main(String[] args) {
 - new RpcClient().start();
 - }
 
小結(jié)
為了便于大家學(xué)習(xí),以上源碼已經(jīng)開源:
https://github.com/houbb/rpc
我是老馬,期待與你的下次重逢。
References
[1] 為什么選擇 netty?: http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty















 
 
 





 
 
 
 