代碼很少,卻很優(yōu)秀!RocketMQ的NameServer是如何做到的?
今天我們來一起深入分析 RocketMQ的注冊中心 NameServer。
本文基于 RocketMQ release-5.2.0。
首先,我們回顧下 RocketMQ的內(nèi)核原理鳥瞰圖:

從上面的鳥瞰圖,我們可以看出:Nameserver既和 Broker交互,也和 Producer和 Consumer交互,因此,在 RocketMQ中,Nameserver起到了一個紐帶性的作用。
接著,我們再看看 NameServer的工程結(jié)構(gòu),如下圖:

整個工程只有 11個類(老版本好像只有不到 10個類),為什么 RocketMQ可以用如此少的代碼,設(shè)計出如此高性能且輕量的注冊中心?
我覺得最核心的 3個點是:
- AP設(shè)計思想
 - 簡單的數(shù)據(jù)結(jié)構(gòu)
 - 心跳機(jī)制
 
一、AP設(shè)計思想
像 ZooKeeper,采用了 Zab (Zookeeper Atomic Broadcast) 這種比較重的協(xié)議,必須大多數(shù)節(jié)點(過半數(shù))可用,才能確保了數(shù)據(jù)的一致性和高可用,大大增加了網(wǎng)絡(luò)開銷和復(fù)雜度。
而 NameServer遵守了 CAP理論中 AP,在一個 NameServer集群中,NameServer節(jié)點之間是P2P(Peer to Peer)的對等關(guān)系,并且 NameServer之間并沒有通信,減少很多不必要的網(wǎng)絡(luò)開銷,即便只剩一個 NameServer節(jié)點也能繼續(xù)工作,足以保證高可用。
二、數(shù)據(jù)結(jié)構(gòu)
NameServer維護(hù)了一套比較簡單的數(shù)據(jù)結(jié)構(gòu),內(nèi)部維護(hù)了一個路由表,該路由表包含以下幾個核心元數(shù)據(jù),對應(yīng)的源碼類RouteInfoManager如下:
public class RouteInfoManager {
    private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; // broker失效時間 120s
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}- topicQueueTable:Topic消息隊列路由信息,消息發(fā)送時根據(jù)路由表進(jìn)行負(fù)載均衡
 - brokerAddrTable:Broker基礎(chǔ)信息,包括brokerName、所屬集群名稱、主備Broker地址
 - clusterAddrTable:Broker集群信息,存儲集群中所有Broker名稱
 - brokerLiveTable:Broker狀態(tài)信息,NameServer每次收到心跳包會替換該信息
 - filterServerTable:Broker上的FilterServer列表,用于過濾標(biāo)簽(Tag)或 SQL表達(dá)式,以減輕 Consumer的負(fù)擔(dān),提高消息消費的效率。
 
1.TopicRouteData
TopicRouteData是 NameServer中最重要的數(shù)據(jù)結(jié)構(gòu)之一,它包括了 Topic對應(yīng)的所有 Broker信息以及每個 Broker上的隊列信息,filter服務(wù)器列表,其源碼如下:
public class TopicRouteData {
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String, List<String>> filterServerTable;
    //It could be null or empty
    private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
}2.BrokerData
BrokerData包含了 Broker的基本屬性,狀態(tài),所在集群以及 Broker服務(wù)器的 IP地址,其源碼如下:
public class BrokerData {
    private String cluster;//所在的集群
    private String brokerName;//所在的brokerName
    private HashMap<Long, String> brokerAddrs;//該broker對應(yīng)的機(jī)器IP列表
    private String zoneName; // 區(qū)域名稱
}3.QueueData
QueueData包含了 BrokerName,readQueue的數(shù)量,writeQueue的數(shù)量等信息,對應(yīng)的源碼類是QueueData,其源碼如下:
public class QueueData {
    private String brokerName;//所在的brokerName
    private int readQueueNums;// 讀隊列數(shù)量
    private int writeQueueNums;// 寫隊列數(shù)量
    private int perm; // 讀寫權(quán)限,參考PermName 類
    private int topicSysFlag; // topic同步標(biāo)記,參考TopicSysFlag 類
}4.元數(shù)據(jù)舉例
為了更好地理解元數(shù)據(jù),這里對每一種元數(shù)據(jù)都給出一個數(shù)據(jù)實例:
topicQueueTable:{
    "topicA":[
        {
            "brokeName":"broker-a",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6, 
            "topicSyncFlag":0 
        },
        {
            "brokeName":"broker-b",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6, 
            "topicSyncFlag":0
        }
    ],
    "topicB":[]
}brokeAddrTable:{
    "broker-a":{
        "cluster":"cluster-1",
        "brokerName":"broker-a",
        "brokerAddrs":{
            0:"192.168.0.1:8000",
            1:"192.168.0.2:8000"
        }
    },
    "broker-b":{
        "cluster":"cluster-1",
        "brokerName":"broker-b",
        "brokerAddrs":{
            0:"192.168.0.3:8000",
            1:"192.168.0.4:8000"
        }
    }
}

三、心跳機(jī)制
心跳機(jī)制是 NameServer維護(hù) Broker的路由信息最重要的一個抓手,主要分為接收心跳、處理心跳、心跳超時 3部分:
1.接收心跳
Broker每 30s會向所有的 NameServer發(fā)送心跳包,告訴它們自己還存活著,從而更新自己在 NameServer的狀態(tài),整體交互如下圖:

2.處理心跳
NameServer收到心跳包時會更新 brokerLiveTable緩存中 BrokerLiveInfo的 lastUpdateTimeStamp信息,整體交互如下圖:

處理邏輯可以參考源碼:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest#brokerHeartbeat:
public RemotingCommand brokerHeartbeat(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final BrokerHeartbeatRequestHeader requestHeader =
        (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
    this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getClusterName(), requestHeader.getBrokerAddr());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}3.心跳超時
NameServer每隔 10s(每隔5s + 5s延遲)掃描 brokerLiveTable檢查 Broker的狀態(tài),如果在 120s內(nèi)未收到 Broker心跳,則認(rèn)為 Broker異常,會從路由表將該 Broker摘除并關(guān)閉 Socket連接,同時還會更新路由表的其他信息,整體交互如下圖:

private void startScheduleService() {
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
        5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
}源碼參考:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unRegisterBroker(),核心流程:
- 遍歷brokerAddrTable
 - 遍歷broker地址
 - 根據(jù) broker地址移除 brokerAddr
 - 如果當(dāng)前 Topic只包含待移除的 Broker,則移除該 Topic
 
四、其他核心源碼解讀
NameServer啟動
NameServer的啟動類為:org.apache.rocketmq.namesrv.NamesrvStartup,整個流程如下圖:

NameServer啟動最核心的 3個事情是:
- 加載配置:NameServerConfig、NettyServerConfig主要是映射配置文件,并創(chuàng)建 NamesrvController。
 - 啟動 Netty通信服務(wù):NettyRemotingServer是 NameServer和Broker,Producer,Consumer通信的底層通道 Netty服務(wù)器。
 - 啟動定時器和鉤子程序:NameServerController實例一方面處理 Netty接收到消息后,一方面內(nèi)部有多個定時器和鉤子程序,它是 NameServer的核心控制器。
 
五、總結(jié)
NameServer并沒有采用復(fù)雜的分布式協(xié)議來保持?jǐn)?shù)據(jù)的一致性,而是采用 CAP理論中的 AP,各個節(jié)點之間是Peer to Peer的對等關(guān)系,數(shù)據(jù)的一致性通過心跳機(jī)制,定時器,延時感知來完成。















 
 
 





 
 
 
 