騰訊二面:如何保證MQ消息不丟失?重復消費如何保證冪等?
面試官在面試候選人時,如果發(fā)現(xiàn)候選人的簡歷中寫了在項目中使用了 MQ 技術(如 Kafka、RabbitMQ、RocketMQ),基本都會拋出一個問題:在使用 MQ 的時候,怎么確保消息 100% 不丟失?重復消費如何保證冪等?
這兩個問題在實際工作中也很常見,既能考察你對 MQ 的掌握程度又能很好的判斷是否有對應的實戰(zhàn)經驗。
本文將深入剖析消息丟失的本質原因,揭示 MQ 核心實現(xiàn)原理,并提供一套完整的 Java 實戰(zhàn)解決方案。
消息傳遞的生命周期
如下圖所示,阿斗被邀請去休閑養(yǎng)生 SPA 享受,服務包含泡腳、按摩、吃水果、看電視,玩真人 CS。
圖片
- 生產者:休閑養(yǎng)生 SPA 系統(tǒng),發(fā)送一條消息到 MQ。
 - MQ 消息隊列:存儲消息。
 - 消息消費者:享受泡腳技師幫泡腳、按摩技師肩背按摩、推油技師推背,同時吃水果看電視(估計是不會看電視了)。
 
此間樂不思蜀也……
消息的生命周期如下圖所示。
圖片
你可以發(fā)現(xiàn),從生產者發(fā)送消息,MQ 保存消息,消費者消費消息,每一個環(huán)節(jié)都有可能丟失消息。
圖片
各環(huán)節(jié)丟失概率統(tǒng)計
環(huán)節(jié)  | 故障概率  | 平均恢復時間  | 
網絡傳輸  | 0.1%-1%  | 秒級  | 
內存存儲  | 0.01%-0.1%  | 分鐘級  | 
磁盤故障  | 0.001%-0.01%  | 小時級  | 
程序異常  | 0.1%-5%  | 分鐘級  | 
典型業(yè)務場景代價
- 支付系統(tǒng):單條消息丟失 ≈ 平均訂單金額(如 1000 元)
 - 庫存系統(tǒng):1%消息丟失率 ≈10 倍超賣風險
 - 物流追蹤:消息丟失率>0.1%≈ 客戶投訴率提升 300%
 
消息生產者
當生產者往 MQ 中寫數(shù)據時,以下場景會導致消息丟失:
- 網絡閃斷:發(fā)送過程中網絡中斷
 - ACK 丟失:MQ 成功處理但確認丟失
 - 發(fā)送超時:網絡延遲導致超時誤判
 - 程序崩潰:處理中進程意外退出
 
