你心心念念的RabbitMQ個(gè)人實(shí)踐來(lái)了來(lái)了它來(lái)了
前言
MQ(Message Queue)就是消息隊(duì)列,其有點(diǎn)有很多:解耦、異步、削峰等等,本文來(lái)聊一下RabbitMQ的一些概念以及使用。
RabbitMq
案例
Springboot整合RabbitMQ簡(jiǎn)單案例
基本概念
- Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
- Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。
- Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來(lái)。
- Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
- Producer:消息生產(chǎn)者,就是投遞消息的程序。
- Consumer:消息消費(fèi)者,就是接受消息的程序。
發(fā)布消息到RabbitMQ需要經(jīng)過(guò)兩步:
- producer → exchange
- exchange 根據(jù) exchange 的類(lèi)型和 routing key 確定將消息投遞到哪個(gè)隊(duì)列
工作流程
了解了RabbitMQ的一些概念,我們來(lái)捋捋使用RabbitMQ的流程:
- 創(chuàng)建Exchange
- 創(chuàng)建Queue
- 將Queue綁定進(jìn)Exchange中(此處會(huì)設(shè)置routing key)
- 生產(chǎn)者發(fā)布消息
- 消費(fèi)者訂閱消息
交換機(jī)(Exchange)
交換機(jī)可以綁定隊(duì)列,綁定時(shí)可以給隊(duì)列指定路由(Routing key)和參數(shù)(Arguments)
所有的消息發(fā)送都是經(jīng)過(guò)交換機(jī)轉(zhuǎn)發(fā)到隊(duì)列的,而不是直接到隊(duì)列中
交換機(jī)類(lèi)型:
- direct
- 根據(jù)確定的路由(routing key)轉(zhuǎn)發(fā)消息到隊(duì)列中(一條消息可以發(fā)到多個(gè)隊(duì)列,只要路由相同)
- fanout
- 路由無(wú)效,只要和該交換機(jī)綁定的隊(duì)列,都能接收到消息
- topic
- 允許路由使用*和#來(lái)進(jìn)行模糊匹配
- *表示一個(gè)單詞
- 表示任意數(shù)量(零個(gè)或多個(gè))單詞
- 例如:如果隊(duì)列的路由為com.# 那么往交換機(jī)發(fā)消息是,路由填com.ccc 隊(duì)列就可以收到消息
- headers
- 忽略路由,由參數(shù)(Arguments)來(lái)確定轉(zhuǎn)發(fā)的隊(duì)列
消息過(guò)期時(shí)間TTL
有兩種方式設(shè)置TTL,創(chuàng)建隊(duì)列時(shí)設(shè)置整個(gè)隊(duì)列的TTL或者在發(fā)送消息時(shí)單獨(dú)設(shè)置每條消息的TTL,消息存活時(shí)間取兩者的最小值。
- 創(chuàng)建隊(duì)列時(shí)設(shè)置
- 是消息的存活時(shí)間,不是隊(duì)列的存活時(shí)間,別搞混了。
- @Beanpublic Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); // 設(shè)置隊(duì)列中的消息5秒過(guò)期 return new Queue("queueName",true, false, false, args);}
- 發(fā)送消息時(shí)設(shè)置
- public void makeOrder(String userid,String productid,int num){ String exchangeName = "ttl_exchange"; String routingKey = "ttlmessage"; //給消息設(shè)置過(guò)期時(shí)間 MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){ public Message postProcessMessage(Message message){ // 設(shè)置消息5秒過(guò)期 message.getMessageProperties().setExpiration("5000"); return message; } } rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor);}
死信隊(duì)列
死信隊(duì)列也是一個(gè)正常隊(duì)列,只是當(dāng)綁定了死信隊(duì)列的隊(duì)列滿(mǎn)足相應(yīng)條件,就會(huì)將滿(mǎn)足條件的消息轉(zhuǎn)移到死信隊(duì)列中。
進(jìn)入死信隊(duì)列的條件:
- 消息被拒絕
- 消息過(guò)期(超時(shí))
- 隊(duì)列達(dá)到最大長(zhǎng)度
死信隊(duì)列的配置:
- 按照正常步驟定義一個(gè)隊(duì)列(交換機(jī)、隊(duì)列、綁定)
- 給需要綁定死信隊(duì)列的隊(duì)列添加x-dead-letter-exchange(死信隊(duì)列的交換機(jī))和x-dead-letter-routing-key(死信隊(duì)列的路由)參數(shù)
- @Beanpublic Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "死信隊(duì)列交換機(jī)名稱(chēng)"); args.put("x-dead-letter-routing-key", "死信隊(duì)列路由"); return new Queue("queueName",true, false, false, args);}
如何保證MQ消息正確送達(dá)與消費(fèi)
可靠性生產(chǎn)和推送
步驟:
- 發(fā)送消息前數(shù)據(jù)庫(kù)保存MQ消息發(fā)送日志
- MQ消息發(fā)送后使用回調(diào)更新日志狀態(tài)
實(shí)現(xiàn):
上面我們講了,發(fā)布消息到RabbitMQ需要經(jīng)過(guò)兩步:
producer → exchange
exchange 根據(jù) exchange 的類(lèi)型和 routing key 確定將消息投遞到哪個(gè)隊(duì)列
所以,發(fā)布消息的確認(rèn)也分兩個(gè)部分,以下是確認(rèn)步驟:
- 修改MQ應(yīng)答機(jī)制(yml)
- spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 # 發(fā)送消息確認(rèn),producer -> exchange publisher-confirm-type: correlated # 發(fā)送消息確認(rèn),exchange -> queue publisher-returns: true
- 新增mq的回調(diào)方法
- /** * PostConstruct注解好多人以為是Spring提供的。其實(shí)是Java自己的注解。 * Java中該注解的說(shuō)明:@PostConstruct該注解被用來(lái)修飾一個(gè)非靜態(tài)的void()方法。 * 被@PostConstruct修飾的方法會(huì)在服務(wù)器加載Servlet的時(shí)候運(yùn)行,并且只會(huì)被服務(wù)器執(zhí)行一次。 * PostConstruct在構(gòu)造函數(shù)之后執(zhí)行,init()方法之前執(zhí)行。 * Constructor(構(gòu)造方法) -> @Autowired(依賴(lài)注入) -> @PostConstruct(注釋的方法) */@PostConstructprivate void regCallBack() { // producer -> exchange 成功或失敗都會(huì)觸發(fā)此回調(diào) rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // 這個(gè)id是在消息發(fā)送的時(shí)候傳入的 String id = correlationData.getId(); // 如果ack為true代表消息被mq成功接收 if (!ack) { // 應(yīng)答失敗,修改日志狀態(tài) System.out.println("exchange 應(yīng)答失敗,做失敗處理!"); } else { // 應(yīng)答成功,修改日志狀態(tài) System.out.println("exchange 成功處理"); } } }); // 這個(gè)回調(diào)只有exchange -> queue 失敗時(shí)才會(huì)觸發(fā) rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("exchange -> queue 發(fā)送失敗"); } });}
- 修改MQ發(fā)送消息的方法,增加日志id的傳遞
- String correlationId = "這是日志id";rabbitTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 消費(fèi)者需要correlationId才做這個(gè)處理 message.getMessageProperties().setCorrelationId(correlationId); return message; }}, new CorrelationData(correlationId));// 如果消費(fèi)者不需要獲取correlationId,則用下面這種即可rabbitTemplate.convertAndSend(exchange, routeKey, msg, new CorrelationData(correlationId));
可靠性消費(fèi)
步驟:
- 開(kāi)啟手動(dòng)應(yīng)答
- 監(jiān)聽(tīng)器增加手動(dòng)應(yīng)答邏輯
實(shí)現(xiàn):
- 開(kāi)啟手動(dòng)應(yīng)答
- spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 listener: simple: acknowledge-mode: manual # 將自動(dòng)應(yīng)答ack模式改成手動(dòng)應(yīng)答
- acknowledge-mode有三種類(lèi)型:
- nome:不進(jìn)行ack,rabbitmq默認(rèn)消費(fèi)者正確處理所有請(qǐng)求
- munual:手動(dòng)確認(rèn)
- auto:自動(dòng)確認(rèn)消息(默認(rèn)類(lèi)型)。如果消費(fèi)者拋出異常,則消息重回隊(duì)列。
- 監(jiān)聽(tīng)器增加手動(dòng)應(yīng)答邏輯
- @RabbitListener(queues = {"隊(duì)列名字"})public void messageConsumer(String orderMsg, Channel channel, @Headers Map<String,Object> headers) throws Exception{ // 需要producer做相應(yīng)處理,consumer才能拿到correlationId String correlationId = messages.getMessageProperties().getCorrelationId(); System.out.println("消息為:" + orderMsg); long tag = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString()); try { // 消費(fèi)成功,進(jìn)行確認(rèn) channel.basicAck(tag, false); } catch (IOException e) { // 消費(fèi)失敗,重發(fā) // requeue代表是否重發(fā),為false則直接將消息丟棄,有死信就進(jìn)入死信隊(duì)列 channel.basicNack(tag, false, true); }}
總結(jié)
本文介紹了RabbitMQ的一些概念和簡(jiǎn)單使用,有不少東西其實(shí)是沒(méi)有講清楚的,比如publisher-confirm-type和acknowledge-mode的幾種類(lèi)型的區(qū)別等等。主要是在官方文檔找不到相關(guān)的細(xì)致描述,查文檔的能力還是有待提高。。。