Webman使用RabbitMQ消息中間件實(shí)現(xiàn)系統(tǒng)異步解耦實(shí)戰(zhàn)教程
簡(jiǎn)介
RabbitMQ是一個(gè)開(kāi)源的消息代理軟件,它使用高級(jí)消息隊(duì)列協(xié)議(AMQP)來(lái)實(shí)現(xiàn)消息的發(fā)送和接收。RabbitMQ支持多種消息協(xié)議,包括STOMP、MQTT等,并且能夠與多種編程語(yǔ)言和平臺(tái)集成,如Java、.NET、Python等。
AMQP 即 Advanced Message Queuing Protocol(高級(jí)消息隊(duì)列協(xié)議),是一個(gè)網(wǎng)絡(luò)協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開(kāi)發(fā)語(yǔ)言等條件的限制。
基本架構(gòu)設(shè)計(jì)

圖片來(lái)源:https://blog.csdn.net/cuierdan/article/details/123824300
基本概念
- Message Broker:(消息代理服務(wù)器)是一個(gè)虛擬的概念,而RabbitMQ是Message Broker的一個(gè)實(shí)例。
- Producer:(生產(chǎn)者)產(chǎn)生數(shù)據(jù)并將數(shù)據(jù)發(fā)送到消息代理服務(wù)器(Message Broker)的程序被稱作消息的生產(chǎn)者。
- Connection:(連接)生產(chǎn)者與消費(fèi)者通過(guò)TCP協(xié)議與消息代理服務(wù)器(Message Broker)創(chuàng)建的連接。
- Channel:(信道)創(chuàng)建在Connection中的虛擬連接,類似于連接數(shù)據(jù)庫(kù)時(shí)的連接池的概念,生產(chǎn)者和消費(fèi)者并不是直接與MQ通過(guò)Connection進(jìn)行通訊的,而是通過(guò)Channel進(jìn)行連接通訊的,數(shù)據(jù)的流動(dòng)是在Channel中進(jìn)行的。
- VirtualHost:(虛擬消息服務(wù)器)就像mysql數(shù)據(jù)庫(kù)中有數(shù)據(jù)庫(kù)實(shí)例的概念,并且可以指定用戶對(duì)庫(kù)和表等操作的設(shè)置權(quán)限。也可以類別成LINUX系統(tǒng)中的不同用戶,不同用戶之間是相互獨(dú)立的。每個(gè)VirtualHost相當(dāng)于一個(gè)相對(duì)獨(dú)立mini的RabbitMQ服務(wù)器。每個(gè)VirtutalHost之間是相互隔離的,exchange,queue,message等不能互通。
- Exchange:(交換機(jī))交換機(jī)直接與Channel(信道)連接,接收來(lái)自于消息生產(chǎn)者產(chǎn)生的數(shù)據(jù),在由Exchange將消息路由到一個(gè)或多個(gè)Queue中(或者丟棄)。Exchange并不存儲(chǔ)消息。RabbitMQ的交換機(jī)有fanout(扇出),direct(直接),topic(主題),headers(標(biāo)題)四種類型,每種交換機(jī)類型都對(duì)應(yīng)著不同的路由規(guī)則,根據(jù)不同的路由規(guī)則,交換機(jī)會(huì)將消息路由到不同的隊(duì)列中。
- Binding: (綁定)交換機(jī)與隊(duì)列之間的虛擬連接,在這個(gè)綁定中可以設(shè)置Binding Key,一個(gè)綁定就是用一個(gè)Binding Key將交換器和隊(duì)列連接起來(lái),設(shè)置的Binding Key存在著一定的規(guī)則,Exchange會(huì)將消息中攜帶的Routing Key與Binding Key 中設(shè)置的規(guī)則進(jìn)行匹配,將消息發(fā)送到相應(yīng)的隊(duì)列中。Binding信息被保存到Exchange中的查詢表中,用于Exchange將消息分發(fā)到隊(duì)列的依據(jù)。
- Routing Key:(路由鍵)用于匹配路由規(guī)則的依據(jù),生產(chǎn)者在將消息發(fā)送到Exchange時(shí),一般會(huì)指定一個(gè)Routing Key,交換機(jī)會(huì)根據(jù)Routing Key 來(lái)匹配Binding中設(shè)置的路由規(guī)則,將符合規(guī)則的消息發(fā)送到指定的隊(duì)列中。
- Queue:(消息隊(duì)列)RabbitMQ中的內(nèi)部對(duì)象用于存放消息的容器,RabbitMQ會(huì)將消息按照RabbitMQ的六大模式中的一種將隊(duì)列中的消息發(fā)送給消費(fèi)者,RabbitMQ會(huì)根據(jù)選擇模式的不同將隊(duì)列中的消息發(fā)送給一個(gè)或多個(gè)消費(fèi)者,在連接到消費(fèi)者之前,消息一直在等待消費(fèi)者到隊(duì)列中將消息取走。
- Consumer:(消費(fèi)者)消息的消費(fèi)者,表示一個(gè)從隊(duì)列中取消息的應(yīng)用程序。
特點(diǎn)
- 可靠性:RabbitMQ使用一些機(jī)制來(lái)保證可靠性, 如持久化、傳輸確認(rèn)及發(fā)布確認(rèn)等。
- 靈活的路由:在消息進(jìn)入隊(duì)列之前,通過(guò)交換器來(lái)路由消息。
- 擴(kuò)展性:多個(gè)RabbitMQ節(jié)點(diǎn)可以組成一個(gè)集群,也可以根據(jù)實(shí)際業(yè)務(wù)情況動(dòng)態(tài)地?cái)U(kuò)展 集群中節(jié)點(diǎn)。
- 高可用性:隊(duì)列可以在集群中的機(jī)器上設(shè)置鏡像,使得在部分節(jié)點(diǎn)出現(xiàn)問(wèn)題的情況下隊(duì) 列仍然可用。
- 多種協(xié)議:RabbitMQ除了原生支持AMQP協(xié)議,還支持STOMP, MQTT等多種消息 中間件協(xié)議。
- 支持多語(yǔ)言客戶端:RabbitMQ 幾乎支持所有常用語(yǔ)言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。
主要功能
- 消息隊(duì)列:允許應(yīng)用程序?qū)⑾l(fā)送到隊(duì)列中,然后由另一個(gè)應(yīng)用程序從隊(duì)列中取出并處理。
- 消息路由:支持將消息從發(fā)送者路由到一個(gè)或多個(gè)接收者。
- 消息持久化:確保消息在系統(tǒng)故障后不會(huì)丟失。
- 消息確認(rèn):確保消息被正確處理,如果處理失敗,可以重新發(fā)送。
- 集群:支持在多個(gè)節(jié)點(diǎn)上運(yùn)行,以提供高可用性和負(fù)載均衡。
安裝
這里使用1Panel安裝,1Panel 是一個(gè)現(xiàn)代化、開(kāi)源的 Linux 服務(wù)器運(yùn)維管理面板
應(yīng)用商店
圖片
安裝鏡像
圖片
注:這里需要勾選【端口外部訪問(wèn)】,方便本地調(diào)試
鏡像
安裝成功后,就可以在鏡像中找到已安裝好的RabbitMQ鏡像容器
圖片
外網(wǎng)可訪問(wèn)地址:http://{{你的公網(wǎng)ip}}:15672,如果是在云服務(wù)器,記得安全策略放開(kāi)端口15672
圖片
- RabbitMQ管理界面端口:15672。是一個(gè)Web應(yīng)用程序,用于管理和監(jiān)控RabbitMQ消息代理
- AMQP默認(rèn)端口:5672。是一種網(wǎng)絡(luò)協(xié)議,用于在應(yīng)用程序之間傳遞消息,通常用于消息隊(duì)列系統(tǒng)。
控制臺(tái)
登陸控制臺(tái),賬號(hào)和密碼都是rabbitmq
圖片
使用
這里使用webman插件RabbitMQ客戶端,插件地址:https://www.workerman.net/plugin/67,
非常感謝兔子大佬的插件貢獻(xiàn)!??
非常感謝兔子大佬的插件貢獻(xiàn)!??
非常感謝兔子大佬的插件貢獻(xiàn)!??
插件特點(diǎn)
- 支持5種消費(fèi)模式:簡(jiǎn)單隊(duì)列、workQueue、routing、pub/sub、exchange;
- 支持延遲隊(duì)列(RabbitMQ須安裝插件);
- 異步無(wú)阻塞消費(fèi)、異步無(wú)阻塞生產(chǎn)、同步阻塞生產(chǎn)
安裝插件
通過(guò)composer包管理安裝:
composer require workbunny/webman-rabbitmq注:安裝該插件前,請(qǐng)確保你已經(jīng)安裝好webman框架,相關(guān)安裝文檔:https://www.workerman.net/doc/webman/install.html
配置
插件所有配置文件路徑:config/plugin/workbunny/webman-rabbitmq/app.php
<?php
return [
'enable' => true,
'host' => '120.120.120.74',
'vhost' => '/',
'port' => 5672,
'username' => 'rabbitmq',
'password' => 'rabbitmq',
'mechanisms' => 'AMQPLAIN',
...
];- host 修改為服務(wù)器公網(wǎng)ip
- port 修改為15672
消費(fèi)者
這里使用命令創(chuàng)建一個(gè)擁有單進(jìn)程消費(fèi)者的RestyBuilder
./webman workbunny:rabbitmq-builder resty --mode=queue
?? Config updated.
?? Builder created.
? Builder RestyBuilder created successfully.創(chuàng)建完成后完整的消費(fèi)者文件位置process/workbunny/rabbitmq/RestyBuilder.php
<?php declare(strict_types=1);
namespace process\workbunny\rabbitmq;
use Bunny\Channel as BunnyChannel;
use Bunny\Async\Client as BunnyClient;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Constants;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;
class RestyBuilder extends QueueBuilder
{
/**
* @var array = [
* 'name' => 'example',
* 'delayed' => false,
* 'prefetch_count' => 1,
* 'prefetch_size' => 0,
* 'is_global' => false,
* 'routing_key' => '',
* ]
*/
protected array $queueConfig = [
// 隊(duì)列名稱 ,默認(rèn)由類名自動(dòng)生成
'name' => 'process.workbunny.rabbitmq.RestyBuilder',
// 是否延遲
'delayed' => false,
// QOS 數(shù)量
'prefetch_count' => 0,
// QOS size
'prefetch_size' => 0,
// QOS 全局
'is_global' => false,
// 路由鍵
'routing_key' => '',
];
/** @var string 交換機(jī)類型 */
protected string $exchangeType = Constants::DIRECT;
/** @var string|null 交換機(jī)名稱,默認(rèn)由類名自動(dòng)生成 */
protected ?string $exchangeName = 'process.workbunny.rabbitmq.RestyBuilder';
/** @inheritDoc */
public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string
{
echo '[RabbitMQ][隊(duì)列消費(fèi)] Tag:' .$message->consumerTag. PHP_EOL;
echo '[RabbitMQ][隊(duì)列消費(fèi)著] Content:' .$message->content. PHP_EOL;
echo '[RabbitMQ][隊(duì)列消費(fèi)著] Exchange:' .$message->exchange. PHP_EOL;
return Constants::ACK;
}
}啟動(dòng)webman
php start.php start
Workerman[start.php] start in DEBUG mode
----------------------------------------------------------------------- WORKERMAN -----------------------------------------------------------------------
Workerman version:4.1.15 PHP version:8.2.18 Event-Loop:\Workerman\Events\Event
------------------------------------------------------------------------ WORKERS ------------------------------------------------------------------------
proto user worker listen processes status
tcp root webman http://0.0.0.0:8217 2 [OK]
tcp root monitor none 1 [OK]
tcp root plugin.workbunny.webman-rabbitmq.process.workbunny.rabbitmq.RestyBuilder none 1 [OK]
---------------------------------------------------------------------------------------------------------------------------------------------------------生產(chǎn)者
use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\RestyBuilder;
sync_publish(RestyBuilder::instance(), '兔子大佬你好呀!'); # return bool消費(fèi)結(jié)果
[RabbitMQ][隊(duì)列消費(fèi)] Tag:process.workbunny.rabbitmq.RestyBuilder
[RabbitMQ][隊(duì)列消費(fèi)著] Content:開(kāi)源技術(shù)小棧你好呀!
[RabbitMQ][隊(duì)列消費(fèi)著] Exchange:process.workbunny.rabbitmq.RestyBuilder
...
[RabbitMQ][隊(duì)列消費(fèi)] Tag:process.workbunny.rabbitmq.RestyBuilder
[RabbitMQ][隊(duì)列消費(fèi)著] Content:兔子大佬你好呀!
[RabbitMQ][隊(duì)列消費(fèi)著] Exchange:process.workbunny.rabbitmq.RestyBuilder
圖片
RabbitMQ管理界面

通過(guò)RabbitMQ管理界面端發(fā)送消息

消費(fèi)者消費(fèi)情況
































