Nacos Client服務(wù)訂閱之事件機(jī)制剖析
本文轉(zhuǎn)載自微信公眾號「程序新視」,作者二師兄。轉(zhuǎn)載本文請聯(lián)系程序新視公眾號。
學(xué)習(xí)不用那么功利,二師兄帶你從更高維度輕松閱讀源碼~
上篇文章,我們分析了Nacos客戶端訂閱的核心流程:Nacos客戶端通過一個定時任務(wù),每6秒從注冊中心獲取實例列表,當(dāng)發(fā)現(xiàn)實例發(fā)生變化時,發(fā)布變更事件,訂閱者進(jìn)行業(yè)務(wù)處理,然后更新內(nèi)存中和本地的緩存中的實例。
這篇文章為服務(wù)訂閱的第二篇,我們重點來分析,定時任務(wù)獲取到最新實例列表之后,整個事件機(jī)制是如何處理的。
回顧整個流程
先回顧一下客戶端服務(wù)訂閱的基本流程:
在第一步調(diào)用subscribe方法時,會訂閱一個EventListener事件。而在定時任務(wù)UpdateTask定時獲取實例列表之后,會調(diào)用ServiceInfoHolder#processServiceInfo方法對ServiceInfo進(jìn)行本地處理,這其中就包括和事件處理。
監(jiān)聽事件的注冊
在subscribe方法中,通過如下方式進(jìn)行了監(jiān)聽事件的注冊:
- @Override
 - public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
 - throws NacosException {
 - if (null == listener) {
 - return;
 - }
 - String clusterString = StringUtils.join(clusters, ",");
 - changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
 - clientProxy.subscribe(serviceName, groupName, clusterString);
 - }
 
這里的changeNotifier.registerListener便是進(jìn)行具體的事件注冊邏輯。追進(jìn)去看一下實現(xiàn)源碼:
- // InstancesChangeNotifier
 - public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
 - String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
 - ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
 - if (eventListeners == null) {
 - synchronized (lock) {
 - eventListeners = listenerMap.get(key);
 - if (eventListeners == null) {
 - eventListeners = new ConcurrentHashSet<EventListener>();
 - // 將EventListener緩存到listenerMap
 - listenerMap.put(key, eventListeners);
 - }
 - }
 - }
 - eventListeners.add(listener);
 - }
 
可以看出,事件的注冊便是將EventListener存儲在InstancesChangeNotifier的listenerMap屬性當(dāng)中了。
這里的數(shù)據(jù)結(jié)構(gòu)為Map,key為服務(wù)實例信息的拼接,value為監(jiān)聽事件的集合。
事件注冊流程就這么簡單。這里有一個雙重檢查鎖的實踐案例,不知道你留意到?jīng)]?可以學(xué)習(xí)一下。
ServiceInfo的處理
上面完成了事件的注冊,現(xiàn)在就追溯一下觸發(fā)事件的來源。UpdateTask中獲取到最新實例會進(jìn)行本地化處理,部分代碼如下:
- // 獲取緩存的service信息
 - ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
 - if (serviceObj == null) {
 - // 根據(jù)serviceName從注冊中心服務(wù)端獲取Service信息
 - serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
 - serviceInfoHolder.processServiceInfo(serviceObj);
 - lastRefTime = serviceObj.getLastRefTime();
 - return;
 - }
 
這部分邏輯在上篇文章中已經(jīng)分析過了,這里重點看serviceInfoHolder#processServiceInfo中的業(yè)務(wù)邏輯處理。先看流程圖,然后看代碼。
上述邏輯簡單說就是:判斷一下新的ServiceInfo數(shù)據(jù)是否正確,是否發(fā)生了變化。如果數(shù)據(jù)格式正確,且發(fā)生的變化,那就發(fā)布一個InstancesChangeEvent事件,同時將ServiceInfo寫入本地緩存。
下面看一下代碼實現(xiàn):
- public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
 - String serviceKey = serviceInfo.getKey();
 - if (serviceKey == null) {
 - return null;
 - }
 - ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
 - if (isEmptyOrErrorPush(serviceInfo)) {
 - //empty or error push, just ignore
 - return oldService;
 - }
 - // 緩存服務(wù)信息
 - serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
 - // 判斷注冊的實例信息是否已變更
 - boolean changed = isChangedServiceInfo(oldService, serviceInfo);
 - if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
 - serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
 - }
 - // 通過prometheus-simpleclient監(jiān)控服務(wù)緩存Map的大小
 - MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
 - // 服務(wù)實例已變更
 - if (changed) {
 - NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
 - + JacksonUtils.toJson(serviceInfo.getHosts()));
 - // 添加實例變更事件,會被推動到訂閱者執(zhí)行
 - NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
 - serviceInfo.getClusters(), serviceInfo.getHosts()));
 - // 記錄Service本地文件
 - DiskCache.write(serviceInfo, cacheDir);
 - }
 - return serviceInfo;
 - }
 
