面試官:RocketMQ 長輪詢是怎么實(shí)現(xiàn)的?
大家好,我是君哥。
我們知道,消息隊(duì)列消費(fèi)端獲取消息的方式包括推模式和拉模式,RocketMQ 并沒有實(shí)現(xiàn)推模式,RocketMQ 的推模式本質(zhì)上也是拉模式。他們在實(shí)現(xiàn)上有下面的不同:
- 拉模式需要開發(fā)在代碼里調(diào)用拉取消息的方法,拉取到消息后直接進(jìn)行消息處理;
- 推模式是消費(fèi)者客戶端初始化時(shí)利用重平衡線程去拉取消息,拉取消息的方法會注冊回調(diào)函數(shù),拉取到消息后,由回調(diào)函數(shù)觸發(fā)監(jiān)聽器(定義處理邏輯)進(jìn)行消息處理。
RocketMQ 為了提供拉取消息的效率,采用了長輪詢機(jī)制,避免消費(fèi)端無效的輪詢請求。當(dāng)消費(fèi)者發(fā)送長輪詢請求后,如果 Broker 上沒有新消息,則不會立刻返回,而是掛起請求,等待新消息到來或者請求超時(shí)。
今天來聊一聊 RocketMQ 的長輪詢是怎么實(shí)現(xiàn)的。
1 長輪詢
長輪詢的流程如下圖:
圖片
客戶端建立連接后,發(fā)送消息拉取請求,如果服務(wù)端有新消息,則返回消息。如果服務(wù)端沒有新消息,則掛起連接,等待新消息到來后給客戶端返回。客戶端如果連接超時(shí),則斷開連接。
2 RocketMQ 實(shí)現(xiàn)
2.1 消費(fèi)端
RocketMQ 消費(fèi)端長輪詢有 2 個(gè)超時(shí)設(shè)置:
- brokerSuspendMaxTimeMillis:長輪詢,Consumer 拉消息請求在 Broker 掛起超過這個(gè)時(shí)間,就會給消費(fèi)端返回響應(yīng),無論有沒有新消息,單位毫秒。這個(gè)參數(shù)消費(fèi)端發(fā)送拉取請求時(shí)會發(fā)給 Broker,Broker 用來判斷這個(gè)長連接是否超時(shí)。
- consumerTimeoutMillisWhenSuspend:消費(fèi)端發(fā)送拉取請求的超時(shí)時(shí)間,這個(gè)時(shí)間要大于 brokerSuspendMaxTimeMillis,客戶端初始化時(shí)會有校驗(yàn)。
注意,這 2 個(gè)超時(shí)時(shí)間官方都不推薦修改。
if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
throw new MQClientException(
"Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
2.2 Broker
RocketMQ 在 Broker 端通過設(shè)置 longPollingEnable 來開啟長輪詢,默認(rèn)是開啟。
Broker 長輪詢掛起時(shí)間使用 suspendTimeoutMillis 來進(jìn)行控制,前面提到過,這個(gè)時(shí)間由消費(fèi)者發(fā)送的 brokerSuspendMaxTimeMillis 參數(shù)來賦值。
2.2.1 掛起消息
Broker 收到客戶端拉取消息請求后,如果沒有新消息,則將請求掛起,也就是將請求放到 pullRequestTable。
//PullMessageProcessor#processRequest
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
//suspendTimeoutMillisLong 這個(gè)參數(shù)就是消費(fèi)端發(fā)來的 consumerTimeoutMillisWhenSuspend
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//這里掛起消息
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
上面的 suspendPullRequest 調(diào)用了 PullRequestHoldService#suspendPullRequest,將請求保存在 pullRequestTable。
2.2.2 處理掛起
消息掛起后,后面怎么恢復(fù)呢?這里總需要一個(gè)線程去循環(huán)處理掛起的消息,這個(gè)處理邏輯也在 PullRequestHoldService,看下面代碼:
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
//長輪詢模式,等待 5s 后處理
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} //...
//這里處理被掛起的請求
this.checkHoldRequest();
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}//...
}
處理請求的邏輯參考下面代碼:
protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
finallong offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
notifyMessageArriving 方法邏輯如下:
- 如果當(dāng)前請求有新消息到來,則給消費(fèi)者返回響應(yīng);
- 如果當(dāng)前請求沒有新消息,但是掛起請求已經(jīng)超時(shí),則給消費(fèi)者返回響應(yīng);
- 否則, 繼續(xù)掛起,等待 5s 后重復(fù)執(zhí)行上面邏輯。
3 總結(jié)
長輪詢可以降低無效的輪詢請求,提升請求效率。RocketMQ 消費(fèi)者長輪詢支持配置,當(dāng)消息量不太大,消費(fèi)者沒有必要頻繁地請求,這時(shí)可以設(shè)置成長輪詢機(jī)制。需要注意的是,消費(fèi)端設(shè)置的請求超時(shí)時(shí)間必須大于 Broker 輪詢時(shí)間。