生產者發(fā)送消息,主流消息隊列都支持同步發(fā)送和異步發(fā)送。
如果使用同步發(fā)送,生產者發(fā)送消息后,會同步等待 Broker 返回的 ACK,收到 ACK 消息,就認為消息發(fā)送成功。如果長時間沒有收到,則會認為消息發(fā)送失敗,需要進行重試。
本地消息表 + 異步重試
消息發(fā)送的流程如下圖所示,基于本地消息表 + 業(yè)務數(shù)據表構成本地事務。
通過消息一步發(fā)送并接受消息隊列的 ACK 來更新消息表狀態(tài),若果未發(fā)送則繼續(xù)重試發(fā)送,保證消息一定發(fā)送出去。
圖片
代碼案例如下所示:
@Service
publicclass ReliableProducer {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Transactional
    public void createOrder(Order order) {
        // 1. 業(yè)務數(shù)據入庫
        jdbcTemplate.update(
            "INSERT INTO orders(id, amount) VALUES(?, ?)",
            order.getId(), order.getAmount());
        // 2. 消息記錄入庫
        String msgId = UUID.randomUUID().toString();
        jdbcTemplate.update(
            "INSERT INTO message_log(msg_id, topic, message, status) VALUES(?, ?, ?, ?)",
            msgId, "orders", JsonUtil.toJson(order), 0); // 0-待發(fā)送
        // 事務提交后觸發(fā)異步發(fā)送
        CompletableFuture.runAsync(() -> sendWithRetry(msgId));
    }
    // 這里其實可以使用 xxl-job 等分布式調度框架查詢未發(fā)送成功的消息發(fā)送。
    private void sendWithRetry(String msgId) {
        MessageRecord msg = jdbcTemplate.queryForObject(
            "SELECT * FROM message_log WHERE msg_id = ?",
            new MessageRecordRowMapper(), msgId);
        int attempt = 0;
        while (attempt < MAX_RETRIES) {
            try {
                ListenableFuture<SendResult<String, String>> future =
                    kafkaTemplate.send(msg.getTopic(), msg.getMessage());
                future.addCallback(result -> {
                    // 更新發(fā)送狀態(tài)
                    jdbcTemplate.update("UPDATE message_log SET status = 1 WHERE msg_id = ?", msgId);
                }, ex -> {
                    scheduleRetry(msgId, attempt); // 失敗重試
                });
                return;
            } catch (Exception e) {
                scheduleRetry(msgId, attempt);
                attempt++;
            }
        }
    }
    private void scheduleRetry(String msgId, int attempt) {
        long delay = (long) Math.pow(2, attempt) * 1000; // 指數(shù)退避
        scheduler.schedule(() -> sendWithRetry(msgId), delay, TimeUnit.MILLISECONDS);
    }
}ACK 機制原理對比
MQ 類型  | ACK 機制  | 可靠性  | 性能影響  | 
Kafka  | acks=0  | 最低  | 無  | 
Kafka  | acks=1  | 中等  | 低  | 
Kafka  | acks=all  | 最高  | 高  | 
RabbitMQ  | 無確認  | 低  | 無  | 
RabbitMQ  | 生產者確認  | 高  | 中等  | 
RocketMQ  | 同步刷盤  | 最高  | 高  | 
MQ 服務端:消息 100%存儲原理
生產者發(fā)送消息成功,也不能保證消息絕對不丟失。因為即使消息發(fā)送到 Broker,如果在消費者拉取到消息之前,Broker 宕機了,消息還沒有落盤,也會導致消息丟失。
kafka 存儲架構剖析
圖片
- Producer(生產者):發(fā)送消息的一方,負責發(fā)布消息到 Kafka 主題(Topic)。
 - Consumer(消費者):接受消息的一方,訂閱主題并處理消息。Kafka 有 ConsumerGroup 的概念,每個 Consumer 只能消費所分配到的 Partition 的消息,每一個 Partition 只能被一個 ConsumerGroup 中的一個 Consumer 所消費,所以同一個 ConsumerGroup 中 Consumer 的數(shù)量如果超過了 Partiton 的數(shù)量,將會出現(xiàn)有些 Consumer 分配不到 partition 消費。
 - Broker(代理):服務代理節(jié)點,Kafka 集群中的一臺服務器就是一個 broker,可以水平無限擴展,同一個 Topic 的消息可以分布在多個 broker 中。
 - Topic(主題)與 Partition(分區(qū)) :Kafka 中的消息以 Topic 為單位進行劃分,生產者將消息發(fā)送到特定的 Topic,而消費者負責訂閱 Topic 的消息并進行消費。圖中 TopicA 有三個 Partiton(TopicA-par0、TopicA-par1、TopicA-par2)
為了提升整個集群的吞吐量,Topic 在物理上還可以細分多個 Partition,一個 Partition 在磁盤上對應一個文件夾。 - Replica(副本):副本,是 Kafka 保證數(shù)據高可用的方式,Kafka 同一 Partition 的數(shù)據可以在多 Broker 上存在多個副本,通常只有 leader 副本對外提供讀寫服務,當 leader 副本所在 broker 崩潰或發(fā)生網絡一場,Kafka 會在 Controller 的管理下會重新選擇新的 Leader 副本對外提供讀寫服務。
 - ZooKeeper:管理 Kafka 集群的元數(shù)據和分布式協(xié)調。
 