可以對照流程圖和代碼中的注釋部分進(jìn)行理解這個過程。
我們要講的重點是服務(wù)信息變更之后,發(fā)布的InstancesChangeEvent,也就是流程圖中標(biāo)紅的部分。
事件追蹤
上面的事件是通過NotifyCenter進(jìn)行發(fā)布的,NotifyCenter中的核心流程如下:
NotifyCenter中進(jìn)行事件發(fā)布,發(fā)布的核心邏輯是:
- 根據(jù)InstancesChangeEvent事件類型,獲得對應(yīng)的CanonicalName;
 - 將CanonicalName作為Key,從NotifyCenter#publisherMap中獲取對應(yīng)的事件發(fā)布者(EventPublisher);
 - EventPublisher將InstancesChangeEvent事件進(jìn)行發(fā)布。
 
NotifyCenter中的核心代碼實現(xiàn)如下:
- private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
 - if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
 - return INSTANCE.sharePublisher.publish(event);
 - }
 - // 根據(jù)InstancesChangeEvent事件類型,獲得對應(yīng)的CanonicalName;
 - final String topic = ClassUtils.getCanonicalName(eventType);
 - // 將CanonicalName作為Key,從NotifyCenter#publisherMap中獲取對應(yīng)的事件發(fā)布者(EventPublisher);
 - EventPublisher publisher = INSTANCE.publisherMap.get(topic);
 - if (publisher != null) {
 - // EventPublisher將InstancesChangeEvent事件進(jìn)行發(fā)布。
 - return publisher.publish(event);
 - }
 - LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
 - return false;
 - }
 
上述代碼中的INSTANCE為NotifyCenter的單例模式實現(xiàn)。那么,這里的publisherMap中key(CanonicalName)和value(EventPublisher)之間的關(guān)系是什么時候建立的呢?
這個是在NacosNamingService實例化時調(diào)用init方法中進(jìn)行綁定的:
- // Publisher的注冊過程在于建立InstancesChangeEvent.class與EventPublisher的關(guān)系。
 - NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
 
registerToPublisher方法默認(rèn)采用了DEFAULT_PUBLISHER_FACTORY來進(jìn)行構(gòu)建。
- public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
 - return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
 - }
 
如果查看NotifyCenter中靜態(tài)代碼塊,會發(fā)現(xiàn)DEFAULT_PUBLISHER_FACTORY默認(rèn)構(gòu)建的EventPublisher為DefaultPublisher。
至此,我們得知,在NotifyCenter中它維護(hù)了事件名稱和事件發(fā)布者的關(guān)系,而默認(rèn)的事件發(fā)布者為DefaultPublisher。
DefaultPublisher的事件發(fā)布
查看DefaultPublisher的源碼,會發(fā)現(xiàn)它繼承自Thread,也就是說它是一個線程類。同時,它又實現(xiàn)了EventPublisher,也就是我們前面提到的發(fā)布者。
- public class DefaultPublisher extends Thread implements EventPublisher {}
 
在DefaultPublisher的init方法實現(xiàn)如下:
- @Override
 - public void init(Class<? extends Event> type, int bufferSize) {
 - // 守護(hù)線程
 - setDaemon(true);
 - // 設(shè)置線程名字
 - setName("nacos.publisher-" + type.getName());
 - this.eventType = type;
 - this.queueMaxSize = bufferSize;
 - // 阻塞隊列初始化
 - this.queue = new ArrayBlockingQueue<>(bufferSize);
 - start();
 - }
 
