偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

RocketMQ消息回溯實(shí)踐與解析

開(kāi)發(fā) 前端
消息回溯功能是 RocketMQ 提供給業(yè)務(wù)方的定心丸,業(yè)務(wù)在出現(xiàn)任何無(wú)法恢復(fù)的問(wèn)題后,都可以及時(shí)通過(guò)消息回溯來(lái)恢復(fù)業(yè)務(wù)或者訂正數(shù)據(jù)。特別是在流或者批計(jì)算的場(chǎng)景,重跑數(shù)據(jù)往往是常態(tài)。

1 問(wèn)題背景

前段時(shí)間,小A公司的短信服務(wù)出現(xiàn)了問(wèn)題,導(dǎo)致一段時(shí)間內(nèi)的短信沒(méi)有發(fā)出去,等服務(wù)修復(fù)后,需要重新補(bǔ)發(fā)這批數(shù)據(jù)。

由于短信服務(wù)是直接通過(guò)RocketMQ觸發(fā),因此在修復(fù)這些數(shù)據(jù)的時(shí)候,小A犯了難,于是就有了以下對(duì)話

領(lǐng)導(dǎo):小A呀,這數(shù)據(jù)這么多,你準(zhǔn)備怎么修呀?

小A:頭大呀領(lǐng)導(dǎo),一般業(yè)務(wù)我們都有一個(gè)本地消息表來(lái)做冪等,我只需要把數(shù)據(jù)庫(kù)表的狀態(tài)重置,然后把數(shù)據(jù)撈出來(lái)重新循環(huán)執(zhí)行就可以啦,但是短信服務(wù)我們沒(méi)有本地表呀!

領(lǐng)導(dǎo):那你有什么想法嗎?

小A:簡(jiǎn)單的話,那就讓上游重發(fā)吧,我們?cè)傧M(fèi)一遍就好了。

領(lǐng)導(dǎo):這樣問(wèn)題就更嚴(yán)重了呀,你想,上游重發(fā)一遍,那是不是所有的消費(fèi)者組都要重新消費(fèi)一遍,到時(shí)候其他業(yè)務(wù)同學(xué)就要來(lái)找你了。

小A:那就不好辦了。。。

領(lǐng)導(dǎo):其實(shí)RocketMQ有專門(mén)的消息回溯的能力,你可以試試

小A:這么神奇?我研究研究。。。

2 驗(yàn)證

2.1 生產(chǎn)者啟動(dòng)

準(zhǔn)備一個(gè)新的topic,并發(fā)送1W條消息

public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

2.2 消費(fèi)者啟動(dòng)

準(zhǔn)備一個(gè)新的消費(fèi)者組,消費(fèi)topic下數(shù)據(jù)并記錄總條數(shù)

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    
    final AtomicInteger count = new AtomicInteger();
    
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            count.incrementAndGet();
            System.out.printf("%s Receive New Messages End: %s %n", Thread.currentThread().getName(), msgs);
            System.out.println(count.get());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
}

消費(fèi)者消息記錄消費(fèi)者消息記錄

2.3 執(zhí)行回溯

命令行執(zhí)行

mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000

以下為mqadmin.cmd的內(nèi)容,因此也可以直接通過(guò)調(diào)用MQAdminStartup的main方法執(zhí)行

MQAdminStartup手動(dòng)執(zhí)行MQAdminStartup手動(dòng)執(zhí)行

代碼執(zhí)行:

public static void main(String[] args) {
    String[] params = new String[]{"resetOffsetByTime","-n","127.0.0.1:9876","-t", "TopicTest", "-g", "please_rename_unique_group_name_4", "-s", "1722240069000"};
    MQAdminStartup.main(params);
}

2.4 結(jié)果驗(yàn)證

客戶端重置成功記錄客戶端重置成功記錄

消費(fèi)者重新消費(fèi)記錄消費(fèi)者重新消費(fèi)記錄

2.5 驗(yàn)證小結(jié)

