Nacos 服務訂閱流程全解析
本文將基于之前源碼搭建系列的前置步驟針對nacos服務訂閱流程源碼進行深入梳理和分析,希望對你有幫助。
客戶端發(fā)起服務訂閱
為了方便講解,筆者基于CommandLineRunner 這個擴展點主動在服務完成初始化之后通過NamingService 的subscribe發(fā)起服務訂閱請求:
@Component
public class TestRunner implements CommandLineRunner {
private final static Logger log = LoggerFactory
.getLogger(TestRunner.class);
@Override
public void run(String... args) throws Exception {
//主動向nacos發(fā)起服務訂閱請求
NamingService naming = NamingFactory.createNamingService("127.0.0.1:8848");
//主動訂閱nacos-provider這個服務的實例信息
naming.subscribe("nacos-provider", event -> {
if (event instanceof NamingEvent) {
//日志打印監(jiān)聽到的服務名稱和結果
log.info("監(jiān)聽到服務名稱:{},實例信息:{}", ((NamingEvent) event).getServiceName(),
((NamingEvent) event).getInstances());
}
});
}
}
查看NamingService 的subscribe源碼可知,該方法會基于我們給定的服務名稱以及分組等信息主動發(fā)起RPC主動向nacos獲取nacos-provider的實例信息:
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
//......
//基于RPC代理發(fā)起服務訂閱
clientProxy.subscribe(serviceName, groupName, clusterString);
}
而clientProxy(底層就是NamingClientProxyDelegate)的執(zhí)行邏輯也比較簡單:
- 從緩存管理器serviceInfoHolder中嘗試獲取需要訂閱的服務信息。
- 如果不存在或者未訂閱則向nacos發(fā)起RPC請求。
- 通過serviceInfoHolder緩存訂閱的服務實例信息。
對應的我們給出NamingClientProxyDelegate獲取服務實例的源碼段,和上述語義一致,讀者可以參考注釋了解一下過程:
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
//......
//查看緩存中是否存在訂閱的服務實例
//......
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//如果不存在或者未訂閱則發(fā)起RPC請求
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
//將查詢或者請求結果緩存到本地serviceInfoMap中
serviceInfoHolder.processServiceInfo(result);
return result;
}
默認情況下,我們的緩存是沒有訂閱服務的信息的,所以會觸發(fā)RPC請求,然后nacos就會返回當前服務的元信息,如下便是從nacos服務端返回的nacos-provider實例節(jié)點信息,后續(xù)如果需要調(diào)用,都會基于這份元信息發(fā)起請求:
我們也可以通過抓包工具看到這個請求的詳細內(nèi)容,如下圖所示,可以看到筆者基于nacos客戶端端口64393作為源端口號,nacos服務端端口號即9848作為目的端口進行抓取捕獲到的TCP網(wǎng)絡包,可以看到這個訂閱的RPC接口請求參數(shù)詳情:
解碼后即可看到這個請求的數(shù)據(jù)就是源碼調(diào)試時看到的請求參數(shù),也就是對于nacos-provider的訂閱:
基于服務端的返回結果,客戶端會進行如下操作:
- 將實例信息更新到本地緩存。
- 查看對應服務實例在本地緩存中的數(shù)據(jù),并和服務端響應的數(shù)據(jù)進行比對,若一致則說明實例信息沒有更新,直接返回,如果發(fā)現(xiàn)不一致,進入步驟3。
- 則說明服務發(fā)生變化,則基于零拷貝將響應結果寫入本地持久化,便于后續(xù)服務重啟恢復數(shù)據(jù)。
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//獲取原有服務信息
String serviceKey = serviceInfo.getKey();
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
//......
//以服務名作為key,實例信息作為value寫入緩存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//比對實例,如果不一致則說明服務發(fā)生變化
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
//......
if (changed) {
//......
//如果發(fā)生變化則基于零拷貝刷盤技術將訂閱的服務信息寫入本地
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
Nacos基于緩存發(fā)起回復并注冊該客戶端
了解了客戶端的訂閱請求之后,我們再來聊聊nacos是如何處理該請求的,在之前的文章中我們說過,nacos是通過grpcCommonRequestAcceptor處理客戶端的RPC請求,對應服務訂閱請求,服務端收到的將會收到一個類型為SubscribeServiceRequest 的服務訂閱請求:
找到對應的處理器SubscribeServiceRequestHandler之后,grpcCommonRequestAcceptor會從RPC請求報文中拿到參數(shù)交由該處理器進行處理:
對應的我們也給出nacos服務端處理客戶端服務訂閱請求的代碼段,即位于GrpcRequestAcceptor的request方法:
@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
//......
traceIfNecessary(grpcRequest, true);
//需要使用的服務器類型,例如服務注冊就是 InstanceRequest
String type = grpcRequest.getMetadata().getType();
long startTime = System.nanoTime();
//......
//基于type到找到對應的請求處理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//......
//......
Request request = (Request) parseObj;
try {
//......
//解析參數(shù)并處理該請求
Response response = requestHandler.handleRequest(request, requestMeta);
//......
} catch (Throwable e) {
//......
}
}
隨后SubscribeServiceRequestHandler的handle就會執(zhí)行如下步驟:
- 基于參數(shù)生成要獲取的服務請求參數(shù)service,service包含要請求的服務名稱、命名空間、分組等信息。
- 從緩存serviceStorage管理的緩存serviceDataIndexes中查詢出對應服務實例即nacos-provider的所有實例信息。
- 過濾出有效即健康可用的實例響應給客戶端。
對應SubscribeServiceRequestHandler的源碼如下,讀者可參考注釋查閱:
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
//解析參數(shù)信息
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//基于參數(shù)生成服務元信息,即要獲取的服務信息
Service service = Service.newService(namespaceId, groupName, serviceName, true);
//......
//基于serviceStorage從緩存中拿到訂閱的服務信息,再通過selectInstancesWithHealthyProtection篩選出健康的示例
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp());
//......
//返回結果
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
小結
自此我們通過源碼閱讀、網(wǎng)絡抓包、斷點調(diào)測查看等方式對nacos服務訂閱的源碼進行的較為詳細的分析,希望對你有幫助。