也就是說,當(dāng)DefaultPublisher被初始化時,是以守護(hù)線程的方式運作的,其中還初始化了一個阻塞隊列,隊列的默認(rèn)大小為16384。
最后調(diào)用了start方法:
- @Override
 - public synchronized void start() {
 - if (!initialized) {
 - // start just called once
 - super.start();
 - if (queueMaxSize == -1) {
 - queueMaxSize = ringBufferSize;
 - }
 - initialized = true;
 - }
 - }
 
start方法中調(diào)用了super.start,此時等于啟動了線程,會執(zhí)行對應(yīng)的run方法。
run方法中只調(diào)用了如下方法:
- void openEventHandler() {
 - try {
 - // This variable is defined to resolve the problem which message overstock in the queue.
 - int waitTimes = 60;
 - // for死循環(huán)不斷的從隊列中取出Event,并通知訂閱者Subscriber執(zhí)行Event
 - // To ensure that messages are not lost, enable EventHandler when
 - // waiting for the first Subscriber to register
 - for (; ; ) {
 - if (shutdown || hasSubscriber() || waitTimes <= 0) {
 - break;
 - }
 - ThreadUtils.sleep(1000L);
 - waitTimes--;
 - }
 - for (; ; ) {
 - if (shutdown) {
 - break;
 - }
 - // // 從隊列取出Event
 - final Event event = queue.take();
 - receiveEvent(event);
 - UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
 - }
 - } catch (Throwable ex) {
 - LOGGER.error("Event listener exception : ", ex);
 - }
 - }
 
這里寫了兩個死循環(huán),第一個死循環(huán)可以理解為延時效果,也就是說線程啟動時最大延時60秒,在這60秒中每隔1秒判斷一下當(dāng)前線程是否關(guān)閉,是否有訂閱者,是否超過60秒。如果滿足一個條件,就可以提前跳出死循環(huán)。
而第二個死循環(huán)才是真正的業(yè)務(wù)邏輯處理,會從阻塞隊列中取出一個事件,然后通過receiveEvent方法進(jìn)行執(zhí)行。
那么,隊列中的事件哪兒來的呢?此時,你可能已經(jīng)想到剛才DefaultPublisher的發(fā)布事件方法被調(diào)用了。來看看它的publish方法實現(xiàn):
- @Override
 - public boolean publish(Event event) {
 - checkIsStart();
 - boolean success = this.queue.offer(event);
 - if (!success) {
 - LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
 - receiveEvent(event);
 - return true;
 - }
 - return true;
 - }
 
可以看到,DefaultPublisher的publish方法的確就是往阻塞隊列中存入事件。這里有個分支邏輯,如果存入失敗,會直接調(diào)用receiveEvent,和從隊列中取出事件執(zhí)行的方法一樣??梢岳斫鉃?,如果向隊列中存入失敗,則立即執(zhí)行,不走隊列了。
最后,再來看看receiveEvent方法的實現(xiàn):
- void receiveEvent(Event event) {
 - final long currentEventSequence = event.sequence();
 - if (!hasSubscriber()) {
 - LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.");
 - return;
 - }
 - // 通知訂閱者執(zhí)行Event
 - // Notification single event listener
 - for (Subscriber subscriber : subscribers) {
 - // Whether to ignore expiration events
 - if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
 - LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
 - event.getClass());
 - continue;
 - }
 - // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
 - // Remove original judge part of codes.
 - notifySubscriber(subscriber, event);
 - }
 - }
 
這里最主要的邏輯就是遍歷DefaultPublisher的subscribers(訂閱者集合),然后執(zhí)行通知訂閱者的方法。
那么有朋友要問了這subscribers中的訂閱者哪里來的呢?這個還要回到NacosNamingService的init方法中:
- // 將Subscribe注冊到Publisher
 - NotifyCenter.registerSubscriber(changeNotifier);
 
該方法最終會調(diào)用NotifyCenter的addSubscriber方法:
- private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
 - EventPublisherFactory factory) {
 - final String topic = ClassUtils.getCanonicalName(subscribeType);
 - synchronized (NotifyCenter.class) {
 - // MapUtils.computeIfAbsent is a unsafe method.
 - MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
 - }
 - // 獲取時間對應(yīng)的Publisher
 - EventPublisher publisher = INSTANCE.publisherMap.get(topic);
 - if (publisher instanceof ShardedEventPublisher) {
 - ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
 - } else {
 - // 添加到subscribers集合
 - publisher.addSubscriber(consumer);
 - }
 - }
 