同步刷盤
kafka 為了得到更高的性能和吞吐量,將數(shù)據異步批量的存儲在磁盤中。
消息的刷盤過程,為了提高性能,減少刷盤次數(shù),kafka 采用了批量刷盤的做法。即,按照一定的消息量,和時間間隔進行刷盤。
這種機制也是由于 linux 操作系統(tǒng)決定的。
將數(shù)據存儲到 linux 操作系統(tǒng)種,會先存儲到頁緩存(Page cache)中,按照時間或者其他條件進行刷盤(從 page cache 到 file),或者通過 fsync 命令強制刷盤。
圖片
數(shù)據在 page cache 中時,如果系統(tǒng)掛掉,數(shù)據會丟失。
kafka 可靠性黃金配置
如圖所示的 kafka 集群,一個 Broker 的 Topic 其中一個 partition 一共有三 副本(包含 Leader)。

試想一種情況:假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個 leader ,但是 leader 的數(shù)據還有一些沒有被 follower 副本的同步的話,就會造成消息丟失。
解決辦法就是我們設置 acks = all。acks 是 Kafka 生產者(Producer) 很重要的一個參數(shù)。
acks 的默認值即為 1,代表我們的消息被 leader 副本接收之后就算被成功發(fā)送。當我們配置 acks = all 代表則所有副本都要接收到該消息之后該消息才算真正成功被發(fā)送。
該場景的 Kafka Broker 黃金高可靠配置如下:
# Kafka配置示例
acks=all
min.insync.replicas=2 // 最小同步副本數(shù)
replication.factor=3  // 每個分區(qū)的 總副本數(shù)量(含 Leader)
unclean.leader.election.enable=false
log.flush.interval.messages=10000
log.flush.interval.ms=1000- acks=all:生產者要求所有 ISR(In-Sync Replicas)副本 都成功寫入消息后才返回確認。
 - min.insync.replicas:定義 最小同步副本數(shù),必須至少有 2 個副本處于同步狀態(tài)(含 Leader)。
 
- 當 
replication.factor=3且min.insync.replicas=2時:允許 1 個副本宕機(如 Broker 故障)、若 2 個副本不可用,則生產會被阻塞 
- replication.factor=3:每個分區(qū)的 總副本數(shù)量(含 Leader),為了保證整個 Kafka 服務的高可用性,你還需要確保 replication.factor > min.insync.replicas ,一般推薦設置成 replication.factor = min.insync.replicas + 1。
 - unclean.leader.election.enable=false:禁止 非同步副本(Out-of-Sync) 成為 Leader。若允許非同步副本成為 Leader,可能導致已提交數(shù)據被覆蓋,金融場景必須設為 
false。 - 我們最開始也說了我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進行同步。
 - 多個 follower 副本之間的消息同步情況不一樣,當我們配置了 unclean.leader.election.enable = false 的話,當 leader 副本發(fā)生故障時就不會從 follower 副本中和 leader 同步程度達不到要求的副本中選擇出 leader ,即只從 ISR 中選擇 leader,這樣降低了消息丟失的可能性。
 - log.flush.interval.messages=10000:每累積 10000 條消息 強制刷盤一次。
 - log.flush.interval.ms=1000:每 1000 毫秒(1 秒) 強制刷盤一次。
 
