快手基于 Flink 構建實時數(shù)倉場景化實踐
本文整理自快手數(shù)據(jù)技術專家李天朔在 5 月 22 日北京站 Flink Meetup 分享的議題《快手基于 Flink 構建實時數(shù)倉場景化實踐》,內容包括:
1.快手實時計算場景
2.快手實時數(shù)倉架構及保障措施
3.快手場景問題及解決方案
4.未來規(guī)劃
一、快手實時計算場景
快手業(yè)務中的實時計算場景主要分為四塊:
公司級別的核心數(shù)據(jù):包括公司經(jīng)營大盤,實時核心日報,以及移動版數(shù)據(jù)。相當于團隊會有公司的大盤指標,以及各個業(yè)務線,比如視頻相關、直播相關,都會有一個核心的實時看板;
大型活動實時指標:其中最核心的內容是實時大屏。例如快手的春晚活動,我們會有一個總體的大屏去看總體活動現(xiàn)狀。一個大型的活動會分為 N 個不同的模塊,我們對每一個模塊不同的玩法會有不同的實時數(shù)據(jù)看板;
運營部分的數(shù)據(jù):運營數(shù)據(jù)主要包括兩方面,一個是創(chuàng)作者,另一個是內容。對于創(chuàng)作者和內容,在運營側,比如上線一個大 V 的活動,我們想看到一些信息如直播間的實時現(xiàn)狀,以及直播間對于大盤的牽引情況?;谶@個場景,我們會做各種實時大屏的多維數(shù)據(jù),以及大盤的一些數(shù)據(jù)。此外,這塊還包括運營策略的支撐,比如我們可能會實時發(fā)掘一些熱點內容和熱點創(chuàng)作者,以及目前的一些熱點情況。我們基于這些熱點情況輸出策略,這個也是我們需要提供的一些支撐能力;最后還包括 C 端數(shù)據(jù)展示,比如現(xiàn)在快手里有創(chuàng)作者中心和主播中心,這里會有一些如主播關播的關播頁,關播頁的實時數(shù)據(jù)有一部分也是我們做的。
實時特征:包含搜索推薦特征和廣告實時特征。
二、快手實時數(shù)倉架構及保障措施
1. 目標及難點
1.1 目標
首先由于我們是做數(shù)倉的,因此希望所有的實時指標都有離線指標去對應,要求實時指標和離線指標整體的數(shù)據(jù)差異在 1% 以內,這是最低標準。
其次是數(shù)據(jù)延遲,其 SLA 標準是活動期間所有核心報表場景的數(shù)據(jù)延遲不能超過 5 分鐘,這 5 分鐘包括作業(yè)掛掉之后和恢復時間,如果超過則意味著 SLA 不達標。
最后是穩(wěn)定性,針對一些場景,比如作業(yè)重啟后,我們的曲線是正常的,不會因為作業(yè)重啟導致指標產(chǎn)出一些明顯的異常。
1.2 難點
第一個難點是數(shù)據(jù)量大。每天整體的入口流量數(shù)據(jù)量級大概在萬億級。在活動如春晚的場景,QPS 峰值能達到億 / 秒。
第二個難點是組件依賴比較復雜??赡苓@條鏈路里有的依賴于 Kafka,有的依賴 Flink,還有一些依賴 KV 存儲、RPC 接口、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常工作。
第三個難點是鏈路復雜。目前我們有 200+ 核心業(yè)務作業(yè),50+ 核心數(shù)據(jù)源,整體作業(yè)超過 1000。
2. 實時數(shù)倉 - 分層模型
基于上面三個難點,來看一下數(shù)倉架構:
如上所示:
最下層有三個不同的數(shù)據(jù)源,分別是客戶端日志、服務端日志以及 Binlog 日志;
在公共基礎層分為兩個不同的層次,一個是 DWD 層,做明細數(shù)據(jù),另一個是 DWS 層,做公共聚合數(shù)據(jù),DIM 是我們常說的維度。我們有一個基于離線數(shù)倉的主題預分層,這個主題預分層可能包括流量、用戶、設備、視頻的生產(chǎn)消費、風控、社交等。DWD 層的核心工作是標準化的清洗;DWS 層是把維度的數(shù)據(jù)和 DWD 層進行關聯(lián),關聯(lián)之后生成一些通用粒度的聚合層次。
再往上是應用層,包括一些大盤的數(shù)據(jù),多維分析的模型以及業(yè)務專題數(shù)據(jù);
最上面是場景。
整體過程可以分為三步:
第一步是做業(yè)務數(shù)據(jù)化,相當于把業(yè)務的數(shù)據(jù)接進來;
第二步是數(shù)據(jù)資產(chǎn)化,意思是對數(shù)據(jù)做很多的清洗,然后形成一些規(guī)則有序的數(shù)據(jù);
第三步是數(shù)據(jù)業(yè)務化,可以理解數(shù)據(jù)在實時數(shù)據(jù)層面可以反哺業(yè)務,為業(yè)務數(shù)據(jù)價值建設提供一些賦能。
3. 實時數(shù)倉 - 保障措施
基于上面的分層模型,來看一下整體的保障措施:
保障層面分為三個不同的部分,分別是質量保障,時效保障以及穩(wěn)定保障。
我們先看藍色部分的質量保障。針對質量保障,可以看到在數(shù)據(jù)源階段,做了如數(shù)據(jù)源的亂序監(jiān)控,這是我們基于自己的 SDK 的采集做的,以及數(shù)據(jù)源和離線的一致性校準。研發(fā)階段的計算過程有三個階段,分別是研發(fā)階段、上線階段和服務階段。研發(fā)階段可能會提供一個標準化的模型,基于這個模型會有一些 Benchmark,并且做離線的比對驗證,保證質量是一致的;上線階段更多的是服務監(jiān)控和指標監(jiān)控;在服務階段,如果出現(xiàn)一些異常情況,先做 Flink 狀態(tài)拉起,如果出現(xiàn)了一些不符合預期的場景,我們會做離線的整體數(shù)據(jù)修復。
第二個是時效性保障。針對數(shù)據(jù)源,我們把數(shù)據(jù)源的延遲情況也納入監(jiān)控。在研發(fā)階段其實還有兩個事情:首先是壓測,常規(guī)的任務會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務延遲的情況;通過壓測之后,會有一些任務上線和重啟性能評估,相當于按照 CP 恢復之后,重啟的性能是什么樣子。
最后一個是穩(wěn)定保障,這在大型活動中會做得比較多,比如切換演練和分級保障。我們會基于之前的壓測結果做限流,目的是保障作業(yè)在超過極限的情況下,仍然是穩(wěn)定的,不會出現(xiàn)很多的不穩(wěn)定或者 CP 失敗的情況。之后我們會有兩種不同的標準,一種是冷備雙機房,另外一種是熱備雙機房。冷備雙機房是:當一個單機房掛掉,我們會從另一個機房去拉起;熱備雙機房:相當于同樣一份邏輯在兩個機房各部署一次。
以上就是我們整體的保障措施。
三、快手場景問題及解決方案
1. PV/UV 標準化
1.1 場景
第一個問題是 PV/UV 標準化,這里有三個截圖:
第一張圖是春晚活動的預熱場景,相當于是一種玩法,第二和第三張圖是春晚當天的發(fā)紅包活動和直播間截圖。
在活動進行過程中,我們發(fā)現(xiàn) 60~70% 的需求是計算頁面里的信息,如:
這個頁面來了多少人,或者有多少人點擊進入這個頁面;
活動一共來了多少人;
頁面里的某一個掛件,獲得了多少點擊、產(chǎn)生了多少曝光。
1.2 方案
抽象一下這個場景就是下面這種 SQL:
簡單來說,就是從一張表做篩選條件,然后按照維度層面做聚合,接著產(chǎn)生一些 Count 或者 Sum 操作。
基于這種場景,我們最開始的解決方案如上圖右邊所示。
我們用到了 Flink SQL 的 Early Fire 機制,從 Source 數(shù)據(jù)源取數(shù)據(jù),之后做了 DID 的分桶。比如最開始紫色的部分按這個做分桶,先做分桶的原因是防止某一個 DID 存在熱點的問題。分桶之后會有一個叫做 Local Window Agg 的東西,相當于數(shù)據(jù)分完桶之后把相同類型的數(shù)據(jù)相加。Local Window Agg 之后再按照維度進行 Global Window Agg 的合桶,合桶的概念相當于按照維度計算出最終的結果。Early Fire 機制相當于在 Local Window Agg 開一個天級的窗口,然后每分鐘去對外輸出一次。
這個過程中我們遇到了一些問題,如上圖左下角所示。
在代碼正常運行的情況下是沒有問題的,但如果整體數(shù)據(jù)存在延遲或者追溯歷史數(shù)據(jù)的情況,比如一分鐘 Early Fire 一次,因為追溯歷史的時候數(shù)據(jù)量會比較大,所以可能導致 14:00 追溯歷史,直接讀到了 14:02 的數(shù)據(jù),而 14:01 的那個點就被丟掉了,丟掉了以后會發(fā)生什么?
在這種場景下,圖中上方的曲線為 Early Fire 回溯歷史數(shù)據(jù)的結果。橫坐標是分鐘,縱坐標是截止到當前時刻的頁面 UV,我們發(fā)現(xiàn)有些點是橫著的,意味著沒有數(shù)據(jù)結果,然后一個陡增,然后又橫著的,接著又一個陡增,而這個曲線的預期結果其實是圖中下方那種平滑的曲線。
為了解決這個問題,我們用到了 Cumulate Window 的解決方案,這個解決方案在 Flink 1.13 版本里也有涉及,其原理是一樣的。
數(shù)據(jù)開一個大的天級窗口,大窗口下又開了一個小的分鐘級窗口,數(shù)據(jù)按數(shù)據(jù)本身的 Row Time 落到分鐘級窗口。
Watermark 推進過了窗口的 event_time,它會進行一次下發(fā)的觸發(fā),通過這種方式可以解決回溯的問題,數(shù)據(jù)本身落在真實的窗口, Watermark 推進,在窗口結束后觸發(fā)。
此外,這種方式在一定程度上能夠解決亂序的問題。比如它的亂序數(shù)據(jù)本身是一個不丟棄的狀態(tài),會記錄到最新的累計數(shù)據(jù)。
最后是語義一致性,它會基于事件時間,在亂序不嚴重的情況下,和離線計算出來的結果一致性是相當高的。
以上是 PV/UV 一個標準化的解決方案。
2. DAU 計算
2.1 背景介紹
下面介紹一下 DAU 計算:
我們對于整個大盤的活躍設備、新增設備和回流設備有比較多的監(jiān)控。
活躍設備指的是當天來過的設備;
新增設備指的是當天來過且歷史沒有來過的設備;
回流設備指的是當天來過且 N 天內沒有來過的設備。
但是我們計算過程之中可能需要 5~8 個這樣不同的 Topic 去計算這幾個指標。
我們看一下離線過程中,邏輯應該怎么算。
首先我們先算活躍設備,把這些合并到一起,然后做一個維度下的天級別去重,接著再去關聯(lián)維度表,這個維度表包括設備的首末次時間,就是截止到昨天設備首次訪問和末次訪問的時間。
得到這個信息之后,我們就可以進行邏輯計算,然后我們會發(fā)現(xiàn)新增和回流的設備其實是活躍設備里打的一個子標簽。新增設備就是做了一個邏輯處理,回流設備是做了 30 天的邏輯處理,基于這樣的解決方案,我們能否簡單地寫一個 SQL 去解決這個問題?
其實我們最開始是這么做的,但遇到了一些問題:
第一個問題是:數(shù)據(jù)源是 6~8 個,而且我們大盤的口徑經(jīng)常會做微調,如果是單作業(yè)的話,每次微調的過程之中都要改,單作業(yè)的穩(wěn)定性會非常差;
第二個問題是:數(shù)據(jù)量是萬億級,這會導致兩個情況,首先是這個量級的單作業(yè)穩(wěn)定性非常差,其次是實時關聯(lián)維表的時候用的 KV 存儲,任何一個這樣的 RPC 服務接口,都不可能在萬億級數(shù)據(jù)量的場景下保證服務穩(wěn)定性;
第三個問題是:我們對于時延要求比較高,要求時延小于一分鐘。整個鏈路要避免批處理,如果出現(xiàn)了一些任務性能的單點問題,我們還要保證高性能和可擴容。
2.2 技術方案
針對以上問題,介紹一下我們是怎么做的:
如上圖的例子,第一步是對 A B C 這三個數(shù)據(jù)源,先按照維度和 DID 做分鐘級別去重,分別去重之后得到三個分鐘級別去重的數(shù)據(jù)源,接著把它們 Union 到一起,然后再進行同樣的邏輯操作。
這相當于我們數(shù)據(jù)源的入口從萬億變到了百億的級別,分鐘級別去重之后再進行一個天級別的去重,產(chǎn)生的數(shù)據(jù)源就可以從百億變成了幾十億的級別。
在幾十億級別數(shù)據(jù)量的情況下,我們再去關聯(lián)數(shù)據(jù)服務化,這就是一種比較可行的狀態(tài),相當于去關聯(lián)用戶畫像的 RPC 接口,得到 RPC 接口之后,最終寫入到了目標 Topic。這個目標 Topic 會導入到 OLAP 引擎,供給多個不同的服務,包括移動版服務,大屏服務,指標看板服務等。
這個方案有三個方面的優(yōu)勢,分別是穩(wěn)定性、時效性和準確性。
首先是穩(wěn)定性。松耦合可以簡單理解為當數(shù)據(jù)源 A 的邏輯和數(shù)據(jù)源 B 的邏輯需要修改時,可以單獨修改。第二是任務可擴容,因為我們把所有邏輯拆分得非常細粒度,當一些地方出現(xiàn)了如流量問題,不會影響后面的部分,所以它擴容比較簡單,除此之外還有服務化后置和狀態(tài)可控。
其次是時效性,我們做到毫秒延遲,并且維度豐富,整體上有 20+ 的維度做多維聚合。
最后是準確性,我們支持數(shù)據(jù)驗證、實時監(jiān)控、模型出口統(tǒng)一等。
此時我們遇到了另外一個問題 - 亂序。對于上方三個不同的作業(yè),每一個作業(yè)重啟至少會有兩分鐘左右的延遲,延遲會導致下游的數(shù)據(jù)源 Union 到一起就會有亂序。
2.3 延遲計算方案
遇到上面這種有亂序的情況下,我們要怎么處理?
我們總共有三種處理方案:
第一種解決方案是用 “did + 維度 + 分鐘” 進行去重,Value 設為 “是否來過”。比如同一個 did,04:01 來了一條,它會進行結果輸出。同樣的,04:02 和 04:04 也會進行結果輸出。但如果 04:01 再來,它就會丟棄,但如果 04:00 來,依舊會進行結果輸出。
這個解決方案存在一些問題,因為我們按分鐘存,存 20 分鐘的狀態(tài)大小是存 10 分鐘的兩倍,到后面這個狀態(tài)大小有點不太可控,因此我們又換了解決方案 2。
第二種解決方案,我們的做法會涉及到一個假設前提,就是假設不存在數(shù)據(jù)源亂序的情況。在這種情況下,key 存的是 “did + 維度”,Value 為 “時間戳”,它的更新方式如上圖所示。04:01 來了一條數(shù)據(jù),進行結果輸出。04:02 來了一條數(shù)據(jù),如果是同一個 did,那么它會更新時間戳,然后仍然做結果輸出。04:04 也是同樣的邏輯,然后將時間戳更新到 04:04,如果后面來了一條 04:01 的數(shù)據(jù),它發(fā)現(xiàn)時間戳已經(jīng)更新到 04:04,它會丟棄這條數(shù)據(jù)。這樣的做法大幅度減少了本身所需要的一些狀態(tài),但是對亂序是零容忍,不允許發(fā)生任何亂序的情況,由于我們不好解決這個問題,因此我們又想出了解決方案 3。
方案 3 是在方案 2 時間戳的基礎之上,加了一個類似于環(huán)形緩沖區(qū),在緩沖區(qū)之內允許亂序。
比如 04:01 來了一條數(shù)據(jù),進行結果輸出;04:02 來了一條數(shù)據(jù),它會把時間戳更新到 04:02,并且會記錄同一個設備在 04:01 也來過。如果 04:04 再來了一條數(shù)據(jù),就按照相應的時間差做一個位移,最后通過這樣的邏輯去保障它能夠容忍一定的亂序。
綜合來看這三個方案:
方案 1 在容忍 16 分鐘亂序的情況下,單作業(yè)的狀態(tài)大小在 480G 左右。這種情況雖然保證了準確性,但是作業(yè)的恢復和穩(wěn)定性是完全不可控的狀態(tài),因此我們還是放棄了這個方案;
方案 2 是 30G 左右的狀態(tài)大小,對于亂序 0 容忍,但是數(shù)據(jù)不準確,由于我們對準確性的要求非常高,因此也放棄了這個方案;
方案 3 的狀態(tài)跟方案 1 相比,它的狀態(tài)雖然變化了但是增加的不多,而且整體能達到跟方案 1 同樣的效果。方案 3 容忍亂序的時間是 16 分鐘,我們正常更新一個作業(yè)的話,10 分鐘完全足夠重啟,因此最終選擇了方案 3。
3. 運營場景
3.1 背景介紹
運營場景可分為四個部分:
第一個是數(shù)據(jù)大屏支持,包括單直播間的分析數(shù)據(jù)和大盤的分析數(shù)據(jù),需要做到分鐘級延遲,更新要求比較高;
第二個是直播看板支持,直播看板的數(shù)據(jù)會有特定維度的分析,特定人群支持,對維度豐富性要求比較高;
第三個是數(shù)據(jù)策略榜單,這個榜單主要是預測熱門作品、爆款,要求的是小時級別的數(shù)據(jù),更新要求比較低;
第四個是 C 端實時指標展示,查詢量比較大,但是查詢模式比較固定。
下面進行分析這 4 種不同的狀態(tài)產(chǎn)生的一些不同的場景。
前 3 種基本沒有什么差別,只是在查詢模式上,有的是特定業(yè)務場景,有的是通用業(yè)務場景。
針對第 3 種和第 4 種,它對于更新的要求比較低,對于吞吐的要求比較高,過程之中的曲線也不要求有一致性。第 4 種查詢模式更多的是單實體的一些查詢,比如去查詢內容,會有哪些指標,而且對 QPS 要求比較高。
3.2 技術方案
針對上方 4 種不同的場景,我們是如何去做的?
首先看一下基礎明細層 (圖中左方),數(shù)據(jù)源有兩條鏈路,其中一條鏈路是消費的流,比如直播的消費信息,還有觀看 / 點贊 / 評論。經(jīng)過一輪基礎清洗,然后做維度管理。上游的這些維度信息來源于 Kafka,Kafka 寫入了一些內容的維度,放到了 KV 存儲里邊,包括一些用戶的維度。
這些維度關聯(lián)了之后,最終寫入 Kafka 的 DWD 事實層,這里為了做性能的提升,我們做了二級緩存的操作。
如圖中上方,我們讀取 DWD 層的數(shù)據(jù)然后做基礎匯總,核心是窗口維度聚合生成 4 種不同粒度的數(shù)據(jù),分別是大盤多維匯總 topic、直播間多維匯總 topic、作者多維匯總 topic、用戶多維匯總 topic,這些都是通用維度的數(shù)據(jù)。
如圖中下方,基于這些通用維度數(shù)據(jù),我們再去加工個性化維度的數(shù)據(jù),也就是 ADS 層。拿到了這些數(shù)據(jù)之后會有維度擴展,包括內容擴展和運營維度的拓展,然后再去做聚合,比如會有電商實時 topic,機構服務實時 topic 和大 V 直播實時 topic。
分成這樣的兩個鏈路會有一個好處:一個地方處理的是通用維度,另一個地方處理的是個性化的維度。通用維度保障的要求會比較高一些,個性化維度則會做很多個性化的邏輯。如果這兩個耦合在一起的話,會發(fā)現(xiàn)任務經(jīng)常出問題,并且分不清楚哪個任務的職責是什么,構建不出這樣的一個穩(wěn)定層。
如圖中右方,最終我們用到了三種不同的引擎。簡單來說就是 Redis 查詢用到了 C 端的場景,OLAP 查詢用到了大屏、業(yè)務看板的場景。
四、未來規(guī)劃
上文一共講了三個場景,第一個場景是標準化 PU/UV 的計算,第二個場景是 DAU 整體的解決方案,第三個場景是運營側如何解決?;谶@些內容,我們有一些未來規(guī)劃,分為 4 個部分。
第一部分是實時保障體系完善:一方面做一些大型的活動,包括春晚活動以及后續(xù)常態(tài)化的活動。針對這些活動如何去保障,我們有一套規(guī)范去做平臺化的建設;第二個是分級保障標準制定,哪些作業(yè)是什么樣的保障級別 / 標準,會有一個標準化的說明;第三個是引擎平臺能力推動解決,包括 Flink 任務的一些引擎,在這上面我們會有一個平臺,基于這個平臺去做規(guī)范、標準化的推動。
第二部分是實時數(shù)倉內容構建:一方面是場景化方案的輸出,比如針對活動會有一些通用化的方案,而不是每次活動都開發(fā)一套新的解決方案;另一方面是內容數(shù)據(jù)層次沉淀,比如現(xiàn)在的數(shù)據(jù)內容建設,在厚度方面有一些場景的缺失,包括內容如何更好地服務于上游的場景。
第三部分是 Flink SQL 場景化構建,包括 SQL 持續(xù)推廣、SQL 任務穩(wěn)定性和 SQL 任務資源利用率。我們在預估資源的過程中,會考慮比如在什么樣 QPS 的場景下, SQL 用什么樣的解決方案,能支撐到什么情況。Flink SQL 可以大幅減少人效,但是在這個過程中,我們想讓業(yè)務操作更加簡單。
第四部分是批流一體探索。實時數(shù)倉的場景其實就是做離線 ETL 計算加速,我們會有很多小時級別的任務,針對這些任務,每次批處理的時候有一些邏輯可以放到流處理去解決,這對于離線數(shù)倉 SLA 體系的提升十分巨大。