其中核心邏輯就是將訂閱事件、發(fā)布者、訂閱者三者進(jìn)行綁定。而發(fā)布者與事件通過Map進(jìn)行維護(hù)、發(fā)布者與訂閱者通過關(guān)聯(lián)關(guān)系進(jìn)行維護(hù)。
發(fā)布者找到了,事件也有了,最后看一下notifySubscriber方法:
- @Override
 - public void notifySubscriber(final Subscriber subscriber, final Event event) {
 - LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
 - // 執(zhí)行訂閱者Event
 - final Runnable job = () -> subscriber.onEvent(event);
 - final Executor executor = subscriber.executor();
 - if (executor != null) {
 - executor.execute(job);
 - } else {
 - try {
 - job.run();
 - } catch (Throwable e) {
 - LOGGER.error("Event callback exception: ", e);
 - }
 - }
 - }
 
邏輯比較簡單,如果訂閱者定義了Executor,那么使用它定義的Executor進(jìn)行事件的執(zhí)行,如果沒有,那就創(chuàng)建一個線程進(jìn)行執(zhí)行。
至此,整個服務(wù)訂閱的事件機(jī)制完成。
小結(jié)
整體來看,整個服務(wù)訂閱的事件機(jī)制還是比較復(fù)雜的,因為用到了事件的形式,邏輯就比較繞,而且這期間還摻雜了守護(hù)線程,死循環(huán),阻塞隊列等。需要重點理解NotifyCenter對事件發(fā)布者、事件訂閱者和事件之間關(guān)系的維護(hù),而這一關(guān)系的維護(hù)的入口就位于NacosNamingService的init方法當(dāng)中。
下面再梳理一下幾個核心流程:
ServiceInfoHolder中通過NotifyCenter發(fā)布了InstancesChangeEvent事件;
NotifyCenter中進(jìn)行事件發(fā)布,發(fā)布的核心邏輯是:
- 根據(jù)InstancesChangeEvent事件類型,獲得對應(yīng)的CanonicalName;
 - 將CanonicalName作為Key,從NotifyCenter#publisherMap中獲取對應(yīng)的事件發(fā)布者(EventPublisher);
 - EventPublisher將InstancesChangeEvent事件進(jìn)行發(fā)布。
 - InstancesChangeEvent事件發(fā)布:
 
通過EventPublisher的實現(xiàn)類DefaultPublisher進(jìn)行InstancesChangeEvent事件發(fā)布;
- DefaultPublisher本身以守護(hù)線程的方式運作,在執(zhí)行業(yè)務(wù)邏輯前,先判斷該線程是否啟動;
 - 如果啟動,則將事件添加到BlockingQueue中,隊列默認(rèn)大小為16384;
 - 添加到BlockingQueue成功,則整個發(fā)布過程完成;
 - 如果添加失敗,則直接調(diào)用DefaultPublisher#receiveEvent方法,接收事件并通知訂閱者;
 - 通知訂閱者時創(chuàng)建一個Runnable對象,執(zhí)行訂閱者的Event。
 - Event事件便是執(zhí)行訂閱時傳入的事件;
 
如果添加到BlockingQueue成功,則走另外一個業(yè)務(wù)邏輯:
- DefaultPublisher初始化時會創(chuàng)建一個阻塞(BlockingQueue)隊列,并標(biāo)記線程啟動;
 - DefaultPublisher本身是一個Thread,當(dāng)執(zhí)行super.start方法時,會調(diào)用它的run方法;
 - run方法的核心業(yè)務(wù)邏輯是通過openEventHandler方法處理的;
 - openEventHandler方法通過兩個for循環(huán),從阻塞隊列中獲取時間信息;
 - 第一個for循環(huán)用于讓線程啟動時在60s內(nèi)檢查執(zhí)行條件;
 - 第二個for循環(huán)為死循環(huán),從阻塞隊列中獲取Event,并調(diào)用DefaultPublisher#receiveEvent方法,接收事件并通知訂閱者;
 - Event事件便是執(zhí)行訂閱時傳入的事件;
 
關(guān)于Nacos Client服務(wù)定義的事件機(jī)制就將這么多,下篇我們來講講故障轉(zhuǎn)移和緩存的實現(xiàn)。


















 
 
 






 
 
 
 