偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

解決MQ消息丟失問題的五種方案

開發(fā) 前端
有些小伙伴在工作中,一提到消息隊(duì)列就覺得很簡單,但真正遇到線上消息丟失時(shí),排查起來卻讓人抓狂。其實(shí),我在實(shí)際工作中,也遇到過MQ消息丟失的情況。今天這篇文章,專門跟大家一起聊聊這個(gè)話題,希望對(duì)你會(huì)有所幫助。

前言

今天我們來聊聊一個(gè)讓很多開發(fā)者頭疼的話題——MQ消息丟失問題。

有些小伙伴在工作中,一提到消息隊(duì)列就覺得很簡單,但真正遇到線上消息丟失時(shí),排查起來卻讓人抓狂。

其實(shí),我在實(shí)際工作中,也遇到過MQ消息丟失的情況。

今天這篇文章,專門跟大家一起聊聊這個(gè)話題,希望對(duì)你會(huì)有所幫助。

一、消息丟失的三大環(huán)節(jié)

在深入解決方案之前,我們先搞清楚消息在哪幾個(gè)環(huán)節(jié)可能丟失:

圖片圖片

1. 生產(chǎn)者發(fā)送階段

  • 網(wǎng)絡(luò)抖動(dòng)導(dǎo)致發(fā)送失敗
  • 生產(chǎn)者宕機(jī)未發(fā)送
  • Broker處理失敗未返回確認(rèn)

2. Broker存儲(chǔ)階段

  • 內(nèi)存消息未持久化,重啟丟失
  • 磁盤故障導(dǎo)致數(shù)據(jù)丟失
  • 集群切換時(shí)消息丟失

3. 消費(fèi)者處理階段

  • 自動(dòng)確認(rèn)模式下處理異常
  • 消費(fèi)者宕機(jī)處理中斷
  • 手動(dòng)確認(rèn)但忘記確認(rèn)

理解了問題根源,接下來我們看5種實(shí)用的解決方案。

二、方案一:生產(chǎn)者確認(rèn)機(jī)制

核心原理

生產(chǎn)者發(fā)送消息后等待Broker確認(rèn),確保消息成功到達(dá)。

這是防止消息丟失的第一道防線。

圖片圖片

關(guān)鍵實(shí)現(xiàn)

// RabbitMQ生產(chǎn)者確認(rèn)配置
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            // 消息成功到達(dá)Broker
            messageStatusService.markConfirmed(correlationData.getId());
        } else {
            // 發(fā)送失敗,觸發(fā)重試
            retryService.scheduleRetry(correlationData.getId());
        }
    });
    return template;
}

// 可靠發(fā)送方法
public void sendReliable(String exchange, String routingKey, Object message) {
    String messageId = generateId();
    // 先落庫保存發(fā)送狀態(tài)
    messageStatusService.saveSendingStatus(messageId, message);
    
    // 發(fā)送持久化消息
    rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
        msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        msg.getMessageProperties().setMessageId(messageId);
        return msg;
    }, new CorrelationData(messageId));
}

適用場景

  • 對(duì)消息可靠性要求高的業(yè)務(wù)
  • 金融交易、訂單處理等關(guān)鍵業(yè)務(wù)
  • 需要精確知道消息發(fā)送結(jié)果的場景

三、方案二:消息持久化機(jī)制

核心原理

將消息保存到磁盤,確保Broker重啟后消息不丟失。

這是防止Broker端消息丟失的關(guān)鍵。

圖片圖片

關(guān)鍵實(shí)現(xiàn)

// 持久化隊(duì)列配置
@Bean
public Queue orderQueue() {
    return QueueBuilder.durable("order.queue")  // 隊(duì)列持久化
            .deadLetterExchange("order.dlx")    // 死信交換機(jī)
            .build();
}

// 發(fā)送持久化消息
public void sendPersistentMessage(Object message) {
    rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {
        msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
        return msg;
    });
}

// Kafka持久化配置
@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本確認(rèn)
    props.put(ProducerConfig.RETRIES_CONFIG, 3);   // 重試次數(shù)
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 冪等性
    returnnew DefaultKafkaProducerFactory<>(props);
}

優(yōu)缺點(diǎn)

優(yōu)點(diǎn):

  • 有效防止Broker重啟導(dǎo)致的消息丟失
  • 配置簡單,效果明顯

缺點(diǎn):

  • 磁盤IO影響性能
  • 需要足夠的磁盤空間

四、方案三:消費(fèi)者確認(rèn)機(jī)制

核心原理

