Apache Beam 是什么,它為什么比其他選擇更受歡迎?
1. 概述
在本教程中,我們將介紹 Apache Beam 并探討其基本概念。我們將首先演示使用 Apache Beam 的用例和好處,然后介紹基本概念和術(shù)語。之后,我們將通過一個(gè)簡(jiǎn)單的例子來說明 Apache Beam 的所有重要方面。
2. Apache Beam是個(gè)啥?
Apache Beam(Batch+strEAM)是一個(gè)用于批處理和流式數(shù)據(jù)處理作業(yè)的統(tǒng)一編程模型。它提供了一個(gè)軟件開發(fā)工具包,用于定義和構(gòu)建數(shù)據(jù)處理管道以及執(zhí)行這些管道的運(yùn)行程序。
Apache Beam旨在提供一個(gè)可移植的編程層。事實(shí)上,Beam管道運(yùn)行程序?qū)?shù)據(jù)處理管道轉(zhuǎn)換為與用戶選擇的后端兼容的API。目前,支持這些分布式處理后端有:
- Apache Apex
- Apache Flink
- Apache Gearpump (incubating)
- Apache Samza
- Apache Spark
- Google Cloud Dataflow
- Hazelcast Jet
3. 為啥選擇 Apache Beam
Apache Beam 將批處理和流式數(shù)據(jù)處理融合在一起,而其他組件通常通過單獨(dú)的 API 來實(shí)現(xiàn)這一點(diǎn) 。因此,很容易將流式處理更改為批處理,反之亦然,例如,隨著需求的變化。
Apache Beam 提高了可移植性和靈活性。我們關(guān)注的是邏輯,而不是底層的細(xì)節(jié)。此外,我們可以隨時(shí)更改數(shù)據(jù)處理后端。
Apache Beam 可以使用 Java、Python、Go和 Scala等SDK。事實(shí)上,團(tuán)隊(duì)中的每個(gè)人都可以使用他們選擇的語言。
4. 基本概念
使用 Apache Beam,我們可以構(gòu)建工作流圖(管道)并執(zhí)行它們。編程模型中的關(guān)鍵概念是:
- PCollection–表示可以是固定批處理或數(shù)據(jù)流的數(shù)據(jù)集
- PTransform–一種數(shù)據(jù)處理操作,它接受一個(gè)或多個(gè) PCollections 并輸出零個(gè)或多個(gè) PCollections。
- Pipeline–表示 PCollection 和 PTransform 的有向無環(huán)圖,因此封裝了整個(gè)數(shù)據(jù)處理作業(yè)。
- PipelineRunner–在指定的分布式處理后端上執(zhí)行管道。
簡(jiǎn)單地說,PipelineRunner 執(zhí)行一個(gè)管道,管道由 PCollection 和 PTransform 組成。
5. 字?jǐn)?shù)統(tǒng)計(jì)示例
現(xiàn)在我們已經(jīng)學(xué)習(xí)了 Apache Beam 的基本概念,讓我們?cè)O(shè)計(jì)并測(cè)試一個(gè)單詞計(jì)數(shù)任務(wù)。
5.1 建造梁式管道
設(shè)計(jì)工作流圖是每個(gè) Apache Beam 作業(yè)的第一步,單詞計(jì)數(shù)任務(wù)的步驟定義如下:
- 1.從原文中讀課文。
- 2.把課文分成單詞表。
- 3.所有單詞都小寫。
- 4.刪去標(biāo)點(diǎn)符號(hào)。
- 5.過濾停止語。
- 6.統(tǒng)計(jì)唯一單詞數(shù)量。
為了實(shí)現(xiàn)這一點(diǎn),我們需要使用 PCollection 和 PTransform 抽象將上述步驟轉(zhuǎn)換為 管道 。
5.2. 依賴
在實(shí)現(xiàn)工作流圖之前,先添加 Apache Beam的依賴項(xiàng) 到我們的項(xiàng)目:
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- <version>${beam.version}</version>
- </dependency>
Beam管道運(yùn)行程序依賴于分布式處理后端來執(zhí)行任務(wù)。我們添加 DirectRunner 作為運(yùn)行時(shí)依賴項(xiàng):
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-direct-java</artifactId>
- <version>${beam.version}</version>
- <scope>runtime</scope>
- </dependency>
與其他管道運(yùn)行程序不同,DirectRunner 不需要任何額外的設(shè)置,這對(duì)初學(xué)者來說是個(gè)不錯(cuò)的選擇。
5.3. 實(shí)現(xiàn)
Apache Beam 使用 Map-Reduce 編程范式 ( 類似 Java Stream)。講下面內(nèi)容之前,最好 對(duì) reduce(), filter(), count(), map(), 和 flatMap() 有個(gè)基礎(chǔ)概念和認(rèn)識(shí)。
首先要做的事情就是 創(chuàng)建管道:
- PipelineOptions options = PipelineOptionsFactory.create();
- Pipeline p = Pipeline.create(options);
六步單詞計(jì)數(shù)任務(wù):
- PCollection<KV<String, Long>> wordCount = p
- .apply("(1) Read all lines",
- TextIO.read().from(inputFilePath))
- .apply("(2) Flatmap to a list of words",
- FlatMapElements.into(TypeDescriptors.strings())
- .via(line -> Arrays.asList(line.split("\\s"))))
- .apply("(3) Lowercase all",
- MapElements.into(TypeDescriptors.strings())
- .via(word -> word.toLowerCase()))
- .apply("(4) Trim punctuations",
- MapElements.into(TypeDescriptors.strings())
- .via(word -> trim(word)))
- .apply("(5) Filter stopwords",
- Filter.by(word -> !isStopWord(word)))
- .apply("(6) Count words",
- Count.perElement());
apply() 的第一個(gè)(可選)參數(shù)是一個(gè)String,它只是為了提高代碼的可讀性。下面是上述代碼中每個(gè) apply() 的作用:
首先,我們使用 TextIO 逐行讀取輸入文本文件。
將每一行按空格分開,把它映射到一個(gè)單詞表上。
單詞計(jì)數(shù)不區(qū)分大小寫,所以我們將所有單詞都小寫。
之前,我們用空格分隔行,但是像“word!“和”word?"這樣的,就需要?jiǎng)h除標(biāo)點(diǎn)符號(hào)。
像“is”和“by”這樣的停止詞在幾乎每一篇英語文章中都很常見,所以我們將它們刪除。
最后,我們使用內(nèi)置函數(shù) Count.perElement() 計(jì)算唯一單詞數(shù)量。
如前所述,管道是在分布式后端處理的。不可能在內(nèi)存中的PCollection上迭代,因?yàn)樗植荚诙鄠€(gè)后端。相反,我們將結(jié)果寫入外部數(shù)據(jù)庫(kù)或文件。
首先,我們將PCollection轉(zhuǎn)換為String。然后,使用TextIO編寫輸出:
- wordCount.apply(MapElements.into(TypeDescriptors.strings())
- .via(count -> count.getKey() + " --> " + count.getValue()))
- .apply(TextIO.write().to(outputFilePath));
現(xiàn)在管道 已經(jīng)定義好了,接下來做個(gè)簡(jiǎn)單的測(cè)試。
5.4. 運(yùn)行測(cè)試
到目前為止,我們已為單詞計(jì)數(shù)任務(wù)定義了管道,現(xiàn)在運(yùn)行管道:
- p.run().waitUntilFinish();
在這行代碼中,Apache Beam 將把我們的任務(wù)發(fā)送到多個(gè) DirectRunner 實(shí)例。因此,最后將生成幾個(gè)輸出文件。它們將包含以下內(nèi)容:
- ...
- apache --> 3
- beam --> 5
- rocks --> 2
- ...
在 Apache Beam 中定義和運(yùn)行分布式作業(yè)是如此地簡(jiǎn)單。為了進(jìn)行比較,單詞計(jì)數(shù)實(shí)現(xiàn)在 Apache Spark, Apache Flink 和 Hazelcast-Jet 上也有
6. 結(jié)語
在本教程中,我們了解了 Apache Beam 是什么,以及它為什么比其他選擇更受歡迎。我們還通過一個(gè)單詞計(jì)數(shù)示例演示了 Apache Beam 的基本概念。
本文轉(zhuǎn)載自微信公眾號(hào)「鍋外的大佬」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系鍋外的大佬公眾號(hào)。