偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

解鎖C++消息隊(duì)列:多線程通信的關(guān)鍵紐帶

開發(fā) 前端
在內(nèi)存資源有限的嵌入式系統(tǒng)中,如果消息隊(duì)列設(shè)置過(guò)大,可能會(huì)導(dǎo)致其他關(guān)鍵任務(wù)因?yàn)閮?nèi)存不足而無(wú)法正常運(yùn)行。為了優(yōu)化性能,可以根據(jù)系統(tǒng)的實(shí)際需求和負(fù)載情況,動(dòng)態(tài)調(diào)整隊(duì)列大小。

在C++編程的廣袤天地里,當(dāng)涉及多線程或多進(jìn)程的程序開發(fā)時(shí),不同線程或進(jìn)程間的有效通信至關(guān)重要。想象一下,一個(gè)大型的游戲開發(fā)項(xiàng)目,其中渲染線程需要不斷將最新的圖像數(shù)據(jù)傳遞給顯示線程,以便在屏幕上呈現(xiàn)出精美的畫面;又或者在一個(gè)分布式系統(tǒng)中,各個(gè)節(jié)點(diǎn)進(jìn)程需要相互傳遞任務(wù)指令和數(shù)據(jù)結(jié)果 。而 C++ 消息隊(duì)列,正是解決這類線程或進(jìn)程間通信問(wèn)題的得力工具。它就像是一座橋梁,搭建起不同執(zhí)行單元之間的溝通渠道,讓數(shù)據(jù)和信息能夠順暢地流通,確保整個(gè)系統(tǒng)的高效運(yùn)行。

消息隊(duì)列一般簡(jiǎn)稱為 MQ (Messges Queue),是指利用高效可靠的消息傳遞機(jī)制進(jìn)行與平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成,是在消息的傳輸過(guò)程中保存消息的容器。消息隊(duì)列本質(zhì)上是一個(gè)隊(duì)列,而隊(duì)列中存放的是一個(gè)個(gè)消息。消息隊(duì)列是一種在分布式系統(tǒng)中進(jìn)行異步通信的機(jī)制。它允許發(fā)送者將消息放入隊(duì)列中,而接收者可以從隊(duì)列中獲取并處理這些消息。消息隊(duì)列常用于解耦、異步處理和削峰填谷等場(chǎng)景。

一、 消息隊(duì)列簡(jiǎn)介

消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合、異步消息、流量削峰等問(wèn)題。實(shí)現(xiàn)高性能、高可用、可伸縮和最終一致性架構(gòu)。是大型分布式系統(tǒng)不可缺少的中間件。

消息隊(duì)列是一種用于進(jìn)程間通信(也稱為 IPC),或者用于應(yīng)用程序的各個(gè)組件之間或應(yīng)用程序之間的技術(shù)。消息隊(duì)列提供用于啟用消息傳遞的協(xié)議或接口。

通過(guò) UNIX<sys.msg.h> 和 POSIX mqueue.h 系統(tǒng)庫(kù)提供了用于單個(gè)計(jì)算機(jī)內(nèi)的 IPC 的消息隊(duì)列,其中這些消息隊(duì)列可在同步和異步模式下使用。同步消息要求在發(fā)送消息時(shí)阻止發(fā)送或接收進(jìn)程,直到接收方確認(rèn)消息為止。異步消息不會(huì)阻止發(fā)送或接收進(jìn)程,并且可以通過(guò)緩沖來(lái)延遲送達(dá)。

針對(duì)跨多個(gè)計(jì)算機(jī)的應(yīng)用程序,已開發(fā)了許多專有的開源消息隊(duì)列系統(tǒng),如 IBM 的 WebSphere MQ、Java 消息服務(wù) (JMS) 和 RabbitMQ。所有這些系統(tǒng)都提供發(fā)送、管理和接收消息的能力。

與傳統(tǒng)的請(qǐng)求和響應(yīng)消息相比,使用消息隊(duì)列可以增加應(yīng)用程序設(shè)計(jì)的復(fù)雜性,并增加需要管理的服務(wù)的數(shù)量。但在一定程度上,對(duì)于流處理等應(yīng)用,需要分布式消息隊(duì)列系統(tǒng)來(lái)管理增加的復(fù)雜性和規(guī)模。

目前在生產(chǎn)環(huán)境,使用較多的消息隊(duì)列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

圖片

1.1消息隊(duì)列特征

通過(guò)與傳統(tǒng)的 RPC 和請(qǐng)求-響應(yīng)機(jī)制相比較,就可以理解和體會(huì)到消息隊(duì)列及其新穎性了。消息隊(duì)列的主要功能如下:

  • 存儲(chǔ):與依賴于使用套接字的基本 TCP 和 UDP 協(xié)議的傳統(tǒng)請(qǐng)求和響應(yīng)系統(tǒng)不同,消息隊(duì)列通常將消息存儲(chǔ)在某種類型的緩沖區(qū)中,直到目標(biāo)進(jìn)程讀取這些消息或?qū)⑵鋸南㈥?duì)列中顯式移除為止。
  • 異步:與請(qǐng)求和響應(yīng)系統(tǒng)不同,消息隊(duì)列通過(guò)緩沖消息可以在應(yīng)用程序中公開一定程度的異步性,允許源進(jìn)程發(fā)送消息并在隊(duì)列中累積消息,而目標(biāo)進(jìn)程則可以挑選消息進(jìn)行處理。這樣,應(yīng)用程序就可以在某些故障情況下運(yùn)行,例如連接斷斷續(xù)續(xù)或源進(jìn)程或目標(biāo)進(jìn)程故障。
  • 路由:消息隊(duì)列還可以提供路由功能,其中多個(gè)進(jìn)程可以在同一隊(duì)列中讀取或?qū)懭胂ⅲ瑥亩鴮?shí)現(xiàn)廣播或單播通信模式。

消息隊(duì)列特點(diǎn):消息隊(duì)列有三個(gè)作用,分別是削峰、解耦和異步

  • 流量削峰:主要用于在高并發(fā)情況下,業(yè)務(wù)異步處理,提供高峰期業(yè)務(wù)處理能力,避免系統(tǒng)癱瘓。假設(shè)系統(tǒng)只能處理1000個(gè)請(qǐng)求,但這時(shí)突然來(lái)了3000個(gè)請(qǐng)求,如果不加以限制就會(huì)造成系統(tǒng)癱瘓。使用消息隊(duì)列做緩沖,將多余的請(qǐng)求存放在消息隊(duì)列中,等系統(tǒng)根據(jù)自己處理請(qǐng)求的能力去消息隊(duì)列去。
  • 應(yīng)用解耦:主要用于當(dāng)一個(gè)業(yè)務(wù)需要多個(gè)模塊共同實(shí)現(xiàn),或者一條消息有多個(gè)系統(tǒng)需要對(duì)應(yīng)處理時(shí),只需要主業(yè)務(wù)完成以后,發(fā)送一條MQ,其余模塊消費(fèi)MQ消息,即可實(shí)現(xiàn)業(yè)務(wù),降低模塊之間的耦合。假設(shè)某個(gè)服務(wù) A 需要調(diào)用服務(wù) B,但是服務(wù) B 突然出現(xiàn)問(wèn)題,這樣會(huì)導(dǎo)致服務(wù) A 也會(huì)出現(xiàn)問(wèn)題。如果使用消息隊(duì)列,當(dāng)服務(wù) A 執(zhí)行完成之后,發(fā)送一條消息到隊(duì)列中,服務(wù) B 讀取到這條消息,那么它立刻開始進(jìn)行業(yè)務(wù)的執(zhí)行。
  • 異步通信:主業(yè)務(wù)執(zhí)行結(jié)束后從屬業(yè)務(wù)通過(guò)MQ,異步執(zhí)行,減低業(yè)務(wù)的響應(yīng)時(shí)間,提高用戶體驗(yàn)。假設(shè)有一個(gè)業(yè)務(wù),要先執(zhí)行服務(wù) A ,然后服務(wù) A 去調(diào)用服務(wù) B ,當(dāng)服務(wù) B 完成之后,服務(wù) A 調(diào)用服務(wù) C,這個(gè)業(yè)務(wù)需要一步步走下去。當(dāng)使用了消息隊(duì)列之后,服務(wù) A 完成之后,可以同時(shí)執(zhí)行服務(wù) B 和 服務(wù) C ,這樣就減低業(yè)務(wù)的響應(yīng)時(shí)間,提高用戶體驗(yàn)。

