偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

警惕!這八個(gè)場(chǎng)景下 RocketMQ 會(huì)發(fā)生流量控制

開(kāi)發(fā) 前端
本文介紹了 RocketMQ 發(fā)生流量控制的 8 個(gè)場(chǎng)景,其中 Broker 4 個(gè)場(chǎng)景,Consumer 4 個(gè)場(chǎng)景。Broker 的流量控制,本質(zhì)是對(duì) Producer 的流量控制,最好的解決方法就是給 Broker 擴(kuò)容,增加 Broker 寫(xiě)入能力。

大家好,我是君哥。

在使用 RocketMQ 的過(guò)程中,有時(shí)候我們會(huì)看到下面的日志:

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5

這是因?yàn)?RocketMQ 觸發(fā)了流量控制。今天我們來(lái)聊一聊哪些場(chǎng)景下 RocketMQ 會(huì)觸發(fā)流量控制。

如上圖,生產(chǎn)者把消息寫(xiě)入 Broker,Consumer 從 Broker 拉取消息。Broker 是 RocketMQ 的核心 ,觸發(fā)流量控制主要就是為了防止 Broker 壓力過(guò)大而宕機(jī)。

一、 Broker 流控

1、 broker busy

RockerMQ 默認(rèn)采用異步刷盤(pán)策略,Producer 把消息發(fā)送到 Broker 后,Broker 會(huì)先把消息寫(xiě)入 Page Cache,刷盤(pán)線程定時(shí)地把數(shù)據(jù)從 Page Cache 刷到磁盤(pán)上,如下圖:

那 broker busy 是怎么導(dǎo)致的呢?

Broker 默認(rèn)是開(kāi)啟快速失敗的,處理邏輯類(lèi)是 BrokerFastFailure,這個(gè)類(lèi)中有一個(gè)定時(shí)任務(wù)用來(lái)清理過(guò)期的請(qǐng)求,每 10 ms 執(zhí)行一次,代碼如下:

public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}

(1)Page Cache 繁忙

清理過(guò)期請(qǐng)求之前首先會(huì)判斷 Page Cache 是否繁忙,如果繁忙,就會(huì)給 Producer 返回一個(gè)系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),也就是本文開(kāi)頭的異常日志。那怎么判斷 Page Cache 繁忙呢?Broker 收到一條消息后會(huì)追加到 Page Cache 或者內(nèi)存映射文件,這個(gè)過(guò)程首先獲取一個(gè) CommitLog 寫(xiě)入鎖,如果持有鎖的時(shí)間大于 osPageCacheBusyTimeOutMills(默認(rèn) 1s,可以配置),就認(rèn)為 Page Cache 繁忙。具體代碼見(jiàn) DefaultMessageStore 類(lèi) isOSPageCacheBusy 方法。

(2)清理過(guò)期請(qǐng)求

清理過(guò)期請(qǐng)求時(shí),如果請(qǐng)求線程的創(chuàng)建時(shí)間到當(dāng)前系統(tǒng)時(shí)間間隔大于 waitTimeMillsInSendQueue(默認(rèn) 200ms,可以配置)就會(huì)清理這個(gè)請(qǐng)求,然后給 Producer 返回一個(gè)系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")。

system busy

這個(gè)異常在 NettyRemotingAbstract#processRequestCommand 方法。

拒絕請(qǐng)求

如果 NettyRequestProcessor 拒絕了請(qǐng)求,就會(huì)給 Producer 返回一個(gè)系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")。那什么情況下請(qǐng)求會(huì)被拒絕呢?看下面這段代碼:

//SendMessageProcessor類(lèi)
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}

從代碼中可以看到,請(qǐng)求被拒絕的情況有兩種可能,一個(gè)是 Page Cache 繁忙,另一個(gè)是 TransientStorePoolDeficient。

跟蹤 isTransientStorePoolDeficient 方法,發(fā)現(xiàn)判斷依據(jù)是在開(kāi)啟 transientStorePoolEnable 配置的情況下,是否還有可用的 ByteBuffer。

注意:在開(kāi)啟 transientStorePoolEnable 的情況下,寫(xiě)入消息時(shí)會(huì)先寫(xiě)入堆外內(nèi)存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盤(pán)。而讀取消息是從 Page Cache,這樣可以實(shí)現(xiàn)讀寫(xiě)分離,避免讀寫(xiě)都在 Page Cache 帶來(lái)的問(wèn)題。如下圖:

線程池拒絕

