偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Nacos 服務(wù)變更推送流程全解析

開(kāi)發(fā)
本文將從一次服務(wù)下線的請(qǐng)求結(jié)合并結(jié)合源碼分析的方式來(lái)講解一下Nacos服務(wù)實(shí)例狀態(tài)變更時(shí)是如何實(shí)時(shí)推送的服務(wù)消費(fèi)者的。

我們以Nacos 2.3.x版本為例,作為服務(wù)注冊(cè)中心,它的服務(wù)注冊(cè)和服務(wù)下線實(shí)時(shí)性感知相較于Consul的Raft一致性確認(rèn),亦或者Eureka定時(shí)拉取同步的機(jī)制來(lái)說(shuō)做了很好的折中,既避免了實(shí)現(xiàn)的復(fù)雜性又能保證較好的實(shí)時(shí)性,所以本文將從一次服務(wù)下線的請(qǐng)求結(jié)合并結(jié)合源碼分析的方式來(lái)講解一下Nacos服務(wù)實(shí)例狀態(tài)變更時(shí)是如何實(shí)時(shí)推送的服務(wù)消費(fèi)者的。

一、項(xiàng)目架構(gòu)介紹

1. 服務(wù)通信流程

這里我們先簡(jiǎn)單介紹的一下項(xiàng)目的架構(gòu),筆者將基于Nacos的源碼搭建一套基礎(chǔ)的服務(wù)注冊(cè)中心,然后提供兩個(gè)nacos-provider服務(wù)作為服務(wù)提供者(端口號(hào)分別是9001和9002),而nacos-consumer作為服務(wù)消費(fèi)者通過(guò)nacos或者nacos-provider的信息發(fā)起服務(wù)調(diào)用。

基于這個(gè)架構(gòu),我們會(huì)嚴(yán)格按照如下步驟完成實(shí)驗(yàn)和源碼分析:

  • 將9002端口的服務(wù)提供者下線,查看服務(wù)提供者如何完成服務(wù)下線通知。
  • 查看nacos收到該請(qǐng)求后,內(nèi)部如何處理該消息,并將消息通知給消費(fèi)者。
  • 消費(fèi)者收到該請(qǐng)求后,如何更新本地緩存。

2. 服務(wù)提供者查詢接口

這里我們也給出服務(wù)提供者nacos-provider的http接口,可以看到該接口會(huì)返回當(dāng)前服務(wù)的名稱和端口號(hào):

@GetMapping("/provide")
    public String provide() {
        log.info("請(qǐng)求打到服務(wù)提供者provide上");
        Map<String, String> map = new HashMap<>();
        map.put("provider", env.getProperty("spring.application.name"));
        map.put("port", env.getProperty("server.port"));
        return JSONUtil.toJsonStr(map);
    }

3. 服務(wù)消費(fèi)者

而服務(wù)消費(fèi)者也通過(guò)feign聲明引入該調(diào)用:

@FeignClient(name = "nacos-provider")
public interface NacosProvider {


    @GetMapping("/provide")
    String provide();
}

后續(xù)我們就可以通過(guò)服務(wù)提供者的test接口調(diào)用到nacos-provider的provide接口:

@Resource
    private NacosProvider nacosProvider;

    @GetMapping("/test")
    public String test() {
        return nacosProvider.provide();
    }

需要注意的是,nacos-consumer如果沒(méi)有顯示調(diào)用nacos-provider是不會(huì)訂閱該提供者的所有實(shí)例信息,所以我們?yōu)榱朔奖闼餍栽诜?wù)啟動(dòng)時(shí)主動(dòng)發(fā)起訂閱:

@Component
publicclass TestRunner implements CommandLineRunner {

    privatefinalstatic Logger log = LoggerFactory
            .getLogger(TestRunner.class);
    @Override
    public void run(String... args) throws Exception {
        //主動(dòng)向nacos發(fā)起服務(wù)訂閱請(qǐng)求
        NamingService naming = NamingFactory.createNamingService("127.0.0.1:8848");
        naming.subscribe("nacos-provider", event -> {
            if (event instanceof NamingEvent) {
                //日志打印監(jiān)聽(tīng)到的服務(wù)名稱和結(jié)果
                log.info("監(jiān)聽(tīng)到服務(wù)名稱:{},實(shí)例信息:{}", ((NamingEvent) event).getServiceName(),
                        ((NamingEvent) event).getInstances());
            }

        });
    }
}

