偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Storm數(shù)據(jù)流模型的分析及討論

開發(fā) 架構(gòu)
本文首先介紹了Storm的基本概念和數(shù)據(jù)流模型,然后結(jié)合一個典型應(yīng)用場景來說明Storm支持Topology之間數(shù)據(jù)流訂閱的必要性,最后對比了Storm與另一個流處理系統(tǒng)在數(shù)據(jù)流模型上的區(qū)別之處。

Storm基本概念

Storm是一個開源的實(shí)時計(jì)算系統(tǒng),它提供了一系列的基本元素用于進(jìn)行計(jì)算:Topology、Stream、Spout、Bolt等等。

在Storm中,一個實(shí)時應(yīng)用的計(jì)算任務(wù)被打包作為Topology發(fā)布,這同Hadoop的MapReduce任務(wù)相似。但是有一點(diǎn)不同的是:在Hadoop中,MapReduce任務(wù)最終會執(zhí)行完成后結(jié)束;而在Storm中,Topology任務(wù)一旦提交后永遠(yuǎn)不會結(jié)束,除非你顯示去停止任務(wù)。

計(jì)算任務(wù)Topology是由不同的Spouts和Bolts,通過數(shù)據(jù)流(Stream)連接起來的圖。下面是一個Topology的結(jié)構(gòu)示意圖:

其中包含有:

Spout:Storm中的消息源,用于為Topology生產(chǎn)消息(數(shù)據(jù)),一般是從外部數(shù)據(jù)源(如Message Queue、RDBMS、NoSQL、Realtime Log)不間斷地讀取數(shù)據(jù)并發(fā)送給Topology消息(tuple元組)。

Bolt:Storm中的消息處理者,用于為Topology進(jìn)行消息的處理,Bolt可以執(zhí)行過濾, 聚合, 查詢數(shù)據(jù)庫等操作,而且可以一級一級的進(jìn)行處理。

最終,Topology會被提交到storm集群中運(yùn)行;也可以通過命令停止Topology的運(yùn)行,將Topology占用的計(jì)算資源歸還給Storm集群。

Storm數(shù)據(jù)流模型

數(shù)據(jù)流(Stream)是Storm中對數(shù)據(jù)進(jìn)行的抽象,它是時間上無界的tuple元組序列。在Topology中,Spout是Stream的源頭,負(fù)責(zé)為Topology從特定數(shù)據(jù)源發(fā)射Stream;Bolt可以接收任意多個Stream作為輸入,然后進(jìn)行數(shù)據(jù)的加工處理過程,如果需要,Bolt還可以發(fā)射出新的Stream給下級Bolt進(jìn)行處理。

下面是一個Topology內(nèi)部Spout和Bolt之間的數(shù)據(jù)流關(guān)系:

Topology中每一個計(jì)算組件(Spout和Bolt)都有一個并行執(zhí)行度,在創(chuàng)建Topology時可以進(jìn)行指定,Storm會在集群內(nèi)分配對應(yīng)并行度個數(shù)的線程來同時執(zhí)行這一組件。

那么,有一個問題:既然對于一個Spout或Bolt,都會有多個task線程來運(yùn)行,那么如何在兩個組件(Spout和Bolt)之間發(fā)送tuple元組呢?

Storm提供了若干種數(shù)據(jù)流分發(fā)(Stream Grouping)策略用來解決這一問題。在Topology定義時,需要為每個Bolt指定接收什么樣的Stream作為其輸入(注:Spout并不需要接收Stream,只會發(fā)射Stream)。

目前Storm中提供了以下7種Stream Grouping策略:Shuffle Grouping、Fields Grouping、All Grouping、Global Grouping、Non Grouping、Direct Grouping、Local or shuffle grouping,具體策略可以參考這里。

一種Storm不能支持的場景

以上介紹了一些Storm中的基本概念,可以看出,Storm中Stream的概念是Topology內(nèi)唯一的,只能在Topology內(nèi)按照“發(fā)布-訂閱”方式在不同的計(jì)算組件(Spout和Bolt)之間進(jìn)行數(shù)據(jù)的流動,而Stream在Topology之間是無法流動的。

