偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

RocketMQ 用法詳解,你學(xué)會(huì)了嗎?

開發(fā) 架構(gòu)
正常情況下生產(chǎn)者組是沒有作用的,但是在發(fā)送事務(wù)消息時(shí),如果producer中途意外宕機(jī),broker會(huì)主動(dòng)回調(diào)producer group 內(nèi)的任意一臺(tái)機(jī)器來(lái)確認(rèn)事務(wù)的狀態(tài)。

大家好,我是指北君。

圖片

消息中間件是我們工作中使用最頻繁的一類中間件,它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一。當(dāng)今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發(fā)RocketMQ等。今天,指北君就來(lái)詳細(xì)講講RocketMQ生產(chǎn)者和消費(fèi)者在使用時(shí)的一些注意事項(xiàng)。

一. 生產(chǎn)者

1.1 發(fā)送消息注意事項(xiàng)

1)消息大小

建議消息大小不要超過(guò)512K。

2)異步發(fā)送

默認(rèn)的發(fā)送為同步發(fā)送,send方法會(huì)一直阻塞,等待broker端的響應(yīng)。如果你關(guān)注性能問(wèn)題,可以通過(guò)send(msg, callback)來(lái)發(fā)起異步調(diào)用。

3)生產(chǎn)者組

正常情況下生產(chǎn)者組是沒有作用的,但是在發(fā)送事務(wù)消息時(shí),如果producer中途意外宕機(jī),broker會(huì)主動(dòng)回調(diào)producer group 內(nèi)的任意一臺(tái)機(jī)器來(lái)確認(rèn)事務(wù)的狀態(tài)。(目前開源版本還不支持事務(wù)消息)。

4)線程安全問(wèn)題

生產(chǎn)者實(shí)例是線程安全的,在應(yīng)用中只需要實(shí)例化一次即可。

5)性能問(wèn)題

如果你希望在一個(gè)jvm進(jìn)程內(nèi)使用多個(gè)producer實(shí)例來(lái)提高發(fā)送能,我們建議:

使用異步發(fā)送,并且producer實(shí)例只需要3 ~ 5個(gè)即可 對(duì)每一個(gè)producer 調(diào)用 setInstanceName,區(qū)別不同的生產(chǎn)者。

6)發(fā)送超時(shí)時(shí)間

當(dāng)客戶端向broker發(fā)送請(qǐng)求超時(shí)時(shí),客戶端會(huì)拋出 RemotingTimeoutException,默認(rèn)的超時(shí)時(shí)間是3秒。通過(guò)調(diào)用send(msg, timeout) 可以設(shè)置超時(shí)時(shí)間。超時(shí)時(shí)間建議不要設(shè)置過(guò)小,因?yàn)?broker 可能需要時(shí)間刷盤或向 slave 同步數(shù)據(jù)。

7)對(duì)于同一個(gè)應(yīng)用最好只使用一個(gè)Topic,消息的子類型可以使用 tags 來(lái)標(biāo)識(shí),tags 可以由應(yīng)用自由設(shè)置。當(dāng)發(fā)送的消息設(shè)置了 tags 時(shí),消費(fèi)方在訂閱消息時(shí)可以使用 tags 在 broker 做消息過(guò)濾。注意這里的命名雖然是復(fù)數(shù),但是一條消息只能有一個(gè)tag。

8)消息在業(yè)務(wù)層面的唯一標(biāo)識(shí)可以設(shè)置到 keys 字段,方便根據(jù) keys 來(lái)定位消息。broker 會(huì)為每個(gè)消息創(chuàng)建索引(哈希索引),應(yīng)用可以通過(guò)topic 、key 查詢這條消息的內(nèi)容(MessageExt),以及消息被誰(shuí)消費(fèi)(MessageTrack,精確到consumer group)。由于是哈希索引,請(qǐng)盡量保證key 的唯一,這樣可以避免潛在的哈希沖突。

9)消息發(fā)送不管是成功還是失敗都要打印消息日志,日志內(nèi)容務(wù)必包含 sendResult 和 key 字段。

10)對(duì)于消息不可丟失的應(yīng)用,務(wù)必要有消息重發(fā)機(jī)制。例如如果消息發(fā)送失敗,可以將消息存儲(chǔ)到數(shù)據(jù)庫(kù),然后通過(guò)定時(shí)程序或者人工的方式觸發(fā)重發(fā)。

