偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

我工作中用MQ的十種場景

開發(fā) 項目管理
記得剛工作那會兒,我總是想不明白:為什么明明直接調(diào)用接口就能完成的功能,非要引入MQ這么個"中間商"?

前言

最近有球友問我:MQ的使用場景有哪些?工作中一定要使用MQ嗎?

記得剛工作那會兒,我總是想不明白:為什么明明直接調(diào)用接口就能完成的功能,非要引入MQ這么個"中間商"?

直到經(jīng)歷了系統(tǒng)崩潰、數(shù)據(jù)丟失、性能瓶頸等一系列問題后,我才真正理解了MQ的價值。

今天我想和大家分享我在實際工作中使用消息隊列(MQ)的10種典型場景,希望對你會有所幫助。

一、為什么需要消息隊列(MQ)?

在深入具體場景之前,我們先來思考一個基本問題:為什么要使用消息隊列?

系統(tǒng)間的直接調(diào)用:

圖片圖片

引入消息隊列后:

圖片圖片

接下來我們將通過10個具體場景,帶大家來深入理解MQ的價值。

場景一:系統(tǒng)解耦

背景描述

在我早期參與的一個電商項目中,訂單創(chuàng)建后需要通知多個系統(tǒng):

// 早期的緊耦合設(shè)計
public class OrderService {
    private InventoryService inventoryService;
    private PointsService pointsService;
    private EmailService emailService;
    private AnalyticsService analyticsService;
    
    public void createOrder(Order order) {
        // 1. 保存訂單
        orderDao.save(order);
        
        // 2. 調(diào)用庫存服務(wù)
        inventoryService.updateInventory(order);
        
        // 3. 調(diào)用積分服務(wù)
        pointsService.addPoints(order.getUserId(), order.getAmount());
        
        // 4. 發(fā)送郵件通知
        emailService.sendOrderConfirmation(order);
        
        // 5. 記錄分析數(shù)據(jù)
        analyticsService.trackOrderCreated(order);
        
        // 更多服務(wù)...
    }
}

這種架構(gòu)存在嚴(yán)重問題:

  • 緊耦合:訂單服務(wù)需要知道所有下游服務(wù)
  • 單點故障:任何一個下游服務(wù)掛掉都會導(dǎo)致訂單創(chuàng)建失敗
  • 性能瓶頸:同步調(diào)用導(dǎo)致響應(yīng)時間慢

MQ解決方案

引入MQ后,架構(gòu)變?yōu)椋?/span>

圖片圖片

代碼實現(xiàn)

// 訂單服務(wù) - 生產(chǎn)者
@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void createOrder(Order order) {
        // 1. 保存訂單
        orderDao.save(order);
        
        // 2. 發(fā)送消息到MQ
        rabbitTemplate.convertAndSend(
            "order.exchange",
            "order.created",
            new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())
        );
    }
}

// 庫存服務(wù) - 消費者
@Component
@RabbitListener(queues = "inventory.queue")
public class InventoryConsumer {
    @Autowired
    private InventoryService inventoryService;
    
    @RabbitHandler
    public void handleOrderCreated(OrderCreatedEvent event) {
        inventoryService.updateInventory(event.getOrderId());
    }
}

技術(shù)要點

  1. 消息協(xié)議選擇:根據(jù)業(yè)務(wù)需求選擇RabbitMQ、Kafka或RocketMQ
  2. 消息格式:使用JSON或Protobuf等跨語言格式
  3. 錯誤處理:實現(xiàn)重試機制和死信隊列

場景二:異步處理

背景描述

用戶上傳視頻后需要執(zhí)行轉(zhuǎn)碼、生成縮略圖、內(nèi)容審核等耗時操作,如果同步處理,用戶需要等待很長時間。

MQ解決方案

// 視頻服務(wù) - 生產(chǎn)者
@Service
public class VideoService {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    public UploadResponse uploadVideo(MultipartFile file, String userId) {
        // 1. 保存原始視頻
        String videoId = saveOriginalVideo(file);
        
        // 2. 發(fā)送處理消息
        kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));
        
        // 3. 立即返回響應(yīng)
        return new UploadResponse(videoId, "upload_success");
    }
}

// 視頻處理服務(wù) - 消費者
@Service
public class VideoProcessingConsumer {
    @KafkaListener(topics = "video-processing")
    public void processVideo(VideoProcessingEvent event) {
        // 異步執(zhí)行耗時操作
        videoProcessor.transcode(event.getVideoId());
        videoProcessor.generateThumbnails(event.getVideoId());
        contentModerationService.checkContent(event.getVideoId());
        
        // 發(fā)送處理完成通知
        notificationService.notifyUser(event.getUserId(), event.getVideoId());
    }
}