1.2消息隊(duì)列優(yōu)勢(shì)

消息隊(duì)列的主要優(yōu)點(diǎn)是它可以在分散式應(yīng)用程序中的各個(gè)實(shí)體之間提供松散耦合。這允許進(jìn)行異步非阻止通信,從而提供更高級(jí)別的進(jìn)程故障容錯(cuò)能力。

圖片

消息隊(duì)列提供了一個(gè)允許程序相互通信的接口。

程序之間無(wú)直接連接:消息隊(duì)列允許程序間進(jìn)行間接通信。發(fā)送方不一定需要知道消息的接收方,反之亦然。這樣,消息隊(duì)列系統(tǒng)就可以決定路由消息(和潛在工作)的邏輯。

程序之間的通信可以不受時(shí)間影響:消息隊(duì)列通常會(huì)緩沖程序之間的消息,因此它們不必為了發(fā)送或接收消息而阻止或中斷執(zhí)行。消息隊(duì)列可以執(zhí)行其他任務(wù),并在消息到達(dá)時(shí)或稍后處理回復(fù)。在編寫消息傳遞應(yīng)用程序時(shí),無(wú)需知道(或關(guān)心)程序何時(shí)發(fā)送消息,或者目標(biāo)何時(shí)能夠接收消息。消息不會(huì)丟失;隊(duì)列管理器會(huì)將它保留到目標(biāo)準(zhǔn)備開始處理它為止。消息將一直保留在隊(duì)列中,直到程序?qū)⑵湟瞥?。這意味著發(fā)送和接收程序是分離的;發(fā)送方可以繼續(xù)處理,而無(wú)需等待接收方確認(rèn)收到消息。發(fā)送消息時(shí),目標(biāo)應(yīng)用程序甚至不必運(yùn)行。它可以檢索啟動(dòng)后的消息。

可以通過(guò)小型獨(dú)立程序執(zhí)行工作:利用消息隊(duì)列可以發(fā)揮使用小型獨(dú)立程序的優(yōu)點(diǎn)。你可以將作業(yè)分散到幾個(gè)較小的獨(dú)立程序上,而不是使用單個(gè)大型程序按順序執(zhí)行作業(yè)的各個(gè)部分。請(qǐng)求程序會(huì)將消息發(fā)送到每個(gè)單獨(dú)的程序,要求它們執(zhí)行其功能。當(dāng)每個(gè)程序完成時(shí),將以一條或多條消息的形式發(fā)送回結(jié)果。

通信可以受事件影響:可以根據(jù)隊(duì)列的狀態(tài)控制程序。例如,可以安排程序在消息到達(dá)隊(duì)列時(shí)立即啟動(dòng)?;蛘?,可以指定只有在隊(duì)列上出現(xiàn) 10 條高于某個(gè)優(yōu)先級(jí)的消息,或者隊(duì)列上出現(xiàn) 10 條任意優(yōu)先級(jí)的消息時(shí),程序才啟動(dòng)。

應(yīng)用程序可以為消息分配優(yōu)先級(jí):當(dāng)程序?qū)⑾⒎湃腙?duì)列時(shí),它可以為消息分配優(yōu)先級(jí)。這將確定在隊(duì)列中添加新消息的位置。程序可以按照消息在隊(duì)列中的順序或通過(guò)獲取特定消息從隊(duì)列中獲取消息。(如果程序正在尋找對(duì)先前發(fā)送的請(qǐng)求的答復(fù),則可能需要獲取特定消息。)

  • 恢復(fù)支持:許多消息隊(duì)列提供暫留存儲(chǔ)和日志記錄,因此可以在故障期間恢復(fù)隊(duì)列中的狀態(tài)和消息。
  • 多個(gè)隊(duì)列:某些系統(tǒng)允許應(yīng)用程序開發(fā)人員定義和配置多個(gè)隊(duì)列。這樣就可以根據(jù)發(fā)布者或訂閱者關(guān)系將消息路由到必要的實(shí)體。例如,Apache Kafka。
  • 易于伸縮:消息隊(duì)列可以橫向擴(kuò)展以應(yīng)對(duì)消息負(fù)載的增加,而緊密耦合系統(tǒng)則不同,在其中擴(kuò)展和管理通信流和終結(jié)點(diǎn)更為困難。

(1)生產(chǎn)者-消費(fèi)者模型

接下來(lái)這種數(shù)據(jù)結(jié)構(gòu)的收發(fā)方式,選用 生產(chǎn)者消費(fèi)者模型:由生產(chǎn)者發(fā)布消息隊(duì)列至消息服務(wù)器,再由消費(fèi)者訂閱消息。

圖片

生產(chǎn)者(Producer)業(yè)務(wù)的發(fā)起方,負(fù)責(zé)生產(chǎn)消息發(fā)布給Broker。

消費(fèi)者(Consumer)業(yè)務(wù)的處理方,負(fù)責(zé)從Broker訂閱消息并進(jìn)行業(yè)務(wù)邏輯處理。

消息服務(wù)器(Broker)MQ的服務(wù)器。包括接收 Producer 發(fā)過(guò)來(lái)的消息、處理 Consumer 的消費(fèi)消息請(qǐng)求、消息的持久化存儲(chǔ)、以及服務(wù)端過(guò)濾功能等。

注意消息服務(wù)器可以是,分布式,或多節(jié)點(diǎn)的集群,且每個(gè)節(jié)點(diǎn)里可能不止一個(gè)隊(duì)列。

圖片

當(dāng)然除了消息服務(wù)器外,生產(chǎn)者、消費(fèi)者和消息本身也可以擁有集群的概念,我們可以對(duì)這三者進(jìn)行分組,形成主題的概念,并進(jìn)一步細(xì)化出二級(jí)標(biāo)簽,實(shí)現(xiàn)特定的集群收發(fā)特定消息的功能。

圖片

主題(topic)一級(jí)消息類型,不同生產(chǎn)者向特定的topic發(fā)送消息,再由MQ分發(fā)至特定的訂閱者,實(shí)現(xiàn)消息的傳遞,標(biāo)簽(tag)二級(jí)消息類型,用來(lái)進(jìn)一步區(qū)分某個(gè)Topic下的消息子類。集群(group)一組生產(chǎn)者或消費(fèi)者,這組生產(chǎn)者或消費(fèi)者通常生產(chǎn)或消費(fèi)同一類消息,且消息發(fā)布或訂閱的邏輯一致,到這一步,就構(gòu)造了消息隊(duì)列服務(wù)器的雛形。

(2)生產(chǎn)者-消費(fèi)者模型

回過(guò)頭來(lái)分析一下剛才構(gòu)建的模型,任何一項(xiàng)技術(shù)都有他的利弊。拋去額外的維護(hù)成本不說(shuō),這個(gè)模型的弊端在于,過(guò)渡依賴于消息中間件,一旦中間件宕機(jī)了,整個(gè)消息體系就瓦解了。

此外,在設(shè)計(jì)時(shí)需要考慮更多因素。生產(chǎn)過(guò)程中難免會(huì)出現(xiàn)生產(chǎn)者,消費(fèi)者或中間件服務(wù)器不可用的情況,隨之帶來(lái)的問(wèn)題就是消息重復(fù)、消息堆積等等,所以實(shí)際運(yùn)用的時(shí)候呢,往往會(huì)給出一些補(bǔ)償措施。

圖片

弊端:

  1. 消息的收發(fā)依賴于中間件,且中間件的穩(wěn)定運(yùn)行需要維護(hù)成本。
  2. 提高開發(fā)復(fù)雜度。需要考慮消息的處理,包括消息冪等性(重復(fù)消費(fèi)問(wèn)題)、消息中間件的持久化和穩(wěn)定性、可靠性等。

PS:這里說(shuō)一下持久化的問(wèn)題簡(jiǎn)單來(lái)說(shuō)就是將數(shù)據(jù)存入磁盤,而不是存在內(nèi)存中隨服務(wù)器重啟斷開而消失,使數(shù)據(jù)能夠永久保存,重啟后數(shù)據(jù)能夠從磁盤中讀取恢復(fù)。MQ會(huì)將你的持久化消息寫入磁盤上的持久化日志文件,等消息被消費(fèi)之后,RabbitMQ會(huì)把這條消息標(biāo)識(shí)為等待垃圾回收。缺點(diǎn):性能低,寫入硬盤要比寫入內(nèi)存性能較低很多,從而降低了服務(wù)器的吞吐量。

