阿里二面:要保證消息不丟失,又不重復(fù),消息隊(duì)列怎么選型?
大家好,我是君哥。
在使用消息隊(duì)列時(shí),有兩個(gè)經(jīng)常讓我們煩惱的問題,消息丟失和消息重復(fù)。那我們?cè)谧黾夹g(shù)選型時(shí),有沒有一個(gè)消息隊(duì)列能解決消息丟失和消息重復(fù)這兩個(gè)問題呢?
消息丟失
如上圖,從生產(chǎn)者發(fā)送消息,Broker 保存消息,消費(fèi)者消費(fèi)消息,每一個(gè)環(huán)節(jié)都有可能丟失消息。
發(fā)送丟失
生產(chǎn)者發(fā)送消息時(shí),如果處理不當(dāng),很可能會(huì)造成消息丟失。
生產(chǎn)者發(fā)送消息,主流消息隊(duì)列都支持同步發(fā)送和異步發(fā)送。如果使用同步發(fā)送,生產(chǎn)者發(fā)送消息后,會(huì)同步等待 Broker 返回的 ACK,收到 ACK 消息,就認(rèn)為消息發(fā)送成功。如果長(zhǎng)時(shí)間沒有收到,則會(huì)認(rèn)為消息發(fā)送失敗,需要進(jìn)行重試。
同步發(fā)送可以保證消息不丟失,但是會(huì)有性能問題,所以多數(shù)情況會(huì)選擇異步發(fā)送。異步發(fā)送如何保證消息不丟失呢?主流消息隊(duì)列(比如 Kafka 和 RocketMQ)實(shí)現(xiàn)方法基本類似,使用回調(diào)函數(shù)來實(shí)現(xiàn)。下面看一下 Kafka 的異步發(fā)送代碼:
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("發(fā)送消息失敗:", exception);
}
if (metadata != null) {
logger.info("消息發(fā)送成功");
}
}
});
消息存儲(chǔ)
生產(chǎn)者發(fā)送消息成功,也不能保證消息絕對(duì)不丟失。因?yàn)榧词瓜l(fā)送到 Broker,如果在消費(fèi)者拉取到消息之前,Broker 宕機(jī)了,消息還沒有落盤,也會(huì)導(dǎo)致消息丟失。
在存儲(chǔ)階段要保證消息不丟失,可以考慮幾個(gè)方面:
同步刷盤
采用異步刷盤,如果在消息落盤之前 Broker 宕機(jī)了,就會(huì)造成消息丟失。而采用同步刷盤,等待消息落盤之后,再給 Sender 返回發(fā)送成功,可以從消息發(fā)送環(huán)節(jié)保證消息不丟失。
在 RocketMQ 中,把 flushDiskType 參數(shù)配置為 SYNC_FLUSH 就可以開啟同步刷盤。
Broker 集群
如果 Broker 集群中只有一個(gè)節(jié)點(diǎn),即使消息落盤成功了,Broker 發(fā)送故障,在 Broker 恢復(fù)以前消費(fèi)者也會(huì)拉取不到消息。而且如果 Broker 磁盤故障不可恢復(fù),消息也會(huì)丟失。
采用 Broker 集群可以很好地解決這個(gè)問題。見下圖:
在 Broker 集群時(shí),可以等待 2 個(gè)以上的節(jié)點(diǎn)同步消息完成后再給 Producer 返回成功。這樣即使一個(gè) Broker 掛了,也可以很容易找到替代的 Broker。
消息消費(fèi)
消費(fèi)者保證不丟失消息,需要消費(fèi)完成后再給 Broker 返回 ACK。在主流的消息隊(duì)列中,如果 Broker 收不到 ACK,都會(huì)給消費(fèi)者再次發(fā)送這條消息。
有時(shí)候?yàn)榱私鉀Q消息積壓的問題,消費(fèi)者拉取到消息后會(huì)直接返回 ACK,然后再異步執(zhí)行消息處理邏輯。這樣要保證消息不丟失,需要在返回 ACK 之前把消息保存到本地,比如持久化到數(shù)據(jù)庫,后面可以取數(shù)據(jù)庫保存的消息進(jìn)行處理。
消息重復(fù)
消息重復(fù)一般有兩個(gè)原因,一個(gè)是生產(chǎn)者發(fā)送消息后沒有收到 ACK,然后進(jìn)行重復(fù)發(fā)送,另一個(gè)原因是消費(fèi)者消費(fèi)完成后 Broker 沒有收到 ACK,導(dǎo)致消息重復(fù)推送給消費(fèi)者。
重復(fù)消息會(huì)對(duì)業(yè)務(wù)造成影響,比如電商場(chǎng)景中的重復(fù)支付、賬務(wù)場(chǎng)景中的重復(fù)記賬,對(duì)業(yè)務(wù)造成的影響都比較嚴(yán)重。
從目前主流的消息隊(duì)列來看,并沒有一個(gè)消息隊(duì)列能解決消息重復(fù)消費(fèi)的問題,只能在消費(fèi)端做冪等處理。下面提供幾個(gè)思路作為參考。
數(shù)據(jù)庫唯一鍵約束
如果消息會(huì)落本地?cái)?shù)據(jù)庫,可以采用消息 ID 作為唯一鍵。如果消息不落數(shù)據(jù)庫,可以將消息 ID 或者消息中其他唯一能標(biāo)識(shí)消息的屬性作為唯一鍵落業(yè)務(wù)數(shù)據(jù)表。
保存消費(fèi)記錄
我們也可以將消息 ID 保存 Redis,消費(fèi)消息前判斷消息 ID 是否已存在。
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
Boolean result = valueOperations.setIfAbsent(messageId, messageId);
if (result) {
//消費(fèi)邏輯;
} else {
logger.error("這條消息已經(jīng)消費(fèi),跳過,消息ID:{}", messageId);
}
這里有一個(gè)注意點(diǎn),如果消費(fèi)失敗了,需要?jiǎng)h除 Redis 中保存的消息 ID。
總結(jié)
消息不丟失、不重復(fù)是消息隊(duì)列的基本要求,但這個(gè)基本要求還是很難滿足的。
消息丟失這個(gè)要求,主流消息隊(duì)列通過消息重試和消息持久化的方式可以滿足。
但消息重試也同時(shí)帶來了消息重復(fù)的可能性,主流消息隊(duì)列在解決重復(fù)消息的問題上并沒有現(xiàn)成的方案,對(duì)不允許重復(fù)消費(fèi)的場(chǎng)景,需要開發(fā)人員在消費(fèi)端做冪等處理。