這一點(diǎn)限制了Storm在一些場景下的應(yīng)用,下面通過一個簡單的實(shí)例來說明。

假設(shè)現(xiàn)在有一個Topology1的結(jié)構(gòu)如下:通過Spout產(chǎn)生數(shù)據(jù)流后,依次需要經(jīng)過Filter Bolt,Join Bolt,Business1 Bolt。其中,F(xiàn)ilter Bolt用于對數(shù)據(jù)進(jìn)行過濾,Join Bolt用于數(shù)據(jù)流的聚合,Business1 Bolt用于進(jìn)行一個實(shí)際業(yè)務(wù)的計(jì)算邏輯。

目前這個Topology1已經(jīng)被提交到Storm集群運(yùn)行,而現(xiàn)在我們又有了新的需求,需要計(jì)算一個新的業(yè)務(wù)邏輯,而這個Topology的特點(diǎn)是和Topology1公用同樣的數(shù)據(jù)源,而且前期的預(yù)處理過程完全一樣(依次經(jīng)歷Filter Bolt和Join Bolt),那么這時候Storm怎么來滿足這一需求?據(jù)個人了解,有以下幾種“曲折”的實(shí)現(xiàn)方式:

1)  第一種方式:首先kill掉已經(jīng)在集群中運(yùn)行的Topology1計(jì)算任務(wù),然后實(shí)現(xiàn)Business2 Bolt的計(jì)算邏輯,并重新打包形成一個新的Topology計(jì)算任務(wù)jar包后,提交到Storm集群中重新運(yùn)行,這時候Storm內(nèi)的整體Topology結(jié)構(gòu)如下:

這種方式的缺點(diǎn)在于:由于要重啟Topology,所以如果Spout或Bolt有狀態(tài)則會丟失掉;同時由于Topology結(jié)構(gòu)發(fā)生了變化,因此重新運(yùn)行Topology前需要對程序的穩(wěn)定性、正確性進(jìn)行驗(yàn)證;另外Topology結(jié)構(gòu)的變化也會帶來額外的運(yùn)維開銷。

2)  第二種方式:完全開發(fā)部署一套新的Topology,其中前面的公共部分的Spout和Bolt可以直接復(fù)用,只需要重新開發(fā)新的計(jì)算邏輯Business2 Bolt來替換原有的Business1 Bolt即可。然后重新提交新的Topology運(yùn)行。這時候Storm內(nèi)的整體Topology結(jié)構(gòu)如下:

這種方式的缺點(diǎn)在于:由于兩個Topology都會從External Data Source讀取同一份數(shù)據(jù),無疑增加了External Data Source的負(fù)載壓力;而且會導(dǎo)致同樣的數(shù)據(jù)在Storm集群內(nèi)被傳輸相同的兩份,被同樣的計(jì)算單元Bolt進(jìn)行處理,浪費(fèi)了Storm的計(jì)算資源和網(wǎng)絡(luò)傳輸帶寬。假設(shè)現(xiàn)在不止有兩個這樣的Topology計(jì)算任務(wù),而是有N個,那么對Storm的計(jì)算Slot的浪費(fèi)很嚴(yán)重。

注意:上述兩種方式還有一個公共的缺點(diǎn)——系統(tǒng)可擴(kuò)展性不好,這意味著不管哪種方式,只要以后有這種新增業(yè)務(wù)邏輯的需求,都需要進(jìn)行復(fù)雜的人工操作或線性的資源浪費(fèi)現(xiàn)象。

3) 第三種方式:OK,看了以上兩種方式后,也許你會提出下面的解決方案:通過Kafka這樣的消息中間件,實(shí)現(xiàn)不同Topology的Spout共享數(shù)據(jù)源,而且這樣可以做到消息可靠傳輸、消息rewind回傳等,好處是對于Storm來說,已經(jīng)有了storm-kafka插件的支持。這時候Storm內(nèi)的整體Topology結(jié)構(gòu)如下:

這種實(shí)現(xiàn)方式可以通過引入一層消息中間件減少對External Data Source的重復(fù)訪問的壓力,而且可以通過消息中間件層,屏蔽掉External Data Source的細(xì)節(jié),如果需要擴(kuò)展新的業(yè)務(wù)邏輯,只需要重新部署運(yùn)行新的Topology,應(yīng)該說是現(xiàn)有Storm版本下很好的實(shí)現(xiàn)方式了。不過消息中間件的引入,無疑將給系統(tǒng)帶來了一定的復(fù)雜性,這對于Storm上的應(yīng)用開發(fā)來說提高了門檻。

值得注意的是,方案三中仍遺留有一點(diǎn)問題沒有解決:對于Storm集群來說,這種方式還是沒有能夠從根本上避免數(shù)據(jù)在Storm不同Topology內(nèi)的重復(fù)發(fā)送與處理。這是由于Storm的數(shù)據(jù)流模型上的限制所導(dǎo)致的,如果Storm實(shí)現(xiàn)了不同Topology之間Stream的共享,那么這一問題也就迎刃而解了。

一個流處理系統(tǒng)的數(shù)據(jù)流模型

個人工作中有幸參與過一個流處理框架的開發(fā)與應(yīng)用。下面我們來簡單看看其中所采用的數(shù)據(jù)流模型。

首先,先來看一下該流處理系統(tǒng)內(nèi)的幾個基本概念:

1)數(shù)據(jù)流(data stream):時間分布和數(shù)量上無限的一系列數(shù)據(jù)記錄的集合體;

2)數(shù)據(jù)記錄(data record):數(shù)據(jù)流的最小組成單元,每條數(shù)據(jù)記錄包括 3 類數(shù)據(jù):所屬數(shù)據(jù)流名稱(stream name)、用于路由的數(shù)據(jù)(keys)和具體數(shù)據(jù)處理邏輯所需的數(shù)據(jù)(value);

3)數(shù)據(jù)處理任務(wù)定義(task definition):定義一個數(shù)據(jù)處理任務(wù)的基本屬性,無法直接被執(zhí)行,必須特化為具體的任務(wù)實(shí)例。其基本屬性包括:

(可選)輸入流(input stream):描述該任務(wù)依賴哪些數(shù)據(jù)流作為輸入,是一個數(shù)據(jù)流名稱列表;數(shù)據(jù)流產(chǎn)生源不會依賴其他數(shù)據(jù)流,可忽略該配置;

數(shù)據(jù)處理邏輯(process logic):描述該任務(wù)具體的處理邏輯,例如由獨(dú)立進(jìn)程進(jìn)行的外部處理邏輯;

(可選)輸出流(output stream):描述該任務(wù)產(chǎn)生哪個數(shù)據(jù)流,是一個數(shù)據(jù)流名稱;數(shù)據(jù)流處理鏈末級任務(wù)不會產(chǎn)生新的數(shù)據(jù)流,可忽略該配置;

4)數(shù)據(jù)處理任務(wù)實(shí)例(task instance):對一個數(shù)據(jù)處理任務(wù)定義進(jìn)行具體約束后,可推送到某個處理結(jié)點(diǎn)上運(yùn)行的邏輯實(shí)體。附加下列屬性:

數(shù)據(jù)處理任務(wù)定義:指向該任務(wù)實(shí)例對應(yīng)的數(shù)據(jù)處理任務(wù)定義實(shí)體;

輸入流過濾條件(input filting condition):一個 boolean 表達(dá)式列表,描述每個輸入流中符合什么條件的數(shù)據(jù)記錄可以作為有效數(shù)據(jù)交給處理邏輯;若某個輸入流中所有數(shù)據(jù)記錄都是有效數(shù)據(jù),則可直接用 true 表示;

(可選)強(qiáng)制輸出周期(output interval):描述以什么頻率強(qiáng)制該任務(wù)實(shí)例產(chǎn)生輸出流記錄,可以用輸入流記錄個數(shù)或間隔時間作為周期;忽略該配置時,輸出流記錄產(chǎn)生周期完全由處理邏輯自身決定,不受框架約束;

