Java 從零開始手寫 RPC—Reflect 反射實現(xiàn)通用調(diào)用之服務(wù)端
前面我們的例子是一個固定的出參和入?yún)?,固定的方法實現(xiàn)。
本節(jié)將實現(xiàn)通用的調(diào)用,讓框架具有更廣泛的實用性。
基本思路
所有的方法調(diào)用,基于反射進行相關(guān)處理實現(xiàn)。
服務(wù)端
核心類
- RpcServer
調(diào)整如下:
- serverBootstrap.group(workerGroup, bossGroup)
- .channel(NioServerSocketChannel.class)
- // 打印日志
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline()
- // 解碼 bytes=>resp
- .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
- // request=>bytes
- .addLast(new ObjectEncoder())
- .addLast(new RpcServerHandler());
- }
- })
- // 這個參數(shù)影響的是還沒有被accept 取出的連接
- .option(ChannelOption.SO_BACKLOG, 128)
- // 這個參數(shù)只是過一段時間內(nèi)客戶端沒有響應(yīng),服務(wù)端會發(fā)送一個 ack 包,以判斷客戶端是否還活著。
- .childOption(ChannelOption.SO_KEEPALIVE, true);
其中 ObjectDecoder 和 ObjectEncoder 都是 netty 內(nèi)置的實現(xiàn)。
RpcServerHandler
- package com.github.houbb.rpc.server.handler;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
- import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;
- import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- /**
- * @author binbin.hou
- * @since 0.0.1
- */
- public class RpcServerHandler extends SimpleChannelInboundHandler {
- private static final Log log = LogFactory.getLog(RpcServerHandler.class);
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- final String id = ctx.channel().id().asLongText();
- log.info("[Server] channel {} connected " + id);
- }
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- final String id = ctx.channel().id().asLongText();
- log.info("[Server] channel read start: {}", id);
- // 接受客戶端請求
- RpcRequest rpcRequest = (RpcRequest)msg;
- log.info("[Server] receive channel {} request: {}", id, rpcRequest);
- // 回寫到 client 端
- DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest);
- ctx.writeAndFlush(rpcResponse);
- log.info("[Server] channel {} response {}", id, rpcResponse);
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- /**
- * 處理請求信息
- * @param rpcRequest 請求信息
- * @return 結(jié)果信息
- * @since 0.0.6
- */
- private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) {
- DefaultRpcResponse rpcResponse = new DefaultRpcResponse();
- rpcResponse.seqId(rpcRequest.seqId());
- try {
- // 獲取對應(yīng)的 service 實現(xiàn)類
- // rpcRequest=>invocationRequest
- // 執(zhí)行 invoke
- Object result = DefaultServiceFactory.getInstance()
- .invoke(rpcRequest.serviceId(),
- rpcRequest.methodName(),
- rpcRequest.paramTypeNames(),
- rpcRequest.paramValues());
- rpcResponse.result(result);
- } catch (Exception e) {
- rpcResponse.error(e);
- log.error("[Server] execute meet ex for request", rpcRequest, e);
- }
- // 構(gòu)建結(jié)果值
- return rpcResponse;
- }
- }
和以前類似,不過 handleRpcRequest 要稍微麻煩一點。
這里需要根據(jù)發(fā)射,調(diào)用對應(yīng)的方法。
pojo
其中使用的出參、入?yún)崿F(xiàn)如下:
RpcRequest
- package com.github.houbb.rpc.common.rpc.domain;
- import java.util.List;
- /**
- * 序列化相關(guān)處理
- * (1)調(diào)用創(chuàng)建時間-createTime
- * (2)調(diào)用方式 callType
- * (3)超時時間 timeOut
- *
- * 額外信息:
- * (1)上下文信息
- *
- * @author binbin.hou
- * @since 0.0.6
- */
- public interface RpcRequest extends BaseRpc {
- /**
- * 創(chuàng)建時間
- * @return 創(chuàng)建時間
- * @since 0.0.6
- */
- long createTime();
- /**
- * 服務(wù)唯一標識
- * @return 服務(wù)唯一標識
- * @since 0.0.6
- */
- String serviceId();
- /**
- * 方法名稱
- * @return 方法名稱
- * @since 0.0.6
- */
- String methodName();
- /**
- * 方法類型名稱列表
- * @return 名稱列表
- * @since 0.0.6
- */
- List<String> paramTypeNames();
- // 調(diào)用參數(shù)信息列表
- /**
- * 調(diào)用參數(shù)值
- * @return 參數(shù)值數(shù)組
- * @since 0.0.6
- */
- Object[] paramValues();
- }
RpcResponse
- package com.github.houbb.rpc.common.rpc.domain;
- /**
- * 序列化相關(guān)處理
- * @author binbin.hou
- * @since 0.0.6
- */
- public interface RpcResponse extends BaseRpc {
- /**
- * 異常信息
- * @return 異常信息
- * @since 0.0.6
- */
- Throwable error();
- /**
- * 請求結(jié)果
- * @return 請求結(jié)果
- * @since 0.0.6
- */
- Object result();
- }
BaseRpc
- package com.github.houbb.rpc.common.rpc.domain;
- import java.io.Serializable;
- /**
- * 序列化相關(guān)處理
- * @author binbin.hou
- * @since 0.0.6
- */
- public interface BaseRpc extends Serializable {
- /**
- * 獲取唯一標識號
- * (1)用來唯一標識一次調(diào)用,便于獲取該調(diào)用對應(yīng)的響應(yīng)信息。
- * @return 唯一標識號
- */
- String seqId();
- /**
- * 設(shè)置唯一標識號
- * @param traceId 唯一標識號
- * @return this
- */
- BaseRpc seqId(final String traceId);
- }
ServiceFactory-服務(wù)工廠
為了便于對所有的 service 實現(xiàn)類統(tǒng)一管理,這里定義 service 工廠類。
ServiceFactory
- package com.github.houbb.rpc.server.service;
- import com.github.houbb.rpc.server.config.service.ServiceConfig;
- import com.github.houbb.rpc.server.registry.ServiceRegistry;
- import java.util.List;
- /**
- * 服務(wù)方法類倉庫管理類-接口
- *
- *
- * (1)對外暴露的方法,應(yīng)該盡可能的少。
- * (2)對于外部的調(diào)用,后期比如 telnet 治理,可以使用比如有哪些服務(wù)列表?
- * 單個服務(wù)有哪些方法名稱?
- *
- * 等等基礎(chǔ)信息的查詢,本期暫時全部隱藏掉。
- *
- * (3)前期盡可能的少暴露方法。
- * @author binbin.hou
- * @since 0.0.6
- * @see ServiceRegistry 服務(wù)注冊,將服務(wù)信息放在這個類中,進行統(tǒng)一的管理。
- * @see ServiceMethod 方法信息
- */
- public interface ServiceFactory {
- /**
- * 注冊服務(wù)列表信息
- * @param serviceConfigList 服務(wù)配置列表
- * @return this
- * @since 0.0.6
- */
- ServiceFactory registerServices(final List<ServiceConfig> serviceConfigList);
- /**
- * 直接反射調(diào)用
- * (1)此處對于方法反射,為了提升性能,所有的 class.getFullName() 進行拼接然后放進 key 中。
- *
- * @param serviceId 服務(wù)名稱
- * @param methodName 方法名稱
- * @param paramTypeNames 參數(shù)類型名稱列表
- * @param paramValues 參數(shù)值
- * @return 方法調(diào)用返回值
- * @since 0.0.6
- */
- Object invoke(final String serviceId, final String methodName,
- List<String> paramTypeNames, final Object[] paramValues);
- }
DefaultServiceFactory
作為默認實現(xiàn),如下:
- package com.github.houbb.rpc.server.service.impl;
- import com.github.houbb.heaven.constant.PunctuationConst;
- import com.github.houbb.heaven.util.common.ArgUtil;
- import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
- import com.github.houbb.heaven.util.util.CollectionUtil;
- import com.github.houbb.rpc.common.exception.RpcRuntimeException;
- import com.github.houbb.rpc.server.config.service.ServiceConfig;
- import com.github.houbb.rpc.server.service.ServiceFactory;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- /**
- * 默認服務(wù)倉庫實現(xiàn)
- * @author binbin.hou
- * @since 0.0.6
- */
- public class DefaultServiceFactory implements ServiceFactory {
- /**
- * 服務(wù) map
- * @since 0.0.6
- */
- private Map<String, Object> serviceMap;
- /**
- * 直接獲取對應(yīng)的 method 信息
- * (1)key: serviceId:methodName:param1@param2@param3
- * (2)value: 對應(yīng)的 method 信息
- */
- private Map<String, Method> methodMap;
- private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory();
- private DefaultServiceFactory(){}
- public static DefaultServiceFactory getInstance() {
- return INSTANCE;
- }
- /**
- * 服務(wù)注冊一般在項目啟動的時候,進行處理。
- * 屬于比較重的操作,而且一個服務(wù)按理說只應(yīng)該初始化一次。
- * 此處加鎖為了保證線程安全。
- * @param serviceConfigList 服務(wù)配置列表
- * @return this
- */
- @Override
- public synchronized ServiceFactory registerServices(List<ServiceConfig> serviceConfigList) {
- ArgUtil.notEmpty(serviceConfigList, "serviceConfigList");
- // 集合初始化
- serviceMap = new HashMap<>(serviceConfigList.size());
- // 這里只是預(yù)估,一般為2個服務(wù)。
- methodMap = new HashMap<>(serviceConfigList.size()*2);
- for(ServiceConfig serviceConfig : serviceConfigList) {
- serviceMap.put(serviceConfig.id(), serviceConfig.reference());
- }
- // 存放方法名稱
- for(Map.Entry<String, Object> entry : serviceMap.entrySet()) {
- String serviceId = entry.getKey();
- Object reference = entry.getValue();
- //獲取所有方法列表
- Method[] methods = reference.getClass().getMethods();
- for(Method method : methods) {
- String methodName = method.getName();
- if(ReflectMethodUtil.isIgnoreMethod(methodName)) {
- continue;
- }
- List<String> paramTypeNames = ReflectMethodUtil.getParamTypeNames(method);
- String key = buildMethodKey(serviceId, methodName, paramTypeNames);
- methodMap.put(key, method);
- }
- }
- return this;
- }
- @Override
- public Object invoke(String serviceId, String methodName, List<String> paramTypeNames, Object[] paramValues) {
- //參數(shù)校驗
- ArgUtil.notEmpty(serviceId, "serviceId");
- ArgUtil.notEmpty(methodName, "methodName");
- // 提供 cache,可以根據(jù)前三個值快速定位對應(yīng)的 method
- // 根據(jù) method 進行反射處理。
- // 對于 paramTypes 進行 string 連接處理。
- final Object reference = serviceMap.get(serviceId);
- final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames);
- final Method method = methodMap.get(methodKey);
- try {
- return method.invoke(reference, paramValues);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RpcRuntimeException(e);
- }
- }
- /**
- * (1)多個之間才用 : 分隔
- * (2)參數(shù)之間采用 @ 分隔
- * @param serviceId 服務(wù)標識
- * @param methodName 方法名稱
- * @param paramTypeNames 參數(shù)類型名稱
- * @return 構(gòu)建完整的 key
- * @since 0.0.6
- */
- private String buildMethodKey(String serviceId, String methodName, List<String> paramTypeNames) {
- String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT);
- return serviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON
- +param;
- }
- }
ServiceRegistry-服務(wù)注冊類
接口
- package com.github.houbb.rpc.server.registry;
- /**
- * 服務(wù)注冊類
- * (1)每個應(yīng)用唯一
- * (2)每個服務(wù)的暴露協(xié)議應(yīng)該保持一致
- * 暫時不提供單個服務(wù)的特殊處理,后期可以考慮添加
- *
- * @author binbin.hou
- * @since 0.0.6
- */
- public interface ServiceRegistry {
- /**
- * 暴露的 rpc 服務(wù)端口信息
- * @param port 端口信息
- * @return this
- * @since 0.0.6
- */
- ServiceRegistry port(final int port);
- /**
- * 注冊服務(wù)實現(xiàn)
- * @param serviceId 服務(wù)標識
- * @param serviceImpl 服務(wù)實現(xiàn)
- * @return this
- * @since 0.0.6
- */
- ServiceRegistry register(final String serviceId, final Object serviceImpl);
- /**
- * 暴露所有服務(wù)信息
- * (1)啟動服務(wù)端
- * @return this
- * @since 0.0.6
- */
- ServiceRegistry expose();
- }
實現(xiàn)
- package com.github.houbb.rpc.server.registry.impl;
- import com.github.houbb.heaven.util.common.ArgUtil;
- import com.github.houbb.rpc.common.config.protocol.ProtocolConfig;
- import com.github.houbb.rpc.server.config.service.DefaultServiceConfig;
- import com.github.houbb.rpc.server.config.service.ServiceConfig;
- import com.github.houbb.rpc.server.core.RpcServer;
- import com.github.houbb.rpc.server.registry.ServiceRegistry;
- import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * 默認服務(wù)端注冊類
- * @author binbin.hou
- * @since 0.0.6
- */
- public class DefaultServiceRegistry implements ServiceRegistry {
- /**
- * 單例信息
- * @since 0.0.6
- */
- private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry();
- /**
- * rpc 服務(wù)端端口號
- * @since 0.0.6
- */
- private int rpcPort;
- /**
- * 協(xié)議配置
- * (1)默認只實現(xiàn) tcp
- * (2)后期可以拓展實現(xiàn) web-service/http/https 等等。
- * @since 0.0.6
- */
- private ProtocolConfig protocolConfig;
- /**
- * 服務(wù)配置列表
- * @since 0.0.6
- */
- private List<ServiceConfig> serviceConfigList;
- private DefaultServiceRegistry(){
- // 初始化默認參數(shù)
- this.serviceConfigList = new ArrayList<>();
- this.rpcPort = 9527;
- }
- public static DefaultServiceRegistry getInstance() {
- return INSTANCE;
- }
- @Override
- public ServiceRegistry port(int port) {
- ArgUtil.positive(port, "port");
- this.rpcPort = port;
- return this;
- }
- /**
- * 注冊服務(wù)實現(xiàn)
- * (1)主要用于后期服務(wù)調(diào)用
- * (2)如何根據(jù) id 獲取實現(xiàn)?非常簡單,id 是唯一的。
- * 有就是有,沒有就拋出異常,直接返回。
- * (3)如果根據(jù) {@link com.github.houbb.rpc.common.rpc.domain.RpcRequest} 獲取對應(yīng)的方法。
- *
- * 3.1 根據(jù) serviceId 獲取唯一的實現(xiàn)
- * 3.2 根據(jù) {@link Class#getMethod(String, Class[])} 方法名稱+參數(shù)類型唯一獲取方法
- * 3.3 根據(jù) {@link java.lang.reflect.Method#invoke(Object, Object...)} 執(zhí)行方法
- *
- * @param serviceId 服務(wù)標識
- * @param serviceImpl 服務(wù)實現(xiàn)
- * @return this
- * @since 0.0.6
- */
- @Override
- @SuppressWarnings("unchecked")
- public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) {
- ArgUtil.notEmpty(serviceId, "serviceId");
- ArgUtil.notNull(serviceImpl, "serviceImpl");
- // 構(gòu)建對應(yīng)的其他信息
- ServiceConfig serviceConfig = new DefaultServiceConfig();
- serviceConfig.id(serviceId).reference(serviceImpl);
- serviceConfigList.add(serviceConfig);
- return this;
- }
- @Override
- public ServiceRegistry expose() {
- // 注冊所有服務(wù)信息
- DefaultServiceFactory.getInstance()
- .registerServices(serviceConfigList);
- // 暴露 netty server 信息
- new RpcServer(rpcPort).start();
- return this;
- }
- }
ServiceConfig 是一些服務(wù)的配置信息,接口定義如下:
- package com.github.houbb.rpc.server.config.service;
- /**
- * 單個服務(wù)配置類
- *
- * 簡化用戶使用:
- * 在用戶使用的時候,這個類應(yīng)該是不可見的。
- * 直接提供對應(yīng)的服務(wù)注冊類即可。
- *
- * 后續(xù)拓展
- * (1)版本信息
- * (2)服務(wù)端超時時間
- *
- * @author binbin.hou
- * @since 0.0.6
- * @param <T> 實現(xiàn)類泛型
- */
- public interface ServiceConfig<T> {
- /**
- * 獲取唯一標識
- * @return 獲取唯一標識
- * @since 0.0.6
- */
- String id();
- /**
- * 設(shè)置唯一標識
- * @param id 標識信息
- * @return this
- * @since 0.0.6
- */
- ServiceConfig<T> id(String id);
- /**
- * 獲取引用實體實現(xiàn)
- * @return 實體實現(xiàn)
- * @since 0.0.6
- */
- T reference();
- /**
- * 設(shè)置引用實體實現(xiàn)
- * @param reference 引用實現(xiàn)
- * @return this
- * @since 0.0.6
- */
- ServiceConfig<T> reference(T reference);
- }
測試
maven 引入
引入服務(wù)端的對應(yīng) maven 包:
- <dependency>
- <groupId>com.github.houbb</groupId>
- <artifactId>rpc-server</artifactId>
- <version>0.0.6</version>
- </dependency>
服務(wù)端啟動
- // 啟動服務(wù)
- DefaultServiceRegistry.getInstance()
- .register(ServiceIdConst.CALC, new CalculatorServiceImpl())
- .expose();
這里注冊了一個計算服務(wù),并且設(shè)置對應(yīng)的實現(xiàn)。
和以前實現(xiàn)類似,此處不再贅述。
啟動日志:
- [DEBUG] [2021-10-05 13:39:42.638] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
- [INFO] [2021-10-05 13:39:42.645] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)開始啟動服務(wù)端
- 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelRegistered
- 信息: [id: 0xec4dc74f] REGISTERED
- 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler bind
- 信息: [id: 0xec4dc74f] BIND: 0.0.0.0/0.0.0.0:9527
- 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelActive
- 信息: [id: 0xec4dc74f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
- [INFO] [2021-10-05 13:39:43.893] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)端啟動完成,監(jiān)聽【9527】端口
ps: 寫到這里忽然發(fā)現(xiàn)忘記添加對應(yīng)的 register 日志了,這里可以添加對應(yīng)的 registerListener 拓展。