偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

面試官:RocketMQ 長(zhǎng)輪詢是怎么實(shí)現(xiàn)的?

數(shù)據(jù)庫(kù) 其他數(shù)據(jù)庫(kù)
長(zhǎng)輪詢可以降低無(wú)效的輪詢請(qǐng)求,提升請(qǐng)求效率。RocketMQ 消費(fèi)者長(zhǎng)輪詢支持配置,當(dāng)消息量不太大,消費(fèi)者沒(méi)有必要頻繁地請(qǐng)求,這時(shí)可以設(shè)置成長(zhǎng)輪詢機(jī)制。需要注意的是,消費(fèi)端設(shè)置的請(qǐng)求超時(shí)時(shí)間必須大于 Broker 輪詢時(shí)間。

大家好,我是君哥。

我們知道,消息隊(duì)列消費(fèi)端獲取消息的方式包括推模式和拉模式,RocketMQ 并沒(méi)有實(shí)現(xiàn)推模式,RocketMQ 的推模式本質(zhì)上也是拉模式。他們?cè)趯?shí)現(xiàn)上有下面的不同:

  • 拉模式需要開(kāi)發(fā)在代碼里調(diào)用拉取消息的方法,拉取到消息后直接進(jìn)行消息處理;
  • 推模式是消費(fèi)者客戶端初始化時(shí)利用重平衡線程去拉取消息,拉取消息的方法會(huì)注冊(cè)回調(diào)函數(shù),拉取到消息后,由回調(diào)函數(shù)觸發(fā)監(jiān)聽(tīng)器(定義處理邏輯)進(jìn)行消息處理。

RocketMQ 為了提供拉取消息的效率,采用了長(zhǎng)輪詢機(jī)制,避免消費(fèi)端無(wú)效的輪詢請(qǐng)求。當(dāng)消費(fèi)者發(fā)送長(zhǎng)輪詢請(qǐng)求后,如果 Broker 上沒(méi)有新消息,則不會(huì)立刻返回,而是掛起請(qǐng)求,等待新消息到來(lái)或者請(qǐng)求超時(shí)。

今天來(lái)聊一聊 RocketMQ 的長(zhǎng)輪詢是怎么實(shí)現(xiàn)的。

1 長(zhǎng)輪詢

長(zhǎng)輪詢的流程如下圖:

圖片圖片

客戶端建立連接后,發(fā)送消息拉取請(qǐng)求,如果服務(wù)端有新消息,則返回消息。如果服務(wù)端沒(méi)有新消息,則掛起連接,等待新消息到來(lái)后給客戶端返回??蛻舳巳绻B接超時(shí),則斷開(kāi)連接。

2 RocketMQ 實(shí)現(xiàn)

2.1 消費(fèi)端

RocketMQ 消費(fèi)端長(zhǎng)輪詢有 2 個(gè)超時(shí)設(shè)置:

  • brokerSuspendMaxTimeMillis:長(zhǎng)輪詢,Consumer 拉消息請(qǐng)求在 Broker 掛起超過(guò)這個(gè)時(shí)間,就會(huì)給消費(fèi)端返回響應(yīng),無(wú)論有沒(méi)有新消息,單位毫秒。這個(gè)參數(shù)消費(fèi)端發(fā)送拉取請(qǐng)求時(shí)會(huì)發(fā)給 Broker,Broker 用來(lái)判斷這個(gè)長(zhǎng)連接是否超時(shí)。
  • consumerTimeoutMillisWhenSuspend:消費(fèi)端發(fā)送拉取請(qǐng)求的超時(shí)時(shí)間,這個(gè)時(shí)間要大于 brokerSuspendMaxTimeMillis,客戶端初始化時(shí)會(huì)有校驗(yàn)。

注意,這 2 個(gè)超時(shí)時(shí)間官方都不推薦修改。

if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
 throw new MQClientException(
  "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
   + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
  null);
}

2.2 Broker

RocketMQ 在 Broker 端通過(guò)設(shè)置 longPollingEnable 來(lái)開(kāi)啟長(zhǎng)輪詢,默認(rèn)是開(kāi)啟。