5)數(shù)據(jù)處理結(jié)點(diǎn)(node):可容納多個數(shù)據(jù)處理任務(wù)實(shí)例運(yùn)行的實(shí)體機(jī)器,每個數(shù)據(jù)處理結(jié)點(diǎn)的IPv4地址必須保證唯一。

該流處理系統(tǒng),采用分布式策略,由多個數(shù)據(jù)處理結(jié)點(diǎn)進(jìn)行數(shù)據(jù)的處理過程;將流式數(shù)據(jù)的處理過程劃分為不同的階段,每個階段伴隨數(shù)據(jù)流的流入、任務(wù)的處理及數(shù)據(jù)流的流出;各個階段會有若干個處理結(jié)點(diǎn)參與完成,其中,每個處理結(jié)點(diǎn)上會有若干個數(shù)據(jù)處理任務(wù)實(shí)例運(yùn)行,每個數(shù)據(jù)處理任務(wù)實(shí)例則是對一個數(shù)據(jù)處理任務(wù)定義進(jìn)行具體約束后,可推送到某個處理結(jié)點(diǎn)上運(yùn)行的邏輯實(shí)體。在不同的處理結(jié)點(diǎn)之間,數(shù)據(jù)流根據(jù)配置信息進(jìn)行傳輸;在處理結(jié)點(diǎn)內(nèi)部,結(jié)點(diǎn)根據(jù)配置信息對流經(jīng)該結(jié)點(diǎn)的數(shù)據(jù)進(jìn)行處理。

下圖為系統(tǒng)對于流式數(shù)據(jù)的基本處理流程:

1)定義數(shù)據(jù)流:將流式數(shù)據(jù)的處理過程劃分成不同的階段,定義出不同的數(shù)據(jù)流名稱;

2)定義數(shù)據(jù)處理任務(wù):為數(shù)據(jù)流的處理過程定義相應(yīng)的數(shù)據(jù)處理任務(wù),其中,各個處理任務(wù)定義了外部處理邏輯,且其輸入/輸出數(shù)據(jù)流須從1)中預(yù)定義的數(shù)據(jù)流列表中選?。?/p>

3)定義數(shù)據(jù)處理結(jié)點(diǎn):定義各個數(shù)據(jù)處理結(jié)點(diǎn)的名稱及其IPv4地址信息;

4)定義數(shù)據(jù)處理任務(wù)實(shí)例:為3)中定義好的每個處理結(jié)點(diǎn),分別定義運(yùn)行在其上的數(shù)據(jù)處理任務(wù)實(shí)例,其中,每個任務(wù)實(shí)例所對應(yīng)的數(shù)據(jù)處理任務(wù)實(shí)體須從2)中預(yù)定義的處理任務(wù)列表中選??;

5)加載數(shù)據(jù)流的相關(guān)配置信息及訂閱信息(具體格式見“附錄:配置信息格式”),然后開始從數(shù)據(jù)流產(chǎn)生源讀取數(shù)據(jù);

6)運(yùn)行數(shù)據(jù)流源結(jié)點(diǎn)上的任務(wù)實(shí)例:數(shù)據(jù)流處理鏈源結(jié)點(diǎn)上的處理任務(wù)實(shí)例直接對數(shù)據(jù)流產(chǎn)生源的數(shù)據(jù)進(jìn)行處理,然后產(chǎn)生新的輸出數(shù)據(jù)流;

7)運(yùn)行下一級結(jié)點(diǎn)上的任務(wù)實(shí)例:中間的處理結(jié)點(diǎn)上的處理任務(wù)實(shí)例依賴于上一級處理結(jié)點(diǎn)的輸出數(shù)據(jù)流作為輸入數(shù)據(jù)流,從中讀取數(shù)據(jù),進(jìn)行處理,產(chǎn)生輸出數(shù)據(jù)流,并傳遞到下一級處理結(jié)點(diǎn);

8)判斷是否到達(dá)數(shù)據(jù)流末級結(jié)點(diǎn):

  • 如果不是,則繼續(xù)返回步驟7),按照數(shù)據(jù)流的流動關(guān)系,繼續(xù)運(yùn)行下一級結(jié)點(diǎn)上的任務(wù)實(shí)例;

  • 否則為數(shù)據(jù)流末級結(jié)點(diǎn),則進(jìn)行步驟9)。