消費(fèi)者處理完消息后手動(dòng)向Broker發(fā)送確認(rèn),Broker收到確認(rèn)后才刪除消息。

這是保證消息不丟失的最后一道防線。

圖片圖片

關(guān)鍵實(shí)現(xiàn)

// 手動(dòng)確認(rèn)消費(fèi)者
@RabbitListener(queues = "order.queue")
public void handleMessage(Order order, Message message, Channel channel) {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
    try {
        // 業(yè)務(wù)處理
        orderService.processOrder(order);
        
        // 手動(dòng)確認(rèn)
        channel.basicAck(deliveryTag, false);
        log.info("消息處理完成: {}", order.getOrderId());
        
    } catch (Exception e) {
        log.error("消息處理失敗: {}", order.getOrderId(), e);
        
        // 處理失敗,重新入隊(duì)
        channel.basicNack(deliveryTag, false, true);
    }
}

// 消費(fèi)者容器配置
@Bean
public SimpleRabbitListenerContainerFactory containerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手動(dòng)確認(rèn)
    factory.setPrefetchCount(10); // 預(yù)取數(shù)量
    factory.setConcurrentConsumers(3); // 并發(fā)消費(fèi)者
    return factory;
}

注意事項(xiàng)

  • 確保業(yè)務(wù)處理完成后再確認(rèn)
  • 合理設(shè)置預(yù)取數(shù)量,避免內(nèi)存溢出
  • 處理異常時(shí)要正確使用NACK

五、方案四:事務(wù)消息機(jī)制

核心原理

通過事務(wù)保證本地業(yè)務(wù)操作和消息發(fā)送的原子性,要么都成功,要么都失敗。

圖片圖片

關(guān)鍵實(shí)現(xiàn)

// 本地事務(wù)表方案
@Transactional
public void createOrder(Order order) {
    // 1. 保存訂單到數(shù)據(jù)庫
    orderRepository.save(order);
    
    // 2. 保存消息到本地消息表
    LocalMessage localMessage = new LocalMessage();
    localMessage.setBusinessId(order.getOrderId());
    localMessage.setContent(JSON.toJSONString(order));
    localMessage.setStatus(MessageStatus.PENDING);
    localMessageRepository.save(localMessage);
    
    // 3. 事務(wù)提交,本地業(yè)務(wù)和消息存儲(chǔ)保持一致性
}

// 定時(shí)任務(wù)掃描并發(fā)送消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
    List<LocalMessage> pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);
    
    for (LocalMessage message : pendingMessages) {
        try {
            // 發(fā)送消息到MQ
            rabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());
            
            // 更新消息狀態(tài)為已發(fā)送
            message.setStatus(MessageStatus.SENT);
            localMessageRepository.save(message);
            
        } catch (Exception e) {
            log.error("發(fā)送消息失敗: {}", message.getId(), e);
        }
    }
}

// RocketMQ事務(wù)消息
public void sendTransactionMessage(Order order) {
    TransactionMQProducer producer = new TransactionMQProducer("order_producer");
    
    // 發(fā)送事務(wù)消息
    Message msg = new Message("order_topic", "create", 
                             JSON.toJSONBytes(order));
    
    TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
    
    if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
        log.info("事務(wù)消息提交成功");
    }
}

適用場景

  • 需要嚴(yán)格保證業(yè)務(wù)和消息一致性的場景
  • 分布式事務(wù)場景
  • 金融、電商等對(duì)數(shù)據(jù)一致性要求高的業(yè)務(wù)

六、方案五:消息重試與死信隊(duì)列

核心原理

通過重試機(jī)制處理臨時(shí)故障,通過死信隊(duì)列處理最終無法消費(fèi)的消息。

圖片圖片

關(guān)鍵實(shí)現(xiàn)

// 重試隊(duì)列配置
@Bean
public Queue orderQueue() {
    return QueueBuilder.durable("order.queue")
            .withArgument("x-dead-letter-exchange", "order.dlx") // 死信交換機(jī)
            .withArgument("x-dead-letter-routing-key", "order.dead")
            .withArgument("x-message-ttl", 60000) // 60秒后進(jìn)入死信
            .build();
}

// 死信隊(duì)列配置
@Bean
public Queue orderDeadLetterQueue() {
    return QueueBuilder.durable("order.dead.queue").build();
}

