偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Redis + MQ:高并發(fā)秒殺的技術(shù)方案與實現(xiàn)

開發(fā) 架構(gòu)
系統(tǒng)可通過定時任務(wù)對比??Redis??流水、??MySQL??庫存流水與訂單表數(shù)據(jù):若??Redis??流水存在但訂單表無對應(yīng)記錄,說明訂單生成失敗,需人工介入補單或回滾??Redis??庫存,避免少賣;若訂單表有記錄但??MySQL??庫存未扣減,則需觸發(fā)庫存補扣,避免多賣。

前言

在電商秒殺場景中,瞬間爆發(fā)的海量請求往往成為系統(tǒng)的生死考驗。當(dāng)并發(fā)量達到數(shù)萬甚至數(shù)十萬QPS時,傳統(tǒng)數(shù)據(jù)庫單表架構(gòu)難以支撐,而Redis與消息隊列(MQ)的組合憑借其高性能與可靠性,成為應(yīng)對高并發(fā)秒殺的黃金方案。

方案總覽

用戶請求 → 前端生成Token → Redis執(zhí)行Lua腳本(預(yù)扣減+防重+流水)→ 發(fā)送RocketMQ事務(wù)消息 → 
[本地事務(wù)校驗Redis結(jié)果] → MQ消息確認(COMMIT/ROLLBACK)→ 消費者消費消息 → MySQL扣減庫存+記錄訂單

秒殺系統(tǒng)的核心訴求是抗并發(fā)、防超賣、保一致。Redis+MQ 方案通過 “前端攔截 - 中間緩沖 - 后端落地” 的三層架構(gòu)實現(xiàn)這一目標(biāo):

  • 前端攔截:Redis通過Lua腳本原子性處理庫存預(yù)扣減,過濾無效請求;
  • 中間緩沖:MQ(如 RocketMQ)通過事務(wù)消息削峰填谷,確保流量平穩(wěn)進入數(shù)據(jù)庫;
  • 后端落地:MySQL最終存儲庫存與訂單數(shù)據(jù),通過事務(wù)消息保障與Redis的一致性。

流程拆解(示例代碼)

Redis 庫存預(yù)扣減

預(yù)扣減流程

