偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

高并發(fā)+海量數(shù)據(jù)下如何實現(xiàn)系統(tǒng)解耦?「下」

數(shù)據(jù)庫 其他數(shù)據(jù)庫
這篇文章,咱們就來看看,假如說基于RabbitMQ作為消息中間件,如何實現(xiàn)一份數(shù)據(jù)被多個系統(tǒng)同時訂閱的“Pub/Sub”模型。

一、前情提示

上一篇文章《?高并發(fā)+海量數(shù)據(jù)下如何實現(xiàn)系統(tǒng)解耦?【中】?》分析了一下如何利用消息中間件對系統(tǒng)進(jìn)行解耦處理。

同時,我們也提到了使用消息中間件還有利于一份數(shù)據(jù)被多個系統(tǒng)同時訂閱,供多個系統(tǒng)來使用于不同的目的。

目前的一個架構(gòu)如下圖所示。

在這個圖里,我們可以清晰的看到,實時計算平臺發(fā)布的一份數(shù)據(jù)到消息中間件里,接著,會進(jìn)行如下步驟:

  1. 數(shù)據(jù)查詢平臺,會訂閱這份數(shù)據(jù),并落入自己本地的數(shù)據(jù)庫集群和緩存集群里,接著對外提供數(shù)據(jù)查詢的服務(wù)
  2. 數(shù)據(jù)質(zhì)量監(jiān)控系統(tǒng),會對計算結(jié)果按照一定的業(yè)務(wù)規(guī)則進(jìn)行監(jiān)控,如果發(fā)現(xiàn)有數(shù)據(jù)計算錯誤,則會立馬進(jìn)行報警
  3. 數(shù)據(jù)鏈路追蹤系統(tǒng),會采集計算結(jié)果作為一個鏈路節(jié)點,同時對一條數(shù)據(jù)的整個完整計算鏈路都進(jìn)行采集并組裝出來一系列的數(shù)據(jù)計算鏈路落地存儲,最后如果某個數(shù)據(jù)計算錯誤了,就可以立馬通過計算鏈路進(jìn)行回溯排查問題

因此上述場景中,使用消息中間件一來可以解耦,二來還可以實現(xiàn)消息“Pub/Sub”模型,實現(xiàn)消息的發(fā)布與訂閱。

這篇文章,咱們就來看看,假如說基于RabbitMQ作為消息中間件,如何實現(xiàn)一份數(shù)據(jù)被多個系統(tǒng)同時訂閱的“Pub/Sub”模型。

二、基于消息中間件的隊列消費模型

上面那個圖,其實就是采用的RabbitMQ最基本的隊列消費模型的支持。

也就是說,你可以理解為RabbitMQ內(nèi)部有一個隊列,生產(chǎn)者不斷的發(fā)送數(shù)據(jù)到隊列里,消息按照先后順序進(jìn)入隊列中排隊。

接著,假設(shè)隊列里有4條數(shù)據(jù),然后我們有2個消費者一起消費這個隊列的數(shù)據(jù)。

此時每個消費者會均勻的被分配到2條數(shù)據(jù),也就是說4條數(shù)據(jù)會均勻的分配給各個消費者,每個消費者只不過是處理一部分?jǐn)?shù)據(jù)罷了,這個就是典型的隊列消費模型。

三、基于消息中間件的“Pub/Sub”模型

但是消息中間件還可以實現(xiàn)一種“Pub/Sub”模型,也就是“發(fā)布/訂閱”模型,Pub就是Publish,Sub就是Subscribe。

這種模型是可以支持多個系統(tǒng)同時消費一份數(shù)據(jù)的。也就是說,你發(fā)布出去的每條數(shù)據(jù),都會廣播給每個系統(tǒng)。

給大家來一張圖,一起來感受一下。

如上圖所示。也就是說,我們想要實現(xiàn)的上圖的效果,實時計算平臺發(fā)布一系列的數(shù)據(jù)到消息中間件里。

