解決MQ消息丟失問題的五種方案
前言
今天我們來聊聊一個(gè)讓很多開發(fā)者頭疼的話題——MQ消息丟失問題。
有些小伙伴在工作中,一提到消息隊(duì)列就覺得很簡單,但真正遇到線上消息丟失時(shí),排查起來卻讓人抓狂。
其實(shí),我在實(shí)際工作中,也遇到過MQ消息丟失的情況。
今天這篇文章,專門跟大家一起聊聊這個(gè)話題,希望對(duì)你會(huì)有所幫助。
一、消息丟失的三大環(huán)節(jié)
在深入解決方案之前,我們先搞清楚消息在哪幾個(gè)環(huán)節(jié)可能丟失:
圖片
1. 生產(chǎn)者發(fā)送階段
- 網(wǎng)絡(luò)抖動(dòng)導(dǎo)致發(fā)送失敗
- 生產(chǎn)者宕機(jī)未發(fā)送
- Broker處理失敗未返回確認(rèn)
2. Broker存儲(chǔ)階段
- 內(nèi)存消息未持久化,重啟丟失
- 磁盤故障導(dǎo)致數(shù)據(jù)丟失
- 集群切換時(shí)消息丟失
3. 消費(fèi)者處理階段
- 自動(dòng)確認(rèn)模式下處理異常
- 消費(fèi)者宕機(jī)處理中斷
- 手動(dòng)確認(rèn)但忘記確認(rèn)
理解了問題根源,接下來我們看5種實(shí)用的解決方案。
二、方案一:生產(chǎn)者確認(rèn)機(jī)制
核心原理
生產(chǎn)者發(fā)送消息后等待Broker確認(rèn),確保消息成功到達(dá)。
這是防止消息丟失的第一道防線。
圖片
關(guān)鍵實(shí)現(xiàn)
// RabbitMQ生產(chǎn)者確認(rèn)配置
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 消息成功到達(dá)Broker
messageStatusService.markConfirmed(correlationData.getId());
} else {
// 發(fā)送失敗,觸發(fā)重試
retryService.scheduleRetry(correlationData.getId());
}
});
return template;
}
// 可靠發(fā)送方法
public void sendReliable(String exchange, String routingKey, Object message) {
String messageId = generateId();
// 先落庫保存發(fā)送狀態(tài)
messageStatusService.saveSendingStatus(messageId, message);
// 發(fā)送持久化消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
msg.getMessageProperties().setMessageId(messageId);
return msg;
}, new CorrelationData(messageId));
}適用場景
- 對(duì)消息可靠性要求高的業(yè)務(wù)
- 金融交易、訂單處理等關(guān)鍵業(yè)務(wù)
- 需要精確知道消息發(fā)送結(jié)果的場景
三、方案二:消息持久化機(jī)制
核心原理
將消息保存到磁盤,確保Broker重啟后消息不丟失。
這是防止Broker端消息丟失的關(guān)鍵。
圖片
關(guān)鍵實(shí)現(xiàn)
// 持久化隊(duì)列配置
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue") // 隊(duì)列持久化
.deadLetterExchange("order.dlx") // 死信交換機(jī)
.build();
}
// 發(fā)送持久化消息
public void sendPersistentMessage(Object message) {
rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
return msg;
});
}
// Kafka持久化配置
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本確認(rèn)
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重試次數(shù)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 冪等性
returnnew DefaultKafkaProducerFactory<>(props);
}優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
- 有效防止Broker重啟導(dǎo)致的消息丟失
- 配置簡單,效果明顯
缺點(diǎn):
- 磁盤IO影響性能
- 需要足夠的磁盤空間
四、方案三:消費(fèi)者確認(rèn)機(jī)制
核心原理
消費(fèi)者處理完消息后手動(dòng)向Broker發(fā)送確認(rèn),Broker收到確認(rèn)后才刪除消息。
這是保證消息不丟失的最后一道防線。
圖片
關(guān)鍵實(shí)現(xiàn)
// 手動(dòng)確認(rèn)消費(fèi)者
@RabbitListener(queues = "order.queue")
public void handleMessage(Order order, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 業(yè)務(wù)處理
orderService.processOrder(order);
// 手動(dòng)確認(rèn)
channel.basicAck(deliveryTag, false);
log.info("消息處理完成: {}", order.getOrderId());
} catch (Exception e) {
log.error("消息處理失敗: {}", order.getOrderId(), e);
// 處理失敗,重新入隊(duì)
channel.basicNack(deliveryTag, false, true);
}
}
// 消費(fèi)者容器配置
@Bean
public SimpleRabbitListenerContainerFactory containerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手動(dòng)確認(rèn)
factory.setPrefetchCount(10); // 預(yù)取數(shù)量
factory.setConcurrentConsumers(3); // 并發(fā)消費(fèi)者
return factory;
}注意事項(xiàng)
- 確保業(yè)務(wù)處理完成后再確認(rèn)
- 合理設(shè)置預(yù)取數(shù)量,避免內(nèi)存溢出
- 處理異常時(shí)要正確使用NACK
五、方案四:事務(wù)消息機(jī)制
核心原理
通過事務(wù)保證本地業(yè)務(wù)操作和消息發(fā)送的原子性,要么都成功,要么都失敗。
圖片
關(guān)鍵實(shí)現(xiàn)
// 本地事務(wù)表方案
@Transactional
public void createOrder(Order order) {
// 1. 保存訂單到數(shù)據(jù)庫
orderRepository.save(order);
// 2. 保存消息到本地消息表
LocalMessage localMessage = new LocalMessage();
localMessage.setBusinessId(order.getOrderId());
localMessage.setContent(JSON.toJSONString(order));
localMessage.setStatus(MessageStatus.PENDING);
localMessageRepository.save(localMessage);
// 3. 事務(wù)提交,本地業(yè)務(wù)和消息存儲(chǔ)保持一致性
}
// 定時(shí)任務(wù)掃描并發(fā)送消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List<LocalMessage> pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);
for (LocalMessage message : pendingMessages) {
try {
// 發(fā)送消息到MQ
rabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());
// 更新消息狀態(tài)為已發(fā)送
message.setStatus(MessageStatus.SENT);
localMessageRepository.save(message);
} catch (Exception e) {
log.error("發(fā)送消息失敗: {}", message.getId(), e);
}
}
}
// RocketMQ事務(wù)消息
public void sendTransactionMessage(Order order) {
TransactionMQProducer producer = new TransactionMQProducer("order_producer");
// 發(fā)送事務(wù)消息
Message msg = new Message("order_topic", "create",
JSON.toJSONBytes(order));
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
log.info("事務(wù)消息提交成功");
}
}適用場景
- 需要嚴(yán)格保證業(yè)務(wù)和消息一致性的場景
- 分布式事務(wù)場景
- 金融、電商等對(duì)數(shù)據(jù)一致性要求高的業(yè)務(wù)
六、方案五:消息重試與死信隊(duì)列
核心原理
通過重試機(jī)制處理臨時(shí)故障,通過死信隊(duì)列處理最終無法消費(fèi)的消息。
圖片
關(guān)鍵實(shí)現(xiàn)
// 重試隊(duì)列配置
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "order.dlx") // 死信交換機(jī)
.withArgument("x-dead-letter-routing-key", "order.dead")
.withArgument("x-message-ttl", 60000) // 60秒后進(jìn)入死信
.build();
}
// 死信隊(duì)列配置
@Bean
public Queue orderDeadLetterQueue() {
return QueueBuilder.durable("order.dead.queue").build();
}
// 消費(fèi)者重試邏輯
@RabbitListener(queues = "order.queue")
public void handleMessageWithRetry(Order order, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
orderService.processOrder(order);
channel.basicAck(deliveryTag, false);
} catch (TemporaryException e) {
// 臨時(shí)異常,重新入隊(duì)重試
channel.basicNack(deliveryTag, false, true);
} catch (PermanentException e) {
// 永久異常,直接確認(rèn)進(jìn)入死信隊(duì)列
channel.basicAck(deliveryTag, false);
log.error("消息進(jìn)入死信隊(duì)列: {}", order.getOrderId(), e);
}
}
// 死信隊(duì)列消費(fèi)者
@RabbitListener(queues = "order.dead.queue")
public void handleDeadLetterMessage(Order order) {
log.warn("處理死信消息: {}", order.getOrderId());
// 發(fā)送告警、記錄日志、人工處理等
alertService.sendAlert("死信消息告警", order.toString());
}重試策略建議
- 指數(shù)退避:1s, 5s, 15s, 30s
- 最大重試次數(shù):3-5次
- 死信處理:人工介入或特殊處理流程
七、方案對(duì)比與選型指南
為了幫助大家選擇合適的方案,我整理了詳細(xì)的對(duì)比表:
方案 | 可靠性 | 性能影響 | 復(fù)雜度 | 適用場景 |
生產(chǎn)者確認(rèn) | 高 | 中 | 低 | 所有需要可靠發(fā)送的場景 |
消息持久化 | 中 | 中 | 低 | Broker重啟保護(hù) |
消費(fèi)者確認(rèn) | 高 | 低 | 中 | 確保消息被成功處理 |
事務(wù)消息 | 最高 | 高 | 高 | 強(qiáng)一致性要求的業(yè)務(wù) |
重試+死信 | 高 | 低 | 中 | 處理臨時(shí)故障和最終死信 |
選型建議
初創(chuàng)項(xiàng)目/簡單業(yè)務(wù):
- 生產(chǎn)者確認(rèn) + 消息持久化 + 消費(fèi)者確認(rèn)
- 滿足大部分場景,實(shí)現(xiàn)簡單
電商/交易系統(tǒng):
- 生產(chǎn)者確認(rèn) + 事務(wù)消息 + 重試機(jī)制
- 保證數(shù)據(jù)一致性,處理復(fù)雜業(yè)務(wù)
大數(shù)據(jù)/日志處理:
- 消息持久化 + 消費(fèi)者確認(rèn)
- 允許少量丟失,追求吞吐量
金融/支付系統(tǒng):
- 全方案組合使用
- 最高可靠性要求
總結(jié)
消息丟失問題是消息隊(duì)列使用中的常見挑戰(zhàn),通過今天介紹的5種方案,我們可以構(gòu)建一個(gè)可靠的消息系統(tǒng):
- 生產(chǎn)者確認(rèn)機(jī)制 - 保證消息成功發(fā)送到Broker
- 消息持久化機(jī)制 - 防止Broker重啟導(dǎo)致消息丟失
- 消費(fèi)者確認(rèn)機(jī)制 - 確保消息被成功處理
- 事務(wù)消息機(jī)制 - 保證業(yè)務(wù)和消息的一致性
- 重試與死信隊(duì)列 - 處理異常情況和最終死信
有些小伙伴可能會(huì)問:"我需要全部使用這些方案嗎?
"我的建議是:根據(jù)業(yè)務(wù)需求選擇合適的組合。
對(duì)于關(guān)鍵業(yè)務(wù),建議至少使用前三種方案;對(duì)于普通業(yè)務(wù),可以根據(jù)實(shí)際情況適當(dāng)簡化。
記住,沒有完美的方案,只有最適合的方案。
