架構(gòu)優(yōu)勢

  1. 快速響應(yīng):用戶上傳后立即得到響應(yīng)
  2. 彈性擴展:可以根據(jù)處理壓力動態(tài)調(diào)整消費者數(shù)量
  3. 故障隔離:處理服務(wù)故障不會影響上傳功能

場景三:流量削峰

背景描述

電商秒殺活動時,瞬時流量可能是平時的百倍以上,直接沖擊數(shù)據(jù)庫和服務(wù)。

MQ解決方案

圖片圖片

代碼實現(xiàn)

// 秒殺服務(wù)
@Service
public class SecKillService {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public SecKillResponse secKill(SecKillRequest request) {
        // 1. 校驗用戶資格
        if (!checkUserQualification(request.getUserId())) {
            return SecKillResponse.failed("用戶無資格");
        }
        
        // 2. 預(yù)減庫存(Redis原子操作)
        Long remaining = redisTemplate.opsForValue().decrement(
            "sec_kill_stock:" + request.getItemId());
        
        if (remaining == null || remaining < 0) {
            // 庫存不足,恢復(fù)庫存
            redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());
            return SecKillResponse.failed("庫存不足");
        }
        
        // 3. 發(fā)送秒殺成功消息到MQ
        rabbitTemplate.convertAndSend(
            "sec_kill.exchange",
            "sec_kill.success",
            new SecKillSuccessEvent(request.getUserId(), request.getItemId())
        );
        
        return SecKillResponse.success("秒殺成功");
    }
}

// 訂單處理消費者
@Component
@RabbitListener(queues = "sec_kill.order.queue")
public class SecKillOrderConsumer {
    @RabbitHandler
    public void handleSecKillSuccess(SecKillSuccessEvent event) {
        // 異步創(chuàng)建訂單
        orderService.createSecKillOrder(event.getUserId(), event.getItemId());
    }
}

技術(shù)要點

  1. 庫存預(yù)扣:使用Redis原子操作避免超賣
  2. 隊列緩沖:MQ緩沖請求,避免直接沖擊數(shù)據(jù)庫
  3. 限流控制:在網(wǎng)關(guān)層進(jìn)行限流,拒絕過多請求

場景四:數(shù)據(jù)同步

背景描述

在微服務(wù)架構(gòu)中,不同服務(wù)有自己的數(shù)據(jù)庫,需要保證數(shù)據(jù)一致性。

MQ解決方案

// 用戶服務(wù) - 數(shù)據(jù)變更時發(fā)送消息
@Service
public class UserService {
    @Transactional
    public User updateUser(User user) {
        // 1. 更新數(shù)據(jù)庫
        userDao.update(user);
        
        // 2. 發(fā)送消息(在事務(wù)內(nèi))
        rocketMQTemplate.sendMessageInTransaction(
            "user-update-topic",
            MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))
                .build(),
            null
        );
        
        return user;
    }
}

// 其他服務(wù) - 消費用戶更新消息
@Service
@RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")
public class UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {
    @Override
    public void onMessage(UserUpdateEvent event) {
        // 更新本地用戶信息緩存
        orderService.updateUserCache(event.getUserId(), event.getStatus());
    }
}

一致性保證

  1. 本地事務(wù)表:將消息和業(yè)務(wù)數(shù)據(jù)放在同一個數(shù)據(jù)庫事務(wù)中
  2. 事務(wù)消息:使用RocketMQ的事務(wù)消息機制
  3. 冪等消費:消費者實現(xiàn)冪等性,避免重復(fù)處理

場景五:日志收集

背景描述

分布式系統(tǒng)中,日志分散在各個節(jié)點,需要集中收集和分析。

MQ解決方案

圖片圖片

代碼實現(xiàn)

// 日志收集組件
@Component
public class LogCollector {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void collectLog(String appId, String level, String message, Map<String, Object> context) {
        LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());
        
        // 發(fā)送到Kafka
        kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));
    }
}

// 日志消費者
@Service
public class LogConsumer {
    @KafkaListener(topics = "app-logs", groupId = "log-es")
    public void consumeLog(String message) {
        LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);
        
        // 存儲到Elasticsearch
        elasticsearchService.indexLog(logEvent);
        
        // 實時監(jiān)控檢查
        if ("ERROR".equals(logEvent.getLevel())) {
            alertService.checkAndAlert(logEvent);
        }
    }
}

