Kafka放棄Zookeeper后如何存儲主題與消費組呢?
由于筆者公司目前使用的kafka版本是2.2.1,故當(dāng)下關(guān)于kafka的內(nèi)核研究目前主要是基于該版本,當(dāng)然該專欄還會繼續(xù)關(guān)注Kafka3.0。
我在使用kafka時發(fā)現(xiàn)客戶端可以不依賴Zookeeper的情況下完成消息發(fā)送、消息消費,眾所周知早期的Kafka,所有的元信息(topic、消費組、集群)等信息都存儲在Zookeeper中,原先的消息發(fā)送客戶端、消息消費客戶端都需要依賴Zookeeper。
溫馨提示:Kafka逐步開啟了去zookeeper化,到kafka2.8之前實現(xiàn)了消息發(fā)送者、消息消費者的去zookeeper化,從2.8版之后broker也支持去zookeeper。
那kafka2.2.1版本中,主題的路由信息、消費組信息分別是存儲在什么地方呢?消息發(fā)送端、消息消費端是如何感知的呢?
溫馨提示:如果大家對Kafka有基本的了解,不防停留片刻,稍作思考。
1.主題元數(shù)據(jù)存儲在Zookeeper中
進(jìn)入到Kafka Broker連接的Zookeeper集群,我們不難發(fā)現(xiàn)在 /{namespace}/brokers/topics節(jié)點下存在該集群中所有的主題信息,展開某一個具體的主題,如下圖所示:

關(guān)于主題的元信息,其實主要包括如下信息:
- 分區(qū)數(shù)量 每一個具體topic下會有一個partitions節(jié)點,該節(jié)點下的每一個子節(jié)點代表一個分區(qū)。
- 分區(qū)狀態(tài)信息 每一個分區(qū)的的狀態(tài)由葉子節(jié)點 /{namespace}/brokers/topics/{topicName}/parttions/{partNO}/state表示,存儲的內(nèi)容如下:
controller_epoch 控制器當(dāng)前的選舉版本。
leader 該分區(qū)的Leader所在的Broker節(jié)點ID。
version 當(dāng)前的存儲格式版本,默認(rèn)為1。
leader_epoch 分區(qū)Leader的選舉版本。
isr 分區(qū)的ISR集合。
主題的路由信息是存儲在Zookeeper中,那為什么客戶端只需要Broker的地址,就可以獲取到主題的路由信息呢?
1.1 主題路由尋址
查找路由信息在Kafka2.1版本中是發(fā)送ApiKeys.METADATA請求,該請求的響應(yīng)邏輯定義在Broker中,那客戶端是如何對Broker進(jìn)行路由,Broker中的路由信息又是從何而來呢?
消息發(fā)送者首次發(fā)送METADATA定位Broker機(jī)制:首次發(fā)送請求會從KafkaProducer的bootstrap.servers中設(shè)置的broker列表中選擇當(dāng)前最空閑的Broker,后續(xù)能感知所有的Broker。
消息消費者發(fā)送METADATA定位Broker機(jī)制:發(fā)送到當(dāng)前消費組的組協(xié)調(diào)所在的Broker。
根據(jù)查閱KafkaApis的handleTopicMetadataRequest方法,進(jìn)行一些ACL校驗后進(jìn)入其核心方法:

關(guān)鍵點:
- 從MetadataCache中獲取topic到路由信息。
- 如果MetadataCache中不存在指定topic的路由信息,如果Broker允許自動創(chuàng)建主題(auto.create.topics.enable),默認(rèn)為true,則自動創(chuàng)建該主題的信息,并將主題信息寫入到zookeeper,具體操作:
在/brokers/topics節(jié)點下創(chuàng)建子節(jié)點,子節(jié)點名稱為topic的名稱。
根據(jù)當(dāng)前kafka分區(qū)的機(jī)架信息,分區(qū)數(shù)、副本數(shù),broker節(jié)點數(shù),進(jìn)行分配,主要盡量將主分區(qū)不放在同一個機(jī)架、存儲在主題的節(jié)點信息中,例如{"version":1,"partitions":{"4":[2,0,1],"5":[0,1,2],"1":[2,1,0],"0":[1,0,2],"2":[0,2,1],"3":[1,2,0]}},其中key為分區(qū)名稱,值為副本所在的brokerId,其中排在第一位是傾向性Leader,主題中存儲的值是靜態(tài)數(shù)據(jù),具體還會觸發(fā)選舉,選舉算法會參考這個分配。
控制器還會注冊調(diào)用registerPartitionModificationsHandlers方法,監(jiān)聽主題信息的變化,從而觸發(fā)后續(xù)流程,啟動分區(qū)的真正創(chuàng)建(各個分區(qū)的Leader選舉等)。
溫馨提示:Kafka開啟自動創(chuàng)建主題,分區(qū)數(shù)量取自kafka broker中的num.partitions參數(shù),默認(rèn)為1,副本因子則取決于default.replication.factor參數(shù),默認(rèn)為1。
1.2 路由信息同步機(jī)制
MetadataCache,元信息緩存,那這里的數(shù)據(jù)又是從何而來呢?MetadataCache中路由信息的更新調(diào)用鏈如下圖所示:

Kafka的KafkaController(后續(xù)統(tǒng)稱控制器)首先會聽/brokers/topics/{topicName}節(jié)點內(nèi)容的變化,一旦有新主題創(chuàng)建或主題信息變更,topic變更事件就會觸發(fā),此時TopicChange的process方法會調(diào)用,最終調(diào)用updatePartitionReplicaAssignment,也就是一旦主題的信息發(fā)生變更,控制器會向所有Broker節(jié)點發(fā)送ApiKeys.UPDATE_METADATA,各個Broker在到該請求后,會更新各個Broker中的內(nèi)存緩存,供消息發(fā)送者查找topic路由信息。
即Kafka2.2版本中,topic的元信息存儲在Zookeeper中,同時Kafka Controller會監(jiān)聽zookeeper中相關(guān)節(jié)點,從而感知信息變更,從而將路由信息通過RPC發(fā)送到集群內(nèi)所有的Broker中,故每一個Broker的內(nèi)存中都存儲一份相同的路由信息。
Kafka2.8版本開始嘗試去Zookeeper化。
思考題:為什么各個Broker不都監(jiān)聽zookeeper,從而感知topic變化,更新本地內(nèi)存呢?歡迎各位留言討論或私信dingwpmz,共同交流。
2.消費組存儲在位點主題中
在較低版本中,啟動Kafka消費組需要指定zookeeper集群的地址,因為在低版本中消費組的元信息存儲在zookeeper中,具體路徑為/consumers,但后續(xù)版本中消費端的啟動已經(jīng)不需指定zookeeper,而是指定broker的地址列表即可,那這個時候,消費組的信息是存儲在哪呢?
在前面介紹Kafka故障解決相關(guān)的文章中我們常??吹较M組組協(xié)調(diào)器,內(nèi)部持有一個消費組元數(shù)據(jù)管理器GroupMetadataManager,相關(guān)的代碼截圖如下所示:

在GroupMetadataManager對象中持有一個Map結(jié)構(gòu)的緩存,其鍵為消費組的名稱,值為GroupMetadata對象,內(nèi)部記錄消費組的狀態(tài),消費組的成員列表,位點信息。
內(nèi)存的特點:訪問高效,但隨著Broker進(jìn)程的退出而丟失,消費組存儲在內(nèi)存中顯然不行,但又不在zookeeper中,那消費組的定義信息存儲在什么地方呢?
2.1消費組元信息存儲
消費組的定義信息存儲在系統(tǒng)主題__consumer_offsets中,什么,這個主題不是用來存儲消費位點的嗎?
原來__consumer_offsets不僅存儲消費組的位點信息,還存儲消費組的元信息,具體代碼入口:GroupMetadataManager#storeGroup,部分代碼截圖如下所示:

即消費組元信息當(dāng)成一條消息寫入到__consumer_offsets,一條消費組元信息存儲的value值,由GroupMetadataManager的groupMetadataValue方法定義,具體代碼如下:

隨著Kafka的不斷演化,存儲格式進(jìn)行了多次修改,對應(yīng)的版本如下:
- V0:Kafka 0.10級以下版本
- V1:大于 0.10,低于等于2.1版本。
- V2:2.2版本及以后
消費組元信息存儲的格式為Json,具體存儲的內(nèi)容:
- protocol_type 協(xié)議版本,取自AbstractCoordinator的抽象方法protocolType(),消費組的固定為:consumer。
- generation 消費組元信息的版本號,每發(fā)生一次消費組重平衡,該值會加一。
- protocol 協(xié)議內(nèi)容,存儲消費組的隊列負(fù)載算法,在構(gòu)建消費者時可通過partition.assignment.strategy參數(shù)傳遞,可以傳遞多個,消費組具體的負(fù)載算法會選擇每一個消費者都支持的協(xié)議進(jìn)行隊列負(fù)載,默認(rèn)的負(fù)載算法為RangeAssignor。
- leader 當(dāng)前消費組的Leader,通常為第一個加入該消費組的消費者。
- current_state_timestamp 最新狀態(tài)變更的時間戳,該值是從V2版本開始引入。
- members 消費組的成員信息,每一個成員信息存儲的信息如下:
- member_id 成員id,客戶端id(clientId) + uuid。
client_id 客戶端ID。
client_host 客戶端ip地址。
rebalance_timeout 重平衡時間,默認(rèn)為300000,5分鐘。
session_timeout 會話超時時間,默認(rèn)為10s。
subscription 元信息,取自AbstractCoordinator的抽象方法metadata(),消費組的實現(xiàn)類為ConsumerCoordinator,主要是遍歷負(fù)載算法,每一個負(fù)載算法根據(jù)訂閱信息計算元信息。
assignment
各個消費者的隊列負(fù)載情況。
溫馨提示:GroupMetadataManager的storeGroup方法的調(diào)用時間是在消費組進(jìn)行重平衡時,具體是重平衡第二階段(SYNC_GROUP)與完成重平衡。
2.2加載消息組元信息
消費組元信息是存儲在 __consumer_offsets主題中,在什么時候會從該主題中加載到內(nèi)存中呢?
在__consumer_offsets的分區(qū)發(fā)生Leader選舉時會觸發(fā)將對應(yīng)分區(qū)中的數(shù)據(jù)加載到內(nèi)存,具體的處理入口在KafkaApis的handleLeaderAndIsrRequest方法,簡易調(diào)用鏈如下圖所示:

3.總結(jié)
本文主要介紹了Kafka 主題與消費組的持久化機(jī)制,在Kafka2.8版本開始,官方逐步去除對Zookeeper的依賴,那kafka3.x之后,又會是如何存儲消費組、主題的信息呢?































