偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

如果有人再問(wèn)你怎么實(shí)現(xiàn)分布式延時(shí)消息,這篇文章丟給他

開(kāi)源 分布式
我們實(shí)現(xiàn)本地延時(shí)比較簡(jiǎn)單,直接使用Java中現(xiàn)成的即可,那我們分布式消息隊(duì)列的實(shí)現(xiàn)有哪些難點(diǎn)呢?

 背景

開(kāi)源版的RocketMQ只提供了18個(gè)層級(jí)的消息隊(duì)列延時(shí),這個(gè)功能在開(kāi)源版中顯得特別雞肋,但是在阿里云中的RocketMQ卻提供了支持40天之內(nèi)任意秒級(jí)延時(shí)隊(duì)列,果然有些功能你只能充錢(qián)才能擁有。當(dāng)然你或許想換一個(gè)開(kāi)源的消息隊(duì)列,在開(kāi)源社區(qū)中消息隊(duì)列延時(shí)消息很多都沒(méi)有被支持比如:RabbitMQ,Kafka等,都只能通過(guò)一些特殊方法才能完成延時(shí)的功能。為什么這么多都沒(méi)有實(shí)現(xiàn)這個(gè)功能呢?是因?yàn)榧夹g(shù)難度比較復(fù)雜嗎?接下來(lái)我們分析一下如何才能實(shí)現(xiàn)一個(gè)延時(shí)消息。

[[285062]]

本地延時(shí)

在實(shí)現(xiàn)分布式消息隊(duì)列的延時(shí)消息之前,我們想想我們平時(shí)是如何在自己的應(yīng)用程序上實(shí)現(xiàn)一些延時(shí)功能的?在Java中可以通過(guò)下面的方式來(lái)完成我們延時(shí)功能:

  • ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,我們提交任務(wù)的時(shí)候,會(huì)將任務(wù)首先提交到DelayedWorkQueue一個(gè)優(yōu)先級(jí)隊(duì)列中,按照過(guò)期時(shí)間進(jìn)行排序,這個(gè)優(yōu)先級(jí)隊(duì)列也就是我們堆結(jié)構(gòu),每次提交任務(wù)排序的復(fù)雜度是O(logN)。然后取任務(wù)的時(shí)候就會(huì)從堆頂取出我們的任務(wù),也就是我們延遲時(shí)間最小的任務(wù)。ScheduledThreadPoolExecutor有個(gè)好處是執(zhí)行延時(shí)任務(wù)可以支持多線程并行執(zhí)行,因?yàn)樗^承的是ThreadPoolExecutor。
  • Timer:Timer也是利用優(yōu)先級(jí)隊(duì)列結(jié)構(gòu)做的,但是其沒(méi)有繼承線程池,相對(duì)來(lái)說(shuō)比較獨(dú)立,不支持多線程,只能使用單獨(dú)的一個(gè)線程。

分布式消息隊(duì)列延時(shí)

我們實(shí)現(xiàn)本地延時(shí)比較簡(jiǎn)單,直接使用Java中現(xiàn)成的即可,那我們分布式消息隊(duì)列的實(shí)現(xiàn)有哪些難點(diǎn)呢?

有很多同學(xué)首先會(huì)想到我們實(shí)現(xiàn)分布式消息隊(duì)列的延時(shí)任務(wù),可不可以直接使用本地的那一套,用ScheduledThreadPoolExecutor,Timer,當(dāng)然這是可以的,前提是你的消息量很小,但是我們分布式消息隊(duì)列往往都是企業(yè)級(jí)別的中間件,數(shù)據(jù)量都是非常的大,那么我們純內(nèi)存的方案肯定是行不通的。所以我們就有了下面這幾個(gè)方案來(lái)解決我們這個(gè)問(wèn)題。

數(shù)據(jù)庫(kù)

