偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

實(shí)時(shí)計(jì)算和數(shù)據(jù)轉(zhuǎn)換,為何Yelp棄用Storm和Heron,自建流處理器PaaStorm?

大數(shù)據(jù) PaaS
Yelp開源了一個(gè)名叫MRJob的框架,是用來(lái)在AWS基礎(chǔ)設(shè)施上運(yùn)行大MapReduce Job的。不幸的是,隨著使用MRJob的服務(wù)數(shù)量巨增,運(yùn)行和調(diào)度任務(wù)開始變得越來(lái)越復(fù)雜。

[[172563]]

美中不足

在2010年時(shí),Yelp開源了一個(gè)名叫MRJob的框架,是用來(lái)在AWS基礎(chǔ)設(shè)施上運(yùn)行大MapReduce Job的。Yelp的工程師們用MRJob實(shí)現(xiàn)了很多功能,從廣告推送到翻譯,比比皆是。事實(shí)證明,MRJob是一個(gè)非常強(qiáng)大的工具,可以在我們當(dāng)時(shí)豐富的數(shù)據(jù)集合上完成計(jì)算和聚集操作。

不幸的是,隨著使用MRJob的服務(wù)數(shù)量巨增,運(yùn)行和調(diào)度任務(wù)開始變得越來(lái)越復(fù)雜。由于很多任務(wù)都是要依賴上游任務(wù)的,所以就要好好地安排整個(gè)系統(tǒng)的拓?fù)?。MapReduce任務(wù)并不是用于實(shí)時(shí)處理的,所以任務(wù)的拓?fù)湟刻煺{(diào)度一次。更糟的是,萬(wàn)一上游的任務(wù)失敗了,下游的也會(huì)失敗,最終會(huì)輸出錯(cuò)誤的結(jié)果。因此就要有非常專業(yè)的能力來(lái)判斷應(yīng)該從哪個(gè)任務(wù)開始、以什么順序重新運(yùn)行,最終輸出正確的結(jié)果。

愛思考的人就會(huì)問(wèn)了:我們有沒有什么辦法來(lái)更高效地完成計(jì)算和轉(zhuǎn)換任務(wù)呢?我們還想支持一個(gè)復(fù)雜的數(shù)據(jù)流中不同數(shù)據(jù)轉(zhuǎn)換操作之間的依賴關(guān)系,尤其是要能優(yōu)雅地處理模式改變及上游的故障。我們還希望系統(tǒng)能實(shí)時(shí)或者近實(shí)時(shí)地運(yùn)行。這樣,系統(tǒng)就可以用于業(yè)務(wù)分析及指標(biāo)監(jiān)控。換句話說(shuō),我們需要的是一個(gè)流處理器。

Storm之類現(xiàn)成的計(jì)算系統(tǒng)本來(lái)也是非常不錯(cuò)的。但由于許多主流的流處理框架對(duì)Python的支持都不太好,因此要把我們的其他后臺(tái)程序與Storm或者其他現(xiàn)有流處理系統(tǒng)結(jié)合起來(lái)就會(huì)非常痛苦。

我們***用的是Pyleus,這是一個(gè)讓開發(fā)者可以用Python處理和轉(zhuǎn)換數(shù)據(jù)的開源框架。Pyleus的底層仍然是使用Storm的,構(gòu)建耗時(shí)比較久,運(yùn)行得也慢。Twitter Heron宣布開源后,我們發(fā)現(xiàn)我們也碰上了許多他們碰到過(guò)的問(wèn)題。Yelp自己有功能非常強(qiáng)大的用于部署服務(wù)的Platform-as-a-Service平臺(tái)PaasTA,相比之下我們更喜歡使用PaaSTA,而不是運(yùn)行專用的Storm集群。

從2015年7月開始,有一幫工程師們開始研發(fā)一種新型的數(shù)據(jù)倉(cāng)庫(kù),也碰上了典型的擴(kuò)展和性能問(wèn)題。最開始時(shí)他們想用Pyleus來(lái)先清洗數(shù)據(jù),再拷貝到Redshift上。后來(lái)他們意識(shí)到部署一整套Storm集群來(lái)運(yùn)行些簡(jiǎn)單的Python邏輯實(shí)在太沒必要了:用Yelp自己的運(yùn)行服務(wù)的平臺(tái)去部署一套基于Python的流處理器就足夠了。我們的流處理器是基于Samza設(shè)計(jì)的,目的是提供一些簡(jiǎn)單的接口,用一種“處理消息”的方法來(lái)做數(shù)據(jù)轉(zhuǎn)換。

工程師們?cè)贖ackathon 17上構(gòu)建了運(yùn)行在PyPy上的流處理器的原型,這樣PassStorm就誕生了。