11)調(diào)用send 同步發(fā)送消息時(shí),假定此時(shí)設(shè)置了 isWaitStoreMsgOK=true(default is true),只要不拋出異常就代表發(fā)送成功,但當(dāng) isWaitStoreMsgOK = false 時(shí),發(fā)送永遠(yuǎn)返回 SEND_OK。但是對(duì)于發(fā)送“成功”會(huì)有多個(gè)狀態(tài),在 SendStatus 中定義如下:

FLUSH_DISK_TIMEOUT

如果 broker 設(shè)置的 FlushDiskType = SYNC_FLUSH,當(dāng) broker 的在刷盤超時(shí)時(shí)(MessageStoreConfig.syncFlushTimeout,默認(rèn)5秒)會(huì)返回該狀態(tài)。此時(shí)消息任然保存在內(nèi)存中,只有broker 宕機(jī)時(shí)消息才會(huì)丟失。

FLUSH_SLAVE_TIMEOU

如果 broker 的 role 是 SYNC_MASTER,當(dāng) slave 同步數(shù)據(jù)的時(shí)間超過(guò)了 MessageStoreConfig.syncFlushTimeout (默認(rèn)5秒) 時(shí)會(huì)返回此狀態(tài)。此時(shí)只有主從都宕機(jī),并且主也沒有刷盤時(shí),消息才會(huì)丟失。

SLAVE_NOT_AVAILABLE

如果 broker 的 role 是 SYNC_MASTER,并且此時(shí) slave 不可用時(shí)會(huì)返回該狀態(tài)。

SEND_OK

發(fā)送成功。為了保證消息不丟失還需要配置 SYNC_MASTER or SYNC_FLUSH。

12)消息重復(fù)

當(dāng)發(fā)送消息時(shí)返回 FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT,若非常不幸的 broker 也宕機(jī)了,消息將會(huì)丟失。此時(shí)如果什么都不做,消息可能會(huì)丟失,如果重發(fā)消息,消息可能會(huì)出現(xiàn)重復(fù)。

通常我們建議發(fā)送端重發(fā)消息,由消費(fèi)方來(lái)保證消息消費(fèi)的冪等性。

1.2 消息發(fā)送失敗如何處理

Producer 的 send 方法本生支持內(nèi)部重試,重試邏輯如下:

至多重試3次 如果發(fā)送失敗,則輪轉(zhuǎn)到下一個(gè)broker 這個(gè)方法的總耗時(shí)時(shí)間不超過(guò) sendMsgTimeout,默認(rèn)3秒 所以發(fā)送消息已經(jīng)產(chǎn)生超時(shí)異常的話就不會(huì)再重試。以上策略仍不能保證消息發(fā)送一定成功,為保證消息發(fā)送一定成功,建議應(yīng)用這么做:如果調(diào)用 send 同步發(fā)送失敗,則嘗試將消息存儲(chǔ)到db,由后臺(tái)線程定時(shí)重試,保證消息一定到達(dá) Broker。

1.3 oneway 的發(fā)送形式

對(duì)于可靠性要求不高的應(yīng)用,可以采用 oneway 的發(fā)送形式,oneway 形式不等待應(yīng)答。

1.4 發(fā)送順序消息

順序消息分為分區(qū)有序和全局有序。

分區(qū)有序要求 producer 在send 時(shí)傳入 MessageQueueSelector 的實(shí)現(xiàn)類,最終將某一類消息發(fā)送到同一隊(duì)列。但是一旦發(fā)生通信異常、broker 重啟等,由于隊(duì)列總數(shù)發(fā)生變化,哈希取模后定位的隊(duì)列會(huì)變化,會(huì)產(chǎn)生短暫的順序不一致。如果業(yè)務(wù)能容忍在集群異常情況下(如某個(gè) broker 宕機(jī)或者重啟)消息短暫的亂序,使用分區(qū)有序比較合適。

全局嚴(yán)格有序的消息即便在異常情況下也能保證消息的有序性,但是卻犧牲了分布式的 failover 特性,即 broker 集群中只有要一臺(tái)機(jī)器不可用,則整個(gè)集群都不可用,服務(wù)可用性會(huì)大大降低。

順序消息的缺點(diǎn):

發(fā)送順序消息無(wú)法利用集群的 FailOver 特性 消費(fèi)順序消息的并行度依賴于隊(duì)列數(shù)量 隊(duì)列熱點(diǎn)問(wèn)題,個(gè)別隊(duì)列由于哈希不均導(dǎo)致消息過(guò)多,消費(fèi)速度跟不上,產(chǎn)生消費(fèi)堆積問(wèn)題 遇到消費(fèi)失敗的消息,無(wú)法跳過(guò),當(dāng)前隊(duì)列需要暫停 5.發(fā)送事務(wù)消息 目前暫不支持。

