偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

面試題:一個(gè)Consumer訂閱兩個(gè)Topic,其中一個(gè)Topic消息過(guò)多堆積了,會(huì)影響另一個(gè)Topic消費(fèi)嗎?

開(kāi)發(fā) 架構(gòu)
一個(gè)consumer訂閱兩個(gè)topic,其中一個(gè)Topic消息過(guò)多堆積了,會(huì)影響另一個(gè)Topic消費(fèi)嗎?對(duì)于RocketMQ這種,看源碼如何方便,于是乎我就開(kāi)始找相應(yīng)的源碼,然后一頓思考。

?無(wú)意中在網(wǎng)上看到這么一個(gè)問(wèn)題,一個(gè)consumer訂閱兩個(gè)topic,其中一個(gè)topic消息過(guò)多堆積了,會(huì)影響另一個(gè)topic消費(fèi)嗎?

對(duì)于RocketMQ這種,看源碼如何方便,于是乎我就開(kāi)始找相應(yīng)的源碼,然后一頓思考。

先給大家上結(jié)論,看堵塞的原因,如果原因是生產(chǎn)者瞬時(shí)產(chǎn)生大量的消息,比如秒殺,導(dǎo)致的消息堆積,基本不會(huì)影響;如果是消費(fèi)者出現(xiàn)故障,消費(fèi)速度變得奇慢無(wú)比,那就會(huì)影響,不過(guò)并不會(huì)阻塞,只是會(huì)影響速率?。

接下來(lái)帶著大家一起看源碼。

/**
* Rebalance Service
* consumer負(fù)載均衡線程服務(wù)
*/
public class RebalanceService extends ServiceThread {}

大家先把目光聚焦到這個(gè)負(fù)載均衡的線程服務(wù)上來(lái),這個(gè)大家也看到了,每個(gè)20秒執(zhí)行一次,這個(gè)主要負(fù)載均衡的邏輯在doRebalance方法中。

圖片

我們進(jìn)去這個(gè)方法看。

圖片

進(jìn)來(lái)之后可以看到,對(duì)consumerTable的對(duì)象進(jìn)行循環(huán),這個(gè)存儲(chǔ)的是所有的消費(fèi)者,然后循環(huán)調(diào)用doRebalance,繼續(xù)進(jìn)去看。

圖片

繼續(xù)往里沖。

圖片

線程的創(chuàng)建來(lái)到這里,我們可以看到核心處理是這個(gè)rebalanceByTopic,傳入的參數(shù)就是我們這個(gè)消費(fèi)者監(jiān)聽(tīng)的topic。

圖片

這里的mqSet是該topic的所有consumerqueue,也就是默認(rèn)創(chuàng)建的那4個(gè)隊(duì)列,當(dāng)然,這個(gè)數(shù)量可以改變  。

然后我們可以看到allocateMessageQueueStrategy,這個(gè)是一個(gè)分配策略對(duì)象,調(diào)用其中的allocate來(lái)進(jìn)行分配該topic的消息隊(duì)列。

    圖片

這個(gè)分配策略也有幾種實(shí)現(xiàn)方式,大家看一看,根據(jù)名字其實(shí)大家也可以猜個(gè)八九不離十了,感興趣的可以點(diǎn)進(jìn)去看看詳細(xì)的處理機(jī)制。

圖片

分配好之后,將隊(duì)列賦值給allocateResultSet這個(gè)對(duì)象,這里為啥要用set集合存儲(chǔ)呢?

我的個(gè)人猜測(cè)是,防止出現(xiàn)queue數(shù)量的重新改變的情況下,可能導(dǎo)致這里出現(xiàn)重復(fù),這里增加一層set防止這種極端情況的出現(xiàn)。

接下來(lái)分配好隊(duì)列之后,主要的處理就是updateProcessQueueTableInRebalance,這個(gè)就是負(fù)責(zé)更新消息隊(duì)列,其實(shí)呢,也可以認(rèn)為成把這個(gè)消費(fèi)者需要負(fù)責(zé)的這些隊(duì)列賦值給它,也就是這是你的責(zé)任了,你這個(gè)消費(fèi)者需要處理這些隊(duì)列。