(3)重復(fù)消費(fèi)問(wèn)題 - 消息冪等性

這里可以講一下其中的一種非常常見的問(wèn)題及其補(bǔ)償措施,就是重復(fù)消費(fèi)的問(wèn)題:

重復(fù)消費(fèi):生產(chǎn)者多發(fā)、消費(fèi)者多次消費(fèi)等。

生產(chǎn)者多發(fā)、消費(fèi)者多次消費(fèi),都會(huì)造成重復(fù)消費(fèi)的問(wèn)題。

解決這種問(wèn)題的常用辦法,就是保證操作的冪等性

冪等操作(Idempotent Operation):執(zhí)行任意多次冪等操作所產(chǎn)生的影響均與一次執(zhí)行的效果相同。

冪等操作有一個(gè)特點(diǎn),甚至還有公式:

f(x)=f(f(x)).

舉個(gè)例子,數(shù)據(jù)庫(kù)腳本insert前都會(huì)先delete,這么一組數(shù)據(jù)庫(kù)操作無(wú)論執(zhí)行多少次結(jié)果都是一樣的,重復(fù)消費(fèi)的問(wèn)題也可以用這個(gè)思想去解決。

實(shí)現(xiàn):

  • 消息中間件端根據(jù) Message Id 去重。
  • 消費(fèi)端:數(shù)據(jù)庫(kù):新增/修改。組件如redis進(jìn)行自身去重。

(4)主流MQ對(duì)比

目前主流的MQ有以下幾種:

圖片圖片

在流量和大數(shù)據(jù)的時(shí)代,ActiveMQ和RabbitMQ這兩者因?yàn)橥掏铝恳约癎itHub的社區(qū)活躍度的原因,在各大互聯(lián)網(wǎng)公司基本上銷聲匿跡了,越來(lái)越多的公司開始青睞于后兩者。其中RocketMQ是阿里開源的,這和同樣是阿里開源的rpc框dubbo設(shè)計(jì)風(fēng)格比較類似。Kafka則更多應(yīng)用在大數(shù)據(jù)業(yè)務(wù)場(chǎng)景中。

1.3常用的消息隊(duì)列

(1)ActiveMQ:是Apache下的一個(gè)子項(xiàng)目。

  • 優(yōu)點(diǎn):?jiǎn)螜C(jī)吞吐量每秒萬(wàn)級(jí),時(shí)效性毫秒級(jí),可用性高,基于主從架構(gòu)實(shí)現(xiàn)高可用性,消息可靠性較低的概率丟失數(shù)據(jù)。支持多種語(yǔ)言、支持Spring2.0的特性、支持多種傳送協(xié)議、支持通過(guò)JDBC和journal提供高速的消息持久化。
  • 缺點(diǎn):官方社區(qū)現(xiàn)在的維護(hù)越來(lái)越少;社區(qū)活躍度不高。

(2)Kafka:是一個(gè)分布式消息發(fā)布訂閱系統(tǒng)。為大數(shù)據(jù)而生的消息中間件,大數(shù)據(jù)的殺手锏

  • 優(yōu)點(diǎn):?jiǎn)螜C(jī)吞吐量每秒百萬(wàn)級(jí),時(shí)效性毫秒級(jí),不會(huì)丟失數(shù)據(jù),不會(huì)導(dǎo)致不可用
  • 缺點(diǎn):支持消息順序,但是一臺(tái)代理宕機(jī)后,就會(huì)產(chǎn)生消息亂序;消費(fèi)失敗不支持重試;社區(qū)更新較慢

(3)RocketMQ:阿里系下開源的一款分布式、隊(duì)列模型的消息中間件,3.0版本名稱改為RocketMQ,是阿里參照 kafka 設(shè)計(jì)思想使用 java 實(shí)現(xiàn)的一套消息隊(duì)列。

  • 優(yōu)點(diǎn):?jiǎn)螜C(jī)吞吐量十萬(wàn)級(jí),時(shí)效性毫秒級(jí),消息可以做到 0 丟失,支持 10 億級(jí)別的消息堆積
  • 缺點(diǎn):支持的客戶端語(yǔ)言不多,目前是 java 及 c++;社區(qū)活躍度一般;

(4)RabbitMQ:是使用Erlang編寫的一個(gè)開源的消息隊(duì)列

優(yōu)點(diǎn):?jiǎn)螜C(jī)吞吐量萬(wàn)級(jí),時(shí)效性微秒級(jí),支持多種語(yǔ)言

二、消息隊(duì)列工作原理

C++ 消息隊(duì)列遵循先進(jìn)先出(FIFO)的原則 ,就像我們?nèi)粘I钪信抨?duì)買東西一樣,先到的人先接受服務(wù)。在消息隊(duì)列中,先發(fā)送的消息會(huì)被先接收和處理。它為進(jìn)程間或線程間提供了一種可靠的數(shù)據(jù)傳遞方式。在一個(gè)多線程的圖形渲染程序中,主線程負(fù)責(zé)生成各種圖形繪制指令,然后將這些指令作為消息發(fā)送到消息隊(duì)列中,而渲染線程則從消息隊(duì)列中依次取出這些指令,按照順序進(jìn)行圖形渲染操作 ,這樣就能確保圖形的繪制順序與指令生成順序一致,從而保證畫面的準(zhǔn)確性和流暢性。

2.1核心操作流程

①創(chuàng)建隊(duì)列:在 C++ 中創(chuàng)建消息隊(duì)列,通常會(huì)使用操作系統(tǒng)提供的相關(guān)函數(shù),以 Linux 系統(tǒng)為例,使用msgget()函數(shù)來(lái)創(chuàng)建一個(gè)消息隊(duì)列。msgget()函數(shù)需要兩個(gè)參數(shù),第一個(gè)參數(shù)key是一個(gè)鍵值,它可以通過(guò)ftok()函數(shù)根據(jù)一個(gè)已存在的文件路徑和一個(gè)項(xiàng)目標(biāo)識(shí)符生成,用于唯一標(biāo)識(shí)這個(gè)消息隊(duì)列,不同的進(jìn)程只要使用相同的key值,就能訪問(wèn)同一個(gè)消息隊(duì)列;

第二個(gè)參數(shù)msgflg用于指定創(chuàng)建消息隊(duì)列的方式和權(quán)限 ,比如IPC_CREAT表示如果消息隊(duì)列不存在就創(chuàng)建它,如果存在則返回其標(biāo)識(shí)符,還可以與文件權(quán)限位(如0666表示所有用戶可讀可寫)進(jìn)行按位或運(yùn)算來(lái)設(shè)置消息隊(duì)列的訪問(wèn)權(quán)限。通過(guò)msgget()函數(shù)創(chuàng)建成功后,會(huì)返回一個(gè)唯一的標(biāo)識(shí)符來(lái)表示這個(gè)消息隊(duì)列,后續(xù)對(duì)該消息隊(duì)列的操作都將使用這個(gè)標(biāo)識(shí)符。

②發(fā)送消息:發(fā)送進(jìn)程需要將消息內(nèi)容封裝到一個(gè)特定的結(jié)構(gòu)體中,這個(gè)結(jié)構(gòu)體至少要包含一個(gè)long類型的成員來(lái)表示消息類型。以一個(gè)簡(jiǎn)單的消息結(jié)構(gòu)體為例:

struct MsgStruct {
    long mtype; // 消息類型
    char mtext[1024]; // 消息內(nèi)容
};

假設(shè)我們要發(fā)送一條消息,首先創(chuàng)建一個(gè)MsgStruct結(jié)構(gòu)體變量,設(shè)置好mtype和mtext的值,然后使用msgsnd()函數(shù)來(lái)發(fā)送消息。msgsnd()函數(shù)有四個(gè)參數(shù),第一個(gè)參數(shù)是前面創(chuàng)建消息隊(duì)列時(shí)返回的標(biāo)識(shí)符msgid,用于指定要發(fā)送到哪個(gè)消息隊(duì)列;第二個(gè)參數(shù)是指向要發(fā)送的消息結(jié)構(gòu)體的指針&msg;第三個(gè)參數(shù)msgsz是要發(fā)送消息的大?。ú话ㄏ㈩愋蚼type占用的字節(jié)數(shù) ),例如strlen(msg.mtext);第四個(gè)參數(shù)msgflg用于指定發(fā)送消息的方式 ,如果為0,表示當(dāng)消息隊(duì)列滿時(shí),msgsnd()函數(shù)將會(huì)阻塞,直到消息能寫進(jìn)消息隊(duì)列,如果設(shè)置為IPC_NOWAIT,當(dāng)消息隊(duì)列已滿的時(shí)候,msgsnd()函數(shù)不等待立即返回。

