RabbitMQ如何保證消息的可靠投遞?
Spring Boot整合RabbitMQ
github地址:
https://github.com/erlieStar/rabbitmq-examples
Spring有三種配置方式
- 基于XML
 - 基于JavaConfig
 - 基于注解
 
當(dāng)然現(xiàn)在已經(jīng)很少使用XML來(lái)做配置了,只介紹一下用JavaConfig和注解的配置方式
RabbitMQ整合Spring Boot,我們只需要增加對(duì)應(yīng)的starter即可
- <dependency>
 - <groupId>org.springframework.boot</groupId>
 - <artifactId>spring-boot-starter-amqp</artifactId>
 - </dependency>
 
基于注解
在application.yaml的配置如下
- spring:
 - rabbitmq:
 - host: myhost
 - port: 5672
 - username: guest
 - password: guest
 - virtual-host: /
 - log:
 - exchange: log.exchange
 - info:
 - queue: info.log.queue
 - binding-key: info.log.key
 - error:
 - queue: error.log.queue
 - binding-key: error.log.key
 - all:
 - queue: all.log.queue
 - binding-key: '*.log.key'
 
消費(fèi)者代碼如下
- @Slf4j
 - @Component
 - public class LogReceiverListener {
 - /**
 - * 接收info級(jí)別的日志
 - */
 - @RabbitListener(
 - bindings = @QueueBinding(
 - value = @Queue(value = "${log.info.queue}", durable = "true"),
 - exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
 - key = "${log.info.binding-key}"
 - )
 - )
 - public void infoLog(Message message) {
 - String msg = new String(message.getBody());
 - log.info("infoLogQueue 收到的消息為: {}", msg);
 - }
 - /**
 - * 接收所有的日志
 - */
 - @RabbitListener(
 - bindings = @QueueBinding(
 - value = @Queue(value = "${log.all.queue}", durable = "true"),
 - exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
 - key = "${log.all.binding-key}"
 - )
 - )
 - public void allLog(Message message) {
 - String msg = new String(message.getBody());
 - log.info("allLogQueue 收到的消息為: {}", msg);
 - }
 - }
 
生產(chǎn)者如下
- @RunWith(SpringRunner.class)
 - @SpringBootTest
 - public class MsgProducerTest {
 - @Autowired
 - private AmqpTemplate amqpTemplate;
 - @Value("${log.exchange}")
 - private String exchange;
 - @Value("${log.info.binding-key}")
 - private String routingKey;
 - @SneakyThrows
 - @Test
 - public void sendMsg() {
 - for (int i = 0; i < 5; i++) {
 - String message = "this is info message " + i;
 - amqpTemplate.convertAndSend(exchange, routingKey, message);
 - }
 - System.in.read();
 - }
 - }
 
Spring Boot針對(duì)消息ack的方式和原生api針對(duì)消息ack的方式有點(diǎn)不同
原生api消息ack的方式
消息的確認(rèn)方式有2種
自動(dòng)確認(rèn)(autoAck=true)
手動(dòng)確認(rèn)(autoAck=false)
消費(fèi)者在消費(fèi)消息的時(shí)候,可以指定autoAck參數(shù)
String basicConsume(String queue, boolean autoAck, Consumer callback)
autoAck=false: RabbitMQ會(huì)等待消費(fèi)者顯示回復(fù)確認(rèn)消息后才從內(nèi)存(或者磁盤)中移出消息
autoAck=true: RabbitMQ會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn),然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者是否真正的消費(fèi)了這些消息
手動(dòng)確認(rèn)的方法如下,有2個(gè)參數(shù)
basicAck(long deliveryTag, boolean multiple)
deliveryTag: 用來(lái)標(biāo)識(shí)信道中投遞的消息。RabbitMQ 推送消息給Consumer時(shí),會(huì)附帶一個(gè)deliveryTag,以便Consumer可以在消息確認(rèn)時(shí)告訴RabbitMQ到底是哪條消息被確認(rèn)了。
RabbitMQ保證在每個(gè)信道中,每條消息的deliveryTag從1開始遞增
multiple=true: 消息id<=deliveryTag的消息,都會(huì)被確認(rèn)
myltiple=false: 消息id=deliveryTag的消息,都會(huì)被確認(rèn)
消息一直不確認(rèn)會(huì)發(fā)生啥?
如果隊(duì)列中的消息發(fā)送到消費(fèi)者后,消費(fèi)者不對(duì)消息進(jìn)行確認(rèn),那么消息會(huì)一直留在隊(duì)列中,直到確認(rèn)才會(huì)刪除。
如果發(fā)送到A消費(fèi)者的消息一直不確認(rèn),只有等到A消費(fèi)者與rabbitmq的連接中斷,rabbitmq才會(huì)考慮將A消費(fèi)者未確認(rèn)的消息重新投遞給另一個(gè)消費(fèi)者
Spring Boot中針對(duì)消息ack的方式
有三種方式,定義在AcknowledgeMode枚舉類中
| 方式 | 解釋 | 
|---|---|
| NONE | 沒(méi)有ack,等價(jià)于原生api中的autoAck=true | 
| MANUAL | 用戶需要手動(dòng)發(fā)送ack或者nack | 
| AUTO | 方法正常結(jié)束,spring boot 框架返回ack,發(fā)生異常spring boot框架返回nack | 
spring boot針對(duì)消息默認(rèn)的ack的方式為AUTO。
在實(shí)際場(chǎng)景中,我們一般都是手動(dòng)ack。
application.yaml的配置改為如下
- spring:
 - rabbitmq:
 - host: myhost
 - port: 5672
 - username: guest
 - password: guest
 - virtual-host: /
 - listener:
 - simple:
 - acknowledge-mode: manual # 手動(dòng)ack,默認(rèn)為auto
 