這名字中有什么含義?

PaaStorm的名字其實(shí)是PaaSTA和Storm的組合。那PaaStorm到底是干什么的呢?要回答這個(gè)問(wèn)題,咱們先看看數(shù)據(jù)管道的基本架構(gòu):

主要看看“Transformer”那一步,就會(huì)知道大多數(shù)存儲(chǔ)在Kafka中的消息都并不能直接被導(dǎo)入目標(biāo)系統(tǒng)。設(shè)想有一套R(shí)edshift集群是用來(lái)存儲(chǔ)廣告推送數(shù)據(jù)的。廣告推送集群想存儲(chǔ)的只是上游系統(tǒng)的某一個(gè)字段(比如某個(gè)業(yè)務(wù)的平均權(quán)重),否則它就要保存原始數(shù)據(jù)并對(duì)其進(jìn)行聚合計(jì)算。如果Redhift廣告推送集群要存儲(chǔ)所有上游數(shù)據(jù)的話,就會(huì)浪費(fèi)存儲(chǔ)空間,導(dǎo)致系統(tǒng)性能降低。

在過(guò)去,各個(gè)服務(wù)都會(huì)寫復(fù)雜的MapReduce任務(wù),在把數(shù)據(jù)寫到目標(biāo)數(shù)據(jù)存儲(chǔ)之前先進(jìn)行數(shù)據(jù)處理??墒牵@些MapReduce任務(wù)都碰到了上文所述的性能和擴(kuò)展問(wèn)題。數(shù)據(jù)管道給大家提供的好處之一是消費(fèi)者程序可以拿到它所需要的數(shù)據(jù)的形式,不管上游數(shù)據(jù)本來(lái)是什么樣。

減少示例代碼

本來(lái)我們是可以讓每個(gè)消費(fèi)者程序自己按自己需要的方式做數(shù)據(jù)轉(zhuǎn)換的。比如,廣告推送系統(tǒng)可以自己寫一個(gè)轉(zhuǎn)換服務(wù),從Kafka中的業(yè)務(wù)數(shù)據(jù)中提取出查看統(tǒng)計(jì)量,并自己維護(hù)這個(gè)轉(zhuǎn)換服務(wù)的。這種辦法最初工作得很好,但最終系統(tǒng)上規(guī)模時(shí)我們就碰上問(wèn)題了。

我們想提供一個(gè)轉(zhuǎn)換框架是基于以下考慮:

  • 很多轉(zhuǎn)換邏輯是通用的,可以在多個(gè)團(tuán)隊(duì)之間共享。比如把標(biāo)志位轉(zhuǎn)換成有意義的字段。
  • 這樣的轉(zhuǎn)換邏輯通常會(huì)需要很多示例代碼。比如連接數(shù)據(jù)源或數(shù)據(jù)目的、保存狀態(tài)、監(jiān)控吞吐量、故障恢復(fù)等。這樣的代碼本來(lái)并不需要在各種服務(wù)之間拷來(lái)拷去。
  • 要保證能對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)處理的話,數(shù)據(jù)轉(zhuǎn)換操作要盡可能地快,要基于流。
  • 減少示例代碼最自然的方式就是提供一個(gè)轉(zhuǎn)換接口。大家的服務(wù)實(shí)現(xiàn)接口中完成一次轉(zhuǎn)換操作的具體邏輯,然后,剩下的工作就由我們的流處理框架完成。

把Kafka作為消息總線

