偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

老弟問我,RocketMQ 中的 ProcessQueue 怎么理解?

開發(fā) 前端
ProcessQueue 是 MessageQueue 的消費(fèi)快照,可以協(xié)助消費(fèi)者進(jìn)行消息拉取、消息消費(fèi)、更新偏移量、限流。

大家好,我是君哥。

今天來分享 RocketMQ 中一個(gè)非常重要又不太好理解的知識(shí)點(diǎn)-ProcessQueue。

一句話概括,ProcessQueue 就是 MessageQueue 的消費(fèi)快照。看下面這張圖:

圖片

1 ProcessQueue 構(gòu)建

RocketMQ 客戶端啟動(dòng)時(shí),會(huì)開啟一個(gè) rebalance 線程,代碼如下:

//MQClientInstance.java
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
//...
// Start rebalance service
this.rebalanceService.start();
//...
}
}
}

這個(gè)線程會(huì)不停的做重平衡操作,對(duì) ProcessQueue 進(jìn)行維護(hù)。在重平衡線程類 RebalanceImpl 定義了一個(gè)變量 processQueueTable,數(shù)據(jù)結(jié)構(gòu)如下:

圖片

可以看到,在 processQueueTable 這個(gè)數(shù)據(jù)結(jié)構(gòu)上維護(hù)了 MessageQueue 和 ProcessQueue 的映射。

下面看一下維護(hù) processQueueTable 的代碼:

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();

if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
//從processQueueTable上移除
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY://拉模式
break;
case CONSUME_PASSIVELY://推模式
//從processQueueTable上移除
break;
default:
break;
}
}
}
}
//創(chuàng)建ProcessQueue并放到processQueueTable
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//...
ProcessQueue pq = new ProcessQueue();

long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}

if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
//封裝好processQueueTable后再創(chuàng)建一個(gè)PullRequest進(jìn)行消息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}

this.dispatchPullRequest(pullRequestList);

return changed;
}

2 拉取消息

上一節(jié)中構(gòu)建 ProcessQueue 后,會(huì)再創(chuàng)建一個(gè) PullRequest,這個(gè) PullRequest 封裝了 MessageQueue 和 ProcessQueue,創(chuàng)建成功后被放到了 PullMessageService 中的 pullRequestQueue 變量:

//PullMessageService.java
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}

這里以 RocketMQ 的推模式為例,Consumer 拉取到消息后,會(huì)進(jìn)行如下處理:

  1. 對(duì)拉取到的消息根據(jù) TAG 再次
    進(jìn)行過濾;
  2. 更新 PullRequest 下次拉取的偏移量 nextOffset;
  3. 把拉取的消息封裝到 ProcessQueue 的 msgTreeMap(
    ?
    放到 msgTreeMap 之前首先要獲取到寫鎖 treeMapLock
    ?
    );
  4. 封裝 ConsumeRequest 進(jìn)行消息消費(fèi);
  5. 封裝消息拉取請(qǐng)求再次進(jìn)行拉取。

代碼如下:

//DefaultMQPushConsumerImpl.java
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//1. 對(duì)拉取到的消息根據(jù) TAG 再次進(jìn)行過濾
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
//2. 更新 PullRequest 下次拉取的偏移量 nextOffset
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
//3. 把拉取的消息封裝到 ProcessQueue 的 msgTreeMap
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//4. 封裝 ConsumeRequest 進(jìn)行消息消費(fèi)
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//5. 封裝消息拉取請(qǐng)求
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
break;
//...
}
}
}

3 消費(fèi)消息

在上一節(jié)提到過,拉取到消息后,會(huì)把消息封裝成一個(gè) ConsumeRequest,這個(gè)線程類會(huì)調(diào)用消費(fèi)者定義的 MessageListener 進(jìn)行消費(fèi)處理??匆幌略创a:

//ConsumeMessageConcurrentlyService.ConsumeRequest
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;

try {
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
}//...

if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
}

