偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

阿里二面:RocketMQ 集群 Broker 掛了,會(huì)造成什么影響?

開發(fā) 架構(gòu)
今天分享 RocketMQ 的 Broker 掛了,會(huì)帶來什么影響。

大家好,我是君哥。今天分享 RocketMQ 的 Broker 掛了,會(huì)帶來什么影響。

面試官:你好,如果 RocketMQ 集群中的一個(gè) Broker 掛了,會(huì)造成什么影響呢? 

:Broker 掛了,首先會(huì)導(dǎo)致 Producer 發(fā)送消息失敗。對(duì)于普通消息,Producer 同步發(fā)送的情況下會(huì)有重試機(jī)制,重試時(shí)把消息發(fā)送到其他 Broker。如下圖,Broker1 宕機(jī)了,把消息發(fā)送到了 Broker2:

圖片

發(fā)送消息的邏輯其實(shí)是是一個(gè)循環(huán),發(fā)送失敗后會(huì)不斷嘗試重新發(fā)送,代碼如下:

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
try {
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
//如果發(fā)送失敗了,這里會(huì)進(jìn)行重試
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
}//省略其他 catch
} else {
break;
}
}

對(duì)于異步發(fā)送和單邊消息是不會(huì)重試的,因此對(duì)于異步和單邊消息,就只能發(fā)送失敗了。而對(duì)于同步消息,可以通過重試的方式發(fā)送到其他的 Broker 上。

面試官:在同步的情況下,Producer 重試時(shí)怎么保證不把消息發(fā)送到掛掉的 Broker 上呢? 

:Producer 默認(rèn)采用 round-robin 的方式,重試前會(huì)記錄上一次發(fā)送消息的 Broker,然后選擇下一個(gè) Broker。代碼如下:

//lastBrokerName 記錄了上一次發(fā)送的 Broker Name
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
//Broker Name 不等于上次的,才會(huì)返回
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}

面試官:在大流量的場(chǎng)景下,可能會(huì)有大量消費(fèi)發(fā)送到失敗的 Broker,這樣導(dǎo)致大量的消息需要重試,對(duì)性能影響會(huì)很大,有什么解決方法嗎?

:RocketMQ 有延遲隔離策略,如果發(fā)送某一個(gè) Broker 失敗了,會(huì)將其隔離,優(yōu)先選擇正常的 Broker 發(fā)送消息。需要注意的是,這個(gè)策略默認(rèn)是不開啟的。

面試官:怎么開啟延遲隔離策略呢? 

:需要在初始化 Producer 的時(shí)候定義,見下面代碼第二行:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setSendLatencyFaultEnable(true);
producer.start();

開啟之后,發(fā)送消息時(shí)會(huì)記錄發(fā)送消息花費(fèi)的時(shí)間下面 latencyMax 變量,超過一定時(shí)間,這個(gè) Broker 就會(huì)在一段時(shí)間內(nèi)不允許發(fā)送(下面 notAvailableDuration 變量)。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

具體邏輯可以參考類 MQFaultStrategy。

面試官:剛剛聊的是對(duì)普通消息的影響,那對(duì)順序消息有什么影響呢?

:對(duì)于全局順序消息,如果設(shè)置了所有消息要發(fā)送到同一個(gè) Broker 的同一個(gè) MessageQueue 中的情況,恰好是這個(gè) Broker 掛了,那就只能等 Broker 重啟后再發(fā)送了。而對(duì)于局部順序消息,比如同一個(gè)訂單相關(guān)的消息要發(fā)送到同一個(gè) Broker 的同一個(gè) MessageQueue 中的情況,如果這個(gè) Broker 掛了,那 MessageQueueSelector 會(huì)選擇其他 Broker 上的 MessageQueue 進(jìn)行發(fā)送,這會(huì)影響當(dāng)前這筆訂單消費(fèi)的順序性。而其他訂單可以被 Producer 發(fā)送到其他的隊(duì)列中,不受影響。如下圖:

圖片