二、基于服務(wù)提供者下線詳解Nacos實(shí)例狀態(tài)推送

1. 服務(wù)提供者優(yōu)雅關(guān)閉并推送服務(wù)下線消息

基于上述架構(gòu),我們通過(guò)IDEA將9002的服務(wù)提供者關(guān)閉,注意如果用IDEA停止按鈕操作就會(huì)斷開(kāi)調(diào)試的連接,我們就無(wú)法調(diào)試服務(wù)下線的源碼,正確是做法是如下代碼的方式主動(dòng)獲取啟動(dòng)時(shí)的上下文通過(guò)close方法顯示關(guān)閉:

@SpringBootApplication
@EnableDiscoveryClient
public class NacosProviderApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(NacosProviderApplication.class, args);
        //主動(dòng)將springboot容器關(guān)閉
        context.close();
    }

}

Spring上下文close方法執(zhí)行關(guān)閉操作,此時(shí)spring就會(huì)遍歷所有的虛擬機(jī)鉤子即shutdown hook,對(duì)應(yīng)我們的服務(wù)提供者在啟動(dòng)時(shí)注冊(cè)的shutdown Hook即NacosAutoServiceRegistration的close方法就會(huì)發(fā)起服務(wù)下線請(qǐng)求,一旦完成服務(wù)下線請(qǐng)求通知之后,服務(wù)提供者就會(huì)銷毀RPC連接以及所有工作線程:

對(duì)應(yīng)的我給出這個(gè)close方法的入口,因?yàn)镹acosAutoServiceRegistration繼承自AbstractAutoServiceRegistration,所以它繼承了這個(gè)抽象類的shutdown hook方法destroy,這就使得spring boot容器關(guān)閉后,就會(huì)觸發(fā)下面這個(gè)方法:

@PreDestroy
 public void destroy() {
  stop();
 }

此時(shí),這個(gè)stop方法在進(jìn)行CAS樂(lè)觀鎖狀態(tài)修改后,執(zhí)行如下兩件事:

  • 發(fā)起RPC下線請(qǐng)求。
  • 銷毀相關(guān)工作線程和nacos維護(hù)的RPC連接。
public void stop() {
  if (this.getRunning().compareAndSet(true, false) && isEnabled()) {
  //發(fā)起RPC下線請(qǐng)求
   deregister();
   //......
   //銷毀相關(guān)工作線程和nacos維護(hù)的RPC連接
   this.serviceRegistry.close();
  }
 }

此時(shí)deregister會(huì)通過(guò)getRegistration拿到nacos的元信息,再通過(guò)NacosServiceRegistry的deregister發(fā)起服務(wù)下線請(qǐng)求:

protected void deregister() {
  this.serviceRegistry.deregister(getRegistration());
 }

最終就會(huì)走到NacosNamingService的deregisterInstance,很直觀的看到,它通過(guò)RPC代理clientProxy傳入服務(wù)名、分組、和實(shí)例信息并調(diào)用deregisterService發(fā)起服務(wù)下線請(qǐng)求:

@Override
    public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        clientProxy.deregisterService(serviceName, groupName, instance);
    }

對(duì)應(yīng)我們給出發(fā)起調(diào)用時(shí)傳入的參數(shù)信息:

2. Nacos服務(wù)端基于RPC推送服務(wù)下線

隨后Nacos服務(wù)端GrpcRequestAcceptor收到該請(qǐng)求后,流程比較長(zhǎng),執(zhí)行如下步驟:

  • 基于請(qǐng)求定位到處理器RequestHandler以服務(wù)下線為例就是InstanceRequestHandler。
  • InstanceRequestHandler發(fā)布一個(gè)ClientDeregisterServiceEvent事件,交由NotifyCenter投遞到任務(wù)隊(duì)列中。
  • NamingEventPublisher從隊(duì)列獲取到這個(gè)任務(wù)之后,找到ClientServiceIndexesManager處理該事件。
  • ClientServiceIndexesManager還是發(fā)布一個(gè)ServiceChangedEvent到上述的阻塞隊(duì)列中。
  • NamingSubscriberServiceV2Impl將其封成一個(gè)延遲任務(wù)提交到tasks中。

此時(shí)有個(gè)100ms執(zhí)行一次的定時(shí)器也就是PushDelayTaskExecuteEngine,將任務(wù)取出分發(fā)給TaskExecuteWorker,這個(gè)執(zhí)行者就會(huì)生成RPC請(qǐng)求將服務(wù)狀態(tài)變更通知給所有服務(wù)消費(fèi)者。