數(shù)據(jù)庫(kù)一般來(lái)說(shuō)是我們很容易想到的一個(gè)辦法,我們通??梢越⑾旅孢@樣一個(gè)表:

  1. CREATE TABLE `delay_message` ( 
  2.  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, 
  3.  `excute_time` bigint(16) DEFAULT NULL COMMENT '執(zhí)行時(shí)間,ms級(jí)別'
  4.  `body` varchar(4096) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '消息體'
  5.  PRIMARY KEY (`id`), 
  6.  KEY `time_index` (`excute_time`) 
  7. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; 

這個(gè)表中我們使用excute_time代表我們真實(shí)的執(zhí)行時(shí)間,并且對(duì)其建立索引,然后在我們的消息服務(wù)中,啟動(dòng)一個(gè)定時(shí)任務(wù),定時(shí)從數(shù)據(jù)庫(kù)中掃描已經(jīng)可以執(zhí)行的消息,然后開(kāi)始執(zhí)行,具體流程如下面所示:

 

如果有人再問(wèn)你怎么實(shí)現(xiàn)分布式延時(shí)消息,這篇文章丟給他

 

使用數(shù)據(jù)庫(kù)的方法是一個(gè)比較原始的方法,在沒(méi)有延時(shí)消息這個(gè)概念之前,要做一個(gè)訂單多少分鐘過(guò)期的這種功能,通常使用這個(gè)方法去完成。而這個(gè)方法通常也比較局限于我們單個(gè)業(yè)務(wù),如果想擴(kuò)展為我們企業(yè)級(jí)的一個(gè)中間件的話是不行的,因?yàn)閙ysql由于BTree的特性,會(huì)隨著維護(hù)二級(jí)索引的開(kāi)銷(xiāo)越來(lái)越大,導(dǎo)致寫(xiě)入會(huì)越來(lái)越慢,所以這個(gè)方案通常不會(huì)被考慮。

RocksDB/LevelDB

我們之前介紹RocketMQ在開(kāi)源版本中只實(shí)現(xiàn)了18個(gè)Level的延時(shí)消息,但是有很多公司基于RocketMQ做了自己的一套支持任意時(shí)間的延時(shí)消息,在美團(tuán)內(nèi)部封裝了RocketMQ使用LevelDB做了對(duì)延時(shí)消息的封裝,在滴滴開(kāi)源的DDMQ中,使用了RocksDB對(duì)RocketMQ的延時(shí)消息部分進(jìn)行了封裝。

其原理基本和Mysql類(lèi)似,如下圖所示:

 

如果有人再問(wèn)你怎么實(shí)現(xiàn)分布式延時(shí)消息,這篇文章丟給他

 

  • Step1: DDMQ發(fā)送消息的時(shí)候會(huì)有一個(gè)代理層,用于將消息做分發(fā),因?yàn)槠鋬?nèi)部有多種消息隊(duì)列,kafka,rocketMQ等等,如果是延時(shí)消息會(huì)將消息發(fā)送到RockesDB的存儲(chǔ)。
  • Step2: 通過(guò)定時(shí)任務(wù)輪訓(xùn)掃描將數(shù)據(jù)轉(zhuǎn)發(fā)投遞至RocketMQ集群。
  • Step3: 消費(fèi)者進(jìn)行消費(fèi)。

為什么同樣是數(shù)據(jù)庫(kù)RocksDB會(huì)比Mysql更加合適呢?因?yàn)镽ocksDB的特性是LSM樹(shù),其使用場(chǎng)景適用于大量寫(xiě)入,和消息隊(duì)列的場(chǎng)景更加契合,所以這個(gè)也是滴滴和美團(tuán)選擇其作為延時(shí)消息封裝的存儲(chǔ)介質(zhì)。

3.2 時(shí)間輪+磁盤(pán)存儲(chǔ)

再說(shuō)時(shí)間輪之前,讓我們?cè)俅位氐轿覀兊膶?shí)現(xiàn)本地延時(shí)的時(shí)候使用的ScheduledThreadPoolExecutor還有Timer,他們都是使用的優(yōu)先級(jí)隊(duì)列完成的,優(yōu)先級(jí)隊(duì)列本質(zhì)上也就是堆結(jié)構(gòu),堆結(jié)構(gòu)的插入的時(shí)間復(fù)雜度是O(LogN),如果未來(lái)我們的內(nèi)存可以做到無(wú)限,我們使用使用優(yōu)先級(jí)隊(duì)列去做延時(shí)消息的存儲(chǔ),但是隨著消息的增多,我們的插入消息的效率也會(huì)越來(lái)越低,那么怎么才能讓我們的插入消息的效率不隨著消息的增多而變低呢?答案就是時(shí)間輪。

什么是時(shí)間輪呢?其實(shí)我們可以簡(jiǎn)單的將其看做是一個(gè)多維數(shù)組。在很多框架中都使用了時(shí)間輪來(lái)做一些定時(shí)的任務(wù),用來(lái)替代我們的Timer,比如我之前講過(guò)的有關(guān)本地緩存Caffeine一篇文章,在Caffeine中是一個(gè)二層時(shí)間輪,也就是二維數(shù)組,其一維的數(shù)據(jù)表示較大的時(shí)間維度比如,秒,分,時(shí),天等,其二維的數(shù)據(jù)表示該時(shí)間維度較小的時(shí)間維度,比如秒內(nèi)的某個(gè)區(qū)間段。當(dāng)定位到一個(gè)TimeWhile[i][j]之后,其數(shù)據(jù)結(jié)構(gòu)其實(shí)是一個(gè)鏈表,記錄著我們的Node。在Caffeine利用時(shí)間輪記錄我們?cè)谀硞€(gè)時(shí)間過(guò)期的數(shù)據(jù),然后去處理。

 