MsgStruct msg;
msg.mtype = 1;
strcpy(msg.mtext, "Hello, message queue!");
if (msgsnd(msgid, &msg, strlen(msg.mtext), 0) == -1) {
    perror("msgsnd");
    // 處理發(fā)送失敗的情況
}

③接收消息:接收進(jìn)程使用msgrcv()函數(shù)從消息隊(duì)列中接收消息。msgrcv()函數(shù)有五個(gè)參數(shù),第一個(gè)參數(shù)同樣是消息隊(duì)列標(biāo)識(shí)符msgid;第二個(gè)參數(shù)msgp是指向用于存放接收到消息的結(jié)構(gòu)體指針;第三個(gè)參數(shù)msgsz是要接收消息的大?。ú话ㄏ㈩愋驼加玫淖止?jié)數(shù) );第四個(gè)參數(shù)msgtyp用于指定接收消息的類型 ,如果msgtyp為0,則接收消息隊(duì)列中的第一個(gè)消息,如果msgtyp大于0,則接收類型等于msgtyp的第一個(gè)消息,如果msgtyp小于0,則接收類型小于等于msgtyp絕對(duì)值的第一個(gè)消息;第五個(gè)參數(shù)msgflg用于指定接收消息的方式 ,若為0,表示阻塞式接收消息,沒(méi)有該類型的消息msgrcv()函數(shù)一直阻塞等待,如果設(shè)置為IPC_NOWAIT,如果沒(méi)有返回條件的消息調(diào)用立即返回,此時(shí)錯(cuò)誤碼為ENOMSG。

MsgStruct receivedMsg;
if (msgrcv(msgid, &receivedMsg, sizeof(receivedMsg.mtext), 1, 0) == -1) {
    perror("msgrcv");
    // 處理接收失敗的情況
}
printf("Received message: %s\n", receivedMsg.mtext);

④刪除隊(duì)列:當(dāng)消息隊(duì)列不再使用時(shí),需要將其刪除以釋放系統(tǒng)資源。在 C++ 中,使用msgctl()函數(shù)來(lái)刪除消息隊(duì)列。msgctl()函數(shù)有三個(gè)參數(shù),第一個(gè)參數(shù)msqid是要?jiǎng)h除的消息隊(duì)列的標(biāo)識(shí)符;第二個(gè)參數(shù)cmd設(shè)置為IPC_RMID表示執(zhí)行刪除操作;第三個(gè)參數(shù)buf在刪除操作時(shí)設(shè)置為NULL。

if (msgctl(msgid, IPC_RMID, NULL) == -1) {
    perror("msgctl");
    // 處理刪除失敗的情況
}

2.2不同操作系統(tǒng)實(shí)現(xiàn)差異

消息隊(duì)列是一種基于內(nèi)核的通信機(jī)制,這意味著它依賴于操作系統(tǒng)內(nèi)核來(lái)實(shí)現(xiàn)其功能。不同的操作系統(tǒng),如 Windows、Linux、macOS 等,在實(shí)現(xiàn)消息隊(duì)列時(shí)可能會(huì)有不同的方式和特點(diǎn)。在 Windows 系統(tǒng)中,消息隊(duì)列與 Windows 的消息機(jī)制緊密相關(guān),它為每個(gè)線程維護(hù)一個(gè)消息隊(duì)列,用于處理各種窗口消息和用戶輸入事件等,并且有隊(duì)列消息(通過(guò)PostMessage和PostThreadMessage發(fā)送 )和非隊(duì)列消息(通過(guò)SendMessage發(fā)送 )之分。

而 Linux 系統(tǒng)提供了 System V 消息隊(duì)列和 POSIX 消息隊(duì)列兩種標(biāo)準(zhǔn)實(shí)現(xiàn),System V 消息隊(duì)列出現(xiàn)較早,在許多現(xiàn)有應(yīng)用中廣泛使用 ,POSIX 消息隊(duì)列則更注重可移植性。不同操作系統(tǒng)在消息隊(duì)列的創(chuàng)建、操作函數(shù)接口、消息格式、隊(duì)列管理等方面都可能存在差異,開發(fā)者在編寫跨平臺(tái)程序時(shí),需要充分考慮這些差異,選擇合適的方式來(lái)實(shí)現(xiàn)消息隊(duì)列功能,以確保程序在不同操作系統(tǒng)上都能正確運(yùn)行。

三、C++ 實(shí)現(xiàn)消息隊(duì)列的實(shí)戰(zhàn)演練

3.1基于標(biāo)準(zhǔn)庫(kù)的實(shí)現(xiàn)方式

(1)數(shù)據(jù)結(jié)構(gòu)選擇

在 C++ 中,我們選用std::queue作為底層數(shù)據(jù)結(jié)構(gòu)來(lái)存儲(chǔ)消息。std::queue是一個(gè)容器適配器,它默認(rèn)基于std::deque實(shí)現(xiàn),具有典型的先進(jìn)先出(FIFO)特性,這與消息隊(duì)列的基本特性完美契合 。在一個(gè)網(wǎng)絡(luò)通信程序中,接收到的網(wǎng)絡(luò)數(shù)據(jù)包可以作為消息,按照到達(dá)的先后順序依次存入std::queue中,這樣后續(xù)處理線程就能按照數(shù)據(jù)包接收的順序進(jìn)行處理,保證了數(shù)據(jù)處理的順序性和正確性。同時(shí),std::queue提供了簡(jiǎn)潔易用的接口,如push用于將元素添加到隊(duì)列末尾 ,pop用于移除隊(duì)列頭部的元素 ,front用于訪問(wèn)隊(duì)首元素等,方便我們進(jìn)行消息的插入和提取操作 。

(2)同步機(jī)制構(gòu)建

為了保證多線程環(huán)境下對(duì)std::queue的安全訪問(wèn),我們使用互斥量(std::mutex)和std::unique_lock?;コ饬烤拖袷且话焰i,同一時(shí)間只有一個(gè)線程能夠獲取到這把鎖,從而訪問(wèn)被保護(hù)的資源,避免了多個(gè)線程同時(shí)對(duì)隊(duì)列進(jìn)行操作導(dǎo)致的數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題 。在多線程的游戲開發(fā)場(chǎng)景中,如果多個(gè)線程同時(shí)嘗試向存儲(chǔ)游戲事件消息的隊(duì)列中添加消息,就可能會(huì)出現(xiàn)數(shù)據(jù)不一致的情況,而互斥量可以有效避免這種問(wèn)題。

std::unique_lock則是一個(gè)智能鎖,它基于 RAII(Resource Acquisition Is Initialization)機(jī)制,在構(gòu)造時(shí)自動(dòng)鎖定互斥量,在析構(gòu)時(shí)自動(dòng)解鎖互斥量 ,這樣即使在操作過(guò)程中發(fā)生異常,也能保證互斥量被正確釋放,避免了死鎖的發(fā)生,極大地提高了代碼的安全性和可靠性。

(3)條件變量運(yùn)用

條件變量(std::condition_variable)在生產(chǎn)者 - 消費(fèi)者模型中起著至關(guān)重要的協(xié)調(diào)作用。生產(chǎn)者線程在向隊(duì)列中添加消息后,通過(guò)調(diào)用條件變量的notify_one或notify_all方法通知消費(fèi)者線程隊(duì)列中有新消息可供消費(fèi);消費(fèi)者線程在嘗試從隊(duì)列中獲取消息時(shí),如果發(fā)現(xiàn)隊(duì)列為空,就調(diào)用條件變量的wait方法進(jìn)入等待狀態(tài),同時(shí)自動(dòng)釋放之前獲取的互斥鎖,避免了資源的浪費(fèi)和死鎖的產(chǎn)生 。當(dāng)生產(chǎn)者線程添加消息并通知后,消費(fèi)者線程被喚醒,重新獲取互斥鎖,然后從隊(duì)列中取出消息進(jìn)行處理 。