然后數(shù)據(jù)查詢平臺、數(shù)據(jù)質(zhì)量監(jiān)控系統(tǒng)、數(shù)據(jù)鏈路追蹤系統(tǒng),都會訂閱數(shù)據(jù),都會消費到同一份完整的數(shù)據(jù),每個系統(tǒng)都可以根據(jù)自己的需要使用數(shù)據(jù)。

這,就是所謂的“Pub/Sub”模型,一個系統(tǒng)發(fā)布一份數(shù)據(jù)出去,多個系統(tǒng)訂閱和消費到一模一樣的一份數(shù)據(jù)。

那如果要實現(xiàn)上述的效果,基于RabbitMQ應(yīng)該怎么來處理呢?

四、RabbitMQ中的exchange到底是個什么東西?

實際上來說,在RabbitMQ里面是不允許生產(chǎn)者直接投遞消息到某個queue(隊列)里的,而是只能讓生產(chǎn)者投遞消息給RabbitMQ內(nèi)部的一個特殊組件,叫做“exchange”。

關(guān)于這個exchange,大概你可以把這個組件理解為一種消息路由的組件。

也就是說,實時計算平臺發(fā)送出去的message到RabbitMQ中都是由一個exchange來接收的。

然后這個exchange會根據(jù)一定的規(guī)則決定要將這個message路由轉(zhuǎn)發(fā)到哪個queue里去,這個實際上就是RabbitMQ中的一個核心的消息模型。

大家看下面的圖,一起來理解一下。

五、默認(rèn)的exchange

在之前的文章里,我們投遞消息到RabbitMQ的時候,也沒有用什么exchange,但是為什么就還是把消息投遞到了queue里去呢?

那是因為我們用了默認(rèn)的exchange,他會直接把消息路由到你指定的那個queue里去,所以如果簡單用隊列消費模型,不就省去了exchange的概念了嗎。

上面這段就是之前我們給大家展示的,讓消息持久化的一種投遞消息的方式。

大家注意里面的第一個參數(shù),是一個空的字符串,這個空字符串的意思,就是說投遞消息到默認(rèn)的exchange里去,然后他就會路由消息到我們指定的queue里去。

六、將消息投遞到fanout exchange

?在RabbitMQ里,exchange這種組件有很多種類型,比如說:direct、topic、headers以及fanout。這里咱們就來看看最后一種,fanout這種類型的exchange組件。

這種exchange組件其實非常的簡單,你可以創(chuàng)建一個fanout類型的exchange,然后給這個exchange綁定多個queue。

接著只要你投遞一條消息到這個exchange,他就會把消息路由給他綁定的所有queue。

使用下面的代碼就可以創(chuàng)建一個exchange,比如說在實時計算平臺(生產(chǎn)者)的代碼里,可以加入下面的一段,創(chuàng)建一個fanout類型的exchange。

第一個參數(shù)我們叫做“rt_compute_data”,這個就是exchange的名字,rt就是“RealTime”的縮寫,意思就是實時計算系統(tǒng)的計算結(jié)果數(shù)據(jù)。

第二個參數(shù)就是定義了這個exchange的類型是“fanout”。?

channel.exchangeDeclare(
"rt_compute_data",
"fanout");

接著我們就采用下面的代碼來投遞消息到我們創(chuàng)建好的exchange組件里去:

大家會注意到,此時消息就是投遞到指定的exchange里去了,但是路由到哪個queue里去呢?此時我們暫時還沒確定,要讓消費者自己來把自己的queue綁定到這個exchange上去才可以。

七、綁定自己的隊列到exchange上去消費

我們對消費者的代碼也進(jìn)行修改,之前我們在這里關(guān)閉了autoAck機制,然后每次都是自己手動ack。