開始
  │
  ├─ 生成Token(前端)
  │
  ├─ 前端攜帶Token請求秒殺
  │
  ├─ 執(zhí)行Lua腳本
  │   │
  │   ├─ 檢查Token是否存在(Hash結(jié)構(gòu))
  │   │   ├─ 存在 → 返回“重復(fù)提交”
  │   │   └─ 不存在 → 繼續(xù)
  │   │
  │   ├─ 獲取Redis庫存(String結(jié)構(gòu))
  │   │   ├─ 庫存不足 → 返回“庫存不足”
  │   │   └─ 庫存充足 → 繼續(xù)
  │   │
  │   ├─ 扣減Redis庫存并更新
  │   │
  │   └─ 記錄流水到Hash結(jié)構(gòu)
  │
  ├─ 返回扣減結(jié)果(成功/失?。?  │
結(jié)束

Lua 腳本

-- 啟用Redis命令復(fù)制,確保腳本在集群環(huán)境中正確同步
redis.replicate_commands()

-- 1. 防重提交校驗:通過用戶ID+Token判斷是否重復(fù)提交
-- KEYS[2]為用戶ID(uid),ARGV[2]為本次請求的Token
if redis.call('hexists', KEYS[2], ARGV[2]) == 1 then
    return redis.error_reply('repeat submit')  -- 重復(fù)提交,返回錯誤
end 

-- 2. 庫存充足性校驗
local product_id = KEYS[1]  -- 商品ID
local stock = redis.call('get', KEYS[1])  -- 獲取當(dāng)前庫存
if not stock then  -- 庫存不存在(如商品未上架)
    return redis.error_reply('product not found')
end
if tonumber(stock) < tonumber(ARGV[1]) then  -- 庫存不足
    return redis.error_reply('stock is not enough')
end 

-- 3. 執(zhí)行庫存扣減
local remaining_stock = tonumber(stock) - tonumber(ARGV[1])
redis.call('set', KEYS[1], tostring(remaining_stock))  -- 更新庫存

-- 4. 記錄交易流水(用于后續(xù)一致性校驗)
local time = redis.call('time')  -- 獲取當(dāng)前時間(秒+微秒)
local currentTimeMillis = (time[1] * 1000) + math.floor(time[2] / 1000)  -- 轉(zhuǎn)換為毫秒時間戳
-- 存儲流水到Hash結(jié)構(gòu):用戶ID → Token → 流水詳情
redis.call('hset', KEYS[2], ARGV[2], 
    cjson.encode({
        action = '扣減庫存',
        product = product_id,
        from = stock,  -- 扣減前庫存
        to = remaining_stock,  -- 扣減后庫存
        change = ARGV[1],  -- 扣減數(shù)量
        token = ARGV[2],
        timestamp = currentTimeMillis
    })
)

return remaining_stock  -- 返回扣減后庫存

Java 調(diào)用 Lua

@Service
public class SeckillService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    // 加載Lua腳本
    private DefaultRedisScript<Long> stockScript;

    @PostConstruct
    public void init() {
        stockScript = new DefaultRedisScript<>();
        stockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("seckill.lua")));
        stockScript.setResultType(Long.class);
    }

    /**
     * 執(zhí)行Redis庫存預(yù)扣減
     * @param productId 商品ID
     * @param uid 用戶ID
     * @param quantity 購買數(shù)量
     * @param token 防重Token
     * @return 扣減后庫存(-1表示失?。?     */
    public Long preDeductStock(String productId, String uid, Integer quantity, String token) {
        try {
            // 執(zhí)行Lua腳本:KEYS = [商品ID, 用戶ID],ARGV = [數(shù)量, Token]
            return redisTemplate.execute(
                stockScript,
                Arrays.asList(productId, uid),
                quantity.toString(),
                token
            );
        } catch (Exception e) {
            log.error("Redis預(yù)扣減失敗", e);
            return -1L;
        }
    }
}

MySQL 庫存扣減

扣減流程圖

開始
  │
  ├─ 發(fā)送半消息到RocketMQ
  │
  ├─ 執(zhí)行本地事務(wù)
  │   │
  │   ├─ 檢查Redis流水是否存在
  │   │   ├─ 存在 → 提交消息(COMMIT)
  │   │   └─ 不存在 → 回滾消息(ROLLBACK)
  │   │
  │   └─ 未知狀態(tài) → 等待回查
  │
  ├─ RocketMQ回查機制
  │   ├─ 有流水 → 提交消息
  │   └─ 無流水 → 回滾消息
  │
  ├─ 消息被消費
  │   │
  │   ├─ 查詢數(shù)據(jù)庫當(dāng)前版本號(樂觀鎖)
  │   │
  │   ├─ 執(zhí)行庫存扣減(WHERE version = 當(dāng)前版本)
  │   │   ├─ 扣減成功 → 記錄數(shù)據(jù)庫流水
  │   │   └─ 扣減失敗 → 拋出異常(觸發(fā)重試)
  │   │
  ├─ 結(jié)束

發(fā)送半消息

系統(tǒng)首先向RocketMQ發(fā)送一條半消息Half Message)。此時消息處于不可消費狀態(tài),需等待生產(chǎn)者確認本地事務(wù)執(zhí)行結(jié)果后,才會被消費者處理。

// 發(fā)送半消息
public void sendHalfMessage(String productId, String uid, String token, Integer quantity) {
    // 構(gòu)建消息
    Message message = new Message(
        "seckill_topic",  // 主題
        "stock_deduct",   // 標(biāo)簽
        JSON.toJSONString(new SeckillMessage(productId, uid, token, quantity)).getBytes()
    );
    // 發(fā)送事務(wù)消息
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
        "seckill_producer_group",  // 生產(chǎn)者組
        message,
        null  // 本地事務(wù)參數(shù)(可傳遞上下文)
    );
    log.info("半消息發(fā)送結(jié)果:{}", result.getSendStatus());
}