如果有人再問(wèn)你怎么實(shí)現(xiàn)分布式延時(shí)消息,這篇文章丟給他

 

由于時(shí)間輪是一個(gè)數(shù)組的結(jié)構(gòu),那么其插入復(fù)雜度是O(1)。我們解決了效率之后,但是我們的內(nèi)存依舊不是無(wú)限的,我們時(shí)間輪如何使用呢?答案當(dāng)然就是磁盤(pán),在去哪兒開(kāi)源的QMQ中已經(jīng)實(shí)現(xiàn)了時(shí)間輪+磁盤(pán)存儲(chǔ),這里為了方便描述我將其轉(zhuǎn)化為RocketMQ中的結(jié)構(gòu)來(lái)進(jìn)行講解,實(shí)現(xiàn)圖如下:

 

如果有人再問(wèn)你怎么實(shí)現(xiàn)分布式延時(shí)消息,這篇文章丟給他

 

  • Step 1: 生產(chǎn)者投遞延時(shí)消息到CommitLog,這個(gè)時(shí)候使用了偷換Topic的那招,來(lái)達(dá)到后面的效果。
  • Step 2: 后臺(tái)有一個(gè)Reput的任務(wù)定時(shí)拉取,延時(shí)Topic相關(guān)的Message。
  • Step 3: 判斷這個(gè)Message是否在當(dāng)前時(shí)間輪范圍中,如果不在則來(lái)到Step4,如果在的話就直接將消息投遞進(jìn)入時(shí)間輪。
  • Step 4: 找到當(dāng)前消息所屬的scheduleLog,然后寫(xiě)入進(jìn)去,去哪兒默認(rèn)劃分是一個(gè)小時(shí)為一段,這里可以根據(jù)業(yè)務(wù)自行調(diào)整。
  • Step 5:時(shí)間輪會(huì)定時(shí)預(yù)加載下個(gè)時(shí)間段的scheduleLog到內(nèi)存。
  • Step 6: 到點(diǎn)的消息會(huì)還原topic再次投遞到CommitLog,如果投遞成功這里會(huì)記錄dispatchLog。記錄的原因是因?yàn)闀r(shí)間輪是內(nèi)存的,你不知道已經(jīng)執(zhí)行到哪個(gè)位置了,如果執(zhí)行到最后最后1s鐘的時(shí)候掛了,這段時(shí)間輪之前的所有數(shù)據(jù)又得重新加載,這里是用來(lái)過(guò)濾已經(jīng)投遞過(guò)的消息。

時(shí)間輪+磁盤(pán)存儲(chǔ)我個(gè)人覺(jué)得比上面的RocksDB要更加正統(tǒng)一點(diǎn),不依賴其他的中間件就可以完成,可用性自然也就更高,當(dāng)然阿里云的RocketMQ具體怎么實(shí)現(xiàn)的這個(gè)兩種方案都有可能。

3.3 redis

在社區(qū)中也有很多公司使用的Redis做的延時(shí)消息,在Redis中有一個(gè)數(shù)據(jù)結(jié)構(gòu)是Zest,也就是有序集合,他可以實(shí)現(xiàn)類(lèi)似我們的優(yōu)先級(jí)隊(duì)列的功能,同樣的他也是堆結(jié)構(gòu),所以插入算法復(fù)雜度依然是O(logN),但是由于Redis足夠快,所以這一塊可以忽略。(這塊沒(méi)有做對(duì)比的基準(zhǔn)測(cè)試,只是猜測(cè))。有同學(xué)會(huì)問(wèn),redis不是純內(nèi)存的k,v嗎,同樣的應(yīng)該也會(huì)受到內(nèi)存限制啊,為什么還會(huì)選擇他呢?