// 消費(fèi)者重試邏輯
@RabbitListener(queues = "order.queue")
public void handleMessageWithRetry(Order order, Message message, Channel channel) {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
    try {
        orderService.processOrder(order);
        channel.basicAck(deliveryTag, false);
        
    } catch (TemporaryException e) {
        // 臨時(shí)異常,重新入隊(duì)重試
        channel.basicNack(deliveryTag, false, true);
        
    } catch (PermanentException e) {
        // 永久異常,直接確認(rèn)進(jìn)入死信隊(duì)列
        channel.basicAck(deliveryTag, false);
        log.error("消息進(jìn)入死信隊(duì)列: {}", order.getOrderId(), e);
    }
}

// 死信隊(duì)列消費(fèi)者
@RabbitListener(queues = "order.dead.queue")
public void handleDeadLetterMessage(Order order) {
    log.warn("處理死信消息: {}", order.getOrderId());
    // 發(fā)送告警、記錄日志、人工處理等
    alertService.sendAlert("死信消息告警", order.toString());
}

重試策略建議

  1. 指數(shù)退避:1s, 5s, 15s, 30s
  2. 最大重試次數(shù):3-5次
  3. 死信處理:人工介入或特殊處理流程

七、方案對(duì)比與選型指南

為了幫助大家選擇合適的方案,我整理了詳細(xì)的對(duì)比表:

方案

可靠性

性能影響

復(fù)雜度

適用場景

生產(chǎn)者確認(rèn)

所有需要可靠發(fā)送的場景

消息持久化

Broker重啟保護(hù)

消費(fèi)者確認(rèn)

確保消息被成功處理

事務(wù)消息

最高

強(qiáng)一致性要求的業(yè)務(wù)

重試+死信

處理臨時(shí)故障和最終死信

選型建議

初創(chuàng)項(xiàng)目/簡單業(yè)務(wù):

  • 生產(chǎn)者確認(rèn) + 消息持久化 + 消費(fèi)者確認(rèn)
  • 滿足大部分場景,實(shí)現(xiàn)簡單

電商/交易系統(tǒng):

  • 生產(chǎn)者確認(rèn) + 事務(wù)消息 + 重試機(jī)制
  • 保證數(shù)據(jù)一致性,處理復(fù)雜業(yè)務(wù)

大數(shù)據(jù)/日志處理:

  • 消息持久化 + 消費(fèi)者確認(rèn)
  • 允許少量丟失,追求吞吐量

金融/支付系統(tǒng):

  • 全方案組合使用
  • 最高可靠性要求

總結(jié)

消息丟失問題是消息隊(duì)列使用中的常見挑戰(zhàn),通過今天介紹的5種方案,我們可以構(gòu)建一個(gè)可靠的消息系統(tǒng):

  1. 生產(chǎn)者確認(rèn)機(jī)制 - 保證消息成功發(fā)送到Broker
  2. 消息持久化機(jī)制 - 防止Broker重啟導(dǎo)致消息丟失
  3. 消費(fèi)者確認(rèn)機(jī)制 - 確保消息被成功處理
  4. 事務(wù)消息機(jī)制 - 保證業(yè)務(wù)和消息的一致性
  5. 重試與死信隊(duì)列 - 處理異常情況和最終死信

有些小伙伴可能會(huì)問:"我需要全部使用這些方案嗎?

"我的建議是:根據(jù)業(yè)務(wù)需求選擇合適的組合

對(duì)于關(guān)鍵業(yè)務(wù),建議至少使用前三種方案;對(duì)于普通業(yè)務(wù),可以根據(jù)實(shí)際情況適當(dāng)簡化。

記住,沒有完美的方案,只有最適合的方案。

責(zé)任編輯:武曉燕 來源: 蘇三說技術(shù)
相關(guān)推薦

2012-03-29 09:57:06

jQuery

2024-04-23 08:46:45

消息積壓KafkaMQ

2021-03-08 10:19:59

MQ消息磁盤

2023-04-14 14:54:29

2018-05-04 07:36:35

醫(yī)療行業(yè)物聯(lián)網(wǎng)IoT

2024-06-05 06:37:19

2023-10-17 08:01:46

MQ消息重試

2025-08-18 08:26:14

2025-09-29 01:55:00

2021-11-29 09:15:57

Github網(wǎng)絡(luò)Python

2016-11-27 19:21:05

2021-08-04 07:47:18

Kafka消息框架

2022-08-29 18:14:55

MQ數(shù)據(jù)不丟失

2023-05-26 07:19:49

Spring聲明式事務(wù)

2009-07-22 17:37:06

ASP.NET Ses

2024-06-06 11:38:55

2024-12-02 14:30:20

2025-09-16 00:00:25

2025-05-07 08:21:01

2025-03-31 08:39:55

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)