本地事務(wù)校驗

本地事務(wù)的核心是判斷Redis預(yù)扣減是否成功:

  • RedisLua腳本執(zhí)行成功(即庫存預(yù)扣減完成且流水已記錄),則向RocketMQ返回 提交COMMIT)指令,消息變?yōu)榭上M狀態(tài);
  • Redis預(yù)扣減失?。ㄈ鐜齑娌蛔慊蛑貜?fù)提交),則返回回滾ROLLBACK)指令,消息被丟棄。
  • RocketMQ長時間未收到本地事務(wù)結(jié)果(如生產(chǎn)者宕機),會觸發(fā)消息回查。此時系統(tǒng)通過檢查Redis中是否存在對應(yīng)交易流水,判斷是否需要提交消息:若流水存在,則提交;否則回滾。
@Component
public class SeckillTransactionListener implements TransactionListener {

    @Autowired
    private StringRedisTemplate redisTemplate;

    // 執(zhí)行本地事務(wù)
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class);
            // 檢查Redis中是否存在對應(yīng)流水(驗證預(yù)扣減成功)
            Boolean flag = redisTemplate.opsForHash().hasKey(
                message.getUid(),  // Hash key:用戶ID
                message.getToken()  // Hash field:Token
            );
            return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.UNKNOWN;  // 未知狀態(tài),觸發(fā)回查
        }
    }

    // 消息回查(解決超時未確認問題)
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class);
        // 回查邏輯:再次檢查流水是否存在
        Boolean flag = redisTemplate.opsForHash().hasKey(message.getUid(), message.getToken());
        return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
    }
}

消費消息并扣減 MySQL 庫存

消費者監(jiān)聽消息,執(zhí)行數(shù)據(jù)庫扣減(需保證冪等性): 消費者接收到可消費的消息后,執(zhí)行MySQL庫存扣減操作,并同步記錄數(shù)據(jù)庫中的交易流水。為確保消費成功,需利用MQ的重試機制:若消費失?。ㄈ鐢?shù)據(jù)庫暫時不可用),MQ會自動重試,直至消費成功或達到最大重試次數(shù)(此時需人工介入處理)。

@Component
@RocketMQMessageListener(
    topic = "seckill_topic",
    consumerGroup = "seckill_consumer_group",
    messageModel = MessageModel.CLUSTERING
)
public class SeckillConsumer implements RocketMQListener<MessageExt> {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public void onMessage(MessageExt message) {
        SeckillMessage msg = JSON.parseObject(new String(message.getBody()), SeckillMessage.class);
        String productId = msg.getProductId();
        int quantity = msg.getQuantity();

        // 數(shù)據(jù)庫扣減(使用樂觀鎖防超賣)
        String sql = "UPDATE product_stock " +
                    "SET stock = stock - ?, version = version + 1 " +
                    "WHERE product_id = ? AND stock >= ? AND version = ?";

        // 1. 查詢當(dāng)前版本號
        Integer version = jdbcTemplate.queryForObject(
            "SELECT version FROM product_stock WHERE product_id = ?",
            Integer.class,
            productId
        );

        // 2. 執(zhí)行扣減(樂觀鎖保證原子性)
        int rows = jdbcTemplate.update(sql, quantity, productId, quantity, version);
        if (rows > 0) {
            // 扣減成功:記錄數(shù)據(jù)庫流水
            jdbcTemplate.update(
                "INSERT INTO stock_flow (product_id, quantity, op_type, create_time) " +
                "VALUES (?, ?, 'SECKILL', NOW())",
                productId, quantity
            );
            // 確認消費成功(返回ACK)
        } else {
            // 扣減失?。河|發(fā)重試(MQ默認重試機制)
            throw new RuntimeException("數(shù)據(jù)庫扣減失敗,觸發(fā)重試");
        }
    }
}