Broker 長(zhǎng)輪詢掛起時(shí)間使用 suspendTimeoutMillis 來(lái)進(jìn)行控制,前面提到過(guò),這個(gè)時(shí)間由消費(fèi)者發(fā)送的 brokerSuspendMaxTimeMillis 參數(shù)來(lái)賦值。

2.2.1 掛起消息

Broker 收到客戶端拉取消息請(qǐng)求后,如果沒(méi)有新消息,則將請(qǐng)求掛起,也就是將請(qǐng)求放到 pullRequestTable。

//PullMessageProcessor#processRequest
case ResponseCode.PULL_NOT_FOUND:

if (brokerAllowSuspend && hasSuspendFlag) {
//suspendTimeoutMillisLong 這個(gè)參數(shù)就是消費(fèi)端發(fā)來(lái)的 consumerTimeoutMillisWhenSuspend
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
   pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
  }

  String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
  PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
   this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//這里掛起消息
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
  response = null;
break;
 }

上面的 suspendPullRequest 調(diào)用了 PullRequestHoldService#suspendPullRequest,將請(qǐng)求保存在 pullRequestTable。

2.2.2 處理掛起

消息掛起后,后面怎么恢復(fù)呢?這里總需要一個(gè)線程去循環(huán)處理掛起的消息,這個(gè)處理邏輯也在 PullRequestHoldService,看下面代碼:

public void run() {
 log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
   //長(zhǎng)輪詢模式,等待 5s 后處理
   if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
    this.waitForRunning(5 * 1000);
   } //...
   //這里處理被掛起的請(qǐng)求
   this.checkHoldRequest();
  } catch (Throwable e) {
   log.warn(this.getServiceName() + " service has exception. ", e);
  }
 }//...
}

處理請(qǐng)求的邏輯參考下面代碼:

protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
  String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
   String topic = kArray[0];
   int queueId = Integer.parseInt(kArray[1]);
   finallong offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
   try {
    this.notifyMessageArriving(topic, queueId, offset);
   } catch (Throwable e) {
    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
   }
  }
 }
}

notifyMessageArriving 方法邏輯如下:

  1. 如果當(dāng)前請(qǐng)求有新消息到來(lái),則給消費(fèi)者返回響應(yīng);
  2. 如果當(dāng)前請(qǐng)求沒(méi)有新消息,但是掛起請(qǐng)求已經(jīng)超時(shí),則給消費(fèi)者返回響應(yīng);
  3. 否則, 繼續(xù)掛起,等待 5s 后重復(fù)執(zhí)行上面邏輯。

3 總結(jié)

長(zhǎng)輪詢可以降低無(wú)效的輪詢請(qǐng)求,提升請(qǐng)求效率。RocketMQ 消費(fèi)者長(zhǎng)輪詢支持配置,當(dāng)消息量不太大,消費(fèi)者沒(méi)有必要頻繁地請(qǐng)求,這時(shí)可以設(shè)置成長(zhǎng)輪詢機(jī)制。需要注意的是,消費(fèi)端設(shè)置的請(qǐng)求超時(shí)時(shí)間必須大于 Broker 輪詢時(shí)間。


責(zé)任編輯:武曉燕 來(lái)源: 君哥聊技術(shù)
相關(guān)推薦

2022-12-05 10:47:08

RocketMQ灰度消息

2024-02-04 10:08:34

2024-12-25 15:44:15

2023-02-08 07:04:20

死鎖面試官單元

2024-10-15 10:00:06

2025-02-26 12:19:52

2021-09-27 07:11:18

MySQLACID特性

2021-09-07 10:44:33

Java 注解開(kāi)發(fā)

2021-02-19 10:02:57

HTTPSJava安全

2025-04-08 00:00:00

@AsyncSpring異步

2021-11-02 09:05:25

Redis

2025-03-07 00:00:10

2024-02-20 14:10:55

系統(tǒng)緩存冗余

2024-03-05 10:33:39

AOPSpring編程

2024-08-22 10:39:50

@Async注解代理

2023-02-08 08:32:41

輪詢鎖

2024-09-11 22:51:19

線程通訊Object

2020-12-09 10:29:53

SSH加密數(shù)據(jù)安全

2023-11-20 10:09:59

2024-02-22 15:36:23

Java內(nèi)存模型線程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)