消費者保證 100% 處理原理
消息在被追加到 Partition(分區(qū))的時候都會分配一個特定的偏移(offset)。
偏移量(offset)表示 Consumer 當前消費到的 Partition(分區(qū))的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區(qū)內的順序性。
當消費者拉取到了分區(qū)的某個消息之后,消費者會自動提交了 offset。
自動提交的話會有一個問題,試想一下,當消費者剛拿到這個消息準備進行真正消費的時候,突然掛掉了,消息實際上并沒有被消費,但是 offset 卻被自動提交了。
解決辦法也比較粗暴,我們手動關閉自動提交 offset,每次在真正消費完消息之后之后再自己手動提交 offset 。
這樣會帶來消息被重新消費的問題。比如你剛剛消費完消息之后,還沒提交 offset,結果自己掛掉了,那么這個消息理論上就會被消費兩次。
開啟手動提交的時候消費端需要去保證冪等性。
圖片
冪等消費 + 死信隊列
@Slf4j
@Component
publicclass ReliableConsumer {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    @Autowired
    private OrderService orderService;
    @KafkaListener(topics = "orders")
    public void consume(ConsumerRecord<String, String> record) {
        String msgId = record.key();
        Order order = JsonUtil.fromJson(record.value(), Order.class);
        // 1. 冪等檢查
        if (isProcessed(msgId)) {
            log.info("消息重復消費,已跳過: {}", msgId);
            return;
        }
        // 2. 獲取分布式鎖
        Lock lock = redisLockFactory.getLock("LOCK:" + msgId);
        if (!lock.tryLock(3, TimeUnit.SECONDS)) {
            thrownew ConcurrentAccessException("獲取鎖失敗");
        }
        try {
            // 3. 二次冪等檢查(防并發(fā))
            if (isProcessed(msgId)) {
                return;
            }
            // 4. 業(yè)務處理
            orderService.processOrder(order);
            // 5. 記錄處理狀態(tài)(設置24小時過期)
            markProcessed(msgId);
        } catch (BusinessException e) {
            // 6. 業(yè)務異常處理
            handleFailure(record, e);
        } finally {
            lock.unlock();
        }
    }
    private boolean isProcessed(String msgId) {
        return"PROCESSED".equals(
            redisTemplate.opsForValue().get("MSG:" + msgId));
    }
    private void markProcessed(String msgId) {
        redisTemplate.opsForValue().set(
            "MSG:" + msgId, "PROCESSED", 24, TimeUnit.HOURS);
    }
    private void handleFailure(ConsumerRecord<?, ?> record, Exception e) {
        // 失敗計數(shù)
        int failCount = incrementFailCounter(record.key());
        if (failCount < 3) {
            thrownew RetryableException(e); // 觸發(fā)重試
        } else {
            sendToDlq(record); // 轉移死信隊列
        }
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 配置批量ACK(性能與可靠性的平衡)
        factory.getContainerProperties().setAckMode(
            AckMode.BATCH);
        // 消費并發(fā)控制
        factory.setConcurrency(3);
        return factory;
    }
}端到端保障:構建全鏈路防御體系
除了對生產者、MQ 中間件、消費端保證不丟失消息的處理手段,還可以對消息軌跡進行監(jiān)控。

自動化對賬系統(tǒng)實現(xiàn)代碼案例。
@Service
@Slf4j
publicclass ReconciliationService {
    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2點執(zhí)行
    public void dailyReconciliation() {
        // 1. 生產端計數(shù)
        long produced = countProducerMessages();
        // 2. MQ端計數(shù)
        long stored = countMQMessages();
        // 3. 消費端計數(shù)
        long consumed = countConsumerMessages();
        // 4. 數(shù)據對比
        if (produced != stored) {
            handleLoss(produced - stored, "生產到MQ丟失");
        }
        if (stored != consumed) {
            handleLoss(stored - consumed, "MQ到消費丟失");
        }
        log.info("對賬完成: 生產={}, MQ存儲={}, 消費={}",
            produced, stored, consumed);
    }
    private void handleLoss(long lossCount, String stage) {
        log.error("消息丟失告警: 階段={}, 數(shù)量={}", stage, lossCount);
        // 1. 通知運維團隊
        alertService.notifyStaff(stage, lossCount);
        // 2. 自動恢復機制
        if (lossCount < 1000) {
            recoveryService.recoverFromBackup();
        } else {
            // 重大事故,啟動緊急預案
            emergencyService.handleDisaster();
        }
    }
}總結
消息零丟失的三位一體架構本質上是對不確定性的系統(tǒng)化防御:
- 生產者防御:建立冗余記錄(消息表)對抗網絡不確定性
 - 存儲層防御:通過副本機制抵御物理故障
 - 消費者防御:依靠冪等性消除重試副作用
 - 監(jiān)控層防御:用全局視角捕捉異常情況
 
在 Java 生態(tài)中,我們擁有強大的工具集實現(xiàn)這套防御:
- Spring 事務管理:確保本地事務一致性
 - Kafka/RabbitMQ 客戶端:提供精細化的 ACK 控制
 - Redis 分布式鎖:實現(xiàn)高并發(fā)下的冪等控制
 















 
 
 













 
 
 
 