技術(shù)優(yōu)勢

  1. 解耦:應(yīng)用節(jié)點無需關(guān)心日志如何處理
  2. 緩沖:應(yīng)對日志產(chǎn)生速率波動
  3. 多消費:同一份日志可以被多個消費者處理

場景六:消息廣播

背景描述

系統(tǒng)配置更新后,需要通知所有服務(wù)節(jié)點更新本地配置。

MQ解決方案

// 配置服務(wù) - 廣播配置更新
@Service
public class ConfigService {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void updateConfig(String configKey, String configValue) {
        // 1. 更新配置存儲
        configDao.updateConfig(configKey, configValue);
        
        // 2. 廣播配置更新消息
        redisTemplate.convertAndSend("config-update-channel", 
            new ConfigUpdateEvent(configKey, configValue));
    }
}

// 服務(wù)節(jié)點 - 訂閱配置更新
@Component
public class ConfigUpdateListener {
    @Autowired
    private LocalConfigCache localConfigCache;
    
    @RedisListener(channel = "config-update-channel")
    public void handleConfigUpdate(ConfigUpdateEvent event) {
        // 更新本地配置緩存
        localConfigCache.updateConfig(event.getKey(), event.getValue());
    }
}

應(yīng)用場景

  1. 功能開關(guān):動態(tài)開啟或關(guān)閉功能
  2. 參數(shù)調(diào)整:調(diào)整超時時間、限流閾值等
  3. 黑白名單:更新黑白名單配置

場景七:順序消息

背景描述

在某些業(yè)務(wù)場景中,消息的處理順序很重要,如訂單狀態(tài)變更。

MQ解決方案

// 訂單狀態(tài)變更服務(wù)
@Service
public class OrderStateService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void changeOrderState(String orderId, String oldState, String newState) {
        OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);
        
        // 發(fā)送順序消息,使用orderId作為sharding key
        rocketMQTemplate.syncSendOrderly(
            "order-state-topic", 
            event, 
            orderId  // 保證同一訂單的消息按順序處理
        );
    }
}

// 訂單狀態(tài)消費者
@Service
@RocketMQMessageListener(
    topic = "order-state-topic",
    consumerGroup = "order-state-group",
    consumeMode = ConsumeMode.ORDERLY  // 順序消費
)
public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {
    @Override
    public void onMessage(OrderStateEvent event) {
        // 按順序處理訂單狀態(tài)變更
        orderService.processStateChange(event);
    }
}

順序保證機制

  1. 分區(qū)順序:同一分區(qū)內(nèi)的消息保證順序
  2. 順序投遞:MQ保證消息按發(fā)送順序投遞
  3. 順序處理:消費者順序處理消息

場景八:延遲消息

背景描述

需要實現(xiàn)定時任務(wù),如訂單超時未支付自動取消。

MQ解決方案

// 訂單服務(wù) - 發(fā)送延遲消息
@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void createOrder(Order order) {
        // 保存訂單
        orderDao.save(order);
        
        // 發(fā)送延遲消息,30分鐘后檢查支付狀態(tài)
        rabbitTemplate.convertAndSend(
            "order.delay.exchange",
            "order.create",
            new OrderCreateEvent(order.getId()),
            message -> {
                message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分鐘
                return message;
            }
        );
    }
}

// 訂單超時檢查消費者
@Component
@RabbitListener(queues = "order.delay.queue")
public class OrderTimeoutConsumer {
    @RabbitHandler
    public void checkOrderPayment(OrderCreateEvent event) {
        Order order = orderDao.findById(event.getOrderId());
        if ("UNPAID".equals(order.getStatus())) {
            // 超時未支付,取消訂單
            orderService.cancelOrder(order.getId(), "超時未支付");
        }
    }
}

替代方案對比

方案

優(yōu)點

缺點

數(shù)據(jù)庫輪詢

實現(xiàn)簡單

實時性差,數(shù)據(jù)庫壓力大

延時隊列

實時性好

實現(xiàn)復(fù)雜,消息堆積問題

定時任務(wù)

可控性強

分布式協(xié)調(diào)復(fù)雜

場景九:消息重試

背景描述

處理消息時可能遇到臨時故障,需要重試機制保證最終處理成功。

MQ解決方案

