偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

消息的存儲-RocketMQ知識體系之三

存儲 存儲軟件
CommitLog是消息存儲文件,所有消息主題的消息都存儲在CommitLog文件中;該文件默認最大為1GB,超過1GB后會輪到下一個CommitLog文件。

[[409969]]

RocketMQ存儲概要設計

RocketMQ主要存儲的文件包括commitlog文件、consumeQueue文件、IndexFile文件。

CommitLog是消息存儲文件,所有消息主題的消息都存儲在CommitLog文件中;該文件默認最大為1GB,超過1GB后會輪到下一個CommitLog文件。通過CommitLog,RocketMQ將所有消息存儲在一起,以順序IO的方式寫入磁盤,充分利用了磁盤順序?qū)憸p少了IO爭用提高數(shù)據(jù)存儲的性能。

RocketMQ的Broker機器磁盤上的文件存儲結(jié)構(gòu)

【CommitLog】

消息在CommitLog中的存儲格式如下:

存儲所有消息內(nèi)容,寫滿一個文件后生成新的 commitlog 文件。所有 topic 的數(shù)據(jù)存儲在一起,邏輯視圖如下:

CommitLog代碼

  1. private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); 
  2.     /** 
  3.      * MAGIC_CODE - MESSAGE 
  4.      * Message's MAGIC CODE daa320a7 
  5.      * 標記某一段為消息,即:[msgId, MESSAGE_MAGIC_CODE, 消息] 
  6.      */ 
  7.     public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; 
  8.     /** 
  9.      * MAGIC_CODE - BLANK 
  10.      * End of file empty MAGIC CODE cbd43194 
  11.      * 標記某一段為空白,即:[msgId, BLANK_MAGIC_CODE, 空白] 
  12.      * 當CommitLog無法容納消息時,使用該類型結(jié)尾 
  13.      */ 
  14.     private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8; 
  15.     /** 
  16.      * 映射文件隊列 
  17.      */ 
  18.     private final MappedFileQueue mappedFileQueue; 
  19.     /** 
  20.      * 消息存儲 
  21.      */ 
  22.     private final DefaultMessageStore defaultMessageStore; 
  23.     /** 
  24.      * flush commitLog 線程服務 
  25.      */ 
  26.     private final FlushCommitLogService flushCommitLogService; 
  27.     /** 
  28.      * If TransientStorePool enabled, we must flush message to FileChannel at fixed periods 
  29.      * commit commitLog 線程服務 
  30.      */ 
  31.     private final FlushCommitLogService commitLogService; 
  32.     /** 
  33.      * 寫入消息到Buffer Callback 
  34.      */ 
  35.     private final AppendMessageCallback appendMessageCallback; 
  36.     /** 
  37.      * topic消息隊列 與 offset 的Map 
  38.      */ 
  39.     private HashMap<String/* topic-queue_id */, Long/* offset */> topicQueueTable = new HashMap<>(1024); 
  40.     /** 
  41.      * TODO 
  42.      */ 
  43.     private volatile long confirmOffset = -1L; 
  44.     /** 
  45.      * 當前獲取lock時間。 
  46.      * 如果當前解鎖,則為0 
  47.      */ 
  48.     private volatile long beginTimeInLock = 0; 
  49.     /** 
  50.      * true: Can lock, false : in lock. 
  51.      * 添加消息 螺旋鎖(通過while循環(huán)實現(xiàn)) 
  52.      */ 
  53.     private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); 
  54.     /** 
  55.      * 添加消息重入鎖 
  56.      */ 
  57.     private ReentrantLock putMessageNormalLock = new ReentrantLock(); // Non fair Sync 

【ConsumeQueue】

ConsumeQueue是消息消費隊列文件,消息達到commitlog文件后將被異步轉(zhuǎn)發(fā)到消息消費隊列,供消息消費者消費;一個ConsumeQueue表示一個topic的一個queue,類似于kafka的一個partition,但是rocketmq在消息存儲上與kafka有著非常大的不同,RocketMQ的ConsumeQueue中不存儲具體的消息,具體的消息由CommitLog存儲,ConsumeQueue中只存儲路由到該queue中的消息在CommitLog中的offset,消息的大小以及消息所屬的tag的hash(tagCode),一共只占20個字節(jié),整個數(shù)據(jù)包如下:

ConsumeQueue代碼

  1. public static final int CQ_STORE_UNIT_SIZE = 20; 
  2.     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); 
  3.     private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); 
  4.  
  5.     private final DefaultMessageStore defaultMessageStore; 
  6.     /** 
  7.      * 映射文件隊列 
  8.      */ 
  9.     private final MappedFileQueue mappedFileQueue; 
  10.     /** 
  11.      * Topic 
  12.      */ 
  13.     private final String topic; 
  14.     /** 
  15.      * 隊列編號 
  16.      */ 
  17.     private final int queueId; 
  18.     /** 
  19.      * 消息位置信息ByteBuffer 
  20.      */ 
  21.     private final ByteBuffer byteBufferIndex; 
  22.     /** 
  23.      * 文件存儲地址 
  24.      */ 
  25.     private final String storePath; 
  26.     /** 
  27.      * 每個映射文件大小 
  28.      */ 
  29.     private final int mappedFileSize; 
  30.     /** 
  31.      * 最大重放消息commitLog存儲位置 
  32.      */ 
  33.     private long maxPhysicOffset = -1; 
  34.     private volatile long minLogicOffset = 0; 

Consume Queue文件組織,如圖所示:

Consume Queue文件組織示意圖

  • 根據(jù)topic和queueId來組織文件,圖中TopicA有兩個隊列0,1,那么TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另一個ConsumeQueue。
  • 按照消費端的GroupName來分組重試隊列,如果消費端消費失敗,消息將被發(fā)往重試隊列中,比如圖中的%RETRY%ConsumerGroupA。
  • 按照消費端的GroupName來分組死信隊列,如果消費端消費失敗,并重試指定次數(shù)后,仍然失敗,則發(fā)往死信隊列,比如圖中的%DLQ%ConsumerGroupA。

死信隊列(Dead Letter Queue)一般用于存放由于某種原因無法傳遞的消息,比如處理失敗或者已經(jīng)過期的消息。

【IndexFile】

IndexFile是消息索引文件,主要存儲的是key和offset的對應關系。

IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法。

文件名fileName是以創(chuàng)建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統(tǒng)中實現(xiàn)HashMap結(jié)構(gòu),故rocketmq的索引文件其底層實現(xiàn)為hash索引。

  1. private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); 
  2.     private static int hashSlotSize = 4; 
  3.     private static int indexSize = 20; 
  4.     private static int invalidIndex = 0; 
  5.     private final int hashSlotNum; 
  6.     private final int indexNum; 
  7.     private final MappedFile mappedFile; 
  8.     private final FileChannel fileChannel; 
  9.     private final MappedByteBuffer mappedByteBuffer; 
  10.     private final IndexHeader indexHeader; 

IndexFile的存儲結(jié)構(gòu):

從上面的分析可以看出,RocketMQ采用的是混合型的存儲結(jié)構(gòu),即為Broker單個實例下所有的隊列共用一個日志數(shù)據(jù)文件(即為CommitLog)來存儲。RocketMQ的混合型存儲結(jié)構(gòu)(多個Topic的消息實體內(nèi)容都存儲于一個CommitLog中)針對Producer和Consumer分別采用了數(shù)據(jù)和索引部分相分離的存儲結(jié)構(gòu),Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。

只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發(fā)送的消息就不會丟失。

正因為如此,Consumer也就肯定有機會去消費這條消息。當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內(nèi)有新消息到達,將直接返回給消費端。這里,RocketMQ的具體做法是,使用Broker端的后臺服務線程—ReputMessageService不停地分發(fā)請求并異步構(gòu)建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數(shù)據(jù)。

【全局的角度來看消息的存儲】

【消息存儲流程】

Broker端收到消息后,將消息原始信息保存在CommitLog文件對應的MappedFile中,然后異步刷新到磁盤

ReputMessageServie線程異步的將CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中

ConsumerQueue和IndexFile只是原始文件的索引信息

內(nèi)存映射和數(shù)據(jù)刷盤

【內(nèi)存映射流程】

  • 內(nèi)存映射文件MappedFile通過AllocateMappedFileService創(chuàng)建
  • MappedFile的創(chuàng)建是典型的生產(chǎn)者-消費者模型
  • MappedFileQueue調(diào)用getLastMappedFile獲取MappedFile時,將請求放入隊列中
  • AllocateMappedFileService線程持續(xù)監(jiān)聽隊列,隊列有請求時,創(chuàng)建出MappedFile對象
  • 最后將MappedFile對象預熱,底層調(diào)用force方法和mlock方法。

