面試題:一個(gè)Consumer訂閱兩個(gè)Topic,其中一個(gè)Topic消息過(guò)多堆積了,會(huì)影響另一個(gè)Topic消費(fèi)嗎?

?無(wú)意中在網(wǎng)上看到這么一個(gè)問(wèn)題,一個(gè)consumer訂閱兩個(gè)topic,其中一個(gè)topic消息過(guò)多堆積了,會(huì)影響另一個(gè)topic消費(fèi)嗎?
對(duì)于RocketMQ這種,看源碼如何方便,于是乎我就開(kāi)始找相應(yīng)的源碼,然后一頓思考。
先給大家上結(jié)論,看堵塞的原因,如果原因是生產(chǎn)者瞬時(shí)產(chǎn)生大量的消息,比如秒殺,導(dǎo)致的消息堆積,基本不會(huì)影響;如果是消費(fèi)者出現(xiàn)故障,消費(fèi)速度變得奇慢無(wú)比,那就會(huì)影響,不過(guò)并不會(huì)阻塞,只是會(huì)影響速率?。
接下來(lái)帶著大家一起看源碼。
/**
* Rebalance Service
* consumer負(fù)載均衡線程服務(wù)
*/
public class RebalanceService extends ServiceThread {}
大家先把目光聚焦到這個(gè)負(fù)載均衡的線程服務(wù)上來(lái),這個(gè)大家也看到了,每個(gè)20秒執(zhí)行一次,這個(gè)主要負(fù)載均衡的邏輯在doRebalance方法中。

我們進(jìn)去這個(gè)方法看。

進(jìn)來(lái)之后可以看到,對(duì)consumerTable的對(duì)象進(jìn)行循環(huán),這個(gè)存儲(chǔ)的是所有的消費(fèi)者,然后循環(huán)調(diào)用doRebalance,繼續(xù)進(jìn)去看。

繼續(xù)往里沖。

線程的創(chuàng)建來(lái)到這里,我們可以看到核心處理是這個(gè)rebalanceByTopic,傳入的參數(shù)就是我們這個(gè)消費(fèi)者監(jiān)聽(tīng)的topic。

這里的mqSet是該topic的所有consumerqueue,也就是默認(rèn)創(chuàng)建的那4個(gè)隊(duì)列,當(dāng)然,這個(gè)數(shù)量可以改變 。
然后我們可以看到allocateMessageQueueStrategy,這個(gè)是一個(gè)分配策略對(duì)象,調(diào)用其中的allocate來(lái)進(jìn)行分配該topic的消息隊(duì)列。
    
這個(gè)分配策略也有幾種實(shí)現(xiàn)方式,大家看一看,根據(jù)名字其實(shí)大家也可以猜個(gè)八九不離十了,感興趣的可以點(diǎn)進(jìn)去看看詳細(xì)的處理機(jī)制。

分配好之后,將隊(duì)列賦值給allocateResultSet這個(gè)對(duì)象,這里為啥要用set集合存儲(chǔ)呢?
我的個(gè)人猜測(cè)是,防止出現(xiàn)queue數(shù)量的重新改變的情況下,可能導(dǎo)致這里出現(xiàn)重復(fù),這里增加一層set防止這種極端情況的出現(xiàn)。
接下來(lái)分配好隊(duì)列之后,主要的處理就是updateProcessQueueTableInRebalance,這個(gè)就是負(fù)責(zé)更新消息隊(duì)列,其實(shí)呢,也可以認(rèn)為成把這個(gè)消費(fèi)者需要負(fù)責(zé)的這些隊(duì)列賦值給它,也就是這是你的責(zé)任了,你這個(gè)消費(fèi)者需要處理這些隊(duì)列。

我們進(jìn)來(lái)updateProcessQueueTableInRebalance這個(gè)方法之后,上面的那些我就折疊起來(lái)不給大家看了,這里的處理主要也是針對(duì)于某些機(jī)器突然宕機(jī)或者增加一些機(jī)器的情況。
這個(gè)方法的主要處理是在最后這個(gè)拉取請(qǐng)求這里,也就是dispatchPullRequest這個(gè),傳入的參數(shù)是一個(gè)pullRequest的list。

線程的創(chuàng)建進(jìn)來(lái)之后,循環(huán)處理pullRequest,哎,還沒(méi)找到最底層,繼續(xù)點(diǎn)進(jìn)去。

線程的創(chuàng)建哎。終于找到你了,就是你這個(gè)家伙,最后就執(zhí)行了一個(gè)put方法,放進(jìn)去的就是一個(gè)LinkedBlockingQueue隊(duì)列。

這個(gè)是一個(gè)拉取消息的請(qǐng)求隊(duì)列,請(qǐng)求的對(duì)象就是pullRequest。
實(shí)際處理的時(shí)候,也就是拉取消息的時(shí)候,多個(gè)線程會(huì)從LinkedBlockingQueue中去take消息,然后按照放入的順序去進(jìn)行消費(fèi)。
家解釋一下這個(gè)流程,這里就是rocketmq首先對(duì)消息會(huì)進(jìn)行一個(gè)負(fù)載均衡Rebalance的過(guò)程,這個(gè)就是將topic中的consumerqueue隊(duì)列按照consumer進(jìn)行分配,分配策略就是上面看到的那幾種。
將pullRequest放入到這個(gè)LinkedBlockingQueue中,這里放的是topic、brokerName、queueID這些,這個(gè)時(shí)候已經(jīng)排好了后面消費(fèi)的順序了。
比如有10個(gè)request中,大概5個(gè)是topicTest1,另外5個(gè)是topicTest2。
所以呢,這個(gè)時(shí)候假如topicTest1消息堆積了,還是會(huì)照常去消費(fèi)topicTest2的,此時(shí)我們需要看這個(gè)堆積的原因,如果堆積是因?yàn)槊霘⒁活惖膱?chǎng)景導(dǎo)致瞬時(shí)間產(chǎn)生大量消息,這樣消費(fèi)者還是會(huì)正常消費(fèi)topicTest1,所以不會(huì)影響topicTest2,
但是,如果topicTest1消費(fèi)速度很慢,導(dǎo)致所有線程都處理很慢,都被占了,那樣就會(huì)稍微影響topicTest2的速度了,不過(guò)那也只是暫時(shí)的,不會(huì)阻塞topicTest2的,















 
 
 




 
 
 
 