消息的存儲-RocketMQ知識體系之三
RocketMQ存儲概要設計
RocketMQ主要存儲的文件包括commitlog文件、consumeQueue文件、IndexFile文件。
CommitLog是消息存儲文件,所有消息主題的消息都存儲在CommitLog文件中;該文件默認最大為1GB,超過1GB后會輪到下一個CommitLog文件。通過CommitLog,RocketMQ將所有消息存儲在一起,以順序IO的方式寫入磁盤,充分利用了磁盤順序?qū)憸p少了IO爭用提高數(shù)據(jù)存儲的性能。
RocketMQ的Broker機器磁盤上的文件存儲結(jié)構(gòu)
【CommitLog】
消息在CommitLog中的存儲格式如下:
存儲所有消息內(nèi)容,寫滿一個文件后生成新的 commitlog 文件。所有 topic 的數(shù)據(jù)存儲在一起,邏輯視圖如下:
CommitLog代碼
- private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- /**
- * MAGIC_CODE - MESSAGE
- * Message's MAGIC CODE daa320a7
- * 標記某一段為消息,即:[msgId, MESSAGE_MAGIC_CODE, 消息]
- */
- public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
- /**
- * MAGIC_CODE - BLANK
- * End of file empty MAGIC CODE cbd43194
- * 標記某一段為空白,即:[msgId, BLANK_MAGIC_CODE, 空白]
- * 當CommitLog無法容納消息時,使用該類型結(jié)尾
- */
- private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
- /**
- * 映射文件隊列
- */
- private final MappedFileQueue mappedFileQueue;
- /**
- * 消息存儲
- */
- private final DefaultMessageStore defaultMessageStore;
- /**
- * flush commitLog 線程服務
- */
- private final FlushCommitLogService flushCommitLogService;
- /**
- * If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
- * commit commitLog 線程服務
- */
- private final FlushCommitLogService commitLogService;
- /**
- * 寫入消息到Buffer Callback
- */
- private final AppendMessageCallback appendMessageCallback;
- /**
- * topic消息隊列 與 offset 的Map
- */
- private HashMap<String/* topic-queue_id */, Long/* offset */> topicQueueTable = new HashMap<>(1024);
- /**
- * TODO
- */
- private volatile long confirmOffset = -1L;
- /**
- * 當前獲取lock時間。
- * 如果當前解鎖,則為0
- */
- private volatile long beginTimeInLock = 0;
- /**
- * true: Can lock, false : in lock.
- * 添加消息 螺旋鎖(通過while循環(huán)實現(xiàn))
- */
- private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
- /**
- * 添加消息重入鎖
- */
- private ReentrantLock putMessageNormalLock = new ReentrantLock(); // Non fair Sync
【ConsumeQueue】
ConsumeQueue是消息消費隊列文件,消息達到commitlog文件后將被異步轉(zhuǎn)發(fā)到消息消費隊列,供消息消費者消費;一個ConsumeQueue表示一個topic的一個queue,類似于kafka的一個partition,但是rocketmq在消息存儲上與kafka有著非常大的不同,RocketMQ的ConsumeQueue中不存儲具體的消息,具體的消息由CommitLog存儲,ConsumeQueue中只存儲路由到該queue中的消息在CommitLog中的offset,消息的大小以及消息所屬的tag的hash(tagCode),一共只占20個字節(jié),整個數(shù)據(jù)包如下:
ConsumeQueue代碼
- public static final int CQ_STORE_UNIT_SIZE = 20;
- private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
- private final DefaultMessageStore defaultMessageStore;
- /**
- * 映射文件隊列
- */
- private final MappedFileQueue mappedFileQueue;
- /**
- * Topic
- */
- private final String topic;
- /**
- * 隊列編號
- */
- private final int queueId;
- /**
- * 消息位置信息ByteBuffer
- */
- private final ByteBuffer byteBufferIndex;
- /**
- * 文件存儲地址
- */
- private final String storePath;
- /**
- * 每個映射文件大小
- */
- private final int mappedFileSize;
- /**
- * 最大重放消息commitLog存儲位置
- */
- private long maxPhysicOffset = -1;
- private volatile long minLogicOffset = 0;
Consume Queue文件組織,如圖所示:
Consume Queue文件組織示意圖
- 根據(jù)topic和queueId來組織文件,圖中TopicA有兩個隊列0,1,那么TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另一個ConsumeQueue。
- 按照消費端的GroupName來分組重試隊列,如果消費端消費失敗,消息將被發(fā)往重試隊列中,比如圖中的%RETRY%ConsumerGroupA。
- 按照消費端的GroupName來分組死信隊列,如果消費端消費失敗,并重試指定次數(shù)后,仍然失敗,則發(fā)往死信隊列,比如圖中的%DLQ%ConsumerGroupA。
死信隊列(Dead Letter Queue)一般用于存放由于某種原因無法傳遞的消息,比如處理失敗或者已經(jīng)過期的消息。
【IndexFile】
IndexFile是消息索引文件,主要存儲的是key和offset的對應關系。
IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法。
文件名fileName是以創(chuàng)建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統(tǒng)中實現(xiàn)HashMap結(jié)構(gòu),故rocketmq的索引文件其底層實現(xiàn)為hash索引。
- private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- private static int hashSlotSize = 4;
- private static int indexSize = 20;
- private static int invalidIndex = 0;
- private final int hashSlotNum;
- private final int indexNum;
- private final MappedFile mappedFile;
- private final FileChannel fileChannel;
- private final MappedByteBuffer mappedByteBuffer;
- private final IndexHeader indexHeader;
IndexFile的存儲結(jié)構(gòu):
從上面的分析可以看出,RocketMQ采用的是混合型的存儲結(jié)構(gòu),即為Broker單個實例下所有的隊列共用一個日志數(shù)據(jù)文件(即為CommitLog)來存儲。RocketMQ的混合型存儲結(jié)構(gòu)(多個Topic的消息實體內(nèi)容都存儲于一個CommitLog中)針對Producer和Consumer分別采用了數(shù)據(jù)和索引部分相分離的存儲結(jié)構(gòu),Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。
只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發(fā)送的消息就不會丟失。
正因為如此,Consumer也就肯定有機會去消費這條消息。當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內(nèi)有新消息到達,將直接返回給消費端。這里,RocketMQ的具體做法是,使用Broker端的后臺服務線程—ReputMessageService不停地分發(fā)請求并異步構(gòu)建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數(shù)據(jù)。
【全局的角度來看消息的存儲】
【消息存儲流程】
Broker端收到消息后,將消息原始信息保存在CommitLog文件對應的MappedFile中,然后異步刷新到磁盤
ReputMessageServie線程異步的將CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中
ConsumerQueue和IndexFile只是原始文件的索引信息
內(nèi)存映射和數(shù)據(jù)刷盤
【內(nèi)存映射流程】
- 內(nèi)存映射文件MappedFile通過AllocateMappedFileService創(chuàng)建
- MappedFile的創(chuàng)建是典型的生產(chǎn)者-消費者模型
- MappedFileQueue調(diào)用getLastMappedFile獲取MappedFile時,將請求放入隊列中
- AllocateMappedFileService線程持續(xù)監(jiān)聽隊列,隊列有請求時,創(chuàng)建出MappedFile對象
- 最后將MappedFile對象預熱,底層調(diào)用force方法和mlock方法。
【刷盤機制】
- 異步刷盤:消息被寫入內(nèi)存的PAGECACHE,返回寫成功狀態(tài),當內(nèi)存里的消息量積累到一定程度時,統(tǒng)一觸發(fā)寫磁盤操作,快速寫入 。吞吐量高,當磁盤損壞時,會丟失消息
- 同步刷盤:消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,給應用返回消息寫成功的狀態(tài)。吞吐量低,但不會造成消息丟失。
【刷盤流程】
producer發(fā)送給broker的消息保存在MappedFile中,然后通過刷盤機制同步到磁盤中。
刷盤分為同步刷盤和異步刷盤。
異步刷盤后臺線程按一定時間間隔執(zhí)行。
同步刷盤也是生產(chǎn)者-消費者模型。broker保存消息到MappedFile后,創(chuàng)建GroupCommitRequest請求放入列表,并阻塞等待。后臺線程從列表中獲取請求并刷新磁盤,成功刷盤后通知等待線程。
RocketMQ 文件存儲模型層次結(jié)構(gòu)
文件存儲模型層次結(jié)構(gòu)圖
RocketMQ文件存儲模型層次結(jié)構(gòu)如上圖所示,根據(jù)類別和作用從概念模型上大致可以劃分為5層,下面將從各個層次分別進行分析和闡述:
RocketMQ業(yè)務處理器層:Broker端對消息進行讀取和寫入的業(yè)務邏輯入口,比如前置的檢查和校驗步驟、構(gòu)造MessageExtBrokerInner對象、decode反序列化、構(gòu)造Response返回對象等;
RocketMQ數(shù)據(jù)存儲組件層;該層主要是RocketMQ的存儲核心類—DefaultMessageStore,其為RocketMQ消息數(shù)據(jù)文件的訪問入口,通過該類的“putMessage()”和“getMessage()”方法完成對CommitLog消息存儲的日志數(shù)據(jù)文件進行讀寫操作(具體的讀寫訪問操作還是依賴下一層中CommitLog對象模型提供的方法);另外,在該組件初始化時候,還會啟動很多存儲相關的后臺服務線程,包括AllocateMappedFileService(MappedFile預分配服務線程)、ReputMessageService(回放存儲消息服務線程)、HAService(Broker主從同步高可用服務線程)、StoreStatsService(消息存儲統(tǒng)計服務線程)、IndexService(索引文件服務線程)等;
RocketMQ存儲邏輯對象層:該層主要包含了RocketMQ數(shù)據(jù)文件存儲直接相關的三個模型類IndexFile、ConsumerQueue和CommitLog。IndexFile為索引數(shù)據(jù)文件提供訪問服務,ConsumerQueue為邏輯消息隊列提供訪問服務,CommitLog則為消息存儲的日志數(shù)據(jù)文件提供訪問服務。這三個模型類也是構(gòu)成了RocketMQ存儲層的整體結(jié)構(gòu)(對于這三個模型類的深入分析將放在后續(xù)篇幅中);
封裝的文件內(nèi)存映射層:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數(shù)據(jù)文件的讀寫。其中,采用MappedByteBuffer這種內(nèi)存映射磁盤文件的方式完成對大文件的讀寫,在RocketMQ中將該類封裝成MappedFile類。這里限制的問題在上面已經(jīng)講過;對于每類大文件(IndexFile/ConsumerQueue/CommitLog),在存儲時分隔成多個固定大小的文件(單個IndexFile文件大小約為400M、單個ConsumerQueue文件大小約5.72M、單個CommitLog文件大小為1G),其中每個分隔文件的文件名為前面所有文件的字節(jié)大小數(shù)+1,即為文件的起始偏移量,從而實現(xiàn)了整個大文件的串聯(lián)。這里,每一種類的單個文件均由MappedFile類提供讀寫操作服務(其中,MappedFile類提供了順序?qū)?隨機讀、內(nèi)存數(shù)據(jù)刷盤、內(nèi)存清理等和文件相關的服務);
磁盤存儲層:主要指的是部署RocketMQ服務器所用的磁盤。這里,需要考慮不同磁盤類型(如SSD或者普通的HDD)特性以及磁盤的性能參數(shù)(如IOPS、吞吐量和訪問時延等指標)對順序?qū)?隨機讀操作帶來的影響;
文件存儲的高可用
【分布式存儲】
同一個topic 上的數(shù)據(jù)會分成多個queue 分布在不同的 broker 上,而且每個queue 都有副本機制。
【副本的主從同步(HA)】
RocketMQ 的主從同步機制如下:
1.首先啟動Master并在指定端口監(jiān)聽;
2.客戶端啟動,主動連接Master,建立TCP連接;
3.客戶端以每隔5s的間隔時間向服務端拉取消息,如果是第一次拉取的話,先獲取本地commitlog文件中最大的偏移量,以該偏移量向服務端拉取消息;
4.服務端解析請求,并返回一批數(shù)據(jù)給客戶端;
5.客戶端收到一批消息后,將消息寫入本地commitlog文件中,然后向Master匯報拉取進度,并更新下一次待拉取偏移量;
6.然后重復第3步;
文件存儲的優(yōu)化技術
RocketMQ存儲層采用的幾項優(yōu)化技術方案在一定程度上可以減少PageCache的缺點帶來的影響,主要包括內(nèi)存預分配,文件預熱和mlock系統(tǒng)調(diào)用。
【預先分配MappedFile】
在消息寫入過程中(調(diào)用CommitLog的putMessage()方法),CommitLog會先從MappedFileQueue隊列中獲取一個 MappedFile,如果沒有就新建一個。
RocketMQ中預分配MappedFile的設計非常巧妙,下次獲取時候直接返回就可以不用等待MappedFile創(chuàng)建分配所產(chǎn)生的時間延遲。
【文件預熱&&mlock系統(tǒng)調(diào)用】
(1)mlock系統(tǒng)調(diào)用:其可以將進程使用的部分或者全部的地址空間鎖定在物理內(nèi)存中,防止其被交換到swap空間。對于RocketMQ這種的高吞吐量的分布式消息隊列來說,追求的是消息讀寫低延遲,那么肯定希望盡可能地多使用物理內(nèi)存,提高數(shù)據(jù)讀寫訪問的操作效率。
(2)文件預熱:預熱的目的主要有兩點;第一點,由于僅分配內(nèi)存并進行mlock系統(tǒng)調(diào)用后并不會為程序完全鎖定這些內(nèi)存,因為其中的分頁可能是寫時復制的。因此,就有必要對每個內(nèi)存頁面中寫入一個假的值。其中,RocketMQ是在創(chuàng)建并分配MappedFile的過程中,預先寫入一些隨機值至Mmap映射出的內(nèi)存空間里。第二,調(diào)用Mmap進行內(nèi)存映射后,OS只是建立虛擬內(nèi)存地址至物理地址的映射表,而實際并沒有加載任何文件至內(nèi)存中。程序要訪問數(shù)據(jù)時OS會檢查該部分的分頁是否已經(jīng)在內(nèi)存中,如果不在,則發(fā)出一次缺頁中斷。這里,可以想象下1G的CommitLog需要發(fā)生多少次缺頁中斷,才能使得對應的數(shù)據(jù)才能完全加載至物理內(nèi)存中(ps:X86的Linux中一個標準頁面大小是4KB)?RocketMQ的做法是,在做Mmap內(nèi)存映射的同時進行madvise系統(tǒng)調(diào)用,目的是使OS做一次內(nèi)存映射后對應的文件數(shù)據(jù)盡可能多的預加載至內(nèi)存中,從而達到內(nèi)存預熱的效果。
本文轉(zhuǎn)載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關注。轉(zhuǎn)載本文請聯(lián)系小汪哥寫代碼公眾號。