在一個(gè)文件處理系統(tǒng)中,生產(chǎn)者線程負(fù)責(zé)讀取文件內(nèi)容并將數(shù)據(jù)塊作為消息放入隊(duì)列,消費(fèi)者線程則從隊(duì)列中取出數(shù)據(jù)塊進(jìn)行解析和處理,如果沒(méi)有條件變量的協(xié)調(diào),消費(fèi)者線程可能會(huì)不斷地?zé)o效嘗試從空隊(duì)列中取數(shù)據(jù),而有了條件變量,消費(fèi)者線程就能在隊(duì)列為空時(shí)等待,直到生產(chǎn)者線程添加新數(shù)據(jù)并通知它,從而實(shí)現(xiàn)了高效的線程間協(xié)作。

3.2代碼示例解析

消息隊(duì)列類定義:下面是一個(gè)簡(jiǎn)單的MessageQueue類的實(shí)現(xiàn):

class MessageQueue {
private:
    std::queue<std::string> queue;  // 存儲(chǔ)消息的隊(duì)列
    std::mutex mtx;                 // 保護(hù)隊(duì)列的互斥量
    std::condition_variable cv;     // 用于線程同步的條件變量
public:
    // 向隊(duì)列中添加消息
    void push(const std::string& message) {
        std::lock_guard<std::mutex> lock(mtx); // 利用lock_guard自動(dòng)管理鎖的生命周期,構(gòu)造時(shí)上鎖,析構(gòu)時(shí)解鎖
        queue.push(message);                    // 添加消息到隊(duì)列
        cv.notify_one();                        // 通知等待的消費(fèi)者線程
    }
    // 從隊(duì)列中獲取消息
    std::string pop() {
        std::unique_lock<std::mutex> lock(mtx); // 使用unique_lock更靈活地管理鎖
        cv.wait(lock, [this] { return!queue.empty(); }); // 等待直到隊(duì)列不為空,lambda表達(dá)式用于判斷條件
        std::string msg = queue.front();         // 獲取隊(duì)列頭部的消息
        queue.pop();                             // 彈出該消息
        return msg;
    }
};

在這個(gè)類中,queue成員變量用于存儲(chǔ)消息,mtx用于保護(hù)隊(duì)列的線程安全訪問(wèn),cv用于協(xié)調(diào)生產(chǎn)者和消費(fèi)者線程。push函數(shù)在向隊(duì)列中添加消息后,通過(guò)cv.notify_one通知等待的消費(fèi)者線程;pop函數(shù)在隊(duì)列為空時(shí),通過(guò)cv.wait等待,直到隊(duì)列中有消息可供消費(fèi)。

生產(chǎn)者與消費(fèi)者線程函數(shù)

// 生產(chǎn)者線程函數(shù)
void producer(MessageQueue& mq) {
    const std::string messages[] = { "Message 1", "Message 2", "Message 3", "Message 4" };
    for (const auto& msg : messages) {
        std::this_thread::sleep_for(std::chrono::seconds(1));  // 模擬生產(chǎn)延遲
        std::cout << "Produced: " << msg << std::endl;
        mq.push(msg);  // 將消息放入隊(duì)列
    }
}
// 消費(fèi)者線程函數(shù)
void consumer(MessageQueue& mq) {
    while (true) {
        std::string msg = mq.pop();  // 從隊(duì)列中獲取消息
        std::cout << "Consumed: " << msg << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(2));  // 模擬消費(fèi)延遲
    }
}

在producer函數(shù)中,生產(chǎn)者每隔 1 秒生成一個(gè)消息,并通過(guò)mq.push將消息放入隊(duì)列;在consumer函數(shù)中,消費(fèi)者不斷從隊(duì)列中獲取消息并消費(fèi),每隔 2 秒處理一個(gè)消息,如果隊(duì)列為空,會(huì)等待生產(chǎn)者添加新消息。

主函數(shù)邏輯

int main() {
    MessageQueue mq;  // 創(chuàng)建消息隊(duì)列
    // 啟動(dòng)生產(chǎn)者和消費(fèi)者線程
    std::thread producerThread(producer, std::ref(mq));
    std::thread consumerThread(consumer, std::ref(mq));
    // 等待生產(chǎn)者和消費(fèi)者線程完成
    producerThread.join();
    consumerThread.join();
    return 0;
}

在main函數(shù)中,首先創(chuàng)建了一個(gè)MessageQueue對(duì)象mq,然后分別啟動(dòng)生產(chǎn)者線程和消費(fèi)者線程,通過(guò)std::thread的構(gòu)造函數(shù)將producer和consumer函數(shù)與mq對(duì)象關(guān)聯(lián)起來(lái)。最后,通過(guò)調(diào)用join方法等待兩個(gè)線程執(zhí)行完畢,確保程序在所有任務(wù)完成后才結(jié)束。

四、消息隊(duì)列應(yīng)用場(chǎng)景

4.1 異步處理

場(chǎng)景說(shuō)明:用戶注冊(cè)后,需要發(fā)送注冊(cè)郵件和發(fā)送注冊(cè)信息,傳統(tǒng)的做法有兩種:串行方式、并行方式

(1)串行方式

將注冊(cè)信息寫入數(shù)據(jù)庫(kù)成功后,發(fā)送注冊(cè)郵件,然后發(fā)送注冊(cè)短信,而所有任務(wù)執(zhí)行完成后,返回信息給客戶端

圖片圖片

(2)并行方式

將注冊(cè)信息寫入數(shù)據(jù)庫(kù)成功后,同時(shí)進(jìn)行發(fā)送注冊(cè)郵件和發(fā)送注冊(cè)短信的操作。而所有任務(wù)執(zhí)行完成后,返回信息給客戶端。同串行方式相比,并行方式可以提高執(zhí)行效率,減少執(zhí)行時(shí)間。

圖片圖片

上面的比較可以發(fā)現(xiàn),假設(shè)三個(gè)操作均需要50ms的執(zhí)行時(shí)間,排除網(wǎng)絡(luò)因素,則最終執(zhí)行完成,串行方式需要150ms,而并行方式需要100ms。

因?yàn)閏pu在單位時(shí)間內(nèi)處理的請(qǐng)求數(shù)量是一致的,假設(shè):CPU每1秒吞吐量是100此,則串行方式1秒內(nèi)可執(zhí)行的請(qǐng)求量為1000/150,不到7次;并行方式1秒內(nèi)可執(zhí)行的請(qǐng)求量為1000/100,為10次。

由上可以看出,傳統(tǒng)串行和并行的方式會(huì)受到系統(tǒng)性能的局限,那么如何解決這個(gè)問(wèn)題?我們需要引入消息隊(duì)列,將不是必須的業(yè)務(wù)邏輯,異步進(jìn)行處理,由此改造出來(lái)的流程為:

圖片圖片

根據(jù)上述的流程,用戶的響應(yīng)時(shí)間基本相當(dāng)于將用戶數(shù)據(jù)寫入數(shù)據(jù)庫(kù)的時(shí)間,發(fā)送注冊(cè)郵件、發(fā)送注冊(cè)短信的消息在寫入消息隊(duì)列后,即可返回執(zhí)行結(jié)果,寫入消息隊(duì)列的時(shí)間很快,幾乎可以忽略,也有此可以將系統(tǒng)吞吐量提升至20QPS,比串行方式提升近3倍,比并行方式提升2倍。

通俗易懂案例:

圖片圖片

這里以我以前接觸過(guò)的一個(gè)智能外呼系統(tǒng)為例:智能外呼:客服中心以電話的方式,主動(dòng)發(fā)起的對(duì)客戶的呼叫問(wèn)答活動(dòng)。廣泛應(yīng)用在產(chǎn)品營(yíng)銷、貸款催繳、投資理財(cái)?shù)确矫妗?/span>

說(shuō)白了就是機(jī)器人給您打電話,不斷問(wèn)問(wèn)題,然后將您的問(wèn)題轉(zhuǎn)成文字存儲(chǔ)在數(shù)據(jù)庫(kù)里的過(guò)程。

圖片圖片

業(yè)務(wù)邏輯:該系統(tǒng)的上游是外呼請(qǐng)求的發(fā)起方,下游是外呼動(dòng)作的執(zhí)行機(jī)構(gòu)。語(yǔ)音識(shí)別,自然語(yǔ)言處理的模塊集成在這里面。