上面的代碼里,每個消費者系統(tǒng),都會有一些不一樣,就是每個消費者都需要定義自己的隊列,然后綁定到exchange上去。比如:

  • 數(shù)據(jù)查詢平臺的隊列是“rt_compute_data_query”。
  • 數(shù)據(jù)質(zhì)量監(jiān)控平臺的隊列是“rt_compute_data_monitor”。
  • 數(shù)據(jù)鏈路追蹤系統(tǒng)的隊列是“rt_compute_data_link”。

這樣,每個訂閱這份數(shù)據(jù)的系統(tǒng)其實都有一個屬于自己的隊列,然后隊列里被會被exchange路由進(jìn)去實時計算平臺生產(chǎn)的所有數(shù)據(jù)。

而且因為是多個隊列的模式,每個系統(tǒng)都可以部署消費者集群來進(jìn)行數(shù)據(jù)的消費和處理,非常的方便。

八、整體架構(gòu)圖

最后,給大家來一張大圖,我們再跟著圖,來捋一捋整個流程。

如上圖所示,首先,實時計算平臺會投遞消息到“rt_compute_data”這個“exchange”里去,但是他沒指定這個exchange要路由消息到哪個隊列,因為這個他本身是不知道的。

接著數(shù)據(jù)查詢平臺、數(shù)據(jù)質(zhì)量監(jiān)控系統(tǒng)、數(shù)據(jù)鏈路追蹤系統(tǒng),就可以聲明自己的隊列,都綁定到exchange上去。

因為queue和exchange的綁定,在這里是要由訂閱數(shù)據(jù)的平臺自己指定的。而且因為這個exchange是fanout類型的,他只要接收到了數(shù)據(jù),就會路由數(shù)據(jù)到所有綁定到他的隊列里去,這樣每個隊列里都有同樣的一份數(shù)據(jù),供對應(yīng)的平臺來消費。

而且針對每個平臺自己的隊列,自己還可以部署消費服務(wù)集群來消費自己的一個隊列,自己的隊列里的數(shù)據(jù)還是會均勻分發(fā)給各個消費服務(wù)實例來?處理,每個消費服務(wù)實例會獲取到一部分的數(shù)據(jù)。

大家思考一下,這樣是不是就實現(xiàn)了不同的系統(tǒng)訂閱一份數(shù)據(jù)的“Pub/Sub”的模型?

當(dāng)然,其實RabbitMQ還支持各種不同類型的exchange,可以實現(xiàn)各種復(fù)雜的功能。?

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2022-08-08 20:46:26

架構(gòu)高并發(fā)

2022-08-08 20:48:09

MQ消息中間件系統(tǒng)解耦

2022-09-02 08:23:12

軟件開發(fā)解耦架構(gòu)

2025-05-26 02:11:00

2025-02-28 00:03:22

高并發(fā)TPS系統(tǒng)

2025-02-26 03:00:00

2022-12-27 11:06:35

海量接口并發(fā)

2025-06-05 01:22:00

SpringGateway高并發(fā)

2019-07-12 10:20:45

海量數(shù)據(jù)搭建

2019-07-15 16:02:30

大數(shù)據(jù)數(shù)據(jù)分析輿情系統(tǒng)

2020-04-02 11:16:28

Linux進(jìn)程高并發(fā)

2022-05-27 09:25:49

數(shù)據(jù)并發(fā)

2018-09-11 08:37:05

高并發(fā)服務(wù)器優(yōu)化

2019-09-27 11:54:38

RedisMySQL數(shù)據(jù)庫

2022-02-09 18:28:46

多線程Excel代碼

2025-02-14 03:00:00

2021-01-13 05:27:02

服務(wù)器性能高并發(fā)

2022-04-15 11:46:09

輕量系統(tǒng)解耦鴻蒙操作系統(tǒng)

2020-09-18 06:36:21

Linuxkernel高并發(fā)

2024-08-29 09:32:36

點贊
收藏

51CTO技術(shù)棧公眾號