RabbitMQ代碼篇之過(guò)期時(shí)間,死信隊(duì)列,延遲隊(duì)列,優(yōu)先級(jí)隊(duì)列的基本使用
這篇是 代碼篇 ,略過(guò) Direct,F(xiàn)anout,Topic 這幾種基本隊(duì)列的使用介紹,但是 Github 倉(cāng)庫(kù)上可以查看到的。
圖片
<( ̄︶ ̄)↗[GO!]
沖沖沖~
死信隊(duì)列
死信是指有這三個(gè)特點(diǎn)的消息
- 消息被拒絕,且沒(méi)有重新入隊(duì)(投遞)
- 消息過(guò)期
- 消息隊(duì)列滿(mǎn)了
// nack 返回 false,并放棄重新回到隊(duì)列
channel.basicNack(deliveryTag, false, false);
//拒絕,不重新入隊(duì)列
channel.basicReject(deliveryTag, false);
死信交換機(jī) —— DLX:Dead-Letter-Exchange
@Bean
public DirectExchange directExchange2() {
/**
* 交換機(jī)名,后面兩個(gè)是默認(rèn)值就:持久化,不自動(dòng)刪除
*/
return new DirectExchange(RabbitMQConstants.DIRECT_EXCHANGE2, true, false);
}
@Bean
public Queue directQueue2() {
return QueueBuilder
.durable(RabbitMQConstants.DIRECT_QUEUE2)
.deadLetterExchange(RabbitMQConstants.DLX_EXCHANGE)
.deadLetterRoutingKey(RabbitMQConstants.DLX_ROUTING_KEY)
.build();
}
/**
* 將隊(duì)列綁定到交換機(jī)上
*
* @return
*/
@Bean
public Binding directBinding2() {
return BindingBuilder.
bind(directQueue2()).
to(directExchange2()).
with(RabbitMQConstants.DIRECT_BINDING_KEY2);
}
過(guò)期時(shí)間
兩種設(shè)置方式
- 創(chuàng)建隊(duì)列時(shí)設(shè)置,消息會(huì)被排序加入到隊(duì)列頭部,短的在前
- 發(fā)送消息時(shí)設(shè)置,時(shí)間到期不會(huì)立刻刪除,而是在推送消息時(shí)刪除
同時(shí)設(shè)置的話(huà),過(guò)期時(shí)間已短的為準(zhǔn)
/**
* 創(chuàng)建隊(duì)列時(shí)設(shè)置
* @return
*/
@Bean
public Queue ttlQueue() {
Map<String, Object> args = new HashMap<>();
//設(shè)置消息過(guò)期時(shí)間
args.put("x-message-ttl", 5000);
//設(shè)置死信交換機(jī)
args.put("x-dead-letter-exchange", RabbitMQConstants.DLX_EXCHANGE);
//設(shè)置死信 routing_key
args.put("x-dead-letter-routing-key", RabbitMQConstants.DLX_ROUTING_KEY);
return new Queue(RabbitMQConstants.TTL_QUEUE, true, false, false, args);
}
/**
* 發(fā)送消息時(shí)設(shè)置
* @return
*/
public void sendMessage2() throws JsonProcessingException {
User user = new User();
ObjectMapper objectMapper = new ObjectMapper();
byte[] bytes = objectMapper.writeValueAsBytes(user);
// 10 s 后過(guò)期
Message message =
MessageBuilder.withBody(bytes)
.setExpiration("10000").build();
// 交換機(jī),路由鍵,信息
rabbitTemplate.convertAndSend(
RabbitMQConstants.DIRECT_EXCHANGE2,
RabbitMQConstants.DIRECT_ROUTING_KEY2,
message
);
}
效果演示
圖片
隊(duì)列 5 秒延遲的效果
圖片
延遲隊(duì)列
兩種方案實(shí)現(xiàn)
- 利用 死信隊(duì)列+過(guò)期時(shí)間 去處理,消息過(guò)期被轉(zhuǎn)發(fā)到死信交換機(jī),死信交換機(jī)路由到死信隊(duì)列進(jìn)行處理
- 使用插件 rabbitmq_delayed_message_exchange
插件可以在這里找到 ??
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
圖片
其他插件:https://www.rabbitmq.com/community-plugins.html
圖片
安裝插件步驟
拷貝文件
docker cp . rabbitmq:/plugins
進(jìn)入容器
docker exec -it rabbitmq /bin/bash
開(kāi)啟插件支持
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
// 延遲交換機(jī)
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
// 交換機(jī)類(lèi)型
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitMQConstants.DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
public void sendMessage() throws JsonProcessingException {
User user = new User();
ObjectMapper objectMapper = new ObjectMapper();
byte[] bytes = objectMapper.writeValueAsBytes(user);
//延遲 10 s
Message message = MessageBuilder.withBody(bytes).setHeader("x-delay", 10_000).build();
// 交換機(jī),路由鍵,信息
rabbitTemplate.convertAndSend(
RabbitMQConstants.DELAY_EXCHANGE,
RabbitMQConstants.DELAY_ROUTING_KEY,
message
);
}
效果演示
圖片
10 秒的延遲
圖片
優(yōu)先級(jí)隊(duì)列
x-max-priority 可以設(shè)置在隊(duì)列和消息上
- 設(shè)置在隊(duì)列上,表示該隊(duì)列是個(gè) 優(yōu)先級(jí)隊(duì)列,同時(shí),消息的最大優(yōu)先級(jí)無(wú)法超過(guò)隊(duì)列設(shè)置的上限
- 發(fā)送消息時(shí),帶上優(yōu)先級(jí),在 消息堆積的情況下,優(yōu)先級(jí) 高的 會(huì)先被消費(fèi) ;
@Bean
public Queue directPriorityQueue() {
Map<String, Object> args = new HashMap<>();
// 最大優(yōu)先級(jí)
args.put("x-max-priority", 10);
return new Queue(RabbitMQConstants.PRIORITY_QUEUE, true, false, false, args);
}
public void sendMessage() throws Exception {
User user = new User();
ObjectMapper objectMapper = new ObjectMapper();
// 交換機(jī),路由鍵,信息, 優(yōu)先級(jí) 5
retryRabbitTemplate.convertAndSend(
RabbitMQConstants.PRIORITY_EXCHANGE,
RabbitMQConstants.PRIORITY_ROUTING_KEY,
MessageBuilder.withBody( objectMapper.writeValueAsBytes(user.setPriority(5))).setPriority(5).build()
);
}
效果演示
圖片
先營(yíng)造消息堆積的場(chǎng)景 ??
圖片
開(kāi)始消費(fèi) ??
圖片
那么,就簡(jiǎn)單介紹到這里了。
我的 GitHub 地址:https://github.com/Java4ye/springboot-demo-4ye
圖片