【刷盤機制】

  1. 異步刷盤:消息被寫入內(nèi)存的PAGECACHE,返回寫成功狀態(tài),當內(nèi)存里的消息量積累到一定程度時,統(tǒng)一觸發(fā)寫磁盤操作,快速寫入 。吞吐量高,當磁盤損壞時,會丟失消息
  2. 同步刷盤:消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,給應用返回消息寫成功的狀態(tài)。吞吐量低,但不會造成消息丟失。

【刷盤流程】

producer發(fā)送給broker的消息保存在MappedFile中,然后通過刷盤機制同步到磁盤中。

刷盤分為同步刷盤和異步刷盤。

異步刷盤后臺線程按一定時間間隔執(zhí)行。

同步刷盤也是生產(chǎn)者-消費者模型。broker保存消息到MappedFile后,創(chuàng)建GroupCommitRequest請求放入列表,并阻塞等待。后臺線程從列表中獲取請求并刷新磁盤,成功刷盤后通知等待線程。

RocketMQ 文件存儲模型層次結(jié)構(gòu)

文件存儲模型層次結(jié)構(gòu)圖

RocketMQ文件存儲模型層次結(jié)構(gòu)如上圖所示,根據(jù)類別和作用從概念模型上大致可以劃分為5層,下面將從各個層次分別進行分析和闡述:

RocketMQ業(yè)務處理器層:Broker端對消息進行讀取和寫入的業(yè)務邏輯入口,比如前置的檢查和校驗步驟、構(gòu)造MessageExtBrokerInner對象、decode反序列化、構(gòu)造Response返回對象等;

RocketMQ數(shù)據(jù)存儲組件層;該層主要是RocketMQ的存儲核心類—DefaultMessageStore,其為RocketMQ消息數(shù)據(jù)文件的訪問入口,通過該類的“putMessage()”和“getMessage()”方法完成對CommitLog消息存儲的日志數(shù)據(jù)文件進行讀寫操作(具體的讀寫訪問操作還是依賴下一層中CommitLog對象模型提供的方法);另外,在該組件初始化時候,還會啟動很多存儲相關的后臺服務線程,包括AllocateMappedFileService(MappedFile預分配服務線程)、ReputMessageService(回放存儲消息服務線程)、HAService(Broker主從同步高可用服務線程)、StoreStatsService(消息存儲統(tǒng)計服務線程)、IndexService(索引文件服務線程)等;

RocketMQ存儲邏輯對象層:該層主要包含了RocketMQ數(shù)據(jù)文件存儲直接相關的三個模型類IndexFile、ConsumerQueue和CommitLog。IndexFile為索引數(shù)據(jù)文件提供訪問服務,ConsumerQueue為邏輯消息隊列提供訪問服務,CommitLog則為消息存儲的日志數(shù)據(jù)文件提供訪問服務。這三個模型類也是構(gòu)成了RocketMQ存儲層的整體結(jié)構(gòu)(對于這三個模型類的深入分析將放在后續(xù)篇幅中);

封裝的文件內(nèi)存映射層:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數(shù)據(jù)文件的讀寫。其中,采用MappedByteBuffer這種內(nèi)存映射磁盤文件的方式完成對大文件的讀寫,在RocketMQ中將該類封裝成MappedFile類。這里限制的問題在上面已經(jīng)講過;對于每類大文件(IndexFile/ConsumerQueue/CommitLog),在存儲時分隔成多個固定大小的文件(單個IndexFile文件大小約為400M、單個ConsumerQueue文件大小約5.72M、單個CommitLog文件大小為1G),其中每個分隔文件的文件名為前面所有文件的字節(jié)大小數(shù)+1,即為文件的起始偏移量,從而實現(xiàn)了整個大文件的串聯(lián)。這里,每一種類的單個文件均由MappedFile類提供讀寫操作服務(其中,MappedFile類提供了順序?qū)?隨機讀、內(nèi)存數(shù)據(jù)刷盤、內(nèi)存清理等和文件相關的服務);

磁盤存儲層:主要指的是部署RocketMQ服務器所用的磁盤。這里,需要考慮不同磁盤類型(如SSD或者普通的HDD)特性以及磁盤的性能參數(shù)(如IOPS、吞吐量和訪問時延等指標)對順序?qū)?隨機讀操作帶來的影響;

文件存儲的高可用

【分布式存儲】

同一個topic 上的數(shù)據(jù)會分成多個queue 分布在不同的 broker 上,而且每個queue 都有副本機制。

【副本的主從同步(HA)】

RocketMQ 的主從同步機制如下:

1.首先啟動Master并在指定端口監(jiān)聽;