圖片

我們進(jìn)來(lái)updateProcessQueueTableInRebalance這個(gè)方法之后,上面的那些我就折疊起來(lái)不給大家看了,這里的處理主要也是針對(duì)于某些機(jī)器突然宕機(jī)或者增加一些機(jī)器的情況。

這個(gè)方法的主要處理是在最后這個(gè)拉取請(qǐng)求這里,也就是dispatchPullRequest這個(gè),傳入的參數(shù)是一個(gè)pullRequest的list。

圖片

線程的創(chuàng)建進(jìn)來(lái)之后,循環(huán)處理pullRequest,哎,還沒(méi)找到最底層,繼續(xù)點(diǎn)進(jìn)去。

圖片

線程的創(chuàng)建哎。終于找到你了,就是你這個(gè)家伙,最后就執(zhí)行了一個(gè)put方法,放進(jìn)去的就是一個(gè)LinkedBlockingQueue隊(duì)列。

圖片

這個(gè)是一個(gè)拉取消息的請(qǐng)求隊(duì)列,請(qǐng)求的對(duì)象就是pullRequest。

實(shí)際處理的時(shí)候,也就是拉取消息的時(shí)候,多個(gè)線程會(huì)從LinkedBlockingQueue中去take消息,然后按照放入的順序去進(jìn)行消費(fèi)。

 家解釋一下這個(gè)流程,這里就是rocketmq首先對(duì)消息會(huì)進(jìn)行一個(gè)負(fù)載均衡Rebalance的過(guò)程,這個(gè)就是將topic中的consumerqueue隊(duì)列按照consumer進(jìn)行分配,分配策略就是上面看到的那幾種。

將pullRequest放入到這個(gè)LinkedBlockingQueue中,這里放的是topic、brokerName、queueID這些,這個(gè)時(shí)候已經(jīng)排好了后面消費(fèi)的順序了。

比如有10個(gè)request中,大概5個(gè)是topicTest1,另外5個(gè)是topicTest2。

所以呢,這個(gè)時(shí)候假如topicTest1消息堆積了,還是會(huì)照常去消費(fèi)topicTest2的,此時(shí)我們需要看這個(gè)堆積的原因,如果堆積是因?yàn)槊霘⒁活惖膱?chǎng)景導(dǎo)致瞬時(shí)間產(chǎn)生大量消息,這樣消費(fèi)者還是會(huì)正常消費(fèi)topicTest1,所以不會(huì)影響topicTest2,

但是,如果topicTest1消費(fèi)速度很慢,導(dǎo)致所有線程都處理很慢,都被占了,那樣就會(huì)稍微影響topicTest2的速度了,不過(guò)那也只是暫時(shí)的,不會(huì)阻塞topicTest2的,

責(zé)任編輯:姜華 來(lái)源: 左耳君
相關(guān)推薦

2016-12-26 15:23:21

戴爾

2012-01-12 10:09:55

Elementary 思路

2011-07-18 15:08:19

SQL存儲(chǔ)過(guò)程

2024-01-15 00:35:23

JavaScript框架HTML

2012-08-02 09:36:58

fork面試題

2011-03-28 14:02:07

MirahJava對(duì)手

2011-11-14 09:41:10

Linux Mint

2021-05-29 07:13:26

微軟Nobelium網(wǎng)絡(luò)攻擊

2023-06-20 08:25:53

NESTED源碼mybatis

2018-12-05 09:00:46

DevOps持續(xù)交付持續(xù)集成

2021-06-16 12:03:49

WindowsLinux游戲

2011-11-15 10:16:04

Linux操作系統(tǒng)

2020-11-13 07:16:09

線程互斥鎖死循環(huán)

2023-09-19 23:21:48

Python列表

2011-11-22 13:52:38

2016-03-01 14:37:47

華為

2021-04-26 14:02:37

AMD串流硬件

2017-05-26 18:06:47

2011-11-10 09:46:41

云計(jì)算云管理

2020-06-18 15:15:02

物聯(lián)網(wǎng)網(wǎng)關(guān)物聯(lián)網(wǎng)IOT
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)