Broker1 掛之前,Order1 的消息發(fā)送到了 Broker1,Broker1 掛之后,Order1 的消息被發(fā)送到了 Broker2。在 Broker1 恢復(fù)前,消費(fèi)者只能消費(fèi) Broker2 上拉取 Order1 的消息,Broker1 恢復(fù)后消費(fèi)者線程再從 Broker1 拉取,因此 Order1 的消息產(chǎn)生亂序。這里假設(shè)沒有從節(jié)點(diǎn)。

面試官:Broker 掛了,對(duì) 消費(fèi)者有影響嗎? 

:如果 Broker 沒有設(shè)置主從集群,消費(fèi)者會(huì)繼續(xù)從掛掉的 Broker 上拉取,這會(huì)導(dǎo)致拉取失敗,直到 NameServer 更新了 Broker 列表。

面試官:NameServer 什么時(shí)候會(huì)更新 Broker 列表呢? 

:NameServer 會(huì)有每 10s 一次的定時(shí)任務(wù)檢查 Broker 是否下線了,如果 120s 內(nèi)有沒有收到 Broker 心跳,則關(guān)閉 channel,把 Broker 信息從本地緩存移除。消費(fèi)者則默認(rèn)每隔 30s 向 NameServer 拉取路由信息來刷新本地緩存的 Broker 列表。也就是說可能會(huì)有最多 150s 的時(shí)間消費(fèi)者拉取消息失敗。如下圖:

圖片

面試官:如果 Broker 集群配置了從節(jié)點(diǎn),還會(huì)有上面的影響嗎? 

:如果有從節(jié)點(diǎn),在 Broker 主節(jié)點(diǎn)恢復(fù)前,生產(chǎn)者是不能往從節(jié)點(diǎn)發(fā)送消息的,但是消費(fèi)者可以去從節(jié)點(diǎn)拉取消息。

面試官:消費(fèi)者什么時(shí)候會(huì)去 Broker 從節(jié)點(diǎn)拉取消息呢? 

:Broker 掛了以后,消費(fèi)組會(huì)通過向 Name Server 拉取訂閱關(guān)系來更新本地緩存的 Broker 列表,因?yàn)橹鞴?jié)點(diǎn)已經(jīng)不在列表中了,所以會(huì)從從節(jié)點(diǎn)列表中選擇一個(gè) Broker 進(jìn)項(xiàng)消息拉取。

面試官:如果主節(jié)點(diǎn)沒有掛,消費(fèi)者會(huì)去從節(jié)點(diǎn)拉取消息嗎? 

:在主節(jié)點(diǎn)系統(tǒng)壓力較大的時(shí)候,消費(fèi)者也會(huì)去從節(jié)點(diǎn)拉取消息??梢詤⒖枷旅娴拇a:

//DefaultMessageStore 類
//maxOffsetPy:最大物理偏移量
//maxPhyOffsetPulling:這次消息拉取的最大偏移量
//diff:還沒有被拉取的消息總長度
long diff = maxOffsetPy - maxPhyOffsetPulling;
//TOTAL_PHYSICAL_MEMORY_SIZE:系統(tǒng)總的物理內(nèi)存大小
//getAccessMessageInMemoryMaxRatio 默認(rèn)是 40
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);

從上面的代碼可以看出,當(dāng)未處理的消息超出物理內(nèi)存 40% 時(shí)就會(huì)去從節(jié)點(diǎn)拉取。需要注意兩點(diǎn):

  1. 需要設(shè)置 slaveReadEnable 參數(shù)為 true,才能去從節(jié)點(diǎn)讀取數(shù)據(jù)。
  2. 需要配置 whichBrokerWhenConsumeSlowly 參數(shù)來決定從哪個(gè)從 brokerId 讀取。參考下面這段代碼:
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// consume too slow ,redirect to another machine
if (getMessageResult.isSuggestPullingFromSlave()) {
//這里配置從哪個(gè)從節(jié)點(diǎn)拉取
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
//...
}
  1. brokerId 默認(rèn)是 0,也就是主節(jié)點(diǎn),如果主節(jié)點(diǎn)掛了并且長期啟動(dòng)失敗,這個(gè)參數(shù)也是需要改成可以長期拉取的一個(gè)從節(jié)點(diǎn)。