總體來(lái)說(shuō),Nacos服務(wù)端收到下線請(qǐng)求后,為避免下線通知影響服務(wù)端整體性能,其內(nèi)部設(shè)計(jì)了一套非常好的事件通知訂閱模型,當(dāng)服務(wù)端收到請(qǐng)求后,其內(nèi)部會(huì)根據(jù)請(qǐng)求類型找到相應(yīng)的處理器發(fā)布事件,讓對(duì)應(yīng)的訂閱者異步處理該消息?;谠撓⒆罱K會(huì)封裝成指定類型的任務(wù),提交到工作線程池中的某個(gè)worker的隊(duì)列中讓其異步消費(fèi),由此種大量解耦結(jié)合線程池的方式基于了nacos服務(wù)端最大的吞吐量和調(diào)優(yōu)空間。

對(duì)應(yīng)的我們也給出整體的業(yè)務(wù)流程圖,讀者可以參考該圖了解一下全過(guò)程:

對(duì)應(yīng)我們找到GrpcRequestAcceptor的request方法,可以看到它會(huì)基于該請(qǐng)求找到對(duì)應(yīng)的處理器,然后調(diào)用處理器的handleRequest方法處理該請(qǐng)求:

@Override
    public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
        
        traceIfNecessary(grpcRequest, true);
        //需要使用的服務(wù)器類型,例如服務(wù)下線就是 InstanceRequest
        String type = grpcRequest.getMetadata().getType();
        long startTime = System.nanoTime();
        
        //......
        //基于type到容器中獲取到響應(yīng)的處理器
        RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
        //......
        Request request = (Request) parseObj;
        try {
         //組裝連接信息
            Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
            RequestMeta requestMeta = new RequestMeta();
            requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
            requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
            requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
            requestMeta.setLabels(connection.getMetaInfo().getLabels());
            requestMeta.setAbilityTable(connection.getAbilityTable());
            connectionManager.refreshActiveTime(requestMeta.getConnectionId());
            //實(shí)際處理rpc請(qǐng)求的方法
            Response response = requestHandler.handleRequest(request, requestMeta);
           //......
        } catch (Throwable e) {
          //......
        }
        
    }

于是就找到了InstanceRequestHandler,該方法就會(huì)通過(guò)clientOperationService(也就是EphemeralClientOperationServiceImpl)發(fā)布ClientDeregisterServiceEvent事件:

private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
//基于ClientDeregisterServiceEvent發(fā)布服務(wù)下線事件
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        //.....
        
    }

隨后NamingEventPublisher收到該事件后調(diào)用handleEvent找到對(duì)應(yīng)的事件處理器處理器該事件:

private void handleEvents() {
        while (!shutdown) {
            try {
            //取出上述的任務(wù)
                final Event event = queue.take();
                handleEvent(event);//處理發(fā)布的事件
            } catch (InterruptedException e) {
              //......
            }
        }
    }

如下便是筆者的調(diào)試記錄,可以看到服務(wù)下線事件定位到了ClientServiceIndexesManager這個(gè)管理器進(jìn)行處理:

于是就來(lái)到了ClientServiceIndexesManager的onEvent方法,再次發(fā)布一個(gè)ServiceChangedEvent事件到上述提到的同一個(gè)阻塞隊(duì)列中:

@Override
    public void onEvent(Event event) {
        if (event instanceof ClientOperationEvent.ClientReleaseEvent) {
            handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event);
        } elseif (event instanceof ClientOperationEvent) {//處理服務(wù)注冊(cè)或者下線后的事件
            handleClientOperation((ClientOperationEvent) event);
        }
    }
    
    
    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//處理服務(wù)注冊(cè)事件,實(shí)際上就是發(fā)布一個(gè)ServiceChangedEvent事件
            addPublisherIndexes(service, clientId);
        } elseif (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) { //......
        } elseif (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
        //......
        } elseif (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            //......
        }
    }

隊(duì)列會(huì)通知NamingSubscriberServiceV2Impl進(jìn)行處理,它會(huì)將事件推送到延遲隊(duì)列中,這個(gè)隊(duì)列內(nèi)部是采用并發(fā)安全的ConcurrentHashMap進(jìn)行管理。

