我工作中用MQ的十種場景
前言
最近有球友問我:MQ的使用場景有哪些?工作中一定要使用MQ嗎?
記得剛工作那會兒,我總是想不明白:為什么明明直接調(diào)用接口就能完成的功能,非要引入MQ這么個"中間商"?
直到經(jīng)歷了系統(tǒng)崩潰、數(shù)據(jù)丟失、性能瓶頸等一系列問題后,我才真正理解了MQ的價值。
今天我想和大家分享我在實際工作中使用消息隊列(MQ)的10種典型場景,希望對你會有所幫助。
一、為什么需要消息隊列(MQ)?
在深入具體場景之前,我們先來思考一個基本問題:為什么要使用消息隊列?
系統(tǒng)間的直接調(diào)用:
圖片
引入消息隊列后:
圖片
接下來我們將通過10個具體場景,帶大家來深入理解MQ的價值。
場景一:系統(tǒng)解耦
背景描述
在我早期參與的一個電商項目中,訂單創(chuàng)建后需要通知多個系統(tǒng):
// 早期的緊耦合設(shè)計
public class OrderService {
private InventoryService inventoryService;
private PointsService pointsService;
private EmailService emailService;
private AnalyticsService analyticsService;
public void createOrder(Order order) {
// 1. 保存訂單
orderDao.save(order);
// 2. 調(diào)用庫存服務(wù)
inventoryService.updateInventory(order);
// 3. 調(diào)用積分服務(wù)
pointsService.addPoints(order.getUserId(), order.getAmount());
// 4. 發(fā)送郵件通知
emailService.sendOrderConfirmation(order);
// 5. 記錄分析數(shù)據(jù)
analyticsService.trackOrderCreated(order);
// 更多服務(wù)...
}
}這種架構(gòu)存在嚴(yán)重問題:
- 緊耦合:訂單服務(wù)需要知道所有下游服務(wù)
- 單點故障:任何一個下游服務(wù)掛掉都會導(dǎo)致訂單創(chuàng)建失敗
- 性能瓶頸:同步調(diào)用導(dǎo)致響應(yīng)時間慢
MQ解決方案
引入MQ后,架構(gòu)變?yōu)椋?/span>
圖片
代碼實現(xiàn):
// 訂單服務(wù) - 生產(chǎn)者
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 1. 保存訂單
orderDao.save(order);
// 2. 發(fā)送消息到MQ
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())
);
}
}
// 庫存服務(wù) - 消費者
@Component
@RabbitListener(queues = "inventory.queue")
public class InventoryConsumer {
@Autowired
private InventoryService inventoryService;
@RabbitHandler
public void handleOrderCreated(OrderCreatedEvent event) {
inventoryService.updateInventory(event.getOrderId());
}
}技術(shù)要點
- 消息協(xié)議選擇:根據(jù)業(yè)務(wù)需求選擇RabbitMQ、Kafka或RocketMQ
- 消息格式:使用JSON或Protobuf等跨語言格式
- 錯誤處理:實現(xiàn)重試機制和死信隊列
場景二:異步處理
背景描述
用戶上傳視頻后需要執(zhí)行轉(zhuǎn)碼、生成縮略圖、內(nèi)容審核等耗時操作,如果同步處理,用戶需要等待很長時間。
MQ解決方案
// 視頻服務(wù) - 生產(chǎn)者
@Service
public class VideoService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public UploadResponse uploadVideo(MultipartFile file, String userId) {
// 1. 保存原始視頻
String videoId = saveOriginalVideo(file);
// 2. 發(fā)送處理消息
kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));
// 3. 立即返回響應(yīng)
return new UploadResponse(videoId, "upload_success");
}
}
// 視頻處理服務(wù) - 消費者
@Service
public class VideoProcessingConsumer {
@KafkaListener(topics = "video-processing")
public void processVideo(VideoProcessingEvent event) {
// 異步執(zhí)行耗時操作
videoProcessor.transcode(event.getVideoId());
videoProcessor.generateThumbnails(event.getVideoId());
contentModerationService.checkContent(event.getVideoId());
// 發(fā)送處理完成通知
notificationService.notifyUser(event.getUserId(), event.getVideoId());
}
}架構(gòu)優(yōu)勢
- 快速響應(yīng):用戶上傳后立即得到響應(yīng)
- 彈性擴展:可以根據(jù)處理壓力動態(tài)調(diào)整消費者數(shù)量
- 故障隔離:處理服務(wù)故障不會影響上傳功能
場景三:流量削峰
背景描述
電商秒殺活動時,瞬時流量可能是平時的百倍以上,直接沖擊數(shù)據(jù)庫和服務(wù)。
MQ解決方案
圖片
代碼實現(xiàn):
// 秒殺服務(wù)
@Service
public class SecKillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
public SecKillResponse secKill(SecKillRequest request) {
// 1. 校驗用戶資格
if (!checkUserQualification(request.getUserId())) {
return SecKillResponse.failed("用戶無資格");
}
// 2. 預(yù)減庫存(Redis原子操作)
Long remaining = redisTemplate.opsForValue().decrement(
"sec_kill_stock:" + request.getItemId());
if (remaining == null || remaining < 0) {
// 庫存不足,恢復(fù)庫存
redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());
return SecKillResponse.failed("庫存不足");
}
// 3. 發(fā)送秒殺成功消息到MQ
rabbitTemplate.convertAndSend(
"sec_kill.exchange",
"sec_kill.success",
new SecKillSuccessEvent(request.getUserId(), request.getItemId())
);
return SecKillResponse.success("秒殺成功");
}
}
// 訂單處理消費者
@Component
@RabbitListener(queues = "sec_kill.order.queue")
public class SecKillOrderConsumer {
@RabbitHandler
public void handleSecKillSuccess(SecKillSuccessEvent event) {
// 異步創(chuàng)建訂單
orderService.createSecKillOrder(event.getUserId(), event.getItemId());
}
}技術(shù)要點
- 庫存預(yù)扣:使用Redis原子操作避免超賣
- 隊列緩沖:MQ緩沖請求,避免直接沖擊數(shù)據(jù)庫
- 限流控制:在網(wǎng)關(guān)層進(jìn)行限流,拒絕過多請求
場景四:數(shù)據(jù)同步
背景描述
在微服務(wù)架構(gòu)中,不同服務(wù)有自己的數(shù)據(jù)庫,需要保證數(shù)據(jù)一致性。
MQ解決方案
// 用戶服務(wù) - 數(shù)據(jù)變更時發(fā)送消息
@Service
public class UserService {
@Transactional
public User updateUser(User user) {
// 1. 更新數(shù)據(jù)庫
userDao.update(user);
// 2. 發(fā)送消息(在事務(wù)內(nèi))
rocketMQTemplate.sendMessageInTransaction(
"user-update-topic",
MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))
.build(),
null
);
return user;
}
}
// 其他服務(wù) - 消費用戶更新消息
@Service
@RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")
public class UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {
@Override
public void onMessage(UserUpdateEvent event) {
// 更新本地用戶信息緩存
orderService.updateUserCache(event.getUserId(), event.getStatus());
}
}一致性保證
- 本地事務(wù)表:將消息和業(yè)務(wù)數(shù)據(jù)放在同一個數(shù)據(jù)庫事務(wù)中
- 事務(wù)消息:使用RocketMQ的事務(wù)消息機制
- 冪等消費:消費者實現(xiàn)冪等性,避免重復(fù)處理
場景五:日志收集
背景描述
分布式系統(tǒng)中,日志分散在各個節(jié)點,需要集中收集和分析。
MQ解決方案
圖片
代碼實現(xiàn):
// 日志收集組件
@Component
public class LogCollector {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void collectLog(String appId, String level, String message, Map<String, Object> context) {
LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());
// 發(fā)送到Kafka
kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));
}
}
// 日志消費者
@Service
public class LogConsumer {
@KafkaListener(topics = "app-logs", groupId = "log-es")
public void consumeLog(String message) {
LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);
// 存儲到Elasticsearch
elasticsearchService.indexLog(logEvent);
// 實時監(jiān)控檢查
if ("ERROR".equals(logEvent.getLevel())) {
alertService.checkAndAlert(logEvent);
}
}
}技術(shù)優(yōu)勢
- 解耦:應(yīng)用節(jié)點無需關(guān)心日志如何處理
- 緩沖:應(yīng)對日志產(chǎn)生速率波動
- 多消費:同一份日志可以被多個消費者處理
場景六:消息廣播
背景描述
系統(tǒng)配置更新后,需要通知所有服務(wù)節(jié)點更新本地配置。
MQ解決方案
// 配置服務(wù) - 廣播配置更新
@Service
public class ConfigService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void updateConfig(String configKey, String configValue) {
// 1. 更新配置存儲
configDao.updateConfig(configKey, configValue);
// 2. 廣播配置更新消息
redisTemplate.convertAndSend("config-update-channel",
new ConfigUpdateEvent(configKey, configValue));
}
}
// 服務(wù)節(jié)點 - 訂閱配置更新
@Component
public class ConfigUpdateListener {
@Autowired
private LocalConfigCache localConfigCache;
@RedisListener(channel = "config-update-channel")
public void handleConfigUpdate(ConfigUpdateEvent event) {
// 更新本地配置緩存
localConfigCache.updateConfig(event.getKey(), event.getValue());
}
}應(yīng)用場景
- 功能開關(guān):動態(tài)開啟或關(guān)閉功能
- 參數(shù)調(diào)整:調(diào)整超時時間、限流閾值等
- 黑白名單:更新黑白名單配置
場景七:順序消息
背景描述
在某些業(yè)務(wù)場景中,消息的處理順序很重要,如訂單狀態(tài)變更。
MQ解決方案
// 訂單狀態(tài)變更服務(wù)
@Service
public class OrderStateService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void changeOrderState(String orderId, String oldState, String newState) {
OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);
// 發(fā)送順序消息,使用orderId作為sharding key
rocketMQTemplate.syncSendOrderly(
"order-state-topic",
event,
orderId // 保證同一訂單的消息按順序處理
);
}
}
// 訂單狀態(tài)消費者
@Service
@RocketMQMessageListener(
topic = "order-state-topic",
consumerGroup = "order-state-group",
consumeMode = ConsumeMode.ORDERLY // 順序消費
)
public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {
@Override
public void onMessage(OrderStateEvent event) {
// 按順序處理訂單狀態(tài)變更
orderService.processStateChange(event);
}
}順序保證機制
- 分區(qū)順序:同一分區(qū)內(nèi)的消息保證順序
- 順序投遞:MQ保證消息按發(fā)送順序投遞
- 順序處理:消費者順序處理消息
場景八:延遲消息
背景描述
需要實現(xiàn)定時任務(wù),如訂單超時未支付自動取消。
MQ解決方案
// 訂單服務(wù) - 發(fā)送延遲消息
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 保存訂單
orderDao.save(order);
// 發(fā)送延遲消息,30分鐘后檢查支付狀態(tài)
rabbitTemplate.convertAndSend(
"order.delay.exchange",
"order.create",
new OrderCreateEvent(order.getId()),
message -> {
message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分鐘
return message;
}
);
}
}
// 訂單超時檢查消費者
@Component
@RabbitListener(queues = "order.delay.queue")
public class OrderTimeoutConsumer {
@RabbitHandler
public void checkOrderPayment(OrderCreateEvent event) {
Order order = orderDao.findById(event.getOrderId());
if ("UNPAID".equals(order.getStatus())) {
// 超時未支付,取消訂單
orderService.cancelOrder(order.getId(), "超時未支付");
}
}
}替代方案對比
方案 | 優(yōu)點 | 缺點 |
數(shù)據(jù)庫輪詢 | 實現(xiàn)簡單 | 實時性差,數(shù)據(jù)庫壓力大 |
延時隊列 | 實時性好 | 實現(xiàn)復(fù)雜,消息堆積問題 |
定時任務(wù) | 可控性強 | 分布式協(xié)調(diào)復(fù)雜 |
場景九:消息重試
背景描述
處理消息時可能遇到臨時故障,需要重試機制保證最終處理成功。
MQ解決方案
// 消息消費者 with 重試機制
@Service
@Slf4j
public class RetryableConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "business.queue")
public void processMessage(Message message, Channel channel) {
try {
// 業(yè)務(wù)處理
businessService.process(message);
// 確認(rèn)消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (TemporaryException e) {
// 臨時異常,重試
log.warn("處理失敗,準(zhǔn)備重試", e);
// 拒絕消息,requeue=true
channel.basicNack(
message.getMessageProperties().getDeliveryTag(),
false,
true// 重新入隊
);
} catch (PermanentException e) {
// 永久異常,進(jìn)入死信隊列
log.error("處理失敗,進(jìn)入死信隊列", e);
channel.basicNack(
message.getMessageProperties().getDeliveryTag(),
false,
false// 不重新入隊
);
}
}
}重試策略
- 立即重試:臨時故障立即重試
- 延遲重試:逐步增加重試間隔
- 死信隊列:最終無法處理的消息進(jìn)入死信隊列
場景十:事務(wù)消息
背景描述
分布式系統(tǒng)中,需要保證多個服務(wù)的數(shù)據(jù)一致性。
MQ解決方案
// 事務(wù)消息生產(chǎn)者
@Service
public class TransactionalMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Transactional
public void createOrderWithTransaction(Order order) {
// 1. 保存訂單(數(shù)據(jù)庫事務(wù))
orderDao.save(order);
// 2. 發(fā)送事務(wù)消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-tx-topic",
MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))
.build(),
order // 事務(wù)參數(shù)
);
if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
thrownew RuntimeException("事務(wù)消息發(fā)送失敗");
}
}
}
// 事務(wù)消息監(jiān)聽器
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderDao orderDao;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 檢查本地事務(wù)狀態(tài)
Order order = (Order) arg;
Order existOrder = orderDao.findById(order.getId());
if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} else {
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 回查本地事務(wù)狀態(tài)
String orderId = (String) msg.getHeaders().get("order_id");
Order order = orderDao.findById(orderId);
if (order != null && "CREATED".equals(order.getStatus())) {
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} else {
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
}
}事務(wù)消息流程
圖片
總結(jié)
通過以上10個場景,我們可以總結(jié)出MQ使用的核心原則:
適用場景
- 異步處理:提升系統(tǒng)響應(yīng)速度
- 系統(tǒng)解耦:降低系統(tǒng)間依賴
- 流量削峰:應(yīng)對突發(fā)流量
- 數(shù)據(jù)同步:保證最終一致性
- 分布式事務(wù):解決數(shù)據(jù)一致性問題
技術(shù)選型建議
場景 | 推薦MQ | 原因 |
高吞吐 | Kafka | 高吞吐量,持久化存儲 |
事務(wù)消息 | RocketMQ | 完整的事務(wù)消息機制 |
復(fù)雜路由 | RabbitMQ | 靈活的路由配置 |
延遲消息 | RabbitMQ | 原生支持延遲隊列 |
最佳實踐
- 消息冪等性:消費者必須實現(xiàn)冪等處理
- 死信隊列:處理失敗的消息要有兜底方案
- 監(jiān)控告警:完善的消息堆積監(jiān)控和告警
- 性能優(yōu)化:根據(jù)業(yè)務(wù)特點調(diào)整MQ參數(shù)































