Nacos 服務(wù)訂閱流程全解析
本文將基于之前源碼搭建系列的前置步驟針對nacos服務(wù)訂閱流程源碼進(jìn)行深入梳理和分析,希望對你有幫助。

客戶端發(fā)起服務(wù)訂閱
為了方便講解,筆者基于CommandLineRunner 這個(gè)擴(kuò)展點(diǎn)主動(dòng)在服務(wù)完成初始化之后通過NamingService 的subscribe發(fā)起服務(wù)訂閱請求:
@Component
public class TestRunner implements CommandLineRunner {
    private final static Logger log = LoggerFactory
            .getLogger(TestRunner.class);
    @Override
    public void run(String... args) throws Exception {
        //主動(dòng)向nacos發(fā)起服務(wù)訂閱請求
        NamingService naming = NamingFactory.createNamingService("127.0.0.1:8848");
        //主動(dòng)訂閱nacos-provider這個(gè)服務(wù)的實(shí)例信息
        naming.subscribe("nacos-provider", event -> {
            if (event instanceof NamingEvent) {
                //日志打印監(jiān)聽到的服務(wù)名稱和結(jié)果
                log.info("監(jiān)聽到服務(wù)名稱:{},實(shí)例信息:{}", ((NamingEvent) event).getServiceName(),
                        ((NamingEvent) event).getInstances());
            }
        });
    }
}查看NamingService 的subscribe源碼可知,該方法會(huì)基于我們給定的服務(wù)名稱以及分組等信息主動(dòng)發(fā)起RPC主動(dòng)向nacos獲取nacos-provider的實(shí)例信息:
@Override
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
            throws NacosException {
       //......
       //基于RPC代理發(fā)起服務(wù)訂閱
        clientProxy.subscribe(serviceName, groupName, clusterString);
    }而clientProxy(底層就是NamingClientProxyDelegate)的執(zhí)行邏輯也比較簡單:
- 從緩存管理器serviceInfoHolder中嘗試獲取需要訂閱的服務(wù)信息。
 - 如果不存在或者未訂閱則向nacos發(fā)起RPC請求。
 - 通過serviceInfoHolder緩存訂閱的服務(wù)實(shí)例信息。
 

對應(yīng)的我們給出NamingClientProxyDelegate獲取服務(wù)實(shí)例的源碼段,和上述語義一致,讀者可以參考注釋了解一下過程:
@Override
    public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
       //......
       
        //查看緩存中是否存在訂閱的服務(wù)實(shí)例
       //......
        ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        //如果不存在或者未訂閱則發(fā)起RPC請求
        if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
            result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
        }
        //將查詢或者請求結(jié)果緩存到本地serviceInfoMap中
        serviceInfoHolder.processServiceInfo(result);
        return result;
    }默認(rèn)情況下,我們的緩存是沒有訂閱服務(wù)的信息的,所以會(huì)觸發(fā)RPC請求,然后nacos就會(huì)返回當(dāng)前服務(wù)的元信息,如下便是從nacos服務(wù)端返回的nacos-provider實(shí)例節(jié)點(diǎn)信息,后續(xù)如果需要調(diào)用,都會(huì)基于這份元信息發(fā)起請求:

我們也可以通過抓包工具看到這個(gè)請求的詳細(xì)內(nèi)容,如下圖所示,可以看到筆者基于nacos客戶端端口64393作為源端口號,nacos服務(wù)端端口號即9848作為目的端口進(jìn)行抓取捕獲到的TCP網(wǎng)絡(luò)包,可以看到這個(gè)訂閱的RPC接口請求參數(shù)詳情:

解碼后即可看到這個(gè)請求的數(shù)據(jù)就是源碼調(diào)試時(shí)看到的請求參數(shù),也就是對于nacos-provider的訂閱:

