Nacos 的長(zhǎng)輪詢定時(shí)機(jī)制,太好用了!
今天這篇文章來(lái)介紹一下Nacos配置中心的原理之一:長(zhǎng)輪詢機(jī)制的應(yīng)用
為方便理解與表達(dá),這里把 Nacos 控制臺(tái)和 Nacos 注冊(cè)中心稱為 Nacos 服務(wù)器(就是 web 界面那個(gè)),我們編寫(xiě)的業(yè)務(wù)服務(wù)稱為 Nacso 客戶端;
Nacos 動(dòng)態(tài)監(jiān)聽(tīng)的長(zhǎng)輪詢機(jī)制原理圖,本篇將圍繞這張圖剖析長(zhǎng)輪詢定時(shí)機(jī)制的原理:
圖片
ConfigService 是 Nacos 客戶端提供的用于訪問(wèn)實(shí)現(xiàn)配置中心基本操作的類,我們將從 ConfigService 的實(shí)例化開(kāi)始長(zhǎng)輪詢定時(shí)機(jī)制的源碼之旅;
1. 客戶端的長(zhǎng)輪詢定時(shí)機(jī)制
我們從NacosPropertySourceLocator.locate()開(kāi)始【斷點(diǎn)步入】:
圖片
1.1 利用反射機(jī)制實(shí)例化 NacosConfigService 對(duì)象
客戶端的長(zhǎng)輪詢定時(shí)任務(wù)是在 NacosFactory.createConfigService() 方法中,構(gòu)建 ConfigService 對(duì)象實(shí)例時(shí)啟動(dòng)的,我們接著 1.1 處的源碼;
進(jìn)入 NacosFactory.createConfigService():
public static ConfigService createConfigService(Properties properties) throws NacosException {
    //【斷點(diǎn)步入】創(chuàng)建 ConfigService
    return ConfigFactory.createConfigService(properties);
}進(jìn)入 ConfigFactory.createConfigService(),發(fā)現(xiàn)其使用反射機(jī)制實(shí)例化 NacosConfigService 對(duì)象;
圖片
1.2 NacosConfigService 的構(gòu)造方法里啟動(dòng)長(zhǎng)輪詢定時(shí)任務(wù)
進(jìn)入 NacosConfigService.NacosConfigService() 構(gòu)造方法,里面設(shè)置了一些更遠(yuǎn)程任務(wù)相關(guān)的屬性;
圖片
1.2.1 初始化 HttpAgent
MetricsHttpAgent 類的設(shè)計(jì)如下:
圖片
ServerHttpAgent 類的設(shè)計(jì)如下:
圖片
1.2.2 初始化 ClientWorker
進(jìn)入 ClientWorker.ClientWorker() 構(gòu)造方法,主要是創(chuàng)建了兩個(gè)定時(shí)調(diào)度的線程池,并啟動(dòng)一個(gè)定時(shí)任務(wù);
圖片
進(jìn)入 ClientWorker.checkConfigInfo(),每隔 10s 檢查一次配置是否發(fā)生變化;
- cacheMap:是一個(gè) AtomicReference<Map<String, CacheData>> 對(duì)象,用來(lái)存儲(chǔ)監(jiān)聽(tīng)變更的緩存集合,key 是根據(jù) datalD/group/tenant(租戶)拼接的值。Value 是對(duì)應(yīng)的存儲(chǔ)在 Nacos 服務(wù)器上的配置文件的內(nèi)容;
 - 長(zhǎng)輪詢?nèi)蝿?wù)拆分:默認(rèn)情況下,每個(gè)長(zhǎng)輪詢 LongPollingRunnable 任務(wù)處理3000個(gè)監(jiān)聽(tīng)配置集。如果超過(guò)3000個(gè),則需要啟動(dòng)多個(gè) LongPollingRunnable 去執(zhí)行;
 
圖片
1.3 檢查配置變更,讀取變更配置 LongPollingRunnable.run()
因?yàn)槲覀儧](méi)有這么多配置項(xiàng),debug 不進(jìn)去,所以直接找到 LongPollingRunnable.run() 方法,該方法的主要邏輯是:
- 根據(jù) taskld 對(duì) cacheMap 進(jìn)行數(shù)據(jù)分割;
 - 再通過(guò) 