二. 消費(fèi)者

2.1 消費(fèi)者組和訂閱

不同的消費(fèi)者組可以獨(dú)立消費(fèi)相同的topic,這點(diǎn)類似于ActiveMQ的虛擬 topic. 另外對(duì)于相同的消費(fèi)者組,需要確保組內(nèi)的消費(fèi)者訂閱消息的規(guī)則是一致的!

MQ 里的一個(gè)Consumer Group 代表一個(gè) Consumer 實(shí)例群組。對(duì)于大多數(shù)分布式應(yīng)用來(lái)說(shuō),一個(gè) Consumer Group 下通常會(huì)掛載多個(gè) Consumer 實(shí)例。訂閱關(guān)系一致指的是同一個(gè) Consumer Group 下所有 Consumer 實(shí)例的處理邏輯必須完全一致。一旦訂閱關(guān)系不一致,消息消費(fèi)的邏輯就會(huì)混亂,甚至導(dǎo)致消息丟失。

由于 MQ 的訂閱關(guān)系主要由 Topic+Tag 共同組成,因此,保持訂閱關(guān)系一致意味著同一個(gè) Consumer Group 下所有的實(shí)例需在以下兩方面均保持一致:

訂閱的 Topic 必須一致;訂閱的 Topic 中的 Tag 必須一致。

技術(shù)架構(gòu) > Consumer 最佳實(shí)踐 > image2017-11-15 15:50:13.png

2.2 MessageListener

1)順序消費(fèi) MessageListenerOrderly

順序消費(fèi)時(shí)消費(fèi)者會(huì)鎖定隊(duì)列,以確保消息被順序消費(fèi),但是這樣也會(huì)造成一定的性能損耗。當(dāng)消費(fèi)出現(xiàn)異常的時(shí)候,建議不要拋出異常,而是返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,讓消費(fèi)暫停一會(huì),暫停時(shí)間由 context.setSuspendCurrentQueueTimeMillis 方法指定。

2)并發(fā)消費(fèi)

并發(fā)消費(fèi)是推薦的消費(fèi)方式,在此種模式下,消息將被并發(fā)的消費(fèi)。消費(fèi)出現(xiàn)異常時(shí)不建議拋出異常,只需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可。為了保證消息肯定被至少消費(fèi)一次,消息將會(huì)被重發(fā)回 broker (topic不是原topic而是這個(gè)消費(fèi)組的RETRY topic),在延遲的某個(gè)時(shí)間點(diǎn)(默認(rèn)是10秒,業(yè)務(wù)可設(shè)置,通過(guò) delayLevelWhenNextConsume 和 MessageStoreConfig.messageDelayLevel 設(shè)置)后,再次投遞到這個(gè) ConsumerGroup,而如果一直這樣重復(fù)消費(fèi)都持續(xù)失敗到一定次數(shù)(默認(rèn)是16次,DefaultMQPushConsumer.maxReconsumeTimes),就會(huì)投遞到DLQ隊(duì)列。應(yīng)用可以監(jiān)控死信隊(duì)列來(lái)做人工干預(yù)。

3)返回狀態(tài)

在并行消費(fèi)時(shí)可以通過(guò)返回 RECONSUME_LATER 來(lái)告訴 Consumer 當(dāng)前無(wú)法消費(fèi)該消息,等延時(shí)一段時(shí)間再重新消費(fèi),但是此時(shí)消費(fèi)不會(huì)停止,你可以繼續(xù)消費(fèi)其他消息。但在順序消費(fèi)時(shí),因?yàn)橐WC消費(fèi)的順序性,所以你不能跳過(guò)失敗的消息,此時(shí)你可以通過(guò)返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 來(lái)告訴 Consumer 先暫停一會(huì)。

4)阻塞

不建議阻塞Listener,因?yàn)檫@會(huì)阻塞住線程池,同時(shí)也有可能造成消費(fèi)者線程終止。

2.3 線程數(shù)

consumer 內(nèi)部通過(guò)一個(gè) ThreadPoolExecutor 來(lái)消費(fèi)消息,可以通過(guò) setConsumeThreadMin 和 setConsumeThreadMax 來(lái)改變線程池的大小。

2.4 ConsumeFromWhere

當(dāng)新實(shí)例啟動(dòng)的時(shí)候,PushConsumer會(huì)拿到本消費(fèi)組broker已經(jīng)記錄好的消費(fèi)進(jìn)度(consumer offset),按照這個(gè)進(jìn)度發(fā)起自己的第一次Pull請(qǐng)求。

