我向《RocketMQ技術(shù)內(nèi)幕》一書的創(chuàng)始人請教了一個問題
是這樣的,我在學(xué)習(xí)rocketmq的時候遇到了一個奇怪的問題,就是同一個消費者組內(nèi)的消費者訂閱同一個主題topic,不同的tag的時候看到一個消息丟失的問題
這個問題我也是向《RocketMQ技術(shù)內(nèi)幕》一書的作者丁威大哥,然后他給我解釋了我對于這個問題的困惑,我來給大家解釋一下
先給大家描述一下這個具體的內(nèi)容
兩個一樣的Consumer Group的Consumer訂閱同一個Topic,但是是不同的tag,Consumer1訂閱Topic的tag1,Consumer2訂閱Topic的tag2,然后分別啟動。
這時候往Topic的tag1里發(fā)送10條數(shù)據(jù),Topic的tag2里發(fā)送10條。目測應(yīng)該是Consumer1和Consumer2分別收到對應(yīng)的10條消息。結(jié)果卻是只有Consumer2收到了消息,而且只收到了4-6條消息,不固定。
MQ底層數(shù)據(jù)結(jié)構(gòu)之精妙
RocketMQ專門按照Topic為每一個topic建立索引,方便消費端按照topic進(jìn)行消費,其具體實現(xiàn)為消息隊列。
在RocketMQ中,ConsumeQueue的引入并不是為了提高消息寫入的性能,而是為消費服務(wù)的。
消息消費隊列中的每一個條目是一個定長的,設(shè)計極具技巧性,其每個條目使用固定長度(8字節(jié)commitlog物理偏移量、4字節(jié)消息長度、8字節(jié)tag hashcode),這里不是存儲tag的原始字符串,而是存儲hashcode。
目的就是確保每個條目的長度固定,可以使用訪問類似數(shù)組下標(biāo)的方式來快速定位條目,極大的提高了ConsumeQueue文件的讀取性能,這樣根據(jù)消費進(jìn)度去訪問消息的方法為使用邏輯偏移量logicOffset * 20即可找到該條目的起始偏移量(consumequeue文件中的偏移量),然后讀取該偏移量后20個字節(jié)即得到了一個條目,無需遍歷consumequeue文件。
關(guān)于RocketMQ中的三個文件,來幫助RocketMQ完成如此高效率的偉業(yè),我也寫了一個文章來介紹這三個文件,大家可以看一下通過這三個文件徹底搞懂rocketmq的存儲原理
消息過濾實現(xiàn)機(jī)制
消費端隊列存儲的是 tag 的 hashcode,眾所周知,不同的字符串得到的hashcode值可能一樣,故在服務(wù)端是無法精確對消息進(jìn)行過濾的,所以在RocketMQ中會進(jìn)行兩次消息過濾。
當(dāng)客戶端向服務(wù)端拉取消息時,服務(wù)端在返回消息之前,會先根據(jù)hashcode進(jìn)行過濾,然后客戶端收到服務(wù)端的消息后,再根據(jù)消息的tag字符串進(jìn)行精確過濾。
上面的原理很好理解呀,那為什么會丟失消息呢?這其實和消息隊列負(fù)載機(jī)制有關(guān)。
在RocketMQ中使用集群模式消費時,同一個消費組中的多個消費者共同完成主題中的隊列的消費,即一個消費者只會分配到其中某幾個隊列,并且同一時間,一個隊列只會分配給一個消費者,這樣結(jié)合上面的的過濾機(jī)制,就會明顯有問題,請看示例圖:
其問題的核心關(guān)鍵是,同一個tag會分布在不同的隊列中,但消費者C1分配到的隊列為q0,q1,q0,q1中有taga和tagb的消息,但tagb的消息會被消費者C1過濾,但這部分消息卻不會被C2消費,造成了消息丟失。
所以在RocketMQ中,一個消費組內(nèi)的所有消費這,其訂閱關(guān)系必須保持一致。
我們再來回過頭看這個問題
首先這是Broker決定的,而不是Consumer端決定的
Consumer端發(fā)心跳給Broker,Broker收到后存到consumerTable里(就是個Map),key是GroupName,value是ConsumerGroupInfo。
ConsumerGroupInfo里面是包含topic等信息的,但是問題就出在上一步驟,key是groupName,你同GroupName的話Broker心跳最后收到的Consumer會覆蓋前者的。相當(dāng)于如下代碼:
map.put(groupName, ConsumerGroupInfo);
這樣同key,肯定產(chǎn)生了覆蓋。所以Consumer1不會收到任何消息,但是Consumer2為什么只收到了一半(不固定)消息呢?
那是因為:你是集群模式消費,它會負(fù)載均衡分配到各個節(jié)點去消費,所以一半消息(不固定個數(shù))跑到了Consumer1上,結(jié)果Consumer1訂閱的是tag1,所以不會任何輸出。
如果換成BROADCASTING,那絕逼后者會收到全部消息,而不是一半,因為廣播是廣播全部Consumer。
/**
* Consumer信息
*/
public class ConsumerGroupInfo {
// 組名
private final String groupName;
// topic信息,比如topic、tag等
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
// 客戶端信息,比如clientId等
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
// PULL/PUSH
private volatile ConsumeType consumeType;
// 消費模式:BROADCASTING/CLUSTERING
private volatile MessageModel messageModel;
// 消費到哪了
private volatile ConsumeFromWhere consumeFromWhere;
}
/**
* 通過心跳將Consumer信息注冊到Broker端。
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// consumerTable:維護(hù)所有的Consumer
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
// 如果沒有Consumer,則put到map里
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
// put到map里
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新Consumer信息,客戶端信息
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新訂閱Topic信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
從這一步可以看出消費者信息是以groupName為key,ConsumerGroupInfo為value存到map(consumerTable)里的,那很明顯了,后者肯定會覆蓋前者的,因為key是一樣的。
而后者的tag是tag2,那肯定覆蓋了前者的tag1,這部分是存到ConsumerGroupInfo的subscriptionTable里面的。
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
SubscriptionData包含了topic等信息
public class SubscriptionData implements Comparable<SubscriptionData> {
// topic
private String topic;
private String subString;
// tags
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
}
其實到這里,這個問題已經(jīng)算是解決了七八成了,等同于是后來的消費者的注冊信息會把之前的消費者的注冊信息覆蓋掉,這也就導(dǎo)致了上述出現(xiàn)的現(xiàn)象。
先啟動訂閱了tag1的消費者,然后啟動了訂閱了tag2的消費者,這時最新的心跳信息是來源于tag2的這個消費者,這就導(dǎo)致了這個消費者的訂閱信息會覆蓋掉之前的訂閱信息,這是因為在RocketMQ中會認(rèn)為同一個消費者組的消費者的訂閱信息是需要保持一致的,如果不保持一致是不被允許的做法。
如果真有那種,你去新建一個topic不就好了,或者新建一個消費者組不就好了,在使用的過程中一定要保持消費者組的訂閱信息保持一致。
這也就導(dǎo)致了發(fā)送者發(fā)送的tag1的消息壓根不會被這個消費者接收到,而兩個消費者自然不會消費這個的消息。
而為什么只收到tag2的部分消息
這是因為rocketMQ默認(rèn)采用的是集群消費的模式,也就是生產(chǎn)者的消息會通過負(fù)載均衡將消息均勻的發(fā)送到多個consumerqueue隊列中,默認(rèn)是4個,也就是我們啟動的兩個消費者會分別監(jiān)聽兩個consumerqueue隊列
這也就意味著有大約一半的tag2的消息會被運送到消費者1的機(jī)器上消費,而消費者1監(jiān)聽的是tag1,不滿足消息的條件,所以監(jiān)聽不到消息
topic和tag信息是如何覆蓋的
/**
* 其實很簡單,就是以topic為key,SubscriptionData為value。而SubscriptionData里包含了tags信息,所以直接覆蓋掉
*/
public boolean updateSubscription(final Set<SubscriptionData> subList) {
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
} else if (sub.getSubVersion() > old.getSubVersion()) {
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
}
本文參考文章:
https://codingw.blog.csdn.net/article/details/116299837。
https://dalin.blog.csdn.net/article/details/107241375。