由于打電話是個(gè)耗時(shí)的過(guò)程,整個(gè)系統(tǒng)異步實(shí)現(xiàn),具體來(lái)說(shuō)有2步:

當(dāng)有外呼請(qǐng)求發(fā)起時(shí),中間件解析上游發(fā)來(lái)的請(qǐng)求報(bào)文,推送至執(zhí)行機(jī)構(gòu)并且即時(shí)回復(fù)響應(yīng)報(bào)文,實(shí)現(xiàn)異步第一步。

中間件輪詢下游處理結(jié)果(如性別的聲紋檢驗(yàn)),封裝結(jié)果返回,異步第二步,實(shí)現(xiàn)閉環(huán)。注意打電話是個(gè)耗時(shí)的操作,在這個(gè)過(guò)程中,如果用傳統(tǒng)的基于請(qǐng)求/響應(yīng)的同步通訊方式,在上游發(fā)起請(qǐng)求后,監(jiān)聽過(guò)程中生產(chǎn)者線程會(huì)一直阻塞。如果這條流水線上有其他業(yè)務(wù)處理,會(huì)造成時(shí)間和資源的浪費(fèi)。

但如果使用如果使用異步消息處理,立即返回消息發(fā)送成功或失敗的回調(diào)方法,就能實(shí)現(xiàn)生產(chǎn)者線程不阻塞,從而達(dá)到異步執(zhí)行的效果。

4.2應(yīng)用解耦

場(chǎng)景說(shuō)明:用戶下單后,訂單系統(tǒng)需要通知庫(kù)存系統(tǒng)。

傳統(tǒng)的做法為:訂單系統(tǒng)調(diào)用庫(kù)存系統(tǒng)的接口。如下圖所示:

圖片圖片

傳統(tǒng)方式具有如下缺點(diǎn):

  • 假設(shè)庫(kù)存系統(tǒng)訪問(wèn)失敗,則訂單減少庫(kù)存失敗,導(dǎo)致訂單創(chuàng)建失敗
  • 訂單系統(tǒng)同庫(kù)存系統(tǒng)過(guò)度耦合

如何解決上述的缺點(diǎn)呢?需要引入消息隊(duì)列,引入消息隊(duì)列后的架構(gòu)如下圖所示:

圖片圖片

  • 訂單系統(tǒng):用戶下單后,訂單系統(tǒng)進(jìn)行數(shù)據(jù)持久化處理,然后將消息寫入消息隊(duì)列,返回訂單創(chuàng)建成功
  • 庫(kù)存系統(tǒng):使用拉/推的方式,獲取下單信息,庫(kù)存系統(tǒng)根據(jù)訂單信息,進(jìn)行庫(kù)存操作。

假如在下單時(shí)庫(kù)存系統(tǒng)不能正常使用。也不影響正常下單,因?yàn)橄聠魏?,訂單系統(tǒng)寫入消息隊(duì)列就不再關(guān)心其后續(xù)操作了。由此實(shí)現(xiàn)了訂單系統(tǒng)與庫(kù)存系統(tǒng)的應(yīng)用解耦。

4.3流量削鋒

流量削鋒也是消息隊(duì)列中的常用場(chǎng)景,一般在秒殺或團(tuán)搶活動(dòng)中使用廣泛。

應(yīng)用場(chǎng)景:秒殺活動(dòng),一般會(huì)因?yàn)榱髁窟^(guò)大,導(dǎo)致流量暴增,應(yīng)用掛掉。為解決這個(gè)問(wèn)題,一般需要在應(yīng)用前端加入消息隊(duì)列。可以控制參與活動(dòng)的人數(shù);

可以緩解短時(shí)間內(nèi)高流量對(duì)應(yīng)用的巨大壓力;

流量削鋒處理方式系統(tǒng)圖如下:

圖片圖片

  • 服務(wù)器在接收到用戶請(qǐng)求后,首先寫入消息隊(duì)列。這時(shí)如果消息隊(duì)列中消息數(shù)量超過(guò)最大數(shù)量,則直接拒絕用戶請(qǐng)求或返回跳轉(zhuǎn)到錯(cuò)誤頁(yè)面;
  • 秒殺業(yè)務(wù)根據(jù)秒殺規(guī)則讀取消息隊(duì)列中的請(qǐng)求信息,進(jìn)行后續(xù)處理。

4.4 日志處理

日志處理是指將消息隊(duì)列用在日志處理中,比如Kafka的應(yīng)用,解決大量日志傳輸?shù)膯?wèn)題。架構(gòu)簡(jiǎn)化如下:

圖片圖片

  • 日志采集客戶端:負(fù)責(zé)日志數(shù)據(jù)采集,定時(shí)寫受寫入Kafka隊(duì)列;
  • Kafka消息隊(duì)列:負(fù)責(zé)日志數(shù)據(jù)的接收,存儲(chǔ)和轉(zhuǎn)發(fā);
  • 日志處理應(yīng)用:訂閱并消費(fèi)kafka隊(duì)列中的日志數(shù)據(jù);

圖片

  • Kafka:接收用戶日志的消息隊(duì)列。
  • Logstash:做日志解析,統(tǒng)一成JSON輸出給Elasticsearch。
  • Elasticsearch:實(shí)時(shí)日志分析服務(wù)的核心技術(shù),一個(gè)schemaless,實(shí)時(shí)的數(shù)據(jù)存儲(chǔ)服務(wù),通過(guò)index組織數(shù)據(jù),兼具強(qiáng)大的搜索和統(tǒng)計(jì)功能。
  • Kibana:基于Elasticsearch的數(shù)據(jù)可視化組件,超強(qiáng)的數(shù)據(jù)可視化能力是眾多公司選擇ELK stack的重要原因。

4.5 消息通訊

消息通訊是指,消息隊(duì)列一般都內(nèi)置了高效的通信機(jī)制,因此也可以用在純的消息通訊。比如實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)消息隊(duì)列、聊天室等。

點(diǎn)對(duì)點(diǎn)通訊

圖片圖片

在點(diǎn)對(duì)點(diǎn)通訊架構(gòu)設(shè)計(jì)中,客戶端A和客戶端B共用一個(gè)消息隊(duì)列,即可實(shí)現(xiàn)消息通訊功能。

聊天室通訊

圖片

客戶端A、客戶端B、直至客戶端N訂閱同一消息隊(duì)列,進(jìn)行消息的發(fā)布與接收,即可實(shí)現(xiàn)聊天通訊方案架構(gòu)設(shè)計(jì)。

五、 消息中間件示例

4.1 電商系統(tǒng)

圖片圖片

消息隊(duì)列采用高可用、可持久化的消息中間件。比如Active MQ,Rabbit MQ,Rocket MQ。

  • 應(yīng)用將主干邏輯處理完成后,寫入消息隊(duì)列。消息發(fā)送是否成功可以開啟消息的確認(rèn)模式。(消息隊(duì)列返回消息接收成功狀態(tài)后,應(yīng)用再返回,這樣保障消息的完整性)
  • 擴(kuò)展流程(發(fā)短信、配送處理)訂閱隊(duì)列消息。采用推或拉的方式獲取消息并處理。
  • 消息將應(yīng)用解耦的同時(shí),帶來(lái)了數(shù)據(jù)一致性問(wèn)題,可以采用最終一致性方式解決。比如主數(shù)據(jù)寫入數(shù)據(jù)庫(kù),擴(kuò)展應(yīng)用根據(jù)消息隊(duì)列,并結(jié)合數(shù)據(jù)庫(kù)方式實(shí)現(xiàn)基于消息隊(duì)列的后續(xù)處理。

4.2 日志收集系統(tǒng)

圖片圖片

分為Zookeeper注冊(cè)中心,日志收集客戶端,Kafka集群和Storm集群(OtherApp)四部分組成。

  • Zookeeper注冊(cè)中心,提出負(fù)載均衡和地址查找服務(wù);
  • 日志收集客戶端,用于采集應(yīng)用系統(tǒng)的日志,并將數(shù)據(jù)推送到kafka隊(duì)列;
  • Kafka集群:接收,路由,存儲(chǔ),轉(zhuǎn)發(fā)等消息處理;
  • Storm集群:與OtherApp處于同一級(jí)別,采用拉的方式消費(fèi)隊(duì)列中的數(shù)據(jù);

