基于HBase構(gòu)建可伸縮的分布式事務(wù)隊(duì)列
一個(gè)實(shí)時(shí)流處理框架通常需要兩個(gè)基礎(chǔ)架構(gòu):處理器和隊(duì)列。處理器從隊(duì)列中讀取事件,執(zhí)行用戶的處理代碼,如果要繼續(xù)對(duì)結(jié)果進(jìn)行處理,處理器還會(huì)把事件寫(xiě)到另外一個(gè)隊(duì)列。隊(duì)列由框架提供并管理。隊(duì)列做為處理器之間的緩沖,傳輸數(shù)據(jù)和事件,這樣處理器可以單獨(dú)操作和擴(kuò)展。例如,一個(gè)web 服務(wù)訪問(wèn)日志處理應(yīng)用,可能是這樣的:
框架之間的主要區(qū)別在于隊(duì)列語(yǔ)義,通常不同之處有以下幾點(diǎn):
-
調(diào)度保障機(jī)制: 至少一次,至多一次,只有一次。
-
容災(zāi)機(jī)制: 失敗對(duì)用戶和自動(dòng)恢復(fù)是透明的。
-
可用性: 數(shù)據(jù)在出現(xiàn)錯(cuò)誤后可以保存并重啟。
-
可擴(kuò)展性:產(chǎn)品/用戶增加時(shí)的局限性。
-
性能Performance: 隊(duì)列操作的吞吐量和延遲。
我們想在開(kāi)源的 Cask Data Application Platform (CDAP)上提供一個(gè)動(dòng)態(tài)可擴(kuò)展,強(qiáng)一致性并且有一次性交易機(jī)制的實(shí)時(shí)流處理框架,在這個(gè)強(qiáng)大的機(jī)制保護(hù)下,開(kāi)發(fā)者可以自由操作任何形式的數(shù)據(jù)操作而不用擔(dān)心不一致性,潛在的返工和失敗。它可以幫助開(kāi)發(fā)者在沒(méi)有分布式系統(tǒng)背景的情況下建立他們的大數(shù)據(jù)應(yīng)用。此外,如果需要可以關(guān)閉這種強(qiáng)大的保護(hù)機(jī)制換取高性能。它總是比其他方式更容易使用。
可擴(kuò)展隊(duì)列
隊(duì)列有兩種基本操作:入隊(duì)和出隊(duì)。生產(chǎn)者將消息寫(xiě)到隊(duì)頭(入隊(duì)),消費(fèi)者從隊(duì)尾讀取數(shù)據(jù)(出隊(duì))。如果做為一個(gè)整體你添加更多生成者時(shí)的入隊(duì)速度和添加更多消費(fèi)者時(shí)的出隊(duì)速度足夠快,我們說(shuō)這個(gè)隊(duì)列是可擴(kuò)展的。
理想狀態(tài)下,擴(kuò)展是線性的,這意味著兩倍的生產(chǎn)者 /消費(fèi)者,會(huì)產(chǎn)生兩位速度的出隊(duì)/入隊(duì),增長(zhǎng)只受集群的規(guī)模限制。為了支持生產(chǎn)者的線性擴(kuò)展,隊(duì)列需要一個(gè)存儲(chǔ)系統(tǒng)并且需要當(dāng)前寫(xiě)入者的數(shù)量線性擴(kuò)展。為了應(yīng)對(duì)消費(fèi)者的線性擴(kuò)展,隊(duì)列可以分區(qū),例如一個(gè)消費(fèi)者只處理隊(duì)列中的一段數(shù)據(jù)。
隊(duì)列擴(kuò)展的另一個(gè)方面是它應(yīng)該可以橫向擴(kuò)展。這意味著隊(duì)列性能的上限可以通過(guò)增加集群結(jié)點(diǎn)的方式來(lái)提升。這是很重要的,它可以保證隊(duì)列不受當(dāng)前集群大小限制根據(jù)數(shù)據(jù)的增長(zhǎng)而擴(kuò)展。
分區(qū)的 HBase 隊(duì)列
我們選擇 Apache HBase做為隊(duì)列的存儲(chǔ)層。它為存儲(chǔ)強(qiáng)一致性,可橫向擴(kuò)展的行數(shù)據(jù)做了設(shè)計(jì)和優(yōu)化。它的并發(fā)寫(xiě)操作性能非常好,并提供了有序掃描以支持分區(qū)消費(fèi)者。我們使用 HBase Coprocessors 的高效掃描濾波和隊(duì)列清洗。為了在隊(duì)列上使用一次性語(yǔ)義,我們用 Tephra’s 為 HBase 提供傳輸支持。
生產(chǎn)者和消費(fèi)者具有操作獨(dú)立性。每個(gè)生產(chǎn)者通過(guò) Hbase puts 批處理執(zhí)行入隊(duì)操作,消費(fèi)者通過(guò)執(zhí)行 Hbase Scans 執(zhí)行出隊(duì)操作。生產(chǎn)者和消費(fèi)者的數(shù)量之間沒(méi)有關(guān)聯(lián),他們可以分離。
此隊(duì)列存在一個(gè)消費(fèi)者組的概念。一個(gè)消費(fèi)者組,是由相同的關(guān)鍵字劃分的消費(fèi)者集合,這樣,每個(gè)發(fā)布到隊(duì)列的事件,就會(huì)由此消費(fèi)者組中的消費(fèi)者去消費(fèi)。使用消費(fèi)者組,可以通過(guò)不同的關(guān)鍵字劃分同一個(gè)隊(duì)列,同時(shí),也可以通過(guò)數(shù)據(jù)的操作性特點(diǎn)來(lái)拓展。按照上面訪問(wèn)日志分析的例子,生產(chǎn)者和消費(fèi)者組可能看起來(lái)像這樣:
對(duì)于Log Parser,這里有兩個(gè)生產(chǎn)者在運(yùn)行,它們并發(fā)的向隊(duì)列寫(xiě)數(shù)據(jù)。在消費(fèi)者這邊,這里存在兩個(gè)消費(fèi)者組。 Unique User Counter組有兩個(gè)消費(fèi)者,使用UserID作為劃分(隊(duì)列的)關(guān)鍵字。Page View Counter組則有三個(gè)消費(fèi)者,使用 PageID 作為劃分(隊(duì)列的)關(guān)鍵字。 |
隊(duì)列行值格式
當(dāng)一個(gè)事件通過(guò)一個(gè)生產(chǎn)者被發(fā)布出去,一個(gè)或多個(gè)消費(fèi)者組合將收到消息,我們把事件寫(xiě)入 HBase 表的一個(gè)或多個(gè)行上,那么這條記錄就被設(shè)計(jì)成適用于每個(gè)消費(fèi)者組。事件的有效負(fù)荷和元數(shù)據(jù)被存儲(chǔ)在獨(dú)立的列上,那么行的值就是下面這樣的格式:
兩個(gè)有趣的部分是行的值是分區(qū) ID 和整個(gè) ID。分區(qū) ID 通過(guò)限定行值前綴再提供給消費(fèi)者。消費(fèi)者只被允許讀數(shù)據(jù),并在出隊(duì)的時(shí)候使用前綴掃描。分區(qū) ID 由兩部分組成:一個(gè)消費(fèi)者組 ID 和一個(gè)消費(fèi)者 ID。生產(chǎn)者計(jì)算出每個(gè)消費(fèi)者組的分區(qū) ID,并通過(guò)入隊(duì)寫(xiě)到那些行。
行關(guān)鍵字中的入口 ID(Entry ID)包含了事務(wù)信息。它由 Tephra 觸發(fā)的生產(chǎn)者事務(wù)寫(xiě)指針和單向增長(zhǎng)的計(jì)數(shù)器組成。這個(gè)計(jì)數(shù)器由本地的生產(chǎn)者生成,同時(shí),針對(duì)事件,計(jì)數(shù)器需要讓行關(guān)鍵字唯一,因?yàn)樯a(chǎn)者可以在同一個(gè)事務(wù)中將多個(gè)事件加入隊(duì)列。
出隊(duì)列的時(shí)候,計(jì)數(shù)器會(huì)使用事務(wù)寫(xiě)指針來(lái)決定,隊(duì)列入口是否已經(jīng)提交,以及是否可以消費(fèi)了。事務(wù)寫(xiě)指針和計(jì)數(shù)器的組合,使得行關(guān)鍵字總是唯一的。這讓生產(chǎn)者可以獨(dú)立的操作,而不會(huì)有寫(xiě)沖突。
為了生成分區(qū) ID(Partition ID),生產(chǎn)者需要知道大小和每個(gè)消費(fèi)者組的分區(qū)關(guān)鍵字。當(dāng)應(yīng)用程序啟動(dòng),以及組大小發(fā)生任何變化的時(shí)候,消費(fèi)者組信息都會(huì)被記錄下來(lái)。
改變生產(chǎn)者和消費(fèi)者
增加或減少生產(chǎn)者是很直接的,因?yàn)槊總€(gè)生產(chǎn)者都是獨(dú)立操作的。增加或減少生產(chǎn)者進(jìn)程就可以滿足這個(gè)要求。然而,當(dāng)消費(fèi)者組的大小需要改變的時(shí)候,就需要協(xié)調(diào)來(lái)正確更新消費(fèi)者組的信息。可以用下面的圖表來(lái)概括所需的步驟:
由于暫停和恢復(fù)是由 Apache ZooKeeper 來(lái)協(xié)調(diào)的,同時(shí)它們也是并行執(zhí)行的,所以它們是兩個(gè)非常快速的操作。例如,之前我們提到的 Web 訪問(wèn)日志分析應(yīng)用程序,改變消費(fèi)者組信息的過(guò)程可能看起來(lái)像這樣:
基于這個(gè)隊(duì)列的設(shè)計(jì),入隊(duì)列和出隊(duì)列的性能,與單獨(dú)的批量 HBase Puts 和 HBase Scans 不相上下,這樣也帶來(lái)與 Tephra 服務(wù)器進(jìn)行通訊的開(kāi)銷。通過(guò)在同一個(gè)業(yè)務(wù)處理中將多個(gè)事件批量處理,可以大大降低這個(gè)開(kāi)銷。
最后,為了避免“熱點(diǎn)聚焦(hotspotting)“,我們基于簇的大小提前分割了 HBase 表,同時(shí),在行關(guān)鍵字(row key)上采用 加鹽(salting) 的方式來(lái)更好的分配寫(xiě)。否則,由于是單調(diào)的增加業(yè)務(wù)處理寫(xiě)指針,行關(guān)鍵字就會(huì)是連續(xù)的。
性能值
我們?cè)谛⌒偷?10 節(jié)點(diǎn)的 HBase 集群上已經(jīng)測(cè)試過(guò)性能,結(jié)果令人印象深刻。使用 1K 字節(jié)負(fù)載,以 500 個(gè)事件為一個(gè)批次大小,我們完成了生產(chǎn)和消費(fèi) 100K 個(gè)事件/秒的吞吐量,其中運(yùn)行了 3 個(gè)生產(chǎn)者和 10 個(gè)消費(fèi)者。我們也觀察到當(dāng)我們?cè)黾酉M(fèi)者和消費(fèi)者的時(shí)候,吞吐量線性增加:例如,當(dāng)我們將生產(chǎn)者和消費(fèi)者數(shù)量加倍的時(shí)候,吞吐量增加到 200K 個(gè)事件/秒。
在 HBase 的幫助下,結(jié)合最佳實(shí)踐,我們成功的創(chuàng)建了一個(gè)線性可伸縮的,分布式事務(wù)隊(duì)列系統(tǒng)。同時(shí),在 CDAP 中使用這個(gè)系統(tǒng)提供實(shí)時(shí)流處理框架:動(dòng)態(tài)可伸縮,強(qiáng)一致性,以及一次交付的傳輸保證。