本文介紹了 RabbitMQ 通信模型中的路由模型的使用,通過(guò)交換機(jī)和路由鍵實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)通信,適合于需要點(diǎn)對(duì)點(diǎn)通信的場(chǎng)景。
大家好,我是了不起。
今天了不起帶領(lǐng)大家接著學(xué)習(xí)RabbitMQ,了解RabbitMQ的五大通信模型之一的路由模型;接下來(lái)還會(huì)有關(guān)于RabbitMQ的系列教程,對(duì)你有幫助的話記得關(guān)注哦~
往期傳送門(mén)
??RabbitMQ(一)hello world??
??RabbitMQ(二)通信模型之work模型??
??RabbitMQ(三)通信模型之發(fā)布訂閱模型??
路由模型
RabbitMQ 提供了五種不同的通信模型,上一篇文章中,簡(jiǎn)單的介紹了一下RabbitMQ的發(fā)布訂閱模型模型。這篇文章來(lái)學(xué)習(xí)一下RabbitMQ中的路由模型(direct)。
路由模型(direct):路由模式相當(dāng)于是分布訂閱模式的升級(jí)版,多了一個(gè) 路由key來(lái)約束隊(duì)列與交換機(jī)的綁定。
在路由模型中,生產(chǎn)者將消息發(fā)送到交換機(jī),交換機(jī)根據(jù)消息的路由鍵將消息轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中。每個(gè)隊(duì)列可以綁定多個(gè)路由鍵,每個(gè)路由鍵可以綁定到多個(gè)隊(duì)列中。消費(fèi)者從隊(duì)列中接收消息并處理。當(dāng)一個(gè)路由鍵被多個(gè)隊(duì)列綁定時(shí),交換機(jī)會(huì)將消息發(fā)送到所有綁定的隊(duì)列中。當(dāng)一個(gè)隊(duì)列綁定多個(gè)路由鍵時(shí),該隊(duì)列將能夠接收到所有路由鍵對(duì)應(yīng)的消息。
適用場(chǎng)景
路由模型適用于需要點(diǎn)對(duì)點(diǎn)通信的場(chǎng)景,例如:
- 系統(tǒng)監(jiān)控告警通知;
- 任務(wù)分發(fā);
- 用戶私信系統(tǒng);
- 訂單確認(rèn)通知等。
演示
- 生產(chǎn)者
// 生產(chǎn)者
public class Producer {
private static final String EXCHANGE_NAME = "exchange_direct_1";
// 定義路由的key,key值是可以隨意定義的
private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";
private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, null, ("路由模型發(fā)送的第 " + i + " 條信息").getBytes());
} else {
channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, null, ("路由模型發(fā)送的第 " + i + " 條信息").getBytes());
}
}
channel.close();
connection.close();
}
}
- 消費(fèi)者
// 消費(fèi)者1
public class Consumer {
private static final String QUEUE_NAME = "queue_direct_1";
private static final String EXCHANGE_NAME = "exchange_direct_1";
private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1接收到的消息是:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
// 消費(fèi)者2
public class Consumer2 {
private static final String QUEUE_NAME = "queue_direct_2";
private static final String EXCHANGE_NAME = "exchange_direct_1";
private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者2接收到的消息是:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
- 測(cè)試
先啟動(dòng)2個(gè)消費(fèi)者,再啟動(dòng)生產(chǎn)者
可以得到結(jié)果是消費(fèi)者1得到了序號(hào)是偶數(shù)的消息
消費(fèi)者2得到了序號(hào)是奇數(shù)的消息
小結(jié)
本文介紹了 RabbitMQ 通信模型中的路由模型的使用,通過(guò)交換機(jī)和路由鍵實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)通信,適合于需要點(diǎn)對(duì)點(diǎn)通信的場(chǎng)景。在實(shí)際使用過(guò)程中,需要注意以下幾點(diǎn):
- 路由鍵必須要與消費(fèi)者綁定隊(duì)列時(shí)的路由鍵相同,否則無(wú)法接收到消息;
- 可以通過(guò)多個(gè)交換機(jī)和路由鍵來(lái)實(shí)現(xiàn)更靈活的消息路由。
后續(xù)了不起還會(huì)繼續(xù)更新RabbitMQ的系列文章,感興趣的小伙伴持續(xù)關(guān)注哦~~~