最初PaaStorm是一個(gè)Kafka-to-Kafka的轉(zhuǎn)換框架,慢慢地才演進(jìn)成也支持了其他類型的終端節(jié)點(diǎn)。把Kafka做為PaaStorm的終端節(jié)點(diǎn)簡(jiǎn)化了很多東西:每個(gè)對(duì)數(shù)據(jù)感興趣的服務(wù)都可以注冊(cè)到Topic上,關(guān)注任意轉(zhuǎn)換過(guò)的數(shù)據(jù)或者原始數(shù)據(jù),有新消息到來(lái)就處理就好了,完全不必在意是誰(shuí)創(chuàng)建了這個(gè)Topic。轉(zhuǎn)換過(guò)的數(shù)據(jù)按Kafka的保留策略持久化。因?yàn)镵afka是一個(gè)發(fā)布-訂閱系統(tǒng),下游系統(tǒng)也可以在任何它想的時(shí)候消費(fèi)數(shù)據(jù)。

用Storm處理一切

當(dāng)采用了PaaStorm之后,我們?cè)撛鯓影盐覀兊腒afka Topic之間的關(guān)系可視化呢?因?yàn)橛行㏕opic中的數(shù)據(jù)會(huì)按照源到端的方式流向別的Topic,我們可以把我們的拓?fù)浣Y(jié)構(gòu)當(dāng)成一個(gè)有向無(wú)環(huán)圖:

每個(gè)節(jié)點(diǎn)都是一個(gè)Kafka Topic,箭頭表示PaaStorm提供的轉(zhuǎn)換操作。這時(shí)候“PaaStorm”這個(gè)名字就變得更有意義了:象Storm一樣,PaaStorm通過(guò)轉(zhuǎn)換模塊(象Bolt一樣)提供對(duì)數(shù)據(jù)流的源(象Spout一樣)的實(shí)時(shí)轉(zhuǎn)換。

PaaStorm內(nèi)部機(jī)制

PaaStorm的核心抽象叫做Spolt(Spout和Bolt的結(jié)合物)。象名字表示的一樣,Spolt接口也定義了兩個(gè)重要的東西:一個(gè)輸入數(shù)據(jù)源,一種對(duì)那個(gè)源的消息數(shù)據(jù)進(jìn)行的某種處理。

下面例子定義了一個(gè)最簡(jiǎn)單的Spolt:

這個(gè)Spolt會(huì)處理“refresh_primary.business.abc123efg456”這個(gè)Topic中的每一條消息,增加一個(gè)字段,保存原始消息中的‘name’字段的大寫的值,然后再把這條處理過(guò)的新版本的消息發(fā)送出去。

值得一提的是數(shù)據(jù)管道中的所有消息都是不可修改的。要得到一條修改過(guò)的消息,就要?jiǎng)?chuàng)建一個(gè)新的對(duì)象。而且,因?yàn)槲覀冊(cè)跒橄Ⅲw中增加一個(gè)新字段(就是那個(gè)增加的“大寫字母的name”字段),新消息的模式已經(jīng)改變了。在生產(chǎn)環(huán)境中,消息的模式ID是從來(lái)都不能寫死的。我們要依靠Schematizer服務(wù)來(lái)為一條修改過(guò)的消息注冊(cè)并提供合適的模式。

***提一句,數(shù)據(jù)管道的客戶端庫(kù)提供了好幾種非常相似的用名字空間、Topic名、源名和模式ID的組合來(lái)生成“spolt_source”的方法。這樣就可以很容易地讓某個(gè)Spolt去找到它需要的所有源并從中讀取數(shù)據(jù)。要了解更多信息,請(qǐng)參考Schematizer的文章。

與Kafka相關(guān)的處理是怎樣的?

也許你已經(jīng)發(fā)現(xiàn)上面的Spolt中沒有什么代碼是與Kafka Topic相交互的。這是因?yàn)樵赑aaStorm中,所有真正的Kafka接口相關(guān)處理都是由一個(gè)內(nèi)部實(shí)例(恰好也叫PaaStorm)完成的。PaaStorm實(shí)例會(huì)把一個(gè)特定的Spolt與對(duì)應(yīng)的源和目的關(guān)聯(lián)起來(lái),并把消息送給Spolt處理,再把Spolt輸出的消息發(fā)布到正確的Topic上去。