從結(jié)果上來(lái)看,消費(fèi)者offset被重置到了指定的時(shí)間戳位置,由于指定時(shí)間戳早于最早消息的創(chuàng)建時(shí)間,因此重新消費(fèi)了所有未被刪除的消息。

那rocketmq究竟做了什么呢?

2.5.1 分析參數(shù)

動(dòng)作標(biāo)識(shí):resetOffsetByTime

額外參數(shù):

-n nameserver的地址

-t 指定topic名稱

-g 指定消費(fèi)者組名稱

-s 指定回溯時(shí)間

2.5.2 思考

消息回溯思考消息回溯思考

3 分析

以下源碼部分均出自4.2.0版本,展示代碼有所精簡(jiǎn)。

3.1 策略模式,解析命令行

org.apache.rocketmq.tools.command.MQAdminStartup#main

/*根據(jù)動(dòng)作標(biāo)識(shí)解析除對(duì)應(yīng)的處理類,我們本次請(qǐng)求實(shí)際處理策略類:ResetOffsetByTimeCommand*/
SubCommand cmd = findSubCommand(args[0]);
/*解析命令行*/
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
                            new PosixParser());
                            
/*提交請(qǐng)求執(zhí)行*/
cmd.execute(commandLine, options, rpcHook);

3.2 創(chuàng)建客戶端,與服務(wù)端交互

org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand#execute

public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
    
    String group = commandLine.getOptionValue("g").trim();//消費(fèi)者組
    String topic = commandLine.getOptionValue("t").trim();//主題
    String timeStampStr = commandLine.getOptionValue("s").trim();//重置時(shí)間戳
    long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);//重置時(shí)間戳
    boolean isC = false;//是否C客戶端
    boolean force = true;//是否強(qiáng)制重置,這里提前解釋一下,有可能時(shí)間戳對(duì)應(yīng)的offset比當(dāng)前消費(fèi)進(jìn)度要大,強(qiáng)制的話會(huì)出現(xiàn)部分消息消費(fèi)不到
    if (commandLine.hasOption('f')) {
        force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
    }

    /*與nameserver以及broker交互的客戶端啟動(dòng)*/
    defaultMQAdminExt.start();
    /*正式執(zhí)行命令*/
    Map<MessageQueue, Long> offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
}

3.3 獲取topic對(duì)應(yīng)的broker地址,提交重置請(qǐng)求

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp

public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
    boolean isC)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    /*從nameserver處獲取broker地址*/
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    /*由于消息數(shù)據(jù)分區(qū)分片,topic下的messagequeue可能存在多個(gè)broker上,因此這是個(gè)列表*/
    List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
    Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
    if (brokerDatas != null) {
        for (BrokerData brokerData : brokerDatas) {
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                /*循環(huán)與各個(gè)broker交互,執(zhí)行重置操作*/
                Map<MessageQueue, Long> offsetTable =
                    this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
                        timeoutMillis, isC);
                if (offsetTable != null) {
                    allOffsetTable.putAll(offsetTable);
                }
            }
        }
    }
    return allOffsetTable;
}

3.4 與 nameserver交互獲取broker地址

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#examineTopicRouteInfo

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
 /*同樣的組裝參數(shù),請(qǐng)求碼:105*/
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);

    /*創(chuàng)建請(qǐng)求與nameserver交互*/
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    byte[] body = response.getBody();
    if (body != null) {
        return TopicRouteData.decode(body, TopicRouteData.class);
    }
}

3.4.1 nameserver收到請(qǐng)求,獲取路由信息并返回

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    /*nameserver內(nèi)部存儲(chǔ)topic的路由信息*/
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
 byte[] content = topicRouteData.encode();
    response.setBody(content);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

3.4.2 RouteInfoManager的核心屬性

