偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

阿里二面:RocketMQ 消費(fèi)者拉取一批消息,其中部分消費(fèi)失敗了,偏移量怎樣更新?

人工智能 機(jī)器學(xué)習(xí)
如果一批消息按照順序消費(fèi),是不可能出現(xiàn)第 100 條消息消費(fèi)成功了,但第 50 條消費(fèi)失敗的情況,因?yàn)榈?50 條消息失敗的時(shí)候,應(yīng)該退出循環(huán),不再繼續(xù)進(jìn)行消費(fèi)。

大家好,我是君哥。

最近有讀者參加面試時(shí)被問(wèn)了一個(gè)問(wèn)題,如果消費(fèi)者拉取了一批消息,比如 100 條,第 100 條消息消費(fèi)成功了,但是第 50 條消費(fèi)失敗,偏移量會(huì)怎樣更新?就著這個(gè)問(wèn)題,今天來(lái)聊一下,如果一批消息有消費(fèi)失敗的情況時(shí),偏移量怎么保存。

1 拉取消息

1.1 封裝拉取請(qǐng)求

以 RocketMQ 推模式為例,RocketMQ 消費(fèi)者啟動(dòng)代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

上面的 DefaultMQPushConsumer 是一個(gè)推模式的消費(fèi)者,啟動(dòng)方法是 start。消費(fèi)者啟動(dòng)后會(huì)觸發(fā)重平衡線程(RebalanceService),這個(gè)線程的任務(wù)是在死循環(huán)中不停地進(jìn)行重平衡,最終封裝拉取消息的請(qǐng)求到 pullRequestQueue。這個(gè)過(guò)程涉及到的 UML 類圖如下:

圖片

1.2 處理拉取請(qǐng)求

封裝好拉取消息的請(qǐng)求 PullRequest 后,RocketMQ 就會(huì)不停地從 pullRequestQueue 獲取消息拉取請(qǐng)求進(jìn)行處理。UML 類圖如下:

圖片

拉取消息的入口方法是一個(gè)死循環(huán),代碼如下:

//PullMessageService
public void run(){
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

這里拉取到消息后,提交給 PullCallback 這個(gè)回調(diào)函數(shù)進(jìn)行處理。

拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封裝到 ConsumeRequest 這個(gè)線程類來(lái)處理。把代碼精簡(jiǎn)后,ConsumeRequest 處理邏輯如下:

//ConsumeMessageConcurrentlyService.java
public void run(){
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
try {
//1.執(zhí)行消費(fèi)邏輯,這里的邏輯是在文章開(kāi)頭的代碼中定義的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
}
if (!processQueue.isDropped()) {
//2.處理消費(fèi)結(jié)果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

2 處理消費(fèi)結(jié)果

2.1 并發(fā)消息

并發(fā)消息處理消費(fèi)結(jié)果的代碼做精簡(jiǎn)后如下:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
){
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
break;
case RECONSUME_LATER:
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
}
break;
default:
break;
}

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

從上面的代碼可以看出,如果處理消息的邏輯是串行的,比如文章開(kāi)頭的代碼使用 for 循環(huán)來(lái)處理消息,那如果在某一條消息處理失敗了,直接退出循環(huán),給 ConsumeConcurrentlyContext 的 ackIndex 變量賦值為消息列表中失敗消息的位置,這樣這條失敗消息后面的消息就不再處理了,發(fā)送給 Broker 等待重新拉取。代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
for (int i = 0; i < msgs.size(); i++) {
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
context.setAckIndex(i);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

消費(fèi)成功的消息則從 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要發(fā)送消息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果處理消息的邏輯是并行的,處理消息失敗后給 ackIndex 賦值是沒(méi)有意義的,因?yàn)榭赡苡卸鄺l消息失敗,給 ackIndex 變量賦值并不準(zhǔn)確。最好的方法就是給 ackIndex 賦值 0,整批消息全部重新消費(fèi),這樣又可能帶來(lái)冥等問(wèn)題。

2.2 順序消息

對(duì)于順序消息,從 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量時(shí),是從 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。

3 總結(jié)

回到開(kāi)頭的問(wèn)題,如果一批消息按照順序消費(fèi),是不可能出現(xiàn)第 100 條消息消費(fèi)成功了,但第 50 條消費(fèi)失敗的情況,因?yàn)榈?50 條消息失敗的時(shí)候,應(yīng)該退出循環(huán),不再繼續(xù)進(jìn)行消費(fèi)。

如果是并發(fā)消費(fèi),如果出現(xiàn)了這種情況,建議是整批消息全部重新消費(fèi),也就是給 ackIndex 賦值 0,這樣必須考慮冥等問(wèn)題。

責(zé)任編輯:武曉燕 來(lái)源: 君哥聊技術(shù)
相關(guān)推薦

2022-03-14 11:05:01

RocketMQRedis緩存

2022-06-02 10:54:16

BrokerRocketMQ

2023-03-14 08:45:25

RocketMQ消息消費(fèi)

2022-08-15 10:45:34

RocketMQ消息隊(duì)列

2021-12-17 08:17:00

RocketMQ數(shù)據(jù)結(jié)構(gòu)消息中間件

2024-01-24 09:00:31

SSD訂閱關(guān)系內(nèi)存

2024-04-22 00:00:00

RocketMQ優(yōu)化位點(diǎn)

2022-07-07 09:00:49

RocketMQ消費(fèi)者消息消費(fèi)

2022-11-08 07:36:17

RocketMQ消費(fèi)者消息堆積

2009-04-15 11:17:23

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2011-08-05 16:21:24

2011-07-22 16:25:38

CA TechnoloIT消費(fèi)化

2023-06-01 08:08:38

kafka消費(fèi)者分區(qū)策略

2021-04-20 08:32:51

消息MQ隊(duì)列

2024-03-14 11:58:43

2015-08-26 09:39:30

java消費(fèi)者

2025-02-26 07:53:21

2022-05-09 11:15:05

RocketMQPULL 模式PUSH 模式

2021-03-01 07:31:53

消息支付高可用
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)