一致性保障

為防止RedisMySQL數(shù)據(jù)不一致(如Redis扣減成功但MySQL扣減失?。?,需定期對賬:

@Scheduled(cron = "0 0 */1 * * ?")  // 每小時執(zhí)行一次
public void reconcileStock() {
    // 1. 掃描Redis中未同步到MySQL的流水
    Set<String> uids = redisTemplate.keys("uid:*");  // 假設(shè)用戶ID前綴為uid:
    for (String uid : uids) {
        Map<Object, Object> tokenMap = redisTemplate.opsForHash().entries(uid);
        for (Map.Entry<Object, Object> entry : tokenMap.entrySet()) {
            String token = (String) entry.getKey();
            String flowJson = (String) entry.getValue();
            SeckillFlow flow = JSON.parseObject(flowJson, SeckillFlow.class);

            // 2. 檢查MySQL是否有對應(yīng)訂單
            Integer count = jdbcTemplate.queryForObject(
                "SELECT COUNT(1) FROM orders WHERE product_id = ? AND uid = ? AND token = ?",
                Integer.class,
                flow.getProduct(), flow.getUid(), token
            );

            if (count == 0) {
                // 3. 未找到訂單 → 人工介入或自動回滾Redis庫存
                log.warn("發(fā)現(xiàn)不一致:Redis有流水但MySQL無訂單,product={}, uid={}", flow.getProduct(), uid);
                // redisTemplate.opsForValue().increment(flow.getProduct(), Integer.parseInt(flow.getChange()));
            }
        }
    }
}

系統(tǒng)可通過定時任務(wù)對比Redis流水、MySQL庫存流水與訂單表數(shù)據(jù):若Redis流水存在但訂單表無對應(yīng)記錄,說明訂單生成失敗,需人工介入補單或回滾Redis庫存,避免少賣;若訂單表有記錄但MySQL庫存未扣減,則需觸發(fā)庫存補扣,避免多賣。

總結(jié)

Redis + MQ 方案通過預(yù)扣減 + 事務(wù)消息 + 對賬三重機制,完美解決了高并發(fā)秒殺的核心痛點:

  • Redis承擔(dān)高并發(fā)讀寫,通過Lua腳本確保原子性,防止超賣;
  • MQ事務(wù)消息保障RedisMySQL的最終一致性,避免數(shù)據(jù)斷層;
  • 流水對賬作為最后一道防線,及時發(fā)現(xiàn)并修復(fù)異常。
責(zé)任編輯:武曉燕 來源: 一安未來
相關(guān)推薦

2024-08-01 11:38:40

2019-10-30 16:54:08

golangredis數(shù)據(jù)庫

2018-09-15 04:59:01

2025-04-08 05:00:00

2025-02-20 00:01:00

2021-08-26 08:24:33

高并發(fā)秒殺系統(tǒng)

2022-03-31 17:38:09

高并發(fā)系統(tǒng)架構(gòu)設(shè)計負載均衡

2020-10-14 07:20:53

高并發(fā)

2024-10-08 10:10:00

削峰高并發(fā)流量

2024-07-03 11:01:55

2025-05-28 02:20:00

2025-06-13 07:42:13

2025-01-20 00:00:03

高并發(fā)秒殺業(yè)務(wù)

2025-07-28 02:22:00

2018-08-24 09:26:13

Redis高可用方式

2018-08-21 10:32:43

數(shù)據(jù)庫Redis高可用技術(shù)

2020-04-01 17:31:03

Redis系統(tǒng)秒殺

2020-04-13 08:33:39

高并發(fā)秒殺系統(tǒng)

2014-11-14 09:42:53

VoLTE
點贊
收藏

51CTO技術(shù)棧公眾號