面試官:Broker 主節(jié)點(diǎn)掛了,如果成功從節(jié)點(diǎn)拉取消息,可能會(huì)重復(fù)消費(fèi)嗎?

:對(duì)于廣播模式,消息偏移量是保存在消費(fèi)者本地的,只要消費(fèi)者不掛,按照內(nèi)存中的偏移量去從節(jié)點(diǎn)拉取就行了,不會(huì)有問題。對(duì)于集群模式,消息偏移量保存在 Broker,路徑如下:

/${rocketmq.client.localOffsetStoreDir}/.rocketmq_offsets/${clientId}/${groupName}/offsets.json

消費(fèi)者消費(fèi)完一批消息后,會(huì)向 Broker 發(fā)送請(qǐng)求更新 Broker 內(nèi)存中保存的偏移量,內(nèi)存中的偏移量會(huì)定時(shí)(每 5s 一次)更新到上面文件中。如果 Broker 主節(jié)點(diǎn)不掛,無論消費(fèi)者從主節(jié)點(diǎn)還是從節(jié)點(diǎn)拉取消息,更新偏移量的請(qǐng)求都會(huì)發(fā)送到主節(jié)點(diǎn),從節(jié)點(diǎn)會(huì)每隔 10s 從主節(jié)點(diǎn)同步偏移量,如下圖:

圖片

代碼如下:

//BrokerController 類 handleSlaveSynchronize
if (role == BrokerRole.SLAVE) {
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
}

也就是說,如果主節(jié)點(diǎn)掛了,去從節(jié)點(diǎn)拉取消息,可能因?yàn)槠屏繘]有同步到主節(jié)點(diǎn),從節(jié)點(diǎn)保存的偏移量不正確。不過只要消費(fèi)者不宕機(jī),就會(huì)根據(jù)消費(fèi)者本地保存的偏移量去拉取,并不會(huì)拉取到重復(fù)消息。

面試官:如果 Broker 主節(jié)點(diǎn)重啟了,主節(jié)點(diǎn)并不能同步從節(jié)點(diǎn)的最新偏移量,那消費(fèi)者從主節(jié)點(diǎn)讀取會(huì)讀到重復(fù)消息嗎? 

:如果主節(jié)點(diǎn)重啟了,如果消費(fèi)者會(huì)用本地保存的偏移量去主節(jié)點(diǎn)拉取消息,主節(jié)點(diǎn)會(huì)更新本地的偏移量,同時(shí)從節(jié)點(diǎn)也會(huì)去主節(jié)點(diǎn)同步偏移量,所以并不會(huì)拉取到重復(fù)消息。如果消費(fèi)者也掛了,消費(fèi)者重啟后 Broker 主節(jié)點(diǎn)的偏移量還沒有被其他消費(fèi)者更新過,那確實(shí)會(huì)拉取到重復(fù)消息。

面試官:恭喜你,通過了。

責(zé)任編輯:姜華 來源: 君哥聊技術(shù)
相關(guān)推薦

2022-10-18 08:38:16

內(nèi)存泄漏線程

2022-06-02 10:54:16

BrokerRocketMQ

2021-03-17 15:54:32

IO零拷貝方式

2021-04-25 09:58:48

mmapJava面試

2025-07-01 07:21:15

2021-10-27 20:54:24

分庫分表高并發(fā)

2022-05-02 16:18:22

RocketMQBrokertopic

2022-03-14 11:05:01

RocketMQRedis緩存

2024-05-24 10:15:36

2022-09-13 14:42:35

Redis內(nèi)存函數(shù)

2021-12-28 14:53:47

Java編程語言

2024-12-16 09:11:57

2022-04-15 11:26:14

緩存功能

2024-03-22 13:31:00

線程策略線程池

2013-04-25 11:45:39

Google Glas

2025-02-26 07:53:21

2023-10-30 01:02:56

Java類類加載器雙親委派

2020-10-27 08:58:47

設(shè)計(jì)NUMA內(nèi)存

2021-06-17 09:16:34

MySQL數(shù)據(jù)庫隔離級(jí)別

2019-08-07 15:51:15

5G網(wǎng)絡(luò)運(yùn)營商
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)