9)輸出結(jié)果:數(shù)據(jù)流處理鏈末級結(jié)點(diǎn)上的處理任務(wù)不會產(chǎn)生新的數(shù)據(jù)流,完成最終的數(shù)據(jù)處理任務(wù)后將結(jié)果進(jìn)行輸出。

以上是數(shù)據(jù)流在不同處理結(jié)點(diǎn)之間的處理流程。每個處理結(jié)點(diǎn)作為流處理的一個環(huán)節(jié),其結(jié)點(diǎn)內(nèi)部的處理流程如下圖所示:

1)每個處理結(jié)點(diǎn)啟動流處理過程后,開啟網(wǎng)絡(luò)服務(wù),監(jiān)聽從上一級處理結(jié)點(diǎn)發(fā)出的TCP連接請求,接收從上一級結(jié)點(diǎn)發(fā)來的數(shù)據(jù);

2)處理結(jié)點(diǎn)不間斷地接收從上一級處理結(jié)點(diǎn)發(fā)來的數(shù)據(jù),對于每條數(shù)據(jù)記錄,根據(jù)數(shù)據(jù)流名進(jìn)行篩選,將其分發(fā)到該數(shù)據(jù)流所對應(yīng)的處理進(jìn)程中;

3)將從每個特定數(shù)據(jù)流發(fā)來的數(shù)據(jù),廣播到所有以該數(shù)據(jù)流作為輸入數(shù)據(jù)流的數(shù)據(jù)處理任務(wù)實(shí)例中;

4)數(shù)據(jù)處理任務(wù)實(shí)例從其輸入數(shù)據(jù)流中接收數(shù)據(jù),按照過濾條件進(jìn)行篩選,然后將符合過濾條件的數(shù)據(jù)記錄發(fā)送給外部應(yīng)用程序進(jìn)行處理;

5)外部應(yīng)用程序啟動外部處理進(jìn)程,對數(shù)據(jù)進(jìn)行實(shí)際處理過程,并將每條數(shù)據(jù)記錄的處理結(jié)果返回給相應(yīng)的數(shù)據(jù)處理任務(wù)實(shí)例;

6)數(shù)據(jù)處理任務(wù)實(shí)例從外部應(yīng)用程序收集處理后的結(jié)果數(shù)據(jù),并依次將其轉(zhuǎn)發(fā)到對應(yīng)的輸出數(shù)據(jù)流中;

7)輸出數(shù)據(jù)流進(jìn)程接收發(fā)向該數(shù)據(jù)流的數(shù)據(jù),然后按照數(shù)據(jù)流的訂閱關(guān)系,將數(shù)據(jù)發(fā)送到所有訂閱了該數(shù)據(jù)流的下一級處理結(jié)點(diǎn);

8)根據(jù)下一級處理結(jié)點(diǎn)的IP地址和端口號,通過TCP請求與下一級處理結(jié)點(diǎn)建立網(wǎng)絡(luò)連接,然后將數(shù)據(jù)按序傳輸?shù)较乱患壧幚斫Y(jié)點(diǎn)。

二者在數(shù)據(jù)流模型上的不同之處

至于兩個系統(tǒng)的實(shí)現(xiàn)細(xì)節(jié),我們先不去做具體比較,下面僅列出二者在數(shù)據(jù)流模型上的一些不同之處(這里并不是為了全面對比二者的不同之處,只是列出其中的關(guān)鍵部分):

1)  在Storm中,數(shù)據(jù)流Stream是在Topology內(nèi)進(jìn)行定義,并在Topology內(nèi)進(jìn)行傳輸?shù)?;而在上面提到的流處理系統(tǒng)中,數(shù)據(jù)流Stream是在整個系統(tǒng)內(nèi)全局唯一的,可以在整個集群內(nèi)被訂閱。