每個(gè)PaaStorm實(shí)例都用一個(gè)Spolt初始化。比如,下面的命令就用上文中定義的UppercaseNameSpolt開啟了一次處理:

  1. PaaStorm(UppercaseNameSpolt()).start() 

這就意味著所有有意寫一個(gè)新轉(zhuǎn)換器的人都可以簡(jiǎn)單地定義一個(gè)新的Spolt子類,壓根不用修改任何PaaStorm運(yùn)行體相關(guān)的東西。

從內(nèi)部來(lái)看,PaaStorm運(yùn)行體的主方法也是驚人的簡(jiǎn)單,偽碼如下:

這個(gè)運(yùn)行體先做了一些設(shè)置:初始化了生產(chǎn)者和消費(fèi)者,以及消息計(jì)數(shù)器。然后,它一直等待上游Topic中的新數(shù)據(jù)。如果有新數(shù)據(jù)到來(lái),就用Spolt處理它。Spolt處理之后會(huì)輸出一條或多條消息,生產(chǎn)者再把它發(fā)布到下游的Topic。

另外簡(jiǎn)單提一下,PaaStorm運(yùn)行體也提供了比如消費(fèi)者注冊(cè)、心跳機(jī)制(名叫“tick”)等。比如某個(gè)Spolt要經(jīng)常性地清空它的內(nèi)容,那就可以用tick來(lái)觸發(fā)。

關(guān)于狀態(tài)保存

PaaStorm保證可以可靠地從故障中恢復(fù)。萬(wàn)一發(fā)生了崩潰,我們就該從正確的偏移位置開始重新消費(fèi)。但不幸的是,這個(gè)正確的偏移量一般情況下都并不是我們從上游的Topic中消費(fèi)的***那一條消息。原因是雖然我們已經(jīng)消費(fèi)了它,但事實(shí)上我們還沒來(lái)得及把轉(zhuǎn)換后的版本發(fā)布出去。

所以重新啟動(dòng)時(shí)正確的位置應(yīng)該是上游Topic與已經(jīng)成功發(fā)布到下游的***一條消息對(duì)應(yīng)的位置。在知道發(fā)到下游的***一條消息的情況之后,我們需要知道它對(duì)應(yīng)的上游的消息是哪一條,這樣就可以從那里恢復(fù)了。