//topic路由信息,根據(jù)這個(gè)做負(fù)載均衡,QueueData里面記錄brokerName
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//broke基本信息 名稱  所在集群信息   主備broke地址  brokerId=0表示master   >0表示slave
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//集群信息,包含集群所有的broke信息
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//存活的broke信息,以及對(duì)應(yīng)的channel
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//broke的過(guò)濾類信息
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

3.5 與broker交互,執(zhí)行重置操作

org.apache.rocketmq.client.impl.MQClientAPIImpl#invokeBrokerToResetOffset

public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
    final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
    throws RemotingException, MQClientException, InterruptedException {
    
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timestamp);
    requestHeader.setForce(isForce);

    /*同樣的組裝參數(shù),請(qǐng)求碼:222*/
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
    if (isC) {
        request.setLanguage(LanguageCode.CPP);
    }
 /*創(chuàng)建請(qǐng)求與broker交互,注意這里是同步invokeSync*/
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
    if (response.getBody() != null) {
        ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
        return body.getOffsetTable();
    }
}

broker收到請(qǐng)求,開(kāi)始處理;

org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset

public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
    boolean isC) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);

    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);

    /*記錄下該消費(fèi)者組消費(fèi)topic下的隊(duì)列要重置到哪條offset*/
    Map<MessageQueue/*隊(duì)列*/, Long/*offser*/> offsetTable = new HashMap<MessageQueue, Long>();

    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setTopic(topic);
        mq.setQueueId(i);

        /*broker可以獲取該topic下的consumergroup下的某個(gè)隊(duì)列的offset*/
        long consumerOffset =
            this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);//消費(fèi)者組當(dāng)前已經(jīng)消費(fèi)的offset
        if (-1 == consumerOffset) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("THe consumer group <%s> not exist", group));
            return response;
        }

        long timeStampOffset;
        if (timeStamp == -1) {
   //沒(méi)有指定表示當(dāng)前隊(duì)列最大的offset
            timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        } else {
            //根據(jù)時(shí)間戳查到隊(duì)列下對(duì)應(yīng)的offset
            timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
        }

        if (timeStampOffset < 0) {
            //<0表示消息已經(jīng)被刪掉了
            log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
            timeStampOffset = 0;
        }

        /*如果isForce=false,則要重置的offset<當(dāng)前正在消費(fèi)的offset才會(huì)重置。也過(guò)來(lái),也就是說(shuō)重置不僅會(huì)回溯,消費(fèi)進(jìn)度過(guò)慢也可以往后撥,加快消費(fèi)進(jìn)度*/
        if (isForce || timeStampOffset < consumerOffset) {
            offsetTable.put(mq, timeStampOffset);
        } else {
            offsetTable.put(mq, consumerOffset);
        }
    }

    /*確定了要先重置的offset之后開(kāi)始與客戶端交互,準(zhǔn)備客戶端重置,請(qǐng)求碼220*/
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timeStamp);
    RemotingCommand request =
        RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
    if (isC) {
        // c++ language
        ResetOffsetBodyForC body = new ResetOffsetBodyForC();
        List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
        body.setOffsetTable(offsetList);
        request.setBody(body.encode());
    } else {
        // other language
        ResetOffsetBody body = new ResetOffsetBody();
        body.setOffsetTable(offsetTable);
        request.setBody(body.encode());
    }

    /*拿到與當(dāng)前broker建立連接的消費(fèi)者組客戶端信息*/
    ConsumerGroupInfo consumerGroupInfo =
        this.brokerController.getConsumerManager().getConsumerGroupInfo(group);

    if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
        //獲取長(zhǎng)連接channel
        ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
            consumerGroupInfo.getChannelInfoTable();
        for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
            int version = entry.getValue().getVersion();
            /*這里版本可以判斷,只有客戶端版本>3.0.7才支持重置*/
            if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                try {
                    /*注意這里是只管發(fā)不管收,可以簡(jiǎn)單理解為異步了invokeOneway*/
                    this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                    log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
                        topic, group, entry.getValue().getClientId());
                } catch (Exception e) {
                    log.error("[reset-offset] reset offset exception. topic={}, group={}",
                        new Object[] {topic, group}, e);
                }
            } else {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("the client does not support this feature. versinotallow="
                    + MQVersion.getVersionDesc(version));
                log.warn("[reset-offset] the client does not support this feature. versinotallow={}",
                    RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                return response;
            }
        }
    } else {
        String errorInfo =
            String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
                requestHeader.getGroup(),
                requestHeader.getTopic(),
                requestHeader.getTimestamp());
        log.error(errorInfo);
        response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
        response.setRemark(errorInfo);
        return response;
    }
    response.setCode(ResponseCode.SUCCESS);
    ResetOffsetBody resBody = new ResetOffsetBody();
    resBody.setOffsetTable(offsetTable);
    response.setBody(resBody.encode());
    return response;
}