2)  在Storm中,數(shù)據(jù)流Stream的發(fā)布和訂閱都是靜態(tài)的,所謂靜態(tài)是指數(shù)據(jù)流的發(fā)布與訂閱關(guān)系在向Storm集群提交Topology計(jì)算任務(wù)時,被一次性生成的,這一關(guān)系在Topology的運(yùn)行過程中是不能被改變的;而在上面提到的流處理系統(tǒng)中,數(shù)據(jù)流Stream的發(fā)布和訂閱都是動態(tài)的,即數(shù)據(jù)處理任務(wù)task可以動態(tài)的發(fā)布Stream,也可以動態(tài)的訂閱系統(tǒng)內(nèi)已經(jīng)生成的任意Stream,數(shù)據(jù)流的訂閱關(guān)于通過分布式應(yīng)用程序協(xié)調(diào)服務(wù)ZooKeeper集群的動態(tài)節(jié)點(diǎn)來維護(hù)管理。

好了,有了以上的對比,我們不難發(fā)現(xiàn),對于本文所舉的應(yīng)用場景實(shí)例,Storm的數(shù)據(jù)流模式尚不能很方便的支持,而在這里提到的這個流處理系統(tǒng)的全局?jǐn)?shù)據(jù)流模型下,這一應(yīng)用場景的需求可以很方便的滿足。

總結(jié)的話

個人覺得,Storm有必要實(shí)現(xiàn)不同Topology之間Stream的共享,這個至少可以在不損失Storm現(xiàn)有功能的前提下,使得Storm在處理實(shí)際生產(chǎn)環(huán)境下的一些應(yīng)用場景時更加從容應(yīng)對。

至于如何在現(xiàn)有Storm的基礎(chǔ)上實(shí)現(xiàn)這一需求,可能的方式很多。一種簡單的方式是通過Zookeeper來集中存儲、動態(tài)感知Topology之間Stream的“發(fā)布-訂閱”關(guān)系,同時在Storm的消息分發(fā)過程中對這種情況加以處理。

以上觀點(diǎn),如果不對之處,歡迎大家指出。

原文鏈接:http://www.cnblogs.com/panfeng412/archive/2012/07/29/storm-stream-model-analysis-and-discussion.html

【編輯推薦】

  1. Storm源碼淺析之topology的提交
  2. PHP集成開發(fā)工具PHPStorm 3.0發(fā)布
  3. 漫談Java開源5年:自由但帶著枷鎖
  4. JavaFX2.0網(wǎng)格布局窗格GridPane
  5. BicaVM:基于JavaScript的JVM-為什么呢?
責(zé)任編輯:彭凡 來源: 博客園
相關(guān)推薦

2011-12-14 15:57:13

javanio

2023-02-16 08:00:00

數(shù)據(jù)流客戶端開發(fā)數(shù)據(jù)集

2010-04-30 09:53:34

Unix系統(tǒng)

2024-04-18 09:02:11

數(shù)據(jù)流Mixtral混合模型

2016-11-14 19:01:36

數(shù)據(jù)流聊天系統(tǒng)web

2009-08-19 10:41:12

Java輸入數(shù)據(jù)流

2022-03-18 08:57:17

前端數(shù)據(jù)流選型

2017-11-16 19:26:34

海量數(shù)據(jù)算法計(jì)算機(jī)

2021-10-27 10:43:36

數(shù)據(jù)流中位數(shù)偶數(shù)

2011-04-14 14:43:38

SSISTransformat

2019-12-19 14:38:08

Flink SQL數(shù)據(jù)流Join

2011-04-19 09:18:02

SSIS數(shù)據(jù)轉(zhuǎn)換

2013-10-21 10:58:50

微軟大數(shù)據(jù)SQL Server

2009-07-15 09:06:11

Linux圖形系統(tǒng)X11的CS架構(gòu)

2014-02-11 08:51:15

亞馬遜PaaSAppStream

2020-02-06 19:12:36

Java函數(shù)式編程編程語言

2014-12-02 10:56:47

TCPIP交互數(shù)據(jù)流

2021-06-29 19:24:42

數(shù)據(jù)流數(shù)據(jù)排序

2021-07-12 18:10:10

數(shù)據(jù)流阿里云

2020-10-21 10:51:43

數(shù)據(jù)分析
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號