Nacos 服務(wù)變更推送流程全解析
我們以Nacos 2.3.x版本為例,作為服務(wù)注冊(cè)中心,它的服務(wù)注冊(cè)和服務(wù)下線實(shí)時(shí)性感知相較于Consul的Raft一致性確認(rèn),亦或者Eureka定時(shí)拉取同步的機(jī)制來(lái)說(shuō)做了很好的折中,既避免了實(shí)現(xiàn)的復(fù)雜性又能保證較好的實(shí)時(shí)性,所以本文將從一次服務(wù)下線的請(qǐng)求結(jié)合并結(jié)合源碼分析的方式來(lái)講解一下Nacos服務(wù)實(shí)例狀態(tài)變更時(shí)是如何實(shí)時(shí)推送的服務(wù)消費(fèi)者的。
一、項(xiàng)目架構(gòu)介紹
1. 服務(wù)通信流程
這里我們先簡(jiǎn)單介紹的一下項(xiàng)目的架構(gòu),筆者將基于Nacos的源碼搭建一套基礎(chǔ)的服務(wù)注冊(cè)中心,然后提供兩個(gè)nacos-provider服務(wù)作為服務(wù)提供者(端口號(hào)分別是9001和9002),而nacos-consumer作為服務(wù)消費(fèi)者通過(guò)nacos或者nacos-provider的信息發(fā)起服務(wù)調(diào)用。
基于這個(gè)架構(gòu),我們會(huì)嚴(yán)格按照如下步驟完成實(shí)驗(yàn)和源碼分析:
- 將9002端口的服務(wù)提供者下線,查看服務(wù)提供者如何完成服務(wù)下線通知。
- 查看nacos收到該請(qǐng)求后,內(nèi)部如何處理該消息,并將消息通知給消費(fèi)者。
- 消費(fèi)者收到該請(qǐng)求后,如何更新本地緩存。
2. 服務(wù)提供者查詢接口
這里我們也給出服務(wù)提供者nacos-provider的http接口,可以看到該接口會(huì)返回當(dāng)前服務(wù)的名稱和端口號(hào):
@GetMapping("/provide")
public String provide() {
log.info("請(qǐng)求打到服務(wù)提供者provide上");
Map<String, String> map = new HashMap<>();
map.put("provider", env.getProperty("spring.application.name"));
map.put("port", env.getProperty("server.port"));
return JSONUtil.toJsonStr(map);
}
3. 服務(wù)消費(fèi)者
而服務(wù)消費(fèi)者也通過(guò)feign聲明引入該調(diào)用:
@FeignClient(name = "nacos-provider")
public interface NacosProvider {
@GetMapping("/provide")
String provide();
}
后續(xù)我們就可以通過(guò)服務(wù)提供者的test接口調(diào)用到nacos-provider的provide接口:
@Resource
private NacosProvider nacosProvider;
@GetMapping("/test")
public String test() {
return nacosProvider.provide();
}
需要注意的是,nacos-consumer如果沒(méi)有顯示調(diào)用nacos-provider是不會(huì)訂閱該提供者的所有實(shí)例信息,所以我們?yōu)榱朔奖闼餍栽诜?wù)啟動(dòng)時(shí)主動(dòng)發(fā)起訂閱:
@Component
publicclass TestRunner implements CommandLineRunner {
privatefinalstatic Logger log = LoggerFactory
.getLogger(TestRunner.class);
@Override
public void run(String... args) throws Exception {
//主動(dòng)向nacos發(fā)起服務(wù)訂閱請(qǐng)求
NamingService naming = NamingFactory.createNamingService("127.0.0.1:8848");
naming.subscribe("nacos-provider", event -> {
if (event instanceof NamingEvent) {
//日志打印監(jiān)聽(tīng)到的服務(wù)名稱和結(jié)果
log.info("監(jiān)聽(tīng)到服務(wù)名稱:{},實(shí)例信息:{}", ((NamingEvent) event).getServiceName(),
((NamingEvent) event).getInstances());
}
});
}
}
二、基于服務(wù)提供者下線詳解Nacos實(shí)例狀態(tài)推送
1. 服務(wù)提供者優(yōu)雅關(guān)閉并推送服務(wù)下線消息
基于上述架構(gòu),我們通過(guò)IDEA將9002的服務(wù)提供者關(guān)閉,注意如果用IDEA停止按鈕操作就會(huì)斷開(kāi)調(diào)試的連接,我們就無(wú)法調(diào)試服務(wù)下線的源碼,正確是做法是如下代碼的方式主動(dòng)獲取啟動(dòng)時(shí)的上下文通過(guò)close方法顯示關(guān)閉:
@SpringBootApplication
@EnableDiscoveryClient
public class NacosProviderApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(NacosProviderApplication.class, args);
//主動(dòng)將springboot容器關(guān)閉
context.close();
}
}
Spring上下文close方法執(zhí)行關(guān)閉操作,此時(shí)spring就會(huì)遍歷所有的虛擬機(jī)鉤子即shutdown hook,對(duì)應(yīng)我們的服務(wù)提供者在啟動(dòng)時(shí)注冊(cè)的shutdown Hook即NacosAutoServiceRegistration的close方法就會(huì)發(fā)起服務(wù)下線請(qǐng)求,一旦完成服務(wù)下線請(qǐng)求通知之后,服務(wù)提供者就會(huì)銷毀RPC連接以及所有工作線程:
對(duì)應(yīng)的我給出這個(gè)close方法的入口,因?yàn)镹acosAutoServiceRegistration繼承自AbstractAutoServiceRegistration,所以它繼承了這個(gè)抽象類的shutdown hook方法destroy,這就使得spring boot容器關(guān)閉后,就會(huì)觸發(fā)下面這個(gè)方法:
@PreDestroy
public void destroy() {
stop();
}
此時(shí),這個(gè)stop方法在進(jìn)行CAS樂(lè)觀鎖狀態(tài)修改后,執(zhí)行如下兩件事:
- 發(fā)起RPC下線請(qǐng)求。
- 銷毀相關(guān)工作線程和nacos維護(hù)的RPC連接。
public void stop() {
if (this.getRunning().compareAndSet(true, false) && isEnabled()) {
//發(fā)起RPC下線請(qǐng)求
deregister();
//......
//銷毀相關(guān)工作線程和nacos維護(hù)的RPC連接
this.serviceRegistry.close();
}
}
此時(shí)deregister會(huì)通過(guò)getRegistration拿到nacos的元信息,再通過(guò)NacosServiceRegistry的deregister發(fā)起服務(wù)下線請(qǐng)求:
protected void deregister() {
this.serviceRegistry.deregister(getRegistration());
}
最終就會(huì)走到NacosNamingService的deregisterInstance,很直觀的看到,它通過(guò)RPC代理clientProxy傳入服務(wù)名、分組、和實(shí)例信息并調(diào)用deregisterService發(fā)起服務(wù)下線請(qǐng)求:
@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
clientProxy.deregisterService(serviceName, groupName, instance);
}
對(duì)應(yīng)我們給出發(fā)起調(diào)用時(shí)傳入的參數(shù)信息:
2. Nacos服務(wù)端基于RPC推送服務(wù)下線
隨后Nacos服務(wù)端GrpcRequestAcceptor收到該請(qǐng)求后,流程比較長(zhǎng),執(zhí)行如下步驟:
- 基于請(qǐng)求定位到處理器RequestHandler以服務(wù)下線為例就是InstanceRequestHandler。
- InstanceRequestHandler發(fā)布一個(gè)ClientDeregisterServiceEvent事件,交由NotifyCenter投遞到任務(wù)隊(duì)列中。
- NamingEventPublisher從隊(duì)列獲取到這個(gè)任務(wù)之后,找到ClientServiceIndexesManager處理該事件。
- ClientServiceIndexesManager還是發(fā)布一個(gè)ServiceChangedEvent到上述的阻塞隊(duì)列中。
- NamingSubscriberServiceV2Impl將其封成一個(gè)延遲任務(wù)提交到tasks中。
此時(shí)有個(gè)100ms執(zhí)行一次的定時(shí)器也就是PushDelayTaskExecuteEngine,將任務(wù)取出分發(fā)給TaskExecuteWorker,這個(gè)執(zhí)行者就會(huì)生成RPC請(qǐng)求將服務(wù)狀態(tài)變更通知給所有服務(wù)消費(fèi)者。
總體來(lái)說(shuō),Nacos服務(wù)端收到下線請(qǐng)求后,為避免下線通知影響服務(wù)端整體性能,其內(nèi)部設(shè)計(jì)了一套非常好的事件通知訂閱模型,當(dāng)服務(wù)端收到請(qǐng)求后,其內(nèi)部會(huì)根據(jù)請(qǐng)求類型找到相應(yīng)的處理器發(fā)布事件,讓對(duì)應(yīng)的訂閱者異步處理該消息?;谠撓⒆罱K會(huì)封裝成指定類型的任務(wù),提交到工作線程池中的某個(gè)worker的隊(duì)列中讓其異步消費(fèi),由此種大量解耦結(jié)合線程池的方式基于了nacos服務(wù)端最大的吞吐量和調(diào)優(yōu)空間。
對(duì)應(yīng)的我們也給出整體的業(yè)務(wù)流程圖,讀者可以參考該圖了解一下全過(guò)程:
對(duì)應(yīng)我們找到GrpcRequestAcceptor的request方法,可以看到它會(huì)基于該請(qǐng)求找到對(duì)應(yīng)的處理器,然后調(diào)用處理器的handleRequest方法處理該請(qǐng)求:
@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
traceIfNecessary(grpcRequest, true);
//需要使用的服務(wù)器類型,例如服務(wù)下線就是 InstanceRequest
String type = grpcRequest.getMetadata().getType();
long startTime = System.nanoTime();
//......
//基于type到容器中獲取到響應(yīng)的處理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//......
Request request = (Request) parseObj;
try {
//組裝連接信息
Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
requestMeta.setAbilityTable(connection.getAbilityTable());
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
//實(shí)際處理rpc請(qǐng)求的方法
Response response = requestHandler.handleRequest(request, requestMeta);
//......
} catch (Throwable e) {
//......
}
}
于是就找到了InstanceRequestHandler,該方法就會(huì)通過(guò)clientOperationService(也就是EphemeralClientOperationServiceImpl)發(fā)布ClientDeregisterServiceEvent事件:
private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
//基于ClientDeregisterServiceEvent發(fā)布服務(wù)下線事件
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
//.....
}
隨后NamingEventPublisher收到該事件后調(diào)用handleEvent找到對(duì)應(yīng)的事件處理器處理器該事件:
private void handleEvents() {
while (!shutdown) {
try {
//取出上述的任務(wù)
final Event event = queue.take();
handleEvent(event);//處理發(fā)布的事件
} catch (InterruptedException e) {
//......
}
}
}
如下便是筆者的調(diào)試記錄,可以看到服務(wù)下線事件定位到了ClientServiceIndexesManager這個(gè)管理器進(jìn)行處理:
于是就來(lái)到了ClientServiceIndexesManager的onEvent方法,再次發(fā)布一個(gè)ServiceChangedEvent事件到上述提到的同一個(gè)阻塞隊(duì)列中:
@Override
public void onEvent(Event event) {
if (event instanceof ClientOperationEvent.ClientReleaseEvent) {
handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event);
} elseif (event instanceof ClientOperationEvent) {//處理服務(wù)注冊(cè)或者下線后的事件
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//處理服務(wù)注冊(cè)事件,實(shí)際上就是發(fā)布一個(gè)ServiceChangedEvent事件
addPublisherIndexes(service, clientId);
} elseif (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) { //......
} elseif (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
//......
} elseif (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
//......
}
}
隊(duì)列會(huì)通知NamingSubscriberServiceV2Impl進(jìn)行處理,它會(huì)將事件推送到延遲隊(duì)列中,這個(gè)隊(duì)列內(nèi)部是采用并發(fā)安全的ConcurrentHashMap進(jìn)行管理。
@Override
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {//給客戶端的服務(wù)改變事件
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
//將處理事件推送到隊(duì)列中
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
MetricsMonitor.incrementServiceChangeCount(service);
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
//......
}
}
任務(wù)提交之后就會(huì)另外一個(gè)線程processingExecutor(100ms處理一次)會(huì)將其取出后找到任務(wù)處理器處理PushDelayTaskExecuteEngine,隨后,這個(gè)任務(wù)處理引擎將任務(wù)交給NacosExecuteTaskExecuteEngine這個(gè)任務(wù)處理引擎:
protected void processTasks() {//通過(guò)remove拿出隊(duì)列的數(shù)據(jù)
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
//......
//找到相應(yīng)處理器即PushDelayTaskExecuteEngine
NacosTaskProcessor processor = getProcessor(taskKey);
try {
// ReAdd task if process failed
if (!processor.process(task)) {//PushDelayTaskExecuteEngine將任務(wù)交給NacosExecuteTaskExecuteEngine這個(gè)任務(wù)處理引擎
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
//......
}
}
}
最后PushDelayTaskExecuteEngine會(huì)將該任務(wù)交給NacosExecuteTaskExecuteEngine中的某個(gè)工作線程TaskExecuteWorker的阻塞隊(duì)列中,最后TaskExecuteWorker就會(huì)取出該任務(wù)并消費(fèi):
@Override
public void run() {
while (!closed.get()) {
try {
//取出任務(wù)并處理
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
//......
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e, e);
}
}
}
}
最后這個(gè)服務(wù)下線的任務(wù)即PushExecuteTask就會(huì)遍歷所有客戶端并通知它們nacos-provider下線:
@Override
public void run() {
try {
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
for (String each : getTargetClientIds()) {//逐個(gè)遍歷客戶端,然后事件推送
Client client = clientManager.getClient(each);
//......
//通過(guò)RPC接口推送服務(wù)下線通知
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));//發(fā)起RPC通知消費(fèi)者
}
} catch (Exception e) {
//......
}
}
3. 消費(fèi)者更新實(shí)例緩存
服務(wù)消費(fèi)者RpcClient收到該請(qǐng)求后,會(huì)基于請(qǐng)求類型定位到服務(wù)端請(qǐng)求處理器,以我們下線通知為例就是NamingPushRequestHandler,由該處理器更新客戶端中記錄9002端口號(hào)的服務(wù)提供者nacos-provider狀態(tài)更新為下線,后續(xù)服務(wù)消費(fèi)者看到緩存中記錄的提供不可用時(shí)就會(huì)調(diào)用9001端口號(hào)的nacos-provider:
對(duì)應(yīng)的我們給出RpcClient的處理服務(wù)端請(qǐng)求的方法handleServerRequest,該方法會(huì)遍歷所有的服務(wù)端請(qǐng)求處理器,只要有一個(gè)處理器處理結(jié)果非空,就說(shuō)明找到相應(yīng)的處理器處理了,直接將響應(yīng)結(jié)果返回:
protected Response handleServerRequest(final Request request) {
//.....
//遍歷所有的服務(wù)端請(qǐng)求處理器
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
try {
//交給該處理器看看能否處理,若能處理則返回值非空
Response response = serverRequestHandler.requestReply(request);
//若非空說(shuō)明處理完成,直接返回結(jié)果
if (response != null) {
//.....
return response;
}
} catch (Exception e) {
//.....
}
}
returnnull;
}
實(shí)際上上述的步驟會(huì)走到NamingPushRequestHandler處理服務(wù)端下線請(qǐng)求,該處理器會(huì)調(diào)用serviceInfoHolder將請(qǐng)求中的實(shí)例信息更新到緩存中,由此保證客戶端可以完成正確的服務(wù)調(diào)用:
@Override
public Response requestReply(Request request) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request;
//從請(qǐng)求中拿到服務(wù)實(shí)例信息,并調(diào)用processServiceInfo更新緩存
serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
最終ServiceInfoHolder的processServiceInfo就會(huì)基于入?yún)⒛玫椒?wù)示例信息,并將緩存更新,然后發(fā)布一個(gè)實(shí)例更新的事件并將更新結(jié)果持久化到磁盤中:
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//拿到服務(wù)實(shí)例緩存
String serviceKey = serviceInfo.getKey();
//若為空直接返回
if (serviceKey == null) {
returnnull;
}
//取出緩存中原有緩存信息
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
//......
//基于請(qǐng)求更新緩存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//比對(duì)新舊緩存變化
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
//......
//如果緩存發(fā)生變化,則發(fā)布一個(gè)實(shí)例更新的事件InstancesChangeEvent,并將更新結(jié)果采用零拷貝的方式持久化到磁盤中
if (changed) {
//.....
//發(fā)布實(shí)例更新事件
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
//零拷貝持久化
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
對(duì)應(yīng)的我們給出下線9002端口的nacos-provider下線的請(qǐng)求值,可以看到基于這個(gè)結(jié)果,服務(wù)端給出的可用服務(wù)實(shí)例值僅有9001號(hào)端口的nacos-provider,服務(wù)消費(fèi)者基于此信息更新緩存,保證了服務(wù)消費(fèi)的正確性:
三、小結(jié)
本文以代碼示例為導(dǎo)向通過(guò)源碼的方式完成了Nacos服務(wù)實(shí)例狀態(tài)變更推送的講解,這里筆者也簡(jiǎn)單的補(bǔ)充一下個(gè)人對(duì)于源碼閱讀的一些技巧:
- 在閱讀源碼前,明確了解項(xiàng)目的設(shè)計(jì)理念和原理,即對(duì)項(xiàng)目有個(gè)基礎(chǔ)的認(rèn)知。
- 以問(wèn)題為導(dǎo)向針對(duì)性的進(jìn)行調(diào)試?yán)斫狻?/li>
- 適當(dāng)查找一些高質(zhì)量的源碼分析文章,針對(duì)性的梳理源碼結(jié)構(gòu)。
- 如果能夠明確源碼的最終斷點(diǎn),我們可以采用以終為始的方式,在目標(biāo)斷點(diǎn)上打住,結(jié)合調(diào)試的棧幀了解整體調(diào)用過(guò)程。
- 調(diào)試過(guò)程中注意觀察各個(gè)類之間的繼承、聚合等關(guān)系,以便梳理設(shè)計(jì)架構(gòu)和理念。
- 最后一點(diǎn),建議直接拉取源碼進(jìn)行調(diào)試,方便注釋。