3.6 消費(fèi)客戶端收到請(qǐng)求,開(kāi)始處理

org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset

public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
    DefaultMQPushConsumerImpl consumer = null;
    try {
        /*根據(jù)消費(fèi)者組找到對(duì)應(yīng)的消費(fèi)實(shí)現(xiàn),即我們熟悉的DefaultMQPushConsumerImpl或者DefaultMQPullConsumerImpl*/
        MQConsumerInner impl = this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            consumer = (DefaultMQPushConsumerImpl) impl;
        } else {
            //由于PullConsumer消費(fèi)進(jìn)度自己控制,因此直接返回
            log.info("[reset-offset] consumer dose not exist. group={}", group);
            return;
        }
        
        consumer.suspend();//暫停消費(fèi)

        /*暫停消息拉取,以及待處理的消息緩存都清掉*/
        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
        for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                ProcessQueue pq = entry.getValue();
                pq.setDropped(true);
                pq.clear();
            }
        }
  
        /*這里的等待實(shí)現(xiàn)比較簡(jiǎn)單,與broker交互是同步,broker與consumer交互是異步,因此這里阻塞10秒是為了保證所有的consumer都在這里存儲(chǔ)offset并觸發(fā)reblance*/
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
        }

        Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
        while (iterator.hasNext()) {
            MessageQueue mq = iterator.next();
            //獲取messagequeue應(yīng)該被重置的offset
            Long offset = offsetTable.get(mq);
            if (topic.equals(mq.getTopic()) && offset != null) {
                try {
                    /*更新更新本地offset,這里注意集群模式是先修改本地,然后定時(shí)任務(wù)每五秒上報(bào)broker,而廣播模式offset在本地存儲(chǔ),因此只需要修改消費(fèi)者本地的offset即可*/
                    consumer.updateConsumeOffset(mq, offset);
                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                    iterator.remove();
                } catch (Exception e) {
                    log.warn("reset offset failed. group={}, {}", group, mq, e);
                }
            }
        }
    } finally {
        if (consumer != null) {
            /*重新觸發(fā)reblance,由于broker已經(jīng)重置的該消費(fèi)者組的offset,因此重分配后以broker為準(zhǔn)*/
            consumer.resume();
        }
    }
}

4 核心流程

消息回溯全流程消息回溯全流程

5 總結(jié)

消息回溯功能是 RocketMQ 提供給業(yè)務(wù)方的定心丸,業(yè)務(wù)在出現(xiàn)任何無(wú)法恢復(fù)的問(wèn)題后,都可以及時(shí)通過(guò)消息回溯來(lái)恢復(fù)業(yè)務(wù)或者訂正數(shù)據(jù)。特別是在流或者批計(jì)算的場(chǎng)景,重跑數(shù)據(jù)往往是常態(tài)。

RocketMQ 能實(shí)現(xiàn)消息回溯功能得益于其簡(jiǎn)單的位點(diǎn)管理機(jī)制,可以很容易通過(guò) mqadmin 工具重置位點(diǎn)。但要注意,由于topic的消息實(shí)際都是存儲(chǔ)在broker上,且有一定的刪除機(jī)制,因此首先要確認(rèn)需要消息回溯的集群broker不能下線節(jié)點(diǎn)或者回溯數(shù)據(jù)被刪除之前的時(shí)間點(diǎn),確保消息不會(huì)丟失。