消息消費(fèi)成功后,會(huì)調(diào)用 processConsumeResult 方法進(jìn)行結(jié)果處理。對(duì)于廣播模式,發(fā)送失敗后不會(huì)做重試,相當(dāng)于把消息丟棄,而對(duì)于集群模式,消費(fèi)失敗的消息會(huì)發(fā)送到 Broker 端等待消費(fèi)者重新拉取進(jìn)行重試。

消費(fèi)結(jié)果處理完后,消費(fèi)成功的消息會(huì)從 ProcessQueue 的 msgTreeMap 中移除(需要獲取到寫鎖 treeMapLock),同時(shí)從 msgTreeMap 中獲取最小的 Offset 來更新對(duì)應(yīng) MessageQueue 的偏移量。這個(gè)邏輯可以參考下面代碼:

public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();

switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
break;
//...
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//...
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//消費(fèi)失敗的,發(fā)送回Broker
boolean result = this.sendMessageBack(msg, context);
//...
}

break;
default:
break;
}
//從msgTreeMap中移除并返回msgTreeMap第一條消息的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

4 消費(fèi)者限流

4.1 緩存消息數(shù)量

如果消費(fèi)者緩存的消息數(shù)量大于 RocketMQ 配置的閾值(默認(rèn) 1000),就會(huì)觸發(fā)延遲拉取,而消費(fèi)者緩存的消息數(shù)量就來自 ProcessQueue,看下面代碼:

long cachedMessageCount = processQueue.getMsgCount().get();
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}

4.2 緩存的消息大小

如果消費(fèi)者緩存的消息大小大于 RocketMQ 配置的閾值(默認(rèn) 100M),就會(huì)觸發(fā)延遲拉取,而消費(fèi)者緩存的消息大小就來自 ProcessQueue,看下面代碼:

long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}

4.3 消息間隔

對(duì)于普通消息,如果消費(fèi)偏移量間隔大于配置的閾值(默認(rèn) 2000),就會(huì)觸發(fā)延遲拉取,而消息間隔就來自 ProcessQueue,看下面代碼:

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
}

4.4 獲取鎖失敗

對(duì)于順序消息,如果獲取鎖失敗,也會(huì)觸發(fā)延遲拉取,而判斷獲取鎖是否成功,也是在 ProcessQueue,看下面代碼:

if (processQueue.isLocked()) {
//...
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}

5 總結(jié)

ProcessQueue 是 MessageQueue 的消費(fèi)快照,可以協(xié)助消費(fèi)者進(jìn)行消息拉取、消息消費(fèi)、更新偏移量、限流。最后,看一下 ProcessQueue 的數(shù)據(jù)結(jié)構(gòu):

圖片

責(zé)任編輯:武曉燕 來源: 君哥聊技術(shù)
相關(guān)推薦

2023-09-26 08:01:46

消費(fèi)者TopicRocketMQ

2020-11-13 16:40:05

RocketMQ延遲消息架構(gòu)

2019-11-15 13:52:06

機(jī)器學(xué)習(xí)Shapley計(jì)算

2021-01-13 10:28:16

Maven插件Mojo

2022-08-26 00:02:03

RocketMQ單體架構(gòu)MQ

2023-04-11 08:35:22

RocketMQ云原生

2022-06-13 11:05:35

RocketMQ消費(fèi)者線程

2022-07-11 11:06:11

RocketMQ函數(shù).消費(fèi)端

2020-09-24 14:40:55

Python 開發(fā)編程語言

2022-04-19 07:31:28

事務(wù)隔離機(jī)制數(shù)據(jù)庫

2020-04-24 09:26:15

RocketMQ分布式MetaQ

2021-10-08 07:53:01

Go 尋址元素

2024-04-29 07:42:20

數(shù)據(jù)庫Mybatis事務(wù)

2023-01-16 08:49:20

RocketMQ定時(shí)任務(wù)源代

2022-07-04 11:06:02

RocketMQ事務(wù)消息實(shí)現(xiàn)

2022-06-27 11:04:24

RocketMQ順序消息

2022-07-18 21:53:46

RocketMQ廣播消息

2023-12-25 19:28:59

RocketMQ大數(shù)據(jù)

2020-07-31 08:06:39

MySQL遞歸查詢

2014-11-11 15:25:30

PHPWeb
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)