相應(yīng)的消費(fèi)者代碼改為
- @Slf4j
 - @Component
 - public class LogListenerManual {
 - /**
 - * 接收info級(jí)別的日志
 - */
 - @RabbitListener(
 - bindings = @QueueBinding(
 - value = @Queue(value = "${log.info.queue}", durable = "true"),
 - exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
 - key = "${log.info.binding-key}"
 - )
 - )
 - public void infoLog(Message message, Channel channel) throws Exception {
 - String msg = new String(message.getBody());
 - log.info("infoLogQueue 收到的消息為: {}", msg);
 - try {
 - // 這里寫各種業(yè)務(wù)邏輯
 - channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 - } catch (Exception e) {
 - channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
 - }
 - }
 - }
 
我們上面用到的注解,作用如下
| 注解 | 作用 | 
|---|---|
| RabbitListener | 消費(fèi)消息,可以定義在類上,方法上,當(dāng)定義在類上時(shí)需要和RabbitHandler配合使用 | 
| QueueBinding | 定義綁定關(guān)系 | 
| Queue | 定義隊(duì)列 | 
| Exchange | 定義交換機(jī) | 
| RabbitHandler | RabbitListener定義在類上時(shí),需要用RabbitHandler指定處理的方法 | 
基于JavaConfig
既然用注解這么方便,為啥還需要JavaConfig的方式呢?
JavaConfig方便自定義各種屬性,比如同時(shí)配置多個(gè)virtual host等
具體代碼看GitHub把
RabbitMQ如何保證消息的可靠投遞
一個(gè)消息往往會(huì)經(jīng)歷如下幾個(gè)階段
在這里插入圖片描述
所以要保證消息的可靠投遞,只需要保證這3個(gè)階段的可靠投遞即可
生產(chǎn)階段
這個(gè)階段的可靠投遞主要靠ConfirmListener(發(fā)布者確認(rèn))和ReturnListener(失敗通知)
前面已經(jīng)介紹過(guò)了,一條消息在RabbitMQ中的流轉(zhuǎn)過(guò)程為
producer -> rabbitmq broker cluster -> exchange -> queue -> consumer
ConfirmListener可以獲取消息是否從producer發(fā)送到broker
ReturnListener可以獲取從exchange路由不到queue的消息
我用Spring Boot Starter 的api來(lái)演示一下效果
application.yaml
- spring:
 - rabbitmq:
 - host: myhost
 - port: 5672
 - username: guest
 - password: guest
 - virtual-host: /
 - listener:
 - simple:
 - acknowledge-mode: manual # 手動(dòng)ack,默認(rèn)為auto
 - log:
 - exchange: log.exchange
 - info:
 - queue: info.log.queue
 - binding-key: info.log.key
 
發(fā)布者確認(rèn)回調(diào)
- @Component
 - public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
 - @Autowired
 - private MessageSender messageSender;
 - @Override
 - public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 - String msgId = correlationData.getId();
 - String msg = messageSender.dequeueUnAckMsg(msgId);
 - if (ack) {
 - System.out.println(String.format("消息 {%s} 成功發(fā)送給mq", msg));
 - } else {
 - // 可以加一些重試的邏輯
 - System.out.println(String.format("消息 {%s} 發(fā)送mq失敗", msg));
 - }
 - }
 - }
 
失敗通知回調(diào)
- @Component
 - public class ReturnCallback implements RabbitTemplate.ReturnCallback {
 - @Override
 - public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
 - String msg = new String(message.getBody());
 - System.out.println(String.format("消息 {%s} 不能被正確路由,routingKey為 {%s}", msg, routingKey));
 - }
 - }
 
- @Configuration
 - public class RabbitMqConfig {
 - @Bean
 - public ConnectionFactory connectionFactory(
 - @Value("${spring.rabbitmq.host}") String host,
 - @Value("${spring.rabbitmq.port}") int port,
 - @Value("${spring.rabbitmq.username}") String username,
 - @Value("${spring.rabbitmq.password}") String password,
 - @Value("${spring.rabbitmq.virtual-host}") String vhost) {
 - CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
 - connectionFactory.setPort(port);
 - connectionFactory.setUsername(username);
 - connectionFactory.setPassword(password);
 - connectionFactory.setVirtualHost(vhost);
 - connectionFactory.setPublisherConfirms(true);
 - connectionFactory.setPublisherReturns(true);
 - return connectionFactory;
 - }
 - @Bean
 - public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
 - ReturnCallback returnCallback, ConfirmCallback confirmCallback) {
 - RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
 - rabbitTemplate.setReturnCallback(returnCallback);
 - rabbitTemplate.setConfirmCallback(confirmCallback);
 - // 要想使 returnCallback 生效,必須設(shè)置為true
 - rabbitTemplate.setMandatory(true);
 - return rabbitTemplate;
 - }
 - }
 