其實(shí)在這個(gè)場(chǎng)景中,Redis是很容易水平擴(kuò)展的當(dāng)一個(gè)Redis內(nèi)存不夠,這里可以使用兩個(gè)甚至更多,來(lái)滿足我們的需要,redis延時(shí)消息的原理圖(原圖出自:https://www.cnblogs.com/lylife/p/7881950.html)如下:

 

如果有人再問(wèn)你怎么實(shí)現(xiàn)分布式延時(shí)消息,這篇文章丟給他

 

  • Delayed Messages Pool: Redis Hash結(jié)構(gòu),key為消息ID,value為具體的message,當(dāng)然這里也可以用磁盤(pán)或者數(shù)據(jù)庫(kù)代替。這里主要存儲(chǔ)我們所有消息的內(nèi)容。
  • Delayed Queue: ZSET數(shù)據(jù)結(jié)構(gòu),value為消息ID,score為執(zhí)行時(shí)間,這里Delayed Queue可以水平擴(kuò)展從而增加我們可以支持的數(shù)據(jù)量。
  • Worker Thread Pool: 其中有多個(gè)Worker,可以部署在多個(gè)機(jī)器上形成一個(gè)集群,集群中的所有Worker通過(guò)ZK進(jìn)行協(xié)調(diào),分配Delayed Queue。

我們?cè)趺床拍苤繢elayed Queue中的消息到期了呢?這里有兩種方法:

  • 每個(gè)Worker定時(shí)掃描,ZSET的最小執(zhí)行時(shí)間,如果到了就取出,這個(gè)方法在消息少的時(shí)候特別浪費(fèi)資源,在消息量多的時(shí)候,由于輪訓(xùn)不及時(shí)導(dǎo)致延時(shí)的時(shí)間不準(zhǔn)確。
  • 因?yàn)榈谝粋€(gè)方法問(wèn)題比較多,所以這里借鑒了Timer中的一些思想,通過(guò)wait-notify可以達(dá)到一個(gè)比較好的延時(shí)效果,并且資源也不會(huì)浪費(fèi),第一次的時(shí)候還是獲取ZSET中最小的時(shí)間,然后wait(執(zhí)行時(shí)間-當(dāng)前時(shí)間),這樣就不需要浪費(fèi)資源到達(dá)時(shí)間時(shí)會(huì)自動(dòng)響應(yīng),如果當(dāng)前ZSET有新的消息進(jìn)入,并且比我們等待的消息還要小,那么直接notify喚醒,重新獲取這個(gè)更小的消息,然后又wait,如此循環(huán)。

總結(jié)

本文介紹了三種方式實(shí)現(xiàn)分布式延時(shí)消息,希望能在你實(shí)現(xiàn)自己的延遲消息的時(shí)候提供一點(diǎn)思路??偟膩?lái)說(shuō)可能前兩種方法來(lái)說(shuō)適用面更加廣一點(diǎn),畢竟在RocketMQ這些大型的消息隊(duì)列中間件,還有一些其他的集成功能,比如順序消息,事務(wù)消息等,延時(shí)消息可能更加傾向于是分布式消息隊(duì)列中的一個(gè)功能,而不是作為一個(gè)獨(dú)立的組件存在。當(dāng)然其中還有一些細(xì)節(jié)并沒(méi)有一一介紹,具體細(xì)節(jié)可以去參考QMQ和DDMQ的源碼。

責(zé)任編輯:武曉燕 來(lái)源: 今日頭條
相關(guān)推薦

2018-12-07 09:31:52

分布式鎖服務(wù)框架分布式系統(tǒng)

2021-02-22 13:32:19

MySQLSQL索引

2022-09-26 10:09:08

MVCC控制并發(fā)

2018-08-07 16:01:32

synchronizevolatilefinal

2022-11-08 09:33:36

訂單系統(tǒng)電商

2020-04-20 13:11:21

HashMap底層存儲(chǔ)

2020-04-28 09:15:58

HashMapJava數(shù)組

2020-12-11 08:23:06

JavaMemory Mode內(nèi)存模型

2018-07-03 14:54:25

Java內(nèi)存模型

2019-09-19 14:03:32

B樹(shù)節(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu)

2021-03-08 10:25:37

MySQL數(shù)據(jù)庫(kù)索引

2021-03-08 12:47:42

MySQL查詢數(shù)據(jù)

2022-05-23 09:41:27

分庫(kù)分表數(shù)據(jù)庫(kù)算法

2021-05-31 09:42:48

MySQL隔離級(jí)別

2023-12-11 08:32:58

數(shù)據(jù)庫(kù)DruidDBA

2020-12-21 14:58:25

分布式性能系統(tǒng)

2018-10-12 09:42:00

分布式鎖 Java多線

2022-10-21 16:39:56

JDK優(yōu)化

2017-11-02 15:04:15

2019-04-15 14:40:46

消息隊(duì)列Java編程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)