// 消息消費者 with 重試機制
@Service
@Slf4j
public class RetryableConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @RabbitListener(queues = "business.queue")
    public void processMessage(Message message, Channel channel) {
        try {
            // 業(yè)務(wù)處理
            businessService.process(message);
            
            // 確認(rèn)消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
        } catch (TemporaryException e) {
            // 臨時異常,重試
            log.warn("處理失敗,準(zhǔn)備重試", e);
            
            // 拒絕消息,requeue=true
            channel.basicNack(
                message.getMessageProperties().getDeliveryTag(),
                false,
                true// 重新入隊
            );
            
        } catch (PermanentException e) {
            // 永久異常,進(jìn)入死信隊列
            log.error("處理失敗,進(jìn)入死信隊列", e);
            
            channel.basicNack(
                message.getMessageProperties().getDeliveryTag(),
                false,
                false// 不重新入隊
            );
        }
    }
}

重試策略

  1. 立即重試:臨時故障立即重試
  2. 延遲重試:逐步增加重試間隔
  3. 死信隊列:最終無法處理的消息進(jìn)入死信隊列

場景十:事務(wù)消息

背景描述

分布式系統(tǒng)中,需要保證多個服務(wù)的數(shù)據(jù)一致性。

MQ解決方案

// 事務(wù)消息生產(chǎn)者
@Service
public class TransactionalMessageService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Transactional
    public void createOrderWithTransaction(Order order) {
        // 1. 保存訂單(數(shù)據(jù)庫事務(wù))
        orderDao.save(order);
        
        // 2. 發(fā)送事務(wù)消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "order-tx-topic",
            MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))
                .build(),
            order  // 事務(wù)參數(shù)
        );
        
        if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
            thrownew RuntimeException("事務(wù)消息發(fā)送失敗");
        }
    }
}

// 事務(wù)消息監(jiān)聽器
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderDao orderDao;
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 檢查本地事務(wù)狀態(tài)
            Order order = (Order) arg;
            Order existOrder = orderDao.findById(order.getId());
            
            if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {
                return RocketMQLocalTransactionState.COMMIT_MESSAGE;
            } else {
                return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 回查本地事務(wù)狀態(tài)
        String orderId = (String) msg.getHeaders().get("order_id");
        Order order = orderDao.findById(orderId);
        
        if (order != null && "CREATED".equals(order.getStatus())) {
            return RocketMQLocalTransactionState.COMMIT_MESSAGE;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

事務(wù)消息流程

圖片圖片

總結(jié)

通過以上10個場景,我們可以總結(jié)出MQ使用的核心原則:

適用場景

  1. 異步處理:提升系統(tǒng)響應(yīng)速度
  2. 系統(tǒng)解耦:降低系統(tǒng)間依賴
  3. 流量削峰:應(yīng)對突發(fā)流量
  4. 數(shù)據(jù)同步:保證最終一致性
  5. 分布式事務(wù):解決數(shù)據(jù)一致性問題

技術(shù)選型建議

場景

推薦MQ

原因

高吞吐

Kafka

高吞吐量,持久化存儲

事務(wù)消息

RocketMQ

完整的事務(wù)消息機制

復(fù)雜路由

RabbitMQ

靈活的路由配置

延遲消息

RabbitMQ

原生支持延遲隊列

最佳實踐

  1. 消息冪等性:消費者必須實現(xiàn)冪等處理
  2. 死信隊列:處理失敗的消息要有兜底方案
  3. 監(jiān)控告警:完善的消息堆積監(jiān)控和告警
  4. 性能優(yōu)化:根據(jù)業(yè)務(wù)特點調(diào)整MQ參數(shù)
責(zé)任編輯:武曉燕 來源: 蘇三說技術(shù)
相關(guān)推薦

2024-01-18 08:21:55

2018-09-25 23:21:13

2022-01-09 18:32:03

MySQL SQL 語句數(shù)據(jù)庫

2024-12-17 08:20:50

2024-02-28 07:53:30

Redis數(shù)據(jù)存儲數(shù)據(jù)庫

2025-02-10 08:30:00

JavaScrip開發(fā)設(shè)計模式

2024-04-15 00:10:00

Redis數(shù)據(jù)庫

2024-12-30 08:29:05

2024-11-13 13:20:44

2015-10-26 09:38:23

程序員工作

2023-02-08 13:07:54

2015-12-09 09:41:52

AngularJS開發(fā)錯誤

2024-11-25 09:08:10

Redis高頻應(yīng)用場景

2023-05-15 15:29:13

設(shè)計模式JavaScript

2020-08-13 07:00:00

工具技術(shù)管理

2010-08-30 16:18:05

2024-08-22 08:54:40

2010-07-07 11:30:16

UML十種圖

2010-09-13 17:17:04

2024-01-22 08:15:42

API協(xié)議設(shè)計
點贊
收藏

51CTO技術(shù)棧公眾號