如何構(gòu)建以事件驅(qū)動型實時信息系統(tǒng)
譯文【51CTO.com快譯】在如今競爭激烈的商業(yè)競爭環(huán)境中,數(shù)據(jù)的處理往往需要具有實時性。如果競爭對手在數(shù)據(jù)管控上事先采取了行動,那么勢必會在業(yè)務(wù)上取得一定的優(yōu)勢。這也正是我們需要構(gòu)建能夠?qū)崟r處理信息(數(shù)據(jù))的企業(yè)級系統(tǒng)的重要原因。在實時處理數(shù)據(jù)時,我們除了可以將系統(tǒng)設(shè)計為以異步的方式,對事件進(jìn)行操作之外,還可以使用同步的請求響應(yīng)消息,來構(gòu)建實時的系統(tǒng)。同時,為了保持此類系統(tǒng)能夠高效地使用資源,我們需要了解如何基于事件驅(qū)動的方法,來構(gòu)建實時的信息系統(tǒng)。
圖:實時信息系統(tǒng)解決方案架構(gòu)
如上圖所示,各類事件通常來源于包括移動和Web應(yīng)用在內(nèi)的不同渠道。其中:
- 接收事件組件,負(fù)責(zé)在任何給定的時間內(nèi)處理大量的數(shù)據(jù),其速率從每秒上千個事件、到每秒數(shù)上百萬個事件不等。在接收到事件之后,該組件會將其傳遞給對應(yīng)的處理組件。
- 事件處理組件,負(fù)責(zé)對事件中帶有的數(shù)據(jù)進(jìn)行操作,包括:過濾,清理,轉(zhuǎn)換和匯總等。根據(jù)組件的行為,事件處理既可以獨(dú)立于事件接收組件、被單獨(dú)地執(zhí)行,也可以作為相關(guān)的操作來完成。事件處理組件可以將原始事件存儲在各種數(shù)據(jù)商店(data store)中,以便進(jìn)行審核(如果未被接收組件處理的話),并且可以根據(jù)用例,將計算結(jié)果存儲在那些單獨(dú)的數(shù)據(jù)商店之中。因此,大多數(shù)事件都能夠被實時處理,并發(fā)布到事件發(fā)布組件中,以傳遞給使用者(consumer)。
- 事件發(fā)布組件,負(fù)責(zé)將處理后的數(shù)據(jù)實時地推送給使用者。這些使用者既可以是移動或Web應(yīng)用,又可以是對已處理事件起作用的其他系統(tǒng)。除了這些實時事件的發(fā)布之外,在某些用例中,我們還需要通過HTTP通道,以同步、或請求-響應(yīng)的方式,將處理后的摘要信息發(fā)布給移動和Web應(yīng)用。
上述參考架構(gòu)可用于那些需要處置現(xiàn)場正在發(fā)生的事件,并將其發(fā)布到后臺應(yīng)用等多種用例中。例如:在出現(xiàn)緊急情況時,現(xiàn)場人員可以實時地發(fā)送有關(guān)需求與狀況的細(xì)節(jié)信息,而后臺團(tuán)隊則能夠毫不拖延地進(jìn)行必要的物資調(diào)配與派送。此外,我們也可以使用此類架構(gòu)來構(gòu)建農(nóng)業(yè)的供應(yīng)鏈。例如:農(nóng)民們通過運(yùn)貨車輛將農(nóng)作物運(yùn)送到連鎖超市。農(nóng)民可以在農(nóng)作物準(zhǔn)備就緒時,更新其詳細(xì)信息。而超市后臺團(tuán)隊則會實時地從各個位置獲取更新,并安排車輛及時地收集農(nóng)作物,以避免延遲。
使用WSO2和Kafka的參考架構(gòu)
消息代理是將消息發(fā)送者與接收者相分離的組件。目前,市場上有很多消息代理類產(chǎn)品,它們各有優(yōu)、缺點(diǎn)。其中最流行的當(dāng)屬Kafka、NATS和RabbitMQ。當(dāng)然,Kafka也可以作為NATS和RabbitMQ的最佳功能性代理。
在此,我們選擇Kafka作為事件消息的代理;選擇功能豐富、簡單且開源的WSO2Streaming Integrator作為事件處理器;使用既支持流媒體、又支持REST風(fēng)格的WSO2 API Manager,作為事件發(fā)布者。當(dāng)然,這些組件也可以被市場上的其他類似工具所替換。下圖展示了構(gòu)成實時事件驅(qū)動型信息系統(tǒng)的各個組件,及其相互連接。
圖:具有代理和WSO2平臺的實時事件驅(qū)動型信息系統(tǒng)
在該架構(gòu)中,事件代理會接收來自移動和Web應(yīng)用等源頭的事件負(fù)載。WSO2 Streaming Integrator會處理這些事件,然后將各種結(jié)果事件通過WebSocket連接,發(fā)布到WSO2 API Manager上。WSO2 API Manager擁有一個公布給網(wǎng)關(guān)的WebSocket API,諸如移動和Web之類的consumer(消費(fèi)者)應(yīng)用會使用該API,實時地接收各種事件。同時,WSO2 Streaming Integrator可以將原始事件和匯總的結(jié)果,通過標(biāo)準(zhǔn)的REST API,從WSO2 API Manager處公布給相關(guān)的consumer。下圖對上述架構(gòu)進(jìn)行了細(xì)化。
圖:具有Kafka和WSO2平臺詳細(xì)信息的實時事件驅(qū)動型信息系統(tǒng)
如上圖所示,事件源通過Kafka客戶端,將事件發(fā)布到Kafka代理中那些可用的topic(主題)處。WSO2 Streaming Integrator不但可以訂閱這些topic,還能通過已配置的Kafka源,實時地使用來自Kafka的各種事件。由Siddhi語言編寫的各項操作將處理這些事件,并傳遞給諸如WebSocket之類的事件sink(接收器)。同時,WSO2 SI會按需通過各種數(shù)據(jù)商店(data stores),將事件存儲到對應(yīng)的數(shù)據(jù)庫表中。
WSO2 API Manager通過WebSocket API來將WebSocket sink的詳細(xì)信息配置到API的端點(diǎn)上。據(jù)此,那些使用WebSocket API的客戶端應(yīng)用將能實時地接收到已經(jīng)處理的各種事件。
同時,那些已處理的信息和原始事件會被存儲到一個通過WSO2 Enterprise Integrator公布了REST數(shù)據(jù)服務(wù)的數(shù)據(jù)庫中。此處的數(shù)據(jù)服務(wù)是通過將WSO2 API Manager作為受保護(hù)的REST API予以公布,并通過客戶端應(yīng)用實現(xiàn)同步通信的服務(wù)。此外,作為一種能夠支持大多數(shù)企業(yè)系統(tǒng)需求的成熟架構(gòu),我們可以通過擴(kuò)展,來支持諸如:混合集成需求、API管理平臺等多種企業(yè)用例。
只有WSO2平臺,沒有Kafka的參考架構(gòu)
如果貴組織剛開始著手構(gòu)建實時的事件驅(qū)動型信息系統(tǒng),而且數(shù)據(jù)負(fù)載量并不大的話,那么就可以僅使用WSO2平臺,來構(gòu)建前文提到的精簡版架構(gòu)。下圖展示了一種沒有消息代理的實現(xiàn)方式。
圖:具有WSO2平臺的實時事件驅(qū)動型信息系統(tǒng)
該架構(gòu)與前文提到的架構(gòu)之間唯一的區(qū)別在于:雖然缺少事件代理,但是客戶端應(yīng)用能夠通過HTTP的調(diào)用,將事件直接發(fā)送到WSO2 Streaming Integrator處。當(dāng)然,由于該架構(gòu)沒有消息代理,因此WSO2 SI需要將原始事件存儲在數(shù)據(jù)庫中,以供各項審核。而它的其余功能則與前文的架構(gòu)相同。下圖展示了該架構(gòu)的詳細(xì)組成結(jié)構(gòu)。
圖:具有WSO2平臺詳細(xì)信息的實時事件驅(qū)動型信息系統(tǒng)
如上圖所示,WSO2 SI被配置為通過HTTP接口來接收事件。而Siddhi應(yīng)用中的HTTP源則被配置為通過不同的操作,來處理各種事件,然后發(fā)布到WebSocket sink中。同時,各種原始事件通過數(shù)據(jù)商店被存儲在數(shù)據(jù)庫中,并將各種聚合的結(jié)果通過不同的數(shù)據(jù)商店存儲到另一個數(shù)據(jù)表里。除此之外,該系統(tǒng)的其余功能與前文提到的基于代理的實現(xiàn)方式基本一致。
從Kafka到Websocket Siddhi應(yīng)用的示例代碼
下面我們將給出一個Siddhi的應(yīng)用示例。它能夠從Kafka的topic中讀取事件,并通過WebSocket服務(wù)器,將各種事件發(fā)布(或輸出)到某個日志sink處。當(dāng)然,在發(fā)布之前,它會對每個事件進(jìn)行簡單地檢查(或篩選),以確保其數(shù)量小于500。具體代碼請參見--https://gist.github.com/chanakaudaya/efe8dfed2558811f0316a7839dbfef57。其中,您可以找到有關(guān)如何使用Streaming Integrator,來設(shè)置Kafka的詳細(xì)示例。同時,您也可以通過文檔鏈接--https://ei.docs.wso2.com/zh_CN/latest/streaming-integrator/examples/working-with-kafka/,來試運(yùn)行該Siddhi應(yīng)用。
如何創(chuàng)建連接到WebSocket端點(diǎn)的WebSocket API
如下圖所示,您可以通過WSO2 API Manager的發(fā)布者(publisher)接口來創(chuàng)建WebSocket API,并使用WS服務(wù)器將這些事件發(fā)布到客戶端。
您可以在下圖的上部菜單中選擇“設(shè)計新的WebSocket API”(或“創(chuàng)建API”),然后在下一個窗口中提供詳細(xì)的信息。
接著,您可以選擇“創(chuàng)建并發(fā)布”選項,將WebSocket API推送到開發(fā)人員的門戶(portal)處,以便用戶在其中使用有效的OAuth2令牌。
通過參考文檔鏈接-- https://apim.docs.wso2.com/zh-CN/latest/learn/tutorials/create-and-publish-websocket-api/#create-and-publish-a-websocket-api,您可以逐步了解到如何創(chuàng)建WebSocket API,并能夠試運(yùn)行其客戶端的示例。
作為拓展,您還可以從如下鏈接處,獲得有關(guān)WSO2的大量代碼示例:
- WSO2 Streaming Integrator教程--https://ei.docs.wso2.com/en/latest/streaming-integrator/guides/use-cases/
- WSO2 API Manager教程--https://apim.docs.wso2.com/en/latest/learn/design-api/create-api/create-a-rest-api/
- WSO2 Enterprise Integrator教程--https://ei.docs.wso2.com/en/latest/micro-integrator/use-cases/learn-overview/
原標(biāo)題:How To Build a Real-Time, Event-Driven Information System ,作者: Chanaka Fernando
【51CTO譯稿,合作站點(diǎn)轉(zhuǎn)載請注明原文譯者和出處為51CTO.com】