這里我對(duì)RabbitTemplate做了一下包裝,主要就是發(fā)送的時(shí)候增加消息id,并且保存消息id和消息的對(duì)應(yīng)關(guān)系,因?yàn)镽abbitTemplate.ConfirmCallback只能拿到消息id,并不能拿到消息內(nèi)容,所以需要我們自己保存這種映射關(guān)系。在一些可靠性要求比較高的系統(tǒng)中,你可以將這種映射關(guān)系存到數(shù)據(jù)庫(kù)中,成功發(fā)送刪除映射關(guān)系,失敗則一直發(fā)送
- @Component
 - public class MessageSender {
 - @Autowired
 - private RabbitTemplate rabbitTemplate;
 - public final Map<String, String> unAckMsgQueue = new ConcurrentHashMap<>();
 - public void convertAndSend(String exchange, String routingKey, String message) {
 - String msgId = UUID.randomUUID().toString();
 - CorrelationData correlationData = new CorrelationData();
 - correlationData.setId(msgId);
 - rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
 - unAckMsgQueue.put(msgId, message);
 - }
 - public String dequeueUnAckMsg(String msgId) {
 - return unAckMsgQueue.remove(msgId);
 - }
 - }
 
測(cè)試代碼為
- @RunWith(SpringRunner.class)
 - @SpringBootTest
 - public class MsgProducerTest {
 - @Autowired
 - private MessageSender messageSender;
 - @Value("${log.exchange}")
 - private String exchange;
 - @Value("${log.info.binding-key}")
 - private String routingKey;
 - /**
 - * 測(cè)試失敗通知
 - */
 - @SneakyThrows
 - @Test
 - public void sendErrorMsg() {
 - for (int i = 0; i < 3; i++) {
 - String message = "this is error message " + i;
 - messageSender.convertAndSend(exchange, "test", message);
 - }
 - System.in.read();
 - }
 - /**
 - * 測(cè)試發(fā)布者確認(rèn)
 - */
 - @SneakyThrows
 - @Test
 - public void sendInfoMsg() {
 - for (int i = 0; i < 3; i++) {
 - String message = "this is info message " + i;
 - messageSender.convertAndSend(exchange, routingKey, message);
 - }
 - System.in.read();
 - }
 - }
 
先來(lái)測(cè)試失敗者通知
輸出為
- 消息 {this is error message 0} 不能被正確路由,routingKey為 {test}
 - 消息 {this is error message 0} 成功發(fā)送給mq
 - 消息 {this is error message 2} 不能被正確路由,routingKey為 {test}
 - 消息 {this is error message 2} 成功發(fā)送給mq
 - 消息 {this is error message 1} 不能被正確路由,routingKey為 {test}
 - 消息 {this is error message 1} 成功發(fā)送給mq
 
消息都成功發(fā)送到broker,但是并沒(méi)有被路由到queue中
再來(lái)測(cè)試發(fā)布者確認(rèn)
輸出為
- 消息 {this is info message 0} 成功發(fā)送給mq
 - infoLogQueue 收到的消息為: {this is info message 0}
 - infoLogQueue 收到的消息為: {this is info message 1}
 - 消息 {this is info message 1} 成功發(fā)送給mq
 - infoLogQueue 收到的消息為: {this is info message 2}
 - 消息 {this is info message 2} 成功發(fā)送給mq
 
消息都成功發(fā)送到broker,也成功被路由到queue中
存儲(chǔ)階段
這個(gè)階段的高可用還真沒(méi)研究過(guò),畢竟集群都是運(yùn)維搭建的,后續(xù)有時(shí)間的話會(huì)把這快的內(nèi)容補(bǔ)充一下
消費(fèi)階段
消費(fèi)階段的可靠投遞主要靠ack來(lái)保證。
總而言之,在生產(chǎn)環(huán)境中,我們一般都是單條手動(dòng)ack,消費(fèi)失敗后不會(huì)重新入隊(duì)(因?yàn)楹艽蟾怕蔬€會(huì)再次失敗),而是將消息重新投遞到死信隊(duì)列,方便以后排查問(wèn)題
總結(jié)一下各種情況
- ack后消息從broker中刪除
 - nack或者reject后,分為如下2種情況
 
(1) reque=true,則消息會(huì)被重新放入隊(duì)列
(2) reque=fasle,消息會(huì)被直接丟棄,如果指定了死信隊(duì)列的話,會(huì)被投遞到死信隊(duì)列
本文轉(zhuǎn)載自微信公眾號(hào)「Java識(shí)堂」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系Java識(shí)堂公眾號(hào)。


















 
 
 














 
 
 
 