偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

基于Topic的消息發(fā)布與消費(fèi)模式

開(kāi)發(fā)
基于Topic的消息發(fā)布與消費(fèi)模式,能夠?qū)⑾M(fèi)者和生產(chǎn)者完全解耦,相對(duì)RabbitMQ中的所支持的靈活處理消息的方式,更加簡(jiǎn)單且易于理解,這也是Kafka的消息處理機(jī)制。

閑話

朋友們,好久不見(jiàn),不知道你們最近怎樣,但相信你們一定都挺好。已經(jīng)有一段時(shí)間沒(méi)有更新了,個(gè)中原因不好細(xì)說(shuō),但是歸根結(jié)底也許是自己懶。這個(gè)不好,大家不要學(xué)。今天主要就是想分享一下關(guān)于消息處理機(jī)制的一些想法。

基本概念

1.Topic

同一個(gè)topic下消息的格式一致,例如topic為order-update-message消息的格式都是一個(gè)統(tǒng)一的OrderUpdateMessage的結(jié)構(gòu)

2.key主鍵

同一主鍵下的消息列表具有順序性,例如key為訂單號(hào)order-0001的消息列表(Queue)下,可能包含的消息列表(Queue)如下:

OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="modifying", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

3.Group消費(fèi)者組

同一個(gè)topic下同一個(gè)group下的消費(fèi)者,對(duì)這個(gè)group下的消息隊(duì)列進(jìn)行搶占式消費(fèi)。例如同一個(gè)消費(fèi)者組group-1下的消費(fèi)者consumer-1和消費(fèi)者consumer-2,以及另外一個(gè)消費(fèi)者組group-2下的消費(fèi)者consumer-3,消息消費(fèi)的結(jié)果可能如下:

// consumer-1消費(fèi)的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)

// consumer-2消費(fèi)的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

// consumer-3消費(fèi)的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

Kafka的消息處理機(jī)制就是以這樣的形式實(shí)現(xiàn)的。

4.優(yōu)勢(shì) 

生產(chǎn)者和消費(fèi)者完全解耦,生產(chǎn)者無(wú)需關(guān)注是否有消費(fèi)者在消費(fèi),消費(fèi)者也無(wú)需知道生產(chǎn)者是否在生成新的消息。

生產(chǎn)者只關(guān)注消息是否成功的發(fā)送到消息處理中間件,消費(fèi)者只關(guān)注能否從消息處理中間件消費(fèi)到消息。

消費(fèi)者可以按組消費(fèi),同組內(nèi)的消費(fèi)者進(jìn)行搶占式消費(fèi)。

RabbitMq中的優(yōu)秀實(shí)踐

1.RabbitMq消息處理機(jī)制

生產(chǎn)者講帶有指定RoutingKey的消息發(fā)送到對(duì)應(yīng)的Exchange上,Exchange通過(guò)Binding定義的路由規(guī)格,將消息按照BindingKey分發(fā)到不同的Queue上,消費(fèi)者從Queue拉取消息消費(fèi)。

  • Exchange & RoutingKey & Topic:RoutingKey決定了消息會(huì)被發(fā)送到哪個(gè)Exchange上,這和topic是類似的概念。
  • Bind & BindingKey & Group:Exchange根據(jù)Binding定義的路由規(guī)格,將消息按照BindingKey分發(fā)到不同的Queue上,這里可以認(rèn)為是對(duì)應(yīng)了Group的概念。
  • Queue & Group:Queue則是維護(hù)了一個(gè)Group下的某個(gè)隊(duì)列下的所有消息。

優(yōu)秀實(shí)踐

因此如果要以RabbitMq實(shí)現(xiàn)基于Topic和Group實(shí)現(xiàn)的消息生產(chǎn)和消費(fèi)的機(jī)制,可以將消息定義成以下類似的結(jié)構(gòu):

// Exchange: {value="order-update", type="fanout"}
// binding1: {value="promotion-service", bindingKey="order.*.paid"}
// binding2: {value='inventory-service', bindingKey="order.*"}
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)

假設(shè)此時(shí)有promotion-service(1個(gè)實(shí)例)和inventory-service(2個(gè)實(shí)例)兩個(gè)消費(fèi)者消費(fèi)消息,則對(duì)應(yīng)的消息消費(fèi)的結(jié)果可能是:

// inventory-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value=Queue('inventory-service'), bindingKey="order.*"}
// inventory-service實(shí)例1消費(fèi)到的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)
// inventory-service實(shí)例2消費(fèi)到的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

// promotion-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value="promotion-service", bindingKey="order.*.paid"}
// promotion-service實(shí)例1消費(fèi)到的消息
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)

總結(jié)

RabbitMQ的Exchange支持不同類型(Direct, Fanout, Topic, Headers),以及Binding可以對(duì)消息以更靈活的通配符的方式將消息分發(fā)到對(duì)應(yīng)的Queue上,因此其消息處理機(jī)制更加靈活。

基于Topic的消息發(fā)布與消費(fèi)模式,能夠?qū)⑾M(fèi)者和生產(chǎn)者完全解耦,相對(duì)RabbitMQ中的所支持的靈活處理消息的方式,更加簡(jiǎn)單且易于理解,這也是Kafka的消息處理機(jī)制。

通過(guò)對(duì)比不同的中間件的消息處理機(jī)制也許能找到更好的實(shí)踐方式。

責(zé)任編輯:趙寧寧 來(lái)源: FrenziedJavaLand
相關(guān)推薦

2022-10-28 13:33:05

Push模式互聯(lián)網(wǎng)高并發(fā)

2024-06-05 06:37:19

2022-12-13 08:39:53

Kafka存儲(chǔ)檢索

2023-08-24 09:01:25

消息拉取RocketMQ

2020-07-27 08:44:22

存儲(chǔ)Kafka 流程

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2023-11-10 09:22:06

2022-09-25 12:53:36

RocketMQtopic

2023-06-29 10:10:06

Rocket MQ消息中間件

2024-07-29 08:34:18

C++訂閱者模式線程

2023-12-04 08:24:23

2021-06-04 11:33:50

消息技巧排查

2021-03-01 07:31:53

消息支付高可用

2021-06-01 06:59:58

運(yùn)維Jira設(shè)計(jì)

2023-11-27 17:29:43

Kafka全局順序性

2019-12-19 15:09:19

微信深色模式消息引用

2009-01-15 10:43:21

SCDMA通信產(chǎn)業(yè)McWiLL

2013-06-04 17:14:42

戴爾

2024-03-20 08:33:00

Kafka線程安全Rebalance

2024-12-18 07:43:49

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)