Redis + MQ:高并發(fā)秒殺的技術(shù)方案與實現(xiàn)
前言
在電商秒殺場景中,瞬間爆發(fā)的海量請求往往成為系統(tǒng)的生死考驗。當(dāng)并發(fā)量達到數(shù)萬甚至數(shù)十萬QPS時,傳統(tǒng)數(shù)據(jù)庫單表架構(gòu)難以支撐,而Redis與消息隊列(MQ)的組合憑借其高性能與可靠性,成為應(yīng)對高并發(fā)秒殺的黃金方案。
方案總覽
用戶請求 → 前端生成Token → Redis執(zhí)行Lua腳本(預(yù)扣減+防重+流水)→ 發(fā)送RocketMQ事務(wù)消息 →
[本地事務(wù)校驗Redis結(jié)果] → MQ消息確認(COMMIT/ROLLBACK)→ 消費者消費消息 → MySQL扣減庫存+記錄訂單秒殺系統(tǒng)的核心訴求是抗并發(fā)、防超賣、保一致。Redis+MQ 方案通過 “前端攔截 - 中間緩沖 - 后端落地” 的三層架構(gòu)實現(xiàn)這一目標(biāo):
- 前端攔截:
Redis通過Lua腳本原子性處理庫存預(yù)扣減,過濾無效請求; - 中間緩沖:
MQ(如RocketMQ)通過事務(wù)消息削峰填谷,確保流量平穩(wěn)進入數(shù)據(jù)庫; - 后端落地:
MySQL最終存儲庫存與訂單數(shù)據(jù),通過事務(wù)消息保障與Redis的一致性。
流程拆解(示例代碼)
Redis 庫存預(yù)扣減
預(yù)扣減流程
開始
│
├─ 生成Token(前端)
│
├─ 前端攜帶Token請求秒殺
│
├─ 執(zhí)行Lua腳本
│ │
│ ├─ 檢查Token是否存在(Hash結(jié)構(gòu))
│ │ ├─ 存在 → 返回“重復(fù)提交”
│ │ └─ 不存在 → 繼續(xù)
│ │
│ ├─ 獲取Redis庫存(String結(jié)構(gòu))
│ │ ├─ 庫存不足 → 返回“庫存不足”
│ │ └─ 庫存充足 → 繼續(xù)
│ │
│ ├─ 扣減Redis庫存并更新
│ │
│ └─ 記錄流水到Hash結(jié)構(gòu)
│
├─ 返回扣減結(jié)果(成功/失?。? │
結(jié)束Lua 腳本
-- 啟用Redis命令復(fù)制,確保腳本在集群環(huán)境中正確同步
redis.replicate_commands()
-- 1. 防重提交校驗:通過用戶ID+Token判斷是否重復(fù)提交
-- KEYS[2]為用戶ID(uid),ARGV[2]為本次請求的Token
if redis.call('hexists', KEYS[2], ARGV[2]) == 1 then
return redis.error_reply('repeat submit') -- 重復(fù)提交,返回錯誤
end
-- 2. 庫存充足性校驗
local product_id = KEYS[1] -- 商品ID
local stock = redis.call('get', KEYS[1]) -- 獲取當(dāng)前庫存
if not stock then -- 庫存不存在(如商品未上架)
return redis.error_reply('product not found')
end
if tonumber(stock) < tonumber(ARGV[1]) then -- 庫存不足
return redis.error_reply('stock is not enough')
end
-- 3. 執(zhí)行庫存扣減
local remaining_stock = tonumber(stock) - tonumber(ARGV[1])
redis.call('set', KEYS[1], tostring(remaining_stock)) -- 更新庫存
-- 4. 記錄交易流水(用于后續(xù)一致性校驗)
local time = redis.call('time') -- 獲取當(dāng)前時間(秒+微秒)
local currentTimeMillis = (time[1] * 1000) + math.floor(time[2] / 1000) -- 轉(zhuǎn)換為毫秒時間戳
-- 存儲流水到Hash結(jié)構(gòu):用戶ID → Token → 流水詳情
redis.call('hset', KEYS[2], ARGV[2],
cjson.encode({
action = '扣減庫存',
product = product_id,
from = stock, -- 扣減前庫存
to = remaining_stock, -- 扣減后庫存
change = ARGV[1], -- 扣減數(shù)量
token = ARGV[2],
timestamp = currentTimeMillis
})
)
return remaining_stock -- 返回扣減后庫存Java 調(diào)用 Lua
@Service
public class SeckillService {
@Autowired
private StringRedisTemplate redisTemplate;
// 加載Lua腳本
private DefaultRedisScript<Long> stockScript;
@PostConstruct
public void init() {
stockScript = new DefaultRedisScript<>();
stockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("seckill.lua")));
stockScript.setResultType(Long.class);
}
/**
* 執(zhí)行Redis庫存預(yù)扣減
* @param productId 商品ID
* @param uid 用戶ID
* @param quantity 購買數(shù)量
* @param token 防重Token
* @return 扣減后庫存(-1表示失?。? */
public Long preDeductStock(String productId, String uid, Integer quantity, String token) {
try {
// 執(zhí)行Lua腳本:KEYS = [商品ID, 用戶ID],ARGV = [數(shù)量, Token]
return redisTemplate.execute(
stockScript,
Arrays.asList(productId, uid),
quantity.toString(),
token
);
} catch (Exception e) {
log.error("Redis預(yù)扣減失敗", e);
return -1L;
}
}
}MySQL 庫存扣減
扣減流程圖
開始
│
├─ 發(fā)送半消息到RocketMQ
│
├─ 執(zhí)行本地事務(wù)
│ │
│ ├─ 檢查Redis流水是否存在
│ │ ├─ 存在 → 提交消息(COMMIT)
│ │ └─ 不存在 → 回滾消息(ROLLBACK)
│ │
│ └─ 未知狀態(tài) → 等待回查
│
├─ RocketMQ回查機制
│ ├─ 有流水 → 提交消息
│ └─ 無流水 → 回滾消息
│
├─ 消息被消費
│ │
│ ├─ 查詢數(shù)據(jù)庫當(dāng)前版本號(樂觀鎖)
│ │
│ ├─ 執(zhí)行庫存扣減(WHERE version = 當(dāng)前版本)
│ │ ├─ 扣減成功 → 記錄數(shù)據(jù)庫流水
│ │ └─ 扣減失敗 → 拋出異常(觸發(fā)重試)
│ │
├─ 結(jié)束發(fā)送半消息
系統(tǒng)首先向RocketMQ發(fā)送一條半消息(Half Message)。此時消息處于不可消費狀態(tài),需等待生產(chǎn)者確認本地事務(wù)執(zhí)行結(jié)果后,才會被消費者處理。
// 發(fā)送半消息
public void sendHalfMessage(String productId, String uid, String token, Integer quantity) {
// 構(gòu)建消息
Message message = new Message(
"seckill_topic", // 主題
"stock_deduct", // 標(biāo)簽
JSON.toJSONString(new SeckillMessage(productId, uid, token, quantity)).getBytes()
);
// 發(fā)送事務(wù)消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"seckill_producer_group", // 生產(chǎn)者組
message,
null // 本地事務(wù)參數(shù)(可傳遞上下文)
);
log.info("半消息發(fā)送結(jié)果:{}", result.getSendStatus());
}本地事務(wù)校驗
本地事務(wù)的核心是判斷Redis預(yù)扣減是否成功:
- 若
Redis的Lua腳本執(zhí)行成功(即庫存預(yù)扣減完成且流水已記錄),則向RocketMQ返回 提交(COMMIT)指令,消息變?yōu)榭上M狀態(tài); - 若
Redis預(yù)扣減失?。ㄈ鐜齑娌蛔慊蛑貜?fù)提交),則返回回滾(ROLLBACK)指令,消息被丟棄。 - 若
RocketMQ長時間未收到本地事務(wù)結(jié)果(如生產(chǎn)者宕機),會觸發(fā)消息回查。此時系統(tǒng)通過檢查Redis中是否存在對應(yīng)交易流水,判斷是否需要提交消息:若流水存在,則提交;否則回滾。
@Component
public class SeckillTransactionListener implements TransactionListener {
@Autowired
private StringRedisTemplate redisTemplate;
// 執(zhí)行本地事務(wù)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class);
// 檢查Redis中是否存在對應(yīng)流水(驗證預(yù)扣減成功)
Boolean flag = redisTemplate.opsForHash().hasKey(
message.getUid(), // Hash key:用戶ID
message.getToken() // Hash field:Token
);
return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
} catch (Exception e) {
return RocketMQLocalTransactionState.UNKNOWN; // 未知狀態(tài),觸發(fā)回查
}
}
// 消息回查(解決超時未確認問題)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class);
// 回查邏輯:再次檢查流水是否存在
Boolean flag = redisTemplate.opsForHash().hasKey(message.getUid(), message.getToken());
return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
}消費消息并扣減 MySQL 庫存
消費者監(jiān)聽消息,執(zhí)行數(shù)據(jù)庫扣減(需保證冪等性): 消費者接收到可消費的消息后,執(zhí)行MySQL庫存扣減操作,并同步記錄數(shù)據(jù)庫中的交易流水。為確保消費成功,需利用MQ的重試機制:若消費失?。ㄈ鐢?shù)據(jù)庫暫時不可用),MQ會自動重試,直至消費成功或達到最大重試次數(shù)(此時需人工介入處理)。
@Component
@RocketMQMessageListener(
topic = "seckill_topic",
consumerGroup = "seckill_consumer_group",
messageModel = MessageModel.CLUSTERING
)
public class SeckillConsumer implements RocketMQListener<MessageExt> {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void onMessage(MessageExt message) {
SeckillMessage msg = JSON.parseObject(new String(message.getBody()), SeckillMessage.class);
String productId = msg.getProductId();
int quantity = msg.getQuantity();
// 數(shù)據(jù)庫扣減(使用樂觀鎖防超賣)
String sql = "UPDATE product_stock " +
"SET stock = stock - ?, version = version + 1 " +
"WHERE product_id = ? AND stock >= ? AND version = ?";
// 1. 查詢當(dāng)前版本號
Integer version = jdbcTemplate.queryForObject(
"SELECT version FROM product_stock WHERE product_id = ?",
Integer.class,
productId
);
// 2. 執(zhí)行扣減(樂觀鎖保證原子性)
int rows = jdbcTemplate.update(sql, quantity, productId, quantity, version);
if (rows > 0) {
// 扣減成功:記錄數(shù)據(jù)庫流水
jdbcTemplate.update(
"INSERT INTO stock_flow (product_id, quantity, op_type, create_time) " +
"VALUES (?, ?, 'SECKILL', NOW())",
productId, quantity
);
// 確認消費成功(返回ACK)
} else {
// 扣減失?。河|發(fā)重試(MQ默認重試機制)
throw new RuntimeException("數(shù)據(jù)庫扣減失敗,觸發(fā)重試");
}
}
}一致性保障
為防止Redis與MySQL數(shù)據(jù)不一致(如Redis扣減成功但MySQL扣減失?。?,需定期對賬:
@Scheduled(cron = "0 0 */1 * * ?") // 每小時執(zhí)行一次
public void reconcileStock() {
// 1. 掃描Redis中未同步到MySQL的流水
Set<String> uids = redisTemplate.keys("uid:*"); // 假設(shè)用戶ID前綴為uid:
for (String uid : uids) {
Map<Object, Object> tokenMap = redisTemplate.opsForHash().entries(uid);
for (Map.Entry<Object, Object> entry : tokenMap.entrySet()) {
String token = (String) entry.getKey();
String flowJson = (String) entry.getValue();
SeckillFlow flow = JSON.parseObject(flowJson, SeckillFlow.class);
// 2. 檢查MySQL是否有對應(yīng)訂單
Integer count = jdbcTemplate.queryForObject(
"SELECT COUNT(1) FROM orders WHERE product_id = ? AND uid = ? AND token = ?",
Integer.class,
flow.getProduct(), flow.getUid(), token
);
if (count == 0) {
// 3. 未找到訂單 → 人工介入或自動回滾Redis庫存
log.warn("發(fā)現(xiàn)不一致:Redis有流水但MySQL無訂單,product={}, uid={}", flow.getProduct(), uid);
// redisTemplate.opsForValue().increment(flow.getProduct(), Integer.parseInt(flow.getChange()));
}
}
}
}系統(tǒng)可通過定時任務(wù)對比Redis流水、MySQL庫存流水與訂單表數(shù)據(jù):若Redis流水存在但訂單表無對應(yīng)記錄,說明訂單生成失敗,需人工介入補單或回滾Redis庫存,避免少賣;若訂單表有記錄但MySQL庫存未扣減,則需觸發(fā)庫存補扣,避免多賣。
總結(jié)
Redis + MQ 方案通過預(yù)扣減 + 事務(wù)消息 + 對賬三重機制,完美解決了高并發(fā)秒殺的核心痛點:
Redis承擔(dān)高并發(fā)讀寫,通過Lua腳本確保原子性,防止超賣;MQ事務(wù)消息保障Redis與MySQL的最終一致性,避免數(shù)據(jù)斷層;- 流水對賬作為最后一道防線,及時發(fā)現(xiàn)并修復(fù)異常。





