6 延申

通過(guò)消息回溯的功能,我們可以任意向前或者向后撥動(dòng)offset,那當(dāng)我們想要指定一個(gè)區(qū)間進(jìn)行消費(fèi),這個(gè)時(shí)候怎么辦呢。比如當(dāng)消費(fèi)進(jìn)度過(guò)慢,我們選擇向后撥動(dòng)offset,那就會(huì)有一部分未消費(fèi)的消息出現(xiàn),針對(duì)這部分消息,我們應(yīng)該在空余時(shí)間把他消費(fèi)完成,就需要指定區(qū)間來(lái)消費(fèi)了。

其實(shí)通過(guò)上面代碼org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset中我們可以看到,對(duì)于DefaultMQPullConsumerImpl類型的消費(fèi)者,消息重置是不生效的,這是因?yàn)镈efaultMQPullConsumerImpl的消費(fèi)進(jìn)度完全由消費(fèi)者來(lái)控制,那我們就可以采用拉模式來(lái)進(jìn)行消費(fèi)。

示例代碼:

public class PullConsumerLocalTest {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
    private static final Map<MessageQueue, Pair<Long/*最小offset*/,Long/*最大offset*/>> QUEUE_OFFSE_SECTION_TABLE = new HashMap<>();
    private static final Long MIN_TIMESTAMP = 1722240069000L;//最小時(shí)間戳
    private static final Long MAX_TIMESTAMP = 1722240160000L;//最大時(shí)間戳

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();

        /*初始化待處理的offset*/
        String topic = "TopicTest";
        init(consumer, topic);

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            //check max offset and dosomething...
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException {
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP);
            long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP);
            //記錄區(qū)間內(nèi)范圍內(nèi)最小以及最大的offset
            QUEUE_OFFSE_SECTION_TABLE.put(mq, new Pair<>(minOffset, maxOffset));
            //將最小offset寫(xiě)為下次消費(fèi)的初始o(jì)ffset
            OFFSE_TABLE.put(mq, minOffset);
        }
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

7 對(duì)比

方式

優(yōu)點(diǎn)

缺點(diǎn)

消費(fèi)者本地消息表

業(yè)務(wù)完全可控

額外存儲(chǔ)開(kāi)銷,重復(fù)消費(fèi)需要單獨(dú)開(kāi)發(fā)

消息重置

無(wú)需業(yè)務(wù)修改,支持廣播/集群,順序/無(wú)序消息(有冪等操作的需要重置狀態(tài))

低版本3.0.7之前不支持

pull手動(dòng)控制

消費(fèi)進(jìn)度完全可控

需要考慮offset維護(hù),復(fù)雜度較高

責(zé)任編輯:武曉燕 來(lái)源: 轉(zhuǎn)轉(zhuǎn)技術(shù)
相關(guān)推薦

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-09-25 08:32:05

2024-10-11 09:15:33

2025-02-27 08:50:00

RocketMQ開(kāi)發(fā)代碼

2022-09-07 21:43:34

云原生存儲(chǔ)技術(shù)消息隊(duì)列

2025-04-11 09:57:16

2024-11-18 16:15:00

2022-12-22 10:03:18

消息集成

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2023-07-17 08:34:03

RocketMQ消息初體驗(yàn)

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-18 09:03:01

RocketMQ場(chǎng)景消息

2025-04-09 08:20:00

RocketMQ消息隊(duì)列開(kāi)發(fā)

2025-02-06 08:24:25

AQS開(kāi)發(fā)Java

2023-05-15 08:24:46

2020-11-13 16:40:05

RocketMQ延遲消息架構(gòu)

2023-09-21 22:02:22

Go語(yǔ)言高級(jí)特性

2025-03-27 04:10:00

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)