Kafka都沒(méi)整明白,還敢去面試?
原創(chuàng)【51CTO.com原創(chuàng)稿件】Apache Kafka 被譽(yù)為時(shí)下熱門(mén)的企業(yè)級(jí)消息傳遞系統(tǒng),其初衷是一個(gè)分布式流系統(tǒng),用于發(fā)布和訂閱記錄流,以其快速,高可擴(kuò)展性以及較完美的容錯(cuò)效果備受業(yè)內(nèi)人士青睞。
圖片來(lái)自 Pexels
放眼當(dāng)下數(shù)據(jù)為王的時(shí)代,深入了解 Apache Kafka 及其常見(jiàn)的部署應(yīng)用,快速實(shí)現(xiàn)數(shù)據(jù)架構(gòu)(Kafka Fast Data Architecture)已是大勢(shì)所趨,刻不容緩。
以下分別 Kafka 架構(gòu),四大核心 API,典型應(yīng)用場(chǎng)景,Kafka 代理與消息主題,集群的創(chuàng)建,流 APIs(Stream APIs)及其處理模式等不同方面展開(kāi)詳細(xì)介紹。
Kafka:分布式流平臺(tái)
Kafka 是一個(gè)分布式流平臺(tái),用于發(fā)布和訂閱消息流(也稱(chēng)記錄流或數(shù)據(jù)流),快速有效地利用 I/O 進(jìn)行數(shù)據(jù)流的批處理,壓縮及解耦,并將數(shù)據(jù)流傳輸?shù)綌?shù)據(jù)池,應(yīng)用程序和實(shí)時(shí)流分析系統(tǒng)中。
Kafka 將主題消息分區(qū)復(fù)制到多個(gè)服務(wù)器中,允許用戶(hù)通過(guò)自己的應(yīng)用程序來(lái)處理這些記錄。
Kafka 四大核心 APIs
Kafka 由記錄(records),主題(topics),使用者(consumers),生產(chǎn)者(producers),代理服務(wù)(brokers),日志(logs),分區(qū)(partitions)和集群(clusters)組成。
Kafka 主題是一個(gè)記錄流,每個(gè)主題都有對(duì)應(yīng)的日志,該日志是該主題在磁盤(pán)上的存儲(chǔ),每個(gè)主題日志又分為多個(gè)分區(qū)和片段。
Kafka Producer API 用于生成數(shù)據(jù)記錄流。Kafka Consumer API 用于消費(fèi)來(lái)自 Kafka 的記錄流。
Broker 是在 Kafka 集群中運(yùn)行的 Kafka 服務(wù)器,Kafka 集群由多個(gè)代理服務(wù)器組成。
①生產(chǎn)者 API(Producer API):消息的生產(chǎn)者,向 Kafka broker 發(fā)消息的客戶(hù)端。
允許客戶(hù)端與集群中運(yùn)行著的 Kafka 服務(wù)器相連接,并將記錄流發(fā)布到一個(gè)或多個(gè) Kafka topics(消息主題)中。
一臺(tái) Kafka 服務(wù)器就是一個(gè) broker,一個(gè)集群由多個(gè) broker 組成,一個(gè) broker 可以容納多個(gè) topic。
②消費(fèi)者 API(Consumer API):消息消費(fèi)者,向 Kafka broker 獲取消息的客戶(hù)端。
允許客戶(hù)端連接集群中運(yùn)行著的 Kafka 服務(wù)器,并消費(fèi)其中一個(gè)或多個(gè) Kafka topics(消息主題)的記錄流。
③流 API(Stream API):充當(dāng)流處理器,用于輸入輸出流的轉(zhuǎn)換。
允許客戶(hù)端充當(dāng)流處理器,從一個(gè)或多個(gè) topics(消息主題)消費(fèi)輸入流,并生產(chǎn)輸出流,輸出到一個(gè)或多個(gè)其他 topics(消息主題)中,從而有效地將輸入流轉(zhuǎn)換至輸出流。
④連接器 API(Connector API):允許編寫(xiě)可重用的生產(chǎn)者和消費(fèi)者代碼。
我們可以從任何關(guān)系型數(shù)據(jù)庫(kù)中讀取數(shù)據(jù),并將其發(fā)布到主題中,同時(shí)也可以“消費(fèi)”這個(gè)主題中的數(shù)據(jù),并將其寫(xiě)入關(guān)系型數(shù)據(jù)庫(kù)。
由此可見(jiàn),Connector API 支持構(gòu)建和運(yùn)行可重復(fù)使用的生產(chǎn)者或消費(fèi)者,并將 topic 連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)。(例如,就關(guān)系型數(shù)據(jù)庫(kù)而言,其連接器可以捕獲到各個(gè)表中的每個(gè)變化。)
Kafka應(yīng)用場(chǎng)景
消息系統(tǒng)
Kafka 作為企業(yè)消息傳遞系統(tǒng),通過(guò)源系統(tǒng)及目標(biāo)系統(tǒng)間的分離來(lái)實(shí)現(xiàn)數(shù)據(jù)交換。與 JMS 相比,Kafka 兼具高吞吐量分區(qū)及高可靠容錯(cuò)力的復(fù)制功能。
Web 站點(diǎn)活動(dòng)跟蹤
跟蹤記錄用戶(hù)在網(wǎng)站上的所有事件信息,從而進(jìn)行數(shù)據(jù)的分析及脫機(jī)處理。
日志匯總
用于處理來(lái)自不同系統(tǒng)的日志,尤其是那些處于微服務(wù)架構(gòu)分布式環(huán)境中的系統(tǒng),這類(lèi)系統(tǒng)通常部署在不同的主機(jī)上,因此 Kafka 需要匯總來(lái)自不同系統(tǒng)的各類(lèi)日志,進(jìn)而對(duì)這些日志集中進(jìn)行分析處理。
指標(biāo)收集
Kafka 可用于收集來(lái)自各類(lèi)系統(tǒng)/網(wǎng)絡(luò)的指標(biāo),并進(jìn)行監(jiān)控,Kafka 配有專(zhuān)門(mén)的指標(biāo)報(bào)告生成工具,如 Ganglia,Graphite 等。
Kafka Brokers & Kafka Topics
Kafka Broker(代理服務(wù)器)
Kafka 集群中的一個(gè)實(shí)例稱(chēng)之為代理(服務(wù)器),在 Kafka 集群中,只要連接其中任意一個(gè)代理(服務(wù)器)就能訪(fǎng)問(wèn)到整個(gè)集群,每個(gè)代理在集群中通過(guò) ID 進(jìn)行標(biāo)識(shí)。
Kafka Topics(消息主題)
一個(gè)消息主題(Topic)是一個(gè)消息記錄發(fā)布后的邏輯名稱(chēng),在 Kafka 中,Topic 被分為若干個(gè)分區(qū)(Partitions),用于消息的發(fā)布。
這些分區(qū)分布在集群的各個(gè)代理服務(wù)器(Brokers)中,為了實(shí)現(xiàn)可擴(kuò)展性,通常將一個(gè)非常大的 Topic 分布在多個(gè)代理服務(wù)器(Broker)上。
由于一個(gè) Topic 可以分為多個(gè)分區(qū)(Partition),每個(gè)分區(qū)(Partition)都是一個(gè)有序的隊(duì)列。
分區(qū)(Partition)中的每條消息都會(huì)被分配一個(gè)有序的 ID(即偏移量,Offset)。
如下圖所示,假設(shè)當(dāng)前有一個(gè)主題(Topic),該主題(Topic)有三個(gè)分區(qū),集群中有三個(gè)代理(Broker),則每個(gè)代理都有一個(gè)分區(qū)。要發(fā)布到分區(qū)的數(shù)據(jù)以偏移量(Offset)增量的方式追加。
其中“Offset”即偏移量,Kafka 的存儲(chǔ)文件都是按照“offset.kafka”來(lái)命名,用 Offset 方式命名是為了便于查找,如果想找位于 2046 的位置,只需找到 2045.kafka 的文件即可。
以下是分區(qū)(Partitions)使用時(shí)值得注意的要點(diǎn):
- 每個(gè)消息主題(Topic)按名稱(chēng)標(biāo)識(shí),集群中允許有多個(gè)已命名的消息主題。
- 每個(gè)消息前后順序的有效性?xún)H限于當(dāng)前分區(qū)級(jí)別(maintained at the partition level),而非跨主題。
- 數(shù)據(jù)一旦寫(xiě)入分區(qū),則不會(huì)被覆蓋,這就是Kafka中強(qiáng)調(diào)的數(shù)據(jù)不變性(immutability)
- 分區(qū)中的消息通過(guò)鍵(key),值(values),時(shí)間戳(timestamps)的形式一起存儲(chǔ),Kafka 確保每一個(gè)給定密鑰的消息都會(huì)發(fā)布到同一個(gè)分區(qū)中。
- 在 Kafka 集群中,每一個(gè)分區(qū)都有一個(gè)引導(dǎo)程序(leader),該引導(dǎo)程序負(fù)責(zé)對(duì)該分區(qū)執(zhí)行讀/寫(xiě)操作。
上圖是一個(gè)例子,當(dāng)前集群中僅一個(gè)消息主題(Topic),該主題包含三個(gè)分區(qū)(partition0,partition1,partition2),集群中有三個(gè)代理服務(wù)器(broker1,broker2,broker3)。
當(dāng)前每個(gè)分區(qū)的副本都復(fù)制到另外兩個(gè)代理服務(wù)器(Broker)中,即每個(gè)代理服務(wù)器(Broker)上包含了三個(gè)分區(qū)。
因此即便其中某兩個(gè)代理服務(wù)器(Broker)發(fā)生故障,也不用擔(dān)心數(shù)據(jù)會(huì)丟失。
如上,當(dāng)我們?cè)?Kafka 中創(chuàng)建主題時(shí),始終建議確保主題(Topic)的復(fù)制因子大于 1,并且小于/等于集群中的代理服務(wù)器(Broker)數(shù)量,這是非常推薦的做法。
上圖示例中,當(dāng)前主題的復(fù)制因子為 3(即,一份原始數(shù)據(jù),兩份副本數(shù)據(jù)), 不難推算出每個(gè)分區(qū)的引導(dǎo)程序加上其副本數(shù)量總共為“3”。
該示例中,每個(gè)分區(qū)都有一個(gè)引導(dǎo)程序(稱(chēng)之為“leader”),以及其他兩個(gè)同步副本(稱(chēng)之為“follower”)。
對(duì)于分區(qū) partition 0 來(lái)說(shuō),broker1 是“leader”, broker2 和 broker3 都是“follower”,從而分區(qū) partition 0 的所有讀寫(xiě)操作都將在 broker1 中進(jìn)行。
同時(shí),之后更新的內(nèi)容也會(huì)被同步復(fù)制到 broker2 和 broker3 對(duì)應(yīng)的分區(qū)(partition)中。
創(chuàng)建 Kafka 集群——Demo
我們還是以上圖中三個(gè) Broker 組成的 Kafka 集群為例,拆解 Kafka 集群創(chuàng)建的步驟。
①Kafka 集群環(huán)境準(zhǔn)備
首先需要準(zhǔn)備好一臺(tái)安裝有 Zookeeper 的機(jī)器,沒(méi)有 Zookeeper,Kafka 集群將無(wú)法工作。
同時(shí)建議直接從官網(wǎng)下載最新版本的 Apache Kafka,目前版本更新至2.11,直接解壓后將其放置到 bin 目錄下:
- https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
然后啟動(dòng) ZooKeeper,為什么需要 Zookeeper?它在這里主要負(fù)責(zé)協(xié)調(diào)服務(wù),管理代理服務(wù) Broker,確定每個(gè)分區(qū)中的引導(dǎo)程序,以及在 Kafka 消息主題或代理服務(wù)發(fā)生變更時(shí)及時(shí)發(fā)出警告。
通過(guò)以下命令可以啟動(dòng)一個(gè)Zookeeper實(shí)例:
②啟動(dòng) Kafka Brokers
成功安裝 Kafka 并啟動(dòng) ZooKeeper 實(shí)例后,接下來(lái)就可以開(kāi)啟 Kafka Broker 了,這里共啟動(dòng)了三個(gè) Kafka Broker。
具體啟動(dòng)方式:先定位到 Kafka 根目錄下的“config”文件夾下,找到“server.properties”文件,將其復(fù)制三次。
然后分別命名為server_1.properties,server_2.properties 以及 server_3.properties,并針對(duì)三個(gè)文件內(nèi)容做如下編輯,直接保存即可:
保存后通過(guò)命令開(kāi)啟這三個(gè)代理服務(wù):
③創(chuàng)建主題
通過(guò)如下命令創(chuàng)建消息主題:
④生成引導(dǎo)服務(wù)
通過(guò) Kafka 控制臺(tái)生成器(Kafka console)指定任意一個(gè)代理服務(wù)地址,并基于之前創(chuàng)建的主題發(fā)布一些消息。
這個(gè)指定的代理服務(wù)就被視作為引導(dǎo)服務(wù)程序,用于訪(fǎng)問(wèn)整個(gè)集群。
⑤“消費(fèi)”消息
通過(guò) Kafka 控制臺(tái)來(lái)使用消息,用戶(hù)(即:消息消費(fèi)者)需要指定任意一個(gè)代理服務(wù)(Broker)地址作為引導(dǎo)服務(wù)器。
在閱讀消息時(shí),用戶(hù)(即:消息消費(fèi)者)是看不到消息順序的,上文中也提到過(guò)消息的先后順序僅在分區(qū)級(jí)別(partition level)進(jìn)行維護(hù),而非主題級(jí)別(topic level)。
通過(guò)以下命令可以描述主題并查看各分區(qū)的分布情況,以及每個(gè)分區(qū)的引導(dǎo)服務(wù)器:
從上面的執(zhí)行結(jié)果可以看出:
- broker-1 是分區(qū) 0 的引導(dǎo)服務(wù)器。
- broker-2 是分區(qū) 1 的引導(dǎo)服務(wù)器。
- broker-3 是分區(qū) 2 的引導(dǎo)服務(wù)器。
- broker-1,broker-2,broker-3 分別具有每個(gè)分區(qū)的副本(同步且相互備份)。
Kafka Streams API
Kafka 常被用作將流數(shù)據(jù)實(shí)時(shí)傳輸?shù)狡渌到y(tǒng)中,此時(shí) Kafka 作為中間層,主要用來(lái)解耦分離實(shí)時(shí)數(shù)據(jù)管道。
Kafka 流是 Kafka 生態(tài)系統(tǒng)的一部分,它提供了實(shí)時(shí)分析的功能,支持將流數(shù)據(jù)傳輸?shù)酱髷?shù)據(jù)平臺(tái)或 RDBMS,Cassandra,Spark 中,以進(jìn)行將來(lái)的數(shù)據(jù)分析。
Kafka Stream API 簡(jiǎn)單易用,通過(guò)其強(qiáng)大的技術(shù)能力可處理所有存儲(chǔ)于其中的數(shù)據(jù),同時(shí)該 API 也為我們提供了一套 Kafka 標(biāo)準(zhǔn)類(lèi)的實(shí)現(xiàn)規(guī)則。
在實(shí)際工作中為了能夠創(chuàng)建支持核心業(yè)務(wù)的實(shí)時(shí)應(yīng)用程序,我們需要 Kafka Stream API 的大力協(xié)助。
Kafka Stream API 獨(dú)特之處在于,通過(guò)其構(gòu)建的應(yīng)用程序都是普通應(yīng)用程序。
所以這些應(yīng)用程序可以像其他任何應(yīng)用程序一樣,進(jìn)行打包,部署和監(jiān)控,而無(wú)需單獨(dú)安裝專(zhuān)門(mén)的處理集群或類(lèi)似基礎(chǔ)架構(gòu),這些額外部署的基礎(chǔ)架構(gòu)往往比較耗錢(qián)。
流(Stream)是 Kafka Streams 提供的最重要的抽象對(duì)象,代表了無(wú)限且持續(xù)更新的數(shù)據(jù)集。
流是一系列不可變數(shù)據(jù)記錄的序列,具備有序,可重復(fù),容錯(cuò)等特性,我們可以簡(jiǎn)單將其視為記錄流(定義為:KStream)或變更日志流(定位為:KTable 或 GlobelKTable)。
流處理器(Stream Processor)是處理器拓?fù)浣Y(jié)構(gòu)中的一個(gè)節(jié)點(diǎn),包含應(yīng)用于流數(shù)據(jù)的處理邏輯,一系列節(jié)點(diǎn)組成了拓?fù)浣Y(jié)構(gòu)中的處理步驟(用于轉(zhuǎn)換數(shù)據(jù))。
Kafka Streams API 處理數(shù)據(jù)——Demo
Kafka Stream API 為實(shí)現(xiàn)流數(shù)據(jù)處理,即消息在 Kafka 中的消費(fèi)及回寫(xiě),提供了兩種選項(xiàng):
- 高級(jí) Kafka Streams DSL(high-level DSL)。
- 低級(jí)處理器 API:用于數(shù)據(jù)基本處理,組合處理,本地狀態(tài)存儲(chǔ)。
①高級(jí) DSL(high-level DSL)
高級(jí) DSL 由記錄流(KStream) 和日志流(KTable/GlobalKTable)兩大主要抽象類(lèi)別組成,包含一系列已實(shí)現(xiàn)的方法可供調(diào)用。
KStream 是記錄流的抽象,其中每個(gè)數(shù)據(jù)都是無(wú)限數(shù)據(jù)集中的簡(jiǎn)單鍵值,KStream 提供了多種處理數(shù)據(jù)流的功能。
例如:map,mapValue,flatMap,flatMapValues,filter;同時(shí)還支持多個(gè)流連接,流數(shù)據(jù)的聚合。
KTable 是變更日志流的抽象,在變更日志中,對(duì)具有相同鍵的行(row)進(jìn)行覆蓋,因而每條數(shù)據(jù)記錄都被視作為插入或更新。
②處理器 API(lower-level processor )
低級(jí)處理器 API 通過(guò)擴(kuò)展抽象類(lèi)(AbstractProcessor),覆蓋含有業(yè)務(wù)邏輯的處理方法,從而實(shí)現(xiàn)客戶(hù)端流數(shù)據(jù)的訪(fǎng)問(wèn),允許基于輸入數(shù)據(jù)流執(zhí)行相應(yīng)的業(yè)務(wù)邏輯,同時(shí)將其結(jié)果作為下游數(shù)據(jù)轉(zhuǎn)發(fā)至客戶(hù)端。
相較于高級(jí) DSL 提供具有功能樣式的即用型方法,低級(jí)處理器API則按需提供處理邏輯。
③Kafka Stream API 應(yīng)用——高級(jí) DSL Demo
前提:必須在當(dāng)前環(huán)境中有以下依賴(lài),版本視當(dāng)前情況而定。
導(dǎo)入以下包:
Kafka 配置屬性:
實(shí)例化 KStreamBuilder,創(chuàng)建一個(gè) KStream 對(duì)象:
KStreamBuilder 有個(gè) Stream 方法,該方法以主題名稱(chēng)(topic name)作為參數(shù),返回一個(gè) KStream 對(duì)象,即,訂閱了指定主題的實(shí)例化對(duì)象。
基于 KStream 對(duì)象,這時(shí)我們就可以使用 Kafka Streams 高級(jí) DSL 提供的眾多方法(例如:map,process,transform,join 等),然后將處理后的數(shù)據(jù)發(fā)送到另一個(gè)主題。
最后,通過(guò)構(gòu)建器(builder)和流配置進(jìn)行流式傳輸:

通過(guò) Kafka Streams API,我們無(wú)需單獨(dú)部署集群即可在 Kafka 中進(jìn)行數(shù)據(jù)流處理。
Kafka Streams API 給我們帶來(lái)的便捷主要包含以下幾個(gè)方面:
- 高可擴(kuò)展性,靈活性,分布式和容錯(cuò)性。
- 支持有狀態(tài)和無(wú)狀態(tài)處理。
- 具有窗口,聯(lián)接和聚合的事件時(shí)間處理。
- 通過(guò) Kafka Streams DSL 或較低級(jí)別的處理器 API 使用已經(jīng)定義的常見(jiàn)轉(zhuǎn)換操作。
- 對(duì)處理沒(méi)有單獨(dú)的群集要求(與 Kafka 集成)。
- 采用一次一個(gè)記錄的處理以實(shí)現(xiàn)毫秒級(jí)的處理延遲。
- 支持 Kafka Connect 連接到不同的應(yīng)用程序和數(shù)據(jù)庫(kù)。
總結(jié)
Kafka 的便捷操作是其備受業(yè)內(nèi)人士廣泛關(guān)注的原因之一,然而更重要的是其出色的穩(wěn)定性,可靠性及耐用性,且具有靈活的發(fā)布/隊(duì)列,可以很好地適應(yīng) N 個(gè)消費(fèi)者組,具有強(qiáng)大的可復(fù)制性,可以為生產(chǎn)者提供一致性保證。
本次分享基于 Kafka 核心要素及其常見(jiàn)部署做了詳情解析,希望給圈內(nèi)感興趣的人士提供技術(shù)普及,交流互補(bǔ)。
作者:羅小羅
簡(jiǎn)介:英國(guó) TOP10 計(jì)算機(jī)專(zhuān)業(yè),計(jì)算機(jī)科學(xué)與技術(shù)碩士,先后就職于匯豐,JPMorgan,HP,交行,阿里等國(guó)內(nèi)外知名企業(yè)。涉及項(xiàng)目領(lǐng)域主要有:互聯(lián)網(wǎng)金融,電商,教育,醫(yī)療等?,F(xiàn)任就職于某世界 500 強(qiáng)公司,擔(dān)任測(cè)試開(kāi)發(fā)團(tuán)隊(duì)負(fù)責(zé)人,帶領(lǐng)團(tuán)隊(duì)構(gòu)建并持續(xù)優(yōu)化自動(dòng)化測(cè)試框架,研發(fā)自動(dòng)化測(cè)試輔助類(lèi)工具;擅長(zhǎng)領(lǐng)域:?jiǎn)卧?接口/性能/安全/自動(dòng)化測(cè)試/CD/CI/DevOps;個(gè)人持續(xù)研究領(lǐng)域:自動(dòng)化測(cè)試模型/數(shù)據(jù)分析/算法/機(jī)器學(xué)習(xí)等。
編輯:陶家龍
征稿:有投稿、尋求報(bào)道意向技術(shù)人請(qǐng)聯(lián)絡(luò) editor@51cto.com
【51CTO原創(chuàng)稿件,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文作者和出處為51CTO.com】