checkLocalConfig()方法比較本地配置文件(在${user}\nacos\config\里)的數(shù)據(jù)是否存在變更,如果有變更則直接觸發(fā)通知; 
public void run() {
    List<CacheData> cacheDatas = new ArrayList();
    ArrayList inInitializingCacheList = new ArrayList();
    try {
        //遍歷 CacheData,檢查本地配置
        Iterator var3 = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();
        while(var3.hasNext()) {
            CacheData cacheData = (CacheData)var3.next();
            if (cacheData.getTaskId() == this.taskId) {
                cacheDatas.add(cacheData);
                try {
                    //檢查本地配置
                    ClientWorker.this.checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception var13) {
                    ClientWorker.LOGGER.error("get local config info error", var13);
                }
            }
        }
        //【斷點(diǎn)步入 1.3.1】通過(guò)長(zhǎng)輪詢請(qǐng)求檢查服務(wù)端對(duì)應(yīng)的配置是否發(fā)生變更
        List<String> changedGroupKeys = ClientWorker.this.checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        //遍歷存在變更的 groupKey,重新加載最新數(shù)據(jù)
        Iterator var16 = changedGroupKeys.iterator();
        while(var16.hasNext()) {
            String groupKey = (String)var16.next();
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                //【斷點(diǎn)步入 1.3.2】讀取變更配置,這里的 dataId、group 和 tenant 是【1.3.1】里獲取的
                String content = ClientWorker.this.getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant));
                cache.setContent(content);
                ClientWorker.LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, cnotallow={}", new Object[]{ClientWorker.this.agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)});
            } catch (NacosException var12) {
                String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", ClientWorker.this.agent.getName(), dataId, group, tenant);
                ClientWorker.LOGGER.error(message, var12);
            }
        }
        //觸發(fā)事件通知
        var16 = cacheDatas.iterator();
        while(true) {
            CacheData cacheDatax;
            do {
                if (!var16.hasNext()) {
                    inInitializingCacheList.clear();
                    //繼續(xù)定時(shí)執(zhí)行當(dāng)前線程
                    ClientWorker.this.executorService.execute(this);
                    return;
                }
                cacheDatax = (CacheData)var16.next();
            } while(cacheDatax.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId, cacheDatax.group, cacheDatax.tenant)));
            cacheDatax.checkListenerMd5();
            cacheDatax.setInitializing(false);
        }
    } catch (Throwable var14) {
        ClientWorker.LOGGER.error("longPolling error : ", var14);
        ClientWorker.this.executorService.schedule(this, (long)ClientWorker.this.taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}注意:這里的斷點(diǎn)需要在 Nacos 服務(wù)器上修改配置(間隔大于 30s),進(jìn)入后才好理解;
1.3.1 檢查配置變更 ClientWorker.checkUpdateDataIds()
我們點(diǎn)進(jìn) ClientWorker.checkUpdateDataIds() 方法,發(fā)現(xiàn)其最終調(diào)用的是 ClientWorker.checkUpdateConfigStr() 方法,其實(shí)現(xiàn)邏輯與源碼如下:
- 通過(guò) 
MetricsHttpAgent.httpPost()方法(上面 1.2.1 有提到)調(diào)用/v1/cs/configs/listener接口實(shí)現(xiàn)長(zhǎng)輪詢請(qǐng)求; - 長(zhǎng)輪詢請(qǐng)求在實(shí)現(xiàn)層面只是設(shè)置了一個(gè)比較長(zhǎng)的超時(shí)時(shí)間,默認(rèn)是 30s;
 - 如果服務(wù)端的數(shù)據(jù)發(fā)生了變更,客戶端會(huì)收到一個(gè) HttpResult ,服務(wù)端返回的是存在數(shù)據(jù)變更的 Data ID、Group、Tenant;
 - 獲得這些信息之后,在 
LongPollingRunnable.run()方法中調(diào)用 getServerConfig() 去 Nacos 服務(wù)器上讀取具體的配置內(nèi)容; 
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
    List<String> params = Arrays.asList("Listening-Configs", probeUpdateString);
    List<String> headers = new ArrayList(2);
    headers.add("Long-Pulling-Timeout");
    headers.add("" + this.timeout);
    if (isInitializingCacheList) {
        headers.add("Long-Pulling-Timeout-No-Hangup");
        headers.add("true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    } else {
        try {
            //調(diào)用 /v1/cs/configs/listener 接口實(shí)現(xiàn)長(zhǎng)輪詢請(qǐng)求,返回的 HttpResult 里包含存在數(shù)據(jù)變更的 Data ID、Group、Tenant
            HttpResult result = this.agent.httpPost("/v1/cs/configs/listener", headers, params, this.agent.getEncode(), this.timeout);
            if (200 == result.code) {
                this.setHealthServer(true);
                //
                returnthis.parseUpdateDataIdResponse(result.content);
            }
            this.setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", this.agent.getName(), result.code);
        } catch (IOException var6) {
            this.setHealthServer(false);
            LOGGER.error("[" + this.agent.getName() + "] [check-update] get changed dataId exception", var6);
            throw var6;
        }
        return Collections.emptyList();
    }
}1.3.2 讀取變更配置 ClientWorker.getServerConfig()
進(jìn)入 ClientWorker.getServerConfig() 方法;讀取服務(wù)器上的變更配置;最終調(diào)用的是 MetricsHttpAgent.httpGet() 方法(上面 1.2.1 有提到),調(diào)用 /v1/cs/configs 接口獲取配置;然后通過(guò)調(diào)用 LocalConfigInfoProcessor.saveSnapshot() 將變更的配置保存到本地;
圖片
圖片
2. 服務(wù)端的長(zhǎng)輪詢定時(shí)機(jī)制
2.1 服務(wù)器接收請(qǐng)求 ConfigController.listener()
Nacos客戶端 通過(guò) HTTP 協(xié)議與服務(wù)器通信,那么在服務(wù)器源碼里必然有對(duì)應(yīng)接口的實(shí)現(xiàn);
在 nacos-config 模塊下的 controller 包,提供了個(gè) ConfigController 類來(lái)處理請(qǐng)求,其中有個(gè) /listener 接口,是客戶端發(fā)起數(shù)據(jù)監(jiān)聽(tīng)的接口,其主要邏輯和源碼如下:
- 獲取客戶端需要監(jiān)聽(tīng)的可能發(fā)生變化的配置,并計(jì)算 MD5 值;
 ConfigServletInner.doPollingConfig()開(kāi)始執(zhí)行長(zhǎng)輪詢請(qǐng)求;
2.2 執(zhí)行長(zhǎng)輪詢請(qǐng)求 ConfigSer
圖片
vletInner.doPollingConfig()
進(jìn)入 ConfigServletInner.doPollingConfig() 方法,該方法封裝了長(zhǎng)輪詢的實(shí)現(xiàn)邏輯,同時(shí)兼容短輪詢邏輯;
圖片
進(jìn)入 LongPollingService.addLongPollingClient() 方法,里面是長(zhǎng)輪詢的核心處理邏輯,主要作用是把客戶端的長(zhǎng)輪詢請(qǐng)求封裝成 ClientPolling 交給 scheduler 執(zhí)行;
圖片
2.3 創(chuàng)建線程執(zhí)行定時(shí)任務(wù) ClientLongPolling.run()
我們找到 ClientLongPolling.run() 方法,這里可以體現(xiàn)長(zhǎng)輪詢定時(shí)機(jī)制的核心原理,通俗來(lái)說(shuō),就是:
- 服務(wù)端收到請(qǐng)求之后,不立即返回,沒(méi)有變更則在延后 (30-0.5)s 把請(qǐng)求結(jié)果返回給客戶端;
 - 這就使得客戶端和服務(wù)端之間在 30s 之內(nèi)數(shù)據(jù)沒(méi)有發(fā)生變化的情況下一直處于連接狀態(tài);
 
圖片
2.4 監(jiān)聽(tīng)配置變更事件
2.4.1 監(jiān)聽(tīng) LocalDataChangeEvent 事件的實(shí)現(xiàn)
當(dāng)我們?cè)?Nacos 服務(wù)器或通過(guò) API 方式變更配置后,會(huì)發(fā)布一個(gè) LocalDataChangeEvent 事件,該事件會(huì)被 LongPollingService 監(jiān)聽(tīng);
這里 LongPollingService 為什么具有監(jiān)聽(tīng)功能在 1.3.1 版本后有些變化:
- 1.3.1 前:
LongPollingService.onEvent(); - 1.3.1 后:
Subscriber.onEvent(); 
在 Nacos 1.3.1 版本之前,通過(guò) LongPollingService 繼承 AbstractEventListener 實(shí)現(xiàn)監(jiān)聽(tīng),覆蓋 onEvent() 方法;
圖片
而在 1.3.2 版本之后,通過(guò)構(gòu)造訂閱者實(shí)現(xiàn)
圖片
效果是一樣的,實(shí)現(xiàn)了對(duì) LocalDataChangeEvent 事件的監(jiān)聽(tīng),并通過(guò)通過(guò)線程池執(zhí)行 DataChangeTask 任務(wù);
2.4.2 監(jiān)聽(tīng)事件后的處理邏輯 DataChangeTask.run()
我們找到 DataChangeTask.run() 方法,這個(gè)線程任務(wù)實(shí)現(xiàn)了
圖片
3. 源碼結(jié)構(gòu)圖小結(jié)
3.1 客戶端的長(zhǎng)輪詢定時(shí)機(jī)制
NacosPropertySourceLocator.locate() :初始化 ConfigService 對(duì)象,定位配置;
- NacosConfigService.NacosConfigService() :NacosConfigService 的構(gòu)造方法;
 - Executors.newScheduledThreadPool() :創(chuàng)建 executor 線程池;
 - Executors.newScheduledThreadPool() :創(chuàng)建 executorService 線程池;
 - ClientWorker.checkConfigInfo() :使用 executor 線程池檢查配置是否發(fā)生變化;
 - ClientWorker.checkLocalConfig() :檢查本地配置;
 - ClientWorker.checkUpdateDataIds() :檢查服務(wù)端對(duì)應(yīng)的配置是否發(fā)生變更;
 - ClientWorker.getServerConfig() :讀取變更配置
 - MetricsHttpAgent.httpPost() :調(diào)用 /v1/cs/configs/listener 接口實(shí)現(xiàn)長(zhǎng)輪詢請(qǐng)求;
 - ClientWorker.checkUpdateConfigStr() :檢查服務(wù)端對(duì)應(yīng)的配置是否發(fā)生變更;
 - MetricsHttpAgent.httpGet() :調(diào)用 /v1/cs/configs 接口獲取配置;
 - LongPollingRunnable.run() :運(yùn)行長(zhǎng)輪詢定時(shí)線程;
 - MetricsHttpAgent.MetricsHttpAgent() :初始化 HttpAgent;
 - ClientWorker.ClientWorker() :初始化 ClientWorker;
 - NacosFactory.createConfigService() :創(chuàng)建配置服務(wù)器;
 - ConfigFactory.createConfigService() :利用反射機(jī)制創(chuàng)建配置服務(wù)器;
 