六、JMS消息服務(wù)

講消息隊(duì)列就不得不提JMS 。JMS(Java Message Service,Java消息服務(wù))API是一個(gè)消息服務(wù)的標(biāo)準(zhǔn)/規(guī)范,允許應(yīng)用程序組件基于JavaEE平臺(tái)創(chuàng)建、發(fā)送、接收和讀取消息。它使分布式通信耦合度更低,消息服務(wù)更加可靠以及異步性。

在EJB架構(gòu)中,有消息bean可以無(wú)縫的與JM消息服務(wù)集成。在J2EE架構(gòu)模式中,有消息服務(wù)者模式,用于實(shí)現(xiàn)消息與應(yīng)用直接的解耦。

6.1 消息模型

在JMS標(biāo)準(zhǔn)中,有兩種消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

(1)P2P模式

圖片圖片

P2P模式包含三個(gè)角色:消息隊(duì)列(Queue),發(fā)送者(Sender),接收者(Receiver)。每個(gè)消息都被發(fā)送到一個(gè)特定的隊(duì)列,接收者從隊(duì)列中獲取消息。隊(duì)列保留著消息,直到他們被消費(fèi)或超時(shí)。

P2P的特點(diǎn)

每個(gè)消息只有一個(gè)消費(fèi)者(Consumer)(即一旦被消費(fèi),消息就不再在消息隊(duì)列中),發(fā)送者和接收者之間在時(shí)間上沒(méi)有依賴性,也就是說(shuō)當(dāng)發(fā)送者發(fā)送了消息之后,不管接收者有沒(méi)有正在運(yùn)行,它不會(huì)影響到消息被發(fā)送到隊(duì)列

接收者在成功接收消息之后需向隊(duì)列應(yīng)答成功,如果希望發(fā)送的每個(gè)消息都會(huì)被成功處理的話,那么需要P2P模式。

(2)Pub/Sub模式

圖片圖片

包含三個(gè)角色:主題(Topic),發(fā)布者(Publisher),訂閱者(Subscriber) 。多個(gè)發(fā)布者將消息發(fā)送到Topic,系統(tǒng)將這些消息傳遞給多個(gè)訂閱者。

Pub/Sub的特點(diǎn)

  • 每個(gè)消息可以有多個(gè)消費(fèi)者
  • 發(fā)布者和訂閱者之間有時(shí)間上的依賴性。針對(duì)某個(gè)主題(Topic)的訂閱者,它必須創(chuàng)建一個(gè)訂閱者之后,才能消費(fèi)發(fā)布者的消息。
  • 為了消費(fèi)消息,訂閱者必須保持運(yùn)行的狀態(tài)。

為了緩和這樣嚴(yán)格的時(shí)間相關(guān)性,JMS允許訂閱者創(chuàng)建一個(gè)可持久化的訂閱。這樣,即使訂閱者沒(méi)有被激活(運(yùn)行),它也能接收到發(fā)布者的消息。如果希望發(fā)送的消息可以不被做任何處理、或者只被一個(gè)消息者處理、或者可以被多個(gè)消費(fèi)者處理的話,那么可以采用Pub/Sub模型。

6.2消息消費(fèi)

在JMS中,消息的產(chǎn)生和消費(fèi)都是異步的。對(duì)于消費(fèi)來(lái)說(shuō),JMS的消息者可以通過(guò)兩種方式來(lái)消費(fèi)消息。

  • 同步:訂閱者或接收者通過(guò)receive方法來(lái)接收消息,receive方法在接收到消息之前(或超時(shí)之前)將一直阻塞;
  • 異步:訂閱者或接收者可以注冊(cè)為一個(gè)消息監(jiān)聽器。當(dāng)消息到達(dá)之后,系統(tǒng)自動(dòng)調(diào)用監(jiān)聽器的onMessage方法。

JNDI:Java命名和目錄接口,是一種標(biāo)準(zhǔn)的Java命名系統(tǒng)接口??梢栽诰W(wǎng)絡(luò)上查找和訪問(wèn)服務(wù)。通過(guò)指定一個(gè)資源名稱,該名稱對(duì)應(yīng)于數(shù)據(jù)庫(kù)或命名服務(wù)中的一個(gè)記錄,同時(shí)返回資源連接建立所必須的信息。JNDI在JMS中起到查找和訪問(wèn)發(fā)送目標(biāo)或消息來(lái)源的作用。

6.3JMS編程模型

  • ConnectionFactory:創(chuàng)建Connection對(duì)象的工廠,針對(duì)兩種不同的JMS消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種??梢酝ㄟ^(guò)JNDI來(lái)查找ConnectionFactory對(duì)象。
  • Destination:Destination的意思是消息生產(chǎn)者的消息發(fā)送目標(biāo)或者說(shuō)消息消費(fèi)者的消息來(lái)源。對(duì)于消息生產(chǎn)者來(lái)說(shuō),它的Destination是某個(gè)隊(duì)列(Queue)或某個(gè)主題(Topic);對(duì)于消息消費(fèi)者來(lái)說(shuō),它的Destination也是某個(gè)隊(duì)列或主題(即消息來(lái)源)。所以,Destination實(shí)際上就是兩種類型的對(duì)象:Queue、Topic可以通過(guò)JNDI來(lái)查找Destination。
  • Connection:表示在客戶端和JMS系統(tǒng)之間建立的鏈接(對(duì)TCP/IP Socket的包裝)。Connection可以產(chǎn)生一個(gè)或多個(gè)Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnectionTopicConnection。
  • Session:Session是操作消息的接口??梢酝ㄟ^(guò)session創(chuàng)建生產(chǎn)者、消費(fèi)者、消息等。Session提供了事務(wù)的功能。當(dāng)需要使用session發(fā)送/接收多個(gè)消息時(shí),可以將這些發(fā)送/接收動(dòng)作放到一個(gè)事務(wù)中。同樣,也分QueueSession和TopicSession。

七、案例實(shí)戰(zhàn)

7.1案例分析

假設(shè)我們要實(shí)現(xiàn)一個(gè)簡(jiǎn)單的任務(wù)調(diào)度系統(tǒng),有多個(gè)消費(fèi)者從消息隊(duì)列中獲取任務(wù)并進(jìn)行處理。

首先,你需要安裝 RabbitMQ C++ 客戶端庫(kù)??梢允褂靡韵旅钸M(jìn)行安裝(假設(shè)你已經(jīng)在 Linux 系統(tǒng)上安裝了 RabbitMQ):

sudo apt-get install librabbitmq-dev

然后,你可以使用以下代碼來(lái)發(fā)送和接收消息:

#include <iostream>
#include <string>
#include <SimpleAmqpClient/SimpleAmqpClient.h>

int main() {
    // 連接到 RabbitMQ 服務(wù)器
    AmqpClient::Channel::ptr_t channel = AmqpClient::Channel::Create("localhost");

    // 聲明一個(gè)隊(duì)列用于消息的發(fā)送和接收
    std::string queueName = "test_queue";
    channel->DeclareQueue(queueName, false, true, false, false);

    // 發(fā)送消息
    std::string message = "Hello, RabbitMQ!";
    channel->BasicPublish("", queueName, AmqpClient::BasicMessage::Create(message));

    std::cout << "Sent message: " << message << std::endl;

    // 接收消息
    bool noAck = true;
    AmqpClient::Envelope::ptr_t envelope;
    
    while (true) {
        bool received = channel->BasicConsumeMessage(queueName, envelope, noAck);
        
        if (received) {
            std::string receivedMessage = envelope->Message()->Body();
            
            std::cout << "Received message: " << receivedMessage << std::endl;
            
            break;
        }
        
        // 休眠一段時(shí)間再嘗試接收消息
        usleep(1000);
   }

   return 0;
}

在這個(gè)示例中,我們使用了 SimpleAmqpClient 庫(kù)來(lái)與 RabbitMQ 服務(wù)器進(jìn)行交互。首先,我們創(chuàng)建一個(gè)通道并聲明一個(gè)隊(duì)列用于消息的發(fā)送和接收。然后,我們使用 BasicPublish 方法發(fā)送消息到隊(duì)列中。最后,我們使用 BasicConsumeMessage 方法循環(huán)嘗試接收消息,直到成功接收到一條消息為止。

請(qǐng)注意,在編譯時(shí)需要鏈接 SimpleAmqpClient 庫(kù)。你可以使用以下命令編譯代碼:

g++ -std=c++11 -I/usr/include/SimpleAmqpClient -o main main.cpp -lamqpcpp

這只是一個(gè)簡(jiǎn)單的示例,用于演示如何在C++中使用RabbitMQ發(fā)送和接收消息。你可以根據(jù)自己的需求進(jìn)行擴(kuò)展和優(yōu)化。

7.2使用 C++ 消息隊(duì)列的注意要點(diǎn)

(1)消息格式與類型一致性

在使用 C++ 消息隊(duì)列進(jìn)行通信時(shí),確保發(fā)送和接收進(jìn)程使用相同的消息格式和類型至關(guān)重要。消息格式就像是一種約定俗成的 “語(yǔ)言規(guī)則”,只有發(fā)送方和接收方都遵循相同的規(guī)則,才能實(shí)現(xiàn)準(zhǔn)確無(wú)誤的信息傳遞。在一個(gè)金融交易系統(tǒng)中,訂單消息可能包含訂單編號(hào)、交易金額、交易時(shí)間、股票代碼等信息,這些信息必須按照固定的順序和數(shù)據(jù)類型進(jìn)行封裝和解析。如果發(fā)送方將交易金額定義為float類型,而接收方卻按照int類型去解析,就會(huì)導(dǎo)致數(shù)據(jù)錯(cuò)誤,可能引發(fā)嚴(yán)重的交易問(wèn)題,如金額計(jì)算錯(cuò)誤、訂單匹配失敗等。

同樣,消息類型也需要嚴(yán)格一致。消息類型通常用于標(biāo)識(shí)消息的用途或類別,不同類型的消息在系統(tǒng)中會(huì)有不同的處理方式。在一個(gè)游戲開發(fā)項(xiàng)目中,可能存在 “玩家移動(dòng)”“怪物生成”“道具拾取” 等不同類型的消息,如果發(fā)送方將 “玩家移動(dòng)” 消息的類型標(biāo)識(shí)錯(cuò)誤,接收方可能會(huì)將其當(dāng)作其他類型的消息進(jìn)行處理,從而導(dǎo)致游戲邏輯混亂,玩家體驗(yàn)受到嚴(yán)重影響。

(2)同步與并發(fā)訪問(wèn)處理

當(dāng)多個(gè)線程或進(jìn)程同時(shí)訪問(wèn)消息隊(duì)列時(shí),同步和并發(fā)訪問(wèn)處理不當(dāng)很容易引發(fā)競(jìng)爭(zhēng)條件和死鎖等問(wèn)題。競(jìng)爭(zhēng)條件是指多個(gè)線程或進(jìn)程在訪問(wèn)共享資源(如消息隊(duì)列)時(shí),由于執(zhí)行順序的不確定性,導(dǎo)致最終結(jié)果出現(xiàn)錯(cuò)誤。死鎖則是指兩個(gè)或多個(gè)線程或進(jìn)程相互等待對(duì)方釋放資源,從而陷入無(wú)限期的阻塞狀態(tài)。為了避免這些問(wèn)題,我們可以采用多種方法。

使用互斥鎖(如std::mutex)是最常見的方式之一,它能夠保證在同一時(shí)刻只有一個(gè)線程或進(jìn)程可以訪問(wèn)消息隊(duì)列,從而避免競(jìng)爭(zhēng)條件。在一個(gè)多線程的日志記錄系統(tǒng)中,多個(gè)線程可能同時(shí)產(chǎn)生日志消息并嘗試將其發(fā)送到消息隊(duì)列中,如果沒(méi)有互斥鎖的保護(hù),就可能出現(xiàn)消息順序混亂、數(shù)據(jù)丟失等問(wèn)題。條件變量(如std::condition_variable)也起著重要作用,它可以協(xié)調(diào)線程之間的同步。

在生產(chǎn)者 - 消費(fèi)者模型中,生產(chǎn)者線程在向消息隊(duì)列中添加消息后,可以通過(guò)條件變量通知消費(fèi)者線程有新消息到來(lái);消費(fèi)者線程在隊(duì)列為空時(shí),可以通過(guò)條件變量等待,避免無(wú)效的輪詢,提高系統(tǒng)效率。同時(shí),合理的加鎖和解鎖策略也至關(guān)重要,要確保在適當(dāng)?shù)臅r(shí)機(jī)獲取和釋放鎖,避免死鎖的發(fā)生。在使用多個(gè)鎖的情況下,按照固定的順序獲取鎖可以有效防止死鎖。如果線程 A 需要獲取鎖 1 和鎖 2,線程 B 需要獲取鎖 2 和鎖 1,若不按照固定順序獲取鎖,就可能出現(xiàn)線程 A 獲取了鎖 1,線程 B 獲取了鎖 2,然后雙方都等待對(duì)方釋放自己需要的鎖,從而導(dǎo)致死鎖的情況 。

(3)隊(duì)列大小與性能優(yōu)化

消息隊(duì)列的大小設(shè)置對(duì)系統(tǒng)性能有著顯著影響。如果隊(duì)列設(shè)置過(guò)小,可能導(dǎo)致消息無(wú)法及時(shí)存儲(chǔ),造成消息丟失。在一個(gè)高并發(fā)的電商訂單處理系統(tǒng)中,促銷活動(dòng)期間短時(shí)間內(nèi)可能會(huì)產(chǎn)生大量的訂單消息,如果消息隊(duì)列過(guò)小,部分訂單消息可能因?yàn)殛?duì)列已滿而無(wú)法入隊(duì),從而導(dǎo)致訂單丟失,給商家和用戶帶來(lái)?yè)p失。另一方面,隊(duì)列過(guò)大也會(huì)占用過(guò)多的系統(tǒng)內(nèi)存資源,影響系統(tǒng)的整體性能。

在內(nèi)存資源有限的嵌入式系統(tǒng)中,如果消息隊(duì)列設(shè)置過(guò)大,可能會(huì)導(dǎo)致其他關(guān)鍵任務(wù)因?yàn)閮?nèi)存不足而無(wú)法正常運(yùn)行。為了優(yōu)化性能,可以根據(jù)系統(tǒng)的實(shí)際需求和負(fù)載情況,動(dòng)態(tài)調(diào)整隊(duì)列大小。在系統(tǒng)負(fù)載較低時(shí),可以適當(dāng)縮小隊(duì)列大小,釋放內(nèi)存資源;當(dāng)系統(tǒng)負(fù)載升高時(shí),及時(shí)擴(kuò)大隊(duì)列大小,以滿足消息存儲(chǔ)的需求。采用消息壓縮技術(shù)可以減少消息在隊(duì)列中占用的空間,提高隊(duì)列的存儲(chǔ)效率。對(duì)于一些包含大量文本信息的日志消息,可以在發(fā)送前進(jìn)行壓縮,接收后再進(jìn)行解壓縮,這樣既能減少隊(duì)列空間的占用,又能降低網(wǎng)絡(luò)傳輸?shù)拈_銷,提升系統(tǒng)的整體性能。

責(zé)任編輯:武曉燕 來(lái)源: 深度Linux
相關(guān)推薦

2024-02-02 18:29:54

C++線程編程

2010-01-21 11:23:49

Linux多線程同步消息隊(duì)列

2012-05-18 10:36:20

CC++編程

2024-06-24 08:10:00

C++互斥鎖

2010-01-18 14:09:58

C++多線程

2010-02-04 10:19:39

C++多線程

2010-02-05 15:30:54

C++多線程測(cè)試

2021-03-05 07:38:52

C++線程編程開發(fā)技術(shù)

2023-12-14 15:05:08

volatile代碼C++

2021-02-25 15:58:46

C++線程編程開發(fā)技術(shù)

2021-03-26 05:54:00

C#數(shù)據(jù)方法

2021-06-10 00:13:43

C#隊(duì)列數(shù)據(jù)

2024-02-21 20:46:48

C++編程volatile

2024-06-24 12:57:09

多線程C++編程語(yǔ)言

2024-01-29 16:55:38

C++引用開發(fā)

2011-06-14 15:25:28

C++多線程

2024-11-05 16:29:57

2010-01-26 14:35:11

C++關(guān)鍵字

2024-04-03 08:25:11

DictionaryC#字典類型

2017-06-19 13:36:12

Linux進(jìn)程消息隊(duì)列
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)