如果這個(gè)消費(fèi)進(jìn)度在Broker并沒有存儲(chǔ)起來(lái),證明這個(gè)是一個(gè)全新的消費(fèi)組,這時(shí)候客戶端有幾個(gè)策略可以選擇:

CONSUME_FROM_LAST_OFFSET //默認(rèn)策略,從該隊(duì)列最尾開始消費(fèi),即跳過(guò)歷史消息。

CONSUME_FROM_FIRST_OFFSET //從隊(duì)列最開始開始消費(fèi),即歷史消息(還儲(chǔ)存在broker的)全部消費(fèi)一遍。

CONSUME_FROM_TIMESTAMP//從某個(gè)時(shí)間點(diǎn)開始消費(fèi),和setConsumeTimestamp()配合使用,默認(rèn)是半個(gè)小時(shí)以前 注意:這些配置只對(duì)全新的消費(fèi)組有效,老的消費(fèi)組都是按已經(jīng)存儲(chǔ)過(guò)的消費(fèi)進(jìn)度繼續(xù)消費(fèi)。

對(duì)于老消費(fèi)組想跳過(guò)歷史消息可以采用以下幾種方法:

1)判斷消息的發(fā)送時(shí)間,太老的消息直接返回 CONSUME_SUCCESS。

2)判斷消息的 offset 和 MAX_OFFSET 的差距,如果落后太多,可以直接。返回 CONSUME_SUCCESS。

3)消費(fèi)者啟動(dòng)前,先調(diào)整該消費(fèi)組的消費(fèi)進(jìn)度,再開始消費(fèi)。可以人工使用命令 resetOffsetByTimeStamp,詳見 ResetOffsetByTimeCommand.java。

2.5 消息冪等

由于 RocketMQ 無(wú)法避免消費(fèi)重復(fù),所以如果業(yè)務(wù)對(duì)消息重復(fù)非常敏感,務(wù)必在業(yè)務(wù)層面去重。

2.6 消費(fèi)速度慢處理方式

1)提高消費(fèi)并行度

大部分消息消費(fèi)行為都屬于 IO 密集型業(yè)務(wù),適當(dāng)?shù)奶岣卟l(fā)度可以顯著的改善消費(fèi)的吞吐量。

2)批量方式消費(fèi)

默認(rèn)情況下 consumer 的 consumeMessageBatchMaxSize 為1,即一次只消費(fèi)一個(gè)消息,如果應(yīng)用可以批量消費(fèi)消息,則可以很大程度上提高消費(fèi)吞吐量。

3)跳過(guò)非重要消息

當(dāng)消堆積嚴(yán)重時(shí)可以丟棄不重要的消息。

4)優(yōu)化消息消費(fèi)過(guò)程

2.7 打印消費(fèi)日志

建議在消費(fèi)入口方法打印消息,方便后續(xù)排查問(wèn)題,消費(fèi)失敗時(shí)也打印失敗日志。

2.8 利用broker過(guò)濾消息,避免多余的消息傳輸

三. 小結(jié)

好了,RocketMQ生產(chǎn)者與消費(fèi)者的使用事項(xiàng)就總結(jié)完畢了,相信大家對(duì)RocketMQ的使用應(yīng)該會(huì)更有信心了。

責(zé)任編輯:武曉燕 來(lái)源: Java技術(shù)指北
相關(guān)推薦

2024-02-04 00:00:00

Effect數(shù)據(jù)組件

2024-01-02 12:05:26

Java并發(fā)編程

2024-10-11 09:15:33

2023-07-03 07:20:50

2022-12-06 07:53:33

MySQL索引B+樹

2023-03-26 22:31:29

2022-12-06 08:37:43

2022-04-26 08:41:54

JDK動(dòng)態(tài)代理方法

2023-08-08 08:23:08

Spring日志?線程池

2022-04-13 09:01:45

SASSCSS處理器

2023-09-06 11:31:24

MERGE用法SQL

2024-09-10 10:34:48

2024-12-31 00:08:37

C#語(yǔ)言dynamic?

2023-03-09 07:38:58

static關(guān)鍵字狀態(tài)

2024-08-12 08:12:38

2023-05-18 09:01:11

MBRGPT分區(qū)

2024-10-12 10:25:15

2024-01-19 08:25:38

死鎖Java通信

2023-07-26 13:11:21

ChatGPT平臺(tái)工具

2023-01-10 08:43:15

定義DDD架構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)