3.2 服務(wù)端的長(zhǎng)輪詢定時(shí)機(jī)制
ConfigController.listener() :服務(wù)器接收請(qǐng)求;
- LongPollingService.addLongPollingClient() :長(zhǎng)輪詢的核心處理邏輯,提前 500ms 返回響應(yīng);
 - ClientLongPolling.run() :長(zhǎng)輪詢定時(shí)機(jī)制的實(shí)現(xiàn)邏輯;
 - Map.put() :將 ClientLongPolling 實(shí)例本身添加到 allSubs 隊(duì)列中;
 - Queue.remove() :把 ClientLongPolling 實(shí)例本身從 allSubs 隊(duì)列中移除;
 - MD5Util.compareMd5() :比較數(shù)據(jù)的 MD5 值;
 - LongPollingService.sendResponse() :將變更的結(jié)果通過(guò) response 返回給客戶端;
 - ConfigExecutor.scheduleLongPolling() :?jiǎn)?dòng)定時(shí)任務(wù),延時(shí)時(shí)間為 29.5s;
 - HttpServletRequest.getHeader() :獲取客戶端設(shè)置的請(qǐng)求超時(shí)時(shí)間;
 - MD5Util.compareMd5() :和服務(wù)端的數(shù)據(jù)進(jìn)行 MD5 對(duì)比;
 - ConfigExecutor.executeLongPolling() :創(chuàng)建 ClientLongPolling 線程執(zhí)行定時(shí)任務(wù);
 - MD5Util.getClientMd5Map() :計(jì)算 MD5 值;
 - ConfigServletInner.doPollingConfig() :執(zhí)行長(zhǎng)輪詢請(qǐng)求;
 
3.3 Nacos 服務(wù)器配置變更的事件監(jiān)聽(tīng)
Nacos 服務(wù)器上的配置發(fā)生變更后,發(fā)布一個(gè) LocalDataChangeEvent 事件;
Subscriber.onEvent() :監(jiān)聽(tīng) LocalDataChangeEvent 事件(1.3.2 版本后);
DataChangeTask.run() :根據(jù) groupKey 返回配置;
ConfigExecutor.executeLongPolling() :通過(guò)線程池執(zhí)行 DataChangeTask 任務(wù)。















 
 
 













 
 
 
 