@Override
    public void onEvent(Event event) {
        if (event instanceof ServiceEvent.ServiceChangedEvent) {//給客戶端的服務(wù)改變事件
            // If service changed, push to all subscribers.
            ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
            Service service = serviceChangedEvent.getService();
            //將處理事件推送到隊(duì)列中
            delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
            MetricsMonitor.incrementServiceChangeCount(service);
        } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
            //......
        }
    }

任務(wù)提交之后就會(huì)另外一個(gè)線程processingExecutor(100ms處理一次)會(huì)將其取出后找到任務(wù)處理器處理PushDelayTaskExecuteEngine,隨后,這個(gè)任務(wù)處理引擎將任務(wù)交給NacosExecuteTaskExecuteEngine這個(gè)任務(wù)處理引擎:

protected void processTasks() {//通過(guò)remove拿出隊(duì)列的數(shù)據(jù)
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
            AbstractDelayTask task = removeTask(taskKey);
             //......
            //找到相應(yīng)處理器即PushDelayTaskExecuteEngine
            NacosTaskProcessor processor = getProcessor(taskKey);
            try {
                // ReAdd task if process failed
                if (!processor.process(task)) {//PushDelayTaskExecuteEngine將任務(wù)交給NacosExecuteTaskExecuteEngine這個(gè)任務(wù)處理引擎
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
               //......
            }
        }
    }

最后PushDelayTaskExecuteEngine會(huì)將該任務(wù)交給NacosExecuteTaskExecuteEngine中的某個(gè)工作線程TaskExecuteWorker的阻塞隊(duì)列中,最后TaskExecuteWorker就會(huì)取出該任務(wù)并消費(fèi):

@Override
        public void run() {
            while (!closed.get()) {
                try {
                //取出任務(wù)并處理
                    Runnable task = queue.take();
                    long begin = System.currentTimeMillis();
                    task.run();
                   //......
                } catch (Throwable e) {
                    log.error("[TASK-FAILED] " + e, e);
                }
            }
        }
    }

最后這個(gè)服務(wù)下線的任務(wù)即PushExecuteTask就會(huì)遍歷所有客戶端并通知它們nacos-provider下線:

@Override
    public void run() {
        try {
            PushDataWrapper wrapper = generatePushData();
            ClientManager clientManager = delayTaskEngine.getClientManager();
            for (String each : getTargetClientIds()) {//逐個(gè)遍歷客戶端,然后事件推送
                Client client = clientManager.getClient(each);
           //......
           //通過(guò)RPC接口推送服務(wù)下線通知
                delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                        new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));//發(fā)起RPC通知消費(fèi)者
            }
        } catch (Exception e) {
          //......
        }
    }

3. 消費(fèi)者更新實(shí)例緩存

服務(wù)消費(fèi)者RpcClient收到該請(qǐng)求后,會(huì)基于請(qǐng)求類型定位到服務(wù)端請(qǐng)求處理器,以我們下線通知為例就是NamingPushRequestHandler,由該處理器更新客戶端中記錄9002端口號(hào)的服務(wù)提供者nacos-provider狀態(tài)更新為下線,后續(xù)服務(wù)消費(fèi)者看到緩存中記錄的提供不可用時(shí)就會(huì)調(diào)用9001端口號(hào)的nacos-provider:

對(duì)應(yīng)的我們給出RpcClient的處理服務(wù)端請(qǐng)求的方法handleServerRequest,該方法會(huì)遍歷所有的服務(wù)端請(qǐng)求處理器,只要有一個(gè)處理器處理結(jié)果非空,就說(shuō)明找到相應(yīng)的處理器處理了,直接將響應(yīng)結(jié)果返回:

protected Response handleServerRequest(final Request request) {
        
       //.....
       //遍歷所有的服務(wù)端請(qǐng)求處理器
        for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
            try {
            //交給該處理器看看能否處理,若能處理則返回值非空
                Response response = serverRequestHandler.requestReply(request);
                //若非空說(shuō)明處理完成,直接返回結(jié)果
                if (response != null) {
                   //.....
                    return response;
                }
            } catch (Exception e) {
                 //.....
            }
            
        }
        returnnull;
    }

實(shí)際上上述的步驟會(huì)走到NamingPushRequestHandler處理服務(wù)端下線請(qǐng)求,該處理器會(huì)調(diào)用serviceInfoHolder將請(qǐng)求中的實(shí)例信息更新到緩存中,由此保證客戶端可以完成正確的服務(wù)調(diào)用:

@Override
    public Response requestReply(Request request) {
        if (request instanceof NotifySubscriberRequest) {
            NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request;
            //從請(qǐng)求中拿到服務(wù)實(shí)例信息,并調(diào)用processServiceInfo更新緩存
            serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo());
            return new NotifySubscriberResponse();
        }
        return null;
    }

最終ServiceInfoHolder的processServiceInfo就會(huì)基于入?yún)⒛玫椒?wù)示例信息,并將緩存更新,然后發(fā)布一個(gè)實(shí)例更新的事件并將更新結(jié)果持久化到磁盤中:

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//拿到服務(wù)實(shí)例緩存
        String serviceKey = serviceInfo.getKey();
        //若為空直接返回
        if (serviceKey == null) {
            returnnull;
        }
        //取出緩存中原有緩存信息
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
       //......
       //基于請(qǐng)求更新緩存
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        //比對(duì)新舊緩存變化
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
       //......
       //如果緩存發(fā)生變化,則發(fā)布一個(gè)實(shí)例更新的事件InstancesChangeEvent,并將更新結(jié)果采用零拷貝的方式持久化到磁盤中
        if (changed) {
            //.....
            //發(fā)布實(shí)例更新事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
            //零拷貝持久化        
            DiskCache.write(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }

對(duì)應(yīng)的我們給出下線9002端口的nacos-provider下線的請(qǐng)求值,可以看到基于這個(gè)結(jié)果,服務(wù)端給出的可用服務(wù)實(shí)例值僅有9001號(hào)端口的nacos-provider,服務(wù)消費(fèi)者基于此信息更新緩存,保證了服務(wù)消費(fèi)的正確性:

三、小結(jié)

本文以代碼示例為導(dǎo)向通過(guò)源碼的方式完成了Nacos服務(wù)實(shí)例狀態(tài)變更推送的講解,這里筆者也簡(jiǎn)單的補(bǔ)充一下個(gè)人對(duì)于源碼閱讀的一些技巧:

  • 在閱讀源碼前,明確了解項(xiàng)目的設(shè)計(jì)理念和原理,即對(duì)項(xiàng)目有個(gè)基礎(chǔ)的認(rèn)知。
  • 以問(wèn)題為導(dǎo)向針對(duì)性的進(jìn)行調(diào)試?yán)斫狻?/li>
  • 適當(dāng)查找一些高質(zhì)量的源碼分析文章,針對(duì)性的梳理源碼結(jié)構(gòu)。
  • 如果能夠明確源碼的最終斷點(diǎn),我們可以采用以終為始的方式,在目標(biāo)斷點(diǎn)上打住,結(jié)合調(diào)試的棧幀了解整體調(diào)用過(guò)程。
  • 調(diào)試過(guò)程中注意觀察各個(gè)類之間的繼承、聚合等關(guān)系,以便梳理設(shè)計(jì)架構(gòu)和理念。
  • 最后一點(diǎn),建議直接拉取源碼進(jìn)行調(diào)試,方便注釋。
責(zé)任編輯:趙寧寧 來(lái)源: 寫(xiě)代碼的SharkChili
相關(guān)推薦

2025-05-28 08:35:00

Nacos服務(wù)訂閱流程開(kāi)發(fā)

2025-05-29 08:35:00

Nacos服務(wù)注冊(cè)開(kāi)發(fā)

2021-09-06 09:46:26

Dubbo 服務(wù)端開(kāi)發(fā)

2018-11-28 15:15:52

大數(shù)據(jù)AI安防

2010-02-06 09:38:42

Android調(diào)用服務(wù)

2021-08-12 06:52:01

Nacos服務(wù)機(jī)制

2021-04-19 07:57:23

Spring 源碼GetBean

2024-03-18 07:48:00

大語(yǔ)言模型NVIDIA生成式 AI

2025-07-02 08:10:01

StarRocks物化視圖MV

2022-01-13 17:24:04

SpringBootYml監(jiān)聽(tīng)器

2022-01-14 14:50:14

SpringBootymlJava

2024-11-11 09:51:46

Nginx部署負(fù)載

2012-05-30 09:26:57

服務(wù)器虛擬化

2022-03-11 23:35:53

云計(jì)算云變更IT

2010-12-16 11:03:07

2020-06-05 13:37:17

網(wǎng)絡(luò)安全技術(shù)

2019-07-28 21:05:47

ICMPIP網(wǎng)絡(luò)協(xié)議

2022-07-01 07:41:03

ZookeeperEurekaConsul

2011-02-22 15:51:41

2011-08-29 14:50:08

jQuery插件
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)