基于服務(wù)端的返回結(jié)果,客戶端會(huì)進(jìn)行如下操作:
- 將實(shí)例信息更新到本地緩存。
 - 查看對應(yīng)服務(wù)實(shí)例在本地緩存中的數(shù)據(jù),并和服務(wù)端響應(yīng)的數(shù)據(jù)進(jìn)行比對,若一致則說明實(shí)例信息沒有更新,直接返回,如果發(fā)現(xiàn)不一致,進(jìn)入步驟3。
 - 則說明服務(wù)發(fā)生變化,則基于零拷貝將響應(yīng)結(jié)果寫入本地持久化,便于后續(xù)服務(wù)重啟恢復(fù)數(shù)據(jù)。
 

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
  //獲取原有服務(wù)信息
        String serviceKey = serviceInfo.getKey();
         ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        //......
        //以服務(wù)名作為key,實(shí)例信息作為value寫入緩存
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        
        //比對實(shí)例,如果不一致則說明服務(wù)發(fā)生變化
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
        //......
        if (changed) {
            //......
            //如果發(fā)生變化則基于零拷貝刷盤技術(shù)將訂閱的服務(wù)信息寫入本地        
            DiskCache.write(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }Nacos基于緩存發(fā)起回復(fù)并注冊該客戶端
了解了客戶端的訂閱請求之后,我們再來聊聊nacos是如何處理該請求的,在之前的文章中我們說過,nacos是通過grpcCommonRequestAcceptor處理客戶端的RPC請求,對應(yīng)服務(wù)訂閱請求,服務(wù)端收到的將會(huì)收到一個(gè)類型為SubscribeServiceRequest 的服務(wù)訂閱請求:

找到對應(yīng)的處理器SubscribeServiceRequestHandler之后,grpcCommonRequestAcceptor會(huì)從RPC請求報(bào)文中拿到參數(shù)交由該處理器進(jìn)行處理:

對應(yīng)的我們也給出nacos服務(wù)端處理客戶端服務(wù)訂閱請求的代碼段,即位于GrpcRequestAcceptor的request方法:
@Override
    public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
        //......
        traceIfNecessary(grpcRequest, true);
        //需要使用的服務(wù)器類型,例如服務(wù)注冊就是 InstanceRequest
        String type = grpcRequest.getMetadata().getType();
        long startTime = System.nanoTime();
        
        //......
        //基于type到找到對應(yīng)的請求處理器
        RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
        //......
        
        //......
        
        Request request = (Request) parseObj;
        try {
            //......
            //解析參數(shù)并處理該請求
            Response response = requestHandler.handleRequest(request, requestMeta);
            //......
        } catch (Throwable e) {
           //......
        }
        
    }隨后SubscribeServiceRequestHandler的handle就會(huì)執(zhí)行如下步驟:
- 基于參數(shù)生成要獲取的服務(wù)請求參數(shù)service,service包含要請求的服務(wù)名稱、命名空間、分組等信息。
 - 從緩存serviceStorage管理的緩存serviceDataIndexes中查詢出對應(yīng)服務(wù)實(shí)例即nacos-provider的所有實(shí)例信息。
 - 過濾出有效即健康可用的實(shí)例響應(yīng)給客戶端。
 

對應(yīng)SubscribeServiceRequestHandler的源碼如下,讀者可參考注釋查閱:
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
        //解析參數(shù)信息
        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //基于參數(shù)生成服務(wù)元信息,即要獲取的服務(wù)信息
        Service service = Service.newService(namespaceId, groupName, serviceName, true);
      //......
        //基于serviceStorage從緩存中拿到訂閱的服務(wù)信息,再通過selectInstancesWithHealthyProtection篩選出健康的示例
        ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
                metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
                true, subscriber.getIp());
        //......
        //返回結(jié)果
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }小結(jié)
自此我們通過源碼閱讀、網(wǎng)絡(luò)抓包、斷點(diǎn)調(diào)測查看等方式對nacos服務(wù)訂閱的源碼進(jìn)行的較為詳細(xì)的分析,希望對你有幫助。















 
 
 












 
 
 
 