為了方便實(shí)現(xiàn)這個(gè)功能,PaaStorm的Spolt在處理一條原始消息時(shí),會(huì)把與這條原始消息相對(duì)應(yīng)的在上游Topic中的Kafka偏移量也加到轉(zhuǎn)換后的包里。轉(zhuǎn)換后的消息隨后會(huì)在生產(chǎn)者的回調(diào)函數(shù)中把這個(gè)偏移量傳回來(lái)。這樣,我們就可以知道與下游Topic中***一條消息對(duì)應(yīng)的上游Topic的偏移量了。因?yàn)榛卣{(diào)函數(shù)只有在生產(chǎn)者成功地把轉(zhuǎn)換后的消息發(fā)布出去之后才會(huì)調(diào)用,也就意味著原始消息已經(jīng)被成功處理了,在這種情況下,消費(fèi)者就可以很放心的在那個(gè)回調(diào)函數(shù)中提交這個(gè)偏移量了。萬(wàn)一發(fā)生崩潰,我們可以直接從還沒有被完全處理的上游消息那里開始繼續(xù)處理。

從上面的偽碼中可以看到,PaaStorm也會(huì)統(tǒng)計(jì)消費(fèi)掉的消息數(shù)和發(fā)布的消息數(shù)。這樣,感興趣的用戶可以檢查上游和下游Topic中的吞吐量。這讓我們很輕松地有了對(duì)任意轉(zhuǎn)換操作的監(jiān)控和性能檢查功能。在Yelp,我們是把我們的統(tǒng)計(jì)信息發(fā)給SignalFX的:

SignalFX圖可以顯示出在一個(gè)PaaStorm實(shí)例中生產(chǎn)者和消費(fèi)者的吞吐量。在這個(gè)例子中,輸入輸出消息量并不匹配。

在PaaStorm中對(duì)生產(chǎn)者和消費(fèi)者分開做統(tǒng)計(jì)的好處之一是我們可以把這兩個(gè)吞吐量放在一起,看看瓶頸是在哪里。如果到不了這個(gè)粒度,是很難發(fā)現(xiàn)管道中的性能問(wèn)題的。

PaaStorm的未來(lái)

PaaStorm提供了兩個(gè)東西:一個(gè)接口,并實(shí)現(xiàn)了一套框架來(lái)支持這個(gè)接口。盡管我們并不希望PaaStorm的接口很快就被改動(dòng),但已經(jīng)有一些孵化項(xiàng)目在計(jì)劃解決“轉(zhuǎn)換并連接”的問(wèn)題了。在將來(lái),我們希望能把PaaStorm的內(nèi)部換成Kafka Stream或者Apache Beam,主要的障礙是對(duì)Python的支持程度如何,我們尤其看重的是對(duì)終端節(jié)點(diǎn)的支持??傊谟虚_源的Python流處理項(xiàng)目成熟之前,我們會(huì)一直把PaaStorm用下去。

我們系列的下一篇

我們已經(jīng)討論了PaaStorm是如何從源到目的做數(shù)據(jù)的實(shí)時(shí)轉(zhuǎn)換的。PaaStorm的最初設(shè)計(jì)是做一個(gè)Kafka-to-Kafka的系統(tǒng),可事實(shí)上許多內(nèi)部服務(wù)并不是要把數(shù)據(jù)輸出到Kafka的,它們可能會(huì)把數(shù)據(jù)導(dǎo)入Redshift或MySQL之類的數(shù)據(jù)存儲(chǔ)然后再做業(yè)務(wù)相關(guān)的東西。即使數(shù)據(jù)已經(jīng)被轉(zhuǎn)成了需要的格式,也還需要進(jìn)一步:數(shù)據(jù)要被上傳到目標(biāo)數(shù)據(jù)存儲(chǔ)中。

回顧一下上文的內(nèi)容就會(huì)發(fā)現(xiàn),PaaStorm的Spolt接口其實(shí)并沒有限定必須輸出到Kafka中。事實(shí)上,只需要少量的改動(dòng),Spolt就可以直接把消息發(fā)布到Kafka之外的系統(tǒng)中。在后續(xù)的文章里,我們會(huì)談?wù)刌elp的Salesforce Connector:一個(gè)用PaaStorm來(lái)大量、高效地把數(shù)據(jù)從Kafka導(dǎo)入Salesforce的服務(wù)。

責(zé)任編輯:武曉燕 來(lái)源: 36大數(shù)據(jù)
相關(guān)推薦

2016-12-08 14:41:59

流處理器PaaStormKafka

2015-10-09 13:42:26

hbase實(shí)時(shí)計(jì)算

2019-04-15 15:55:36

微軟SurfaceIntel

2014-02-14 15:49:03

storm安裝部署

2023-11-13 11:01:25

數(shù)據(jù)技術(shù)

2021-06-03 08:10:30

SparkStream項(xiàng)目Uv

2012-12-06 10:59:51

大數(shù)據(jù)

2013-08-29 14:12:52

Storm分布式實(shí)時(shí)計(jì)算

2023-09-26 09:29:08

Java數(shù)據(jù)

2019-06-27 09:12:43

FlinkStorm框架

2015-07-31 10:35:18

實(shí)時(shí)計(jì)算

2021-03-10 08:22:47

FlinktopN計(jì)算

2022-12-29 09:13:02

實(shí)時(shí)計(jì)算平臺(tái)

2021-07-05 10:48:42

大數(shù)據(jù)實(shí)時(shí)計(jì)算

2019-12-13 08:25:26

FlinkSpark Strea流數(shù)據(jù)

2020-11-20 15:04:17

芯片手機(jī)電腦

2021-09-30 12:55:44

數(shù)據(jù)處理流處理引擎

2022-03-01 08:40:34

StormHadoop批處理

2011-04-14 14:43:38

SSISTransformat

2015-08-31 14:27:52

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)