Broker 收到請(qǐng)求后,會(huì)把處理邏輯封裝成到 Runnable 中,由線程池來(lái)提交執(zhí)行,如果線程池滿了就會(huì)拒絕請(qǐng)求(這里線程池中隊(duì)列的大小默認(rèn)是 10000,可以通過(guò)參數(shù) sendThreadPoolQueueCapacity 進(jìn)行配置),線程池拒絕后會(huì)拋出異常 RejectedExecutionException,程序捕獲到異常后,會(huì)判斷是不是單向請(qǐng)求(OnewayRPC),如果不是,就會(huì)給 Producer 返回一個(gè)系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[OVERLOAD]system busy, start flow control for a while")。

判斷 OnewayRPC 的代碼如下,flag = 2 或者 3 時(shí)是單向請(qǐng)求:

public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}

(3) 消息重試

Broker 發(fā)生流量控制的情況下,返回給 Producer 系統(tǒng)繁忙的狀態(tài)碼(code=2),Producer 收到這個(gè)狀態(tài)碼是不會(huì)進(jìn)行重試的。下面是會(huì)進(jìn)行重試的響應(yīng)碼:

//DefaultMQProducer類(lèi)
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));

二、 Consumer 流控

DefaultMQPushConsumerImpl 類(lèi)中有 Consumer 流控的邏輯 。

1、 緩存消息數(shù)量超過(guò)閾值

ProcessQueue 保存的消息數(shù)量超過(guò)閾值(默認(rèn) 1000,可以配置),源碼如下:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

2、緩存消息大小超過(guò)閾值

ProcessQueue 保存的消息大小超過(guò)閾值(默認(rèn) 100M,可以配置),源碼如下:

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

3、 緩存消息跨度超過(guò)閾值

對(duì)于非順序消費(fèi)的場(chǎng)景,ProcessQueue 中保存的最后一條和第一條消息偏移量之差超過(guò)閾值(默認(rèn) 2000,可以配置)。源代碼如下:

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}

4、獲取鎖失敗

對(duì)于順序消費(fèi)的情況,ProcessQueue 加鎖失敗,也會(huì)延遲拉取,這個(gè)延遲時(shí)間默認(rèn)是 3s,可以配置。

三、總結(jié)

本文介紹了 RocketMQ 發(fā)生流量控制的 8 個(gè)場(chǎng)景,其中 Broker 4 個(gè)場(chǎng)景,Consumer 4 個(gè)場(chǎng)景。Broker 的流量控制,本質(zhì)是對(duì) Producer 的流量控制,最好的解決方法就是給 Broker 擴(kuò)容,增加 Broker 寫(xiě)入能力。而對(duì)于 Consumer 端的流量控制,需要解決 Consumer 端消費(fèi)慢的問(wèn)題,比如有第三方接口響應(yīng)慢或者有慢 SQL。

在使用的時(shí)候,根據(jù)打印的日志可以分析具體是哪種情況的流量控制,并采用相應(yīng)的措施。

責(zé)任編輯:姜華 來(lái)源: 君哥聊技術(shù)
相關(guān)推薦

2023-08-07 09:12:51

權(quán)限SpringSecurity

2025-02-10 10:38:24

2022-05-06 17:12:35

區(qū)塊鏈元宇宙

2010-02-03 23:04:31

流量控制P2P華夏創(chuàng)新

2022-05-26 00:33:29

權(quán)限TienChin項(xiàng)目

2023-10-08 12:14:42

Sentinel流量控制

2022-05-02 16:18:22

RocketMQBrokertopic

2015-01-06 09:48:34

Docker多租戶(hù)docker應(yīng)用

2018-04-09 12:44:45

Docker使用場(chǎng)景開(kāi)發(fā)

2011-06-23 09:09:37

流量控制

2013-07-22 14:25:29

iOS開(kāi)發(fā)ASIHTTPRequ

2024-05-13 18:33:08

SQL日期函數(shù)

2010-06-17 17:00:07

Linux流量控制

2021-03-09 07:38:15

Percona Xtr流量控制運(yùn)維

2010-06-04 10:49:58

Linux流量控制

2019-10-18 15:16:10

Redis數(shù)據(jù)庫(kù)并發(fā)

2021-08-12 10:05:06

MySQL數(shù)據(jù)庫(kù)MySQL

2021-11-19 10:25:23

MySQL數(shù)據(jù)庫(kù)架構(gòu)

2022-03-02 11:39:53

物聯(lián)網(wǎng)科技

2016-09-09 13:25:01

Linux
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)