2.客戶端啟動,主動連接Master,建立TCP連接;

3.客戶端以每隔5s的間隔時間向服務端拉取消息,如果是第一次拉取的話,先獲取本地commitlog文件中最大的偏移量,以該偏移量向服務端拉取消息;

4.服務端解析請求,并返回一批數(shù)據(jù)給客戶端;

5.客戶端收到一批消息后,將消息寫入本地commitlog文件中,然后向Master匯報拉取進度,并更新下一次待拉取偏移量;

6.然后重復第3步;

文件存儲的優(yōu)化技術

RocketMQ存儲層采用的幾項優(yōu)化技術方案在一定程度上可以減少PageCache的缺點帶來的影響,主要包括內(nèi)存預分配,文件預熱和mlock系統(tǒng)調(diào)用。

【預先分配MappedFile】

在消息寫入過程中(調(diào)用CommitLog的putMessage()方法),CommitLog會先從MappedFileQueue隊列中獲取一個 MappedFile,如果沒有就新建一個。

RocketMQ中預分配MappedFile的設計非常巧妙,下次獲取時候直接返回就可以不用等待MappedFile創(chuàng)建分配所產(chǎn)生的時間延遲。

【文件預熱&&mlock系統(tǒng)調(diào)用】

(1)mlock系統(tǒng)調(diào)用:其可以將進程使用的部分或者全部的地址空間鎖定在物理內(nèi)存中,防止其被交換到swap空間。對于RocketMQ這種的高吞吐量的分布式消息隊列來說,追求的是消息讀寫低延遲,那么肯定希望盡可能地多使用物理內(nèi)存,提高數(shù)據(jù)讀寫訪問的操作效率。

(2)文件預熱:預熱的目的主要有兩點;第一點,由于僅分配內(nèi)存并進行mlock系統(tǒng)調(diào)用后并不會為程序完全鎖定這些內(nèi)存,因為其中的分頁可能是寫時復制的。因此,就有必要對每個內(nèi)存頁面中寫入一個假的值。其中,RocketMQ是在創(chuàng)建并分配MappedFile的過程中,預先寫入一些隨機值至Mmap映射出的內(nèi)存空間里。第二,調(diào)用Mmap進行內(nèi)存映射后,OS只是建立虛擬內(nèi)存地址至物理地址的映射表,而實際并沒有加載任何文件至內(nèi)存中。程序要訪問數(shù)據(jù)時OS會檢查該部分的分頁是否已經(jīng)在內(nèi)存中,如果不在,則發(fā)出一次缺頁中斷。這里,可以想象下1G的CommitLog需要發(fā)生多少次缺頁中斷,才能使得對應的數(shù)據(jù)才能完全加載至物理內(nèi)存中(ps:X86的Linux中一個標準頁面大小是4KB)?RocketMQ的做法是,在做Mmap內(nèi)存映射的同時進行madvise系統(tǒng)調(diào)用,目的是使OS做一次內(nèi)存映射后對應的文件數(shù)據(jù)盡可能多的預加載至內(nèi)存中,從而達到內(nèi)存預熱的效果。

本文轉(zhuǎn)載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關注。轉(zhuǎn)載本文請聯(lián)系小汪哥寫代碼公眾號。

 

責任編輯:武曉燕 來源: 小汪哥寫代碼
相關推薦

2021-07-08 07:16:24

RocketMQ數(shù)據(jù)結(jié)構(gòu)Message

2021-07-13 11:52:47

順序消息RocketMQkafka

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-09 07:15:48

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2021-07-16 18:44:42

RocketMQ知識

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2021-07-07 07:06:31

Brokerkafka架構(gòu)

2015-07-28 17:52:36

IOS知識體系

2017-02-27 16:42:23

Spark識體系

2017-04-03 15:35:13

知識體系架構(gòu)

2017-06-22 13:07:21

2012-03-08 11:13:23

企業(yè)架構(gòu)

2015-07-16 10:15:44

web前端知識體系

2020-09-09 09:15:58

Nginx體系進程

2020-10-26 08:34:18

知識體系普適性

2020-03-09 10:31:58

vue前端開發(fā)

2021-07-05 06:26:08

生產(chǎn)者kafka架構(gòu)

2017-07-25 17:34:54

大數(shù)據(jù)機器學習數(shù)據(jù)

2017-08-30 17:30:43

大數(shù)據(jù)數(shù)據(jù)化運營

2011-08-18 17:20:43

梭子魚知識體系
點贊
收藏

51CTO技術棧公眾號