一套 SQL 搞定數(shù)據(jù)倉庫?Flink有了新嘗試
數(shù)據(jù)倉庫是公司數(shù)據(jù)發(fā)展到一定規(guī)模后必然需要提供的一種基礎(chǔ)服務(wù),也是“數(shù)據(jù)智能”建設(shè)的基礎(chǔ)環(huán)節(jié)。迅速獲取數(shù)據(jù)反饋不僅有利于改善產(chǎn)品及用戶體驗(yàn),更有利于公司的科學(xué)決策,因此獲取數(shù)據(jù)的實(shí)時(shí)性尤為重要。目前企業(yè)的數(shù)倉建設(shè)大多是離線一套,實(shí)時(shí)一套。業(yè)務(wù)要求低延時(shí)的使用實(shí)時(shí)數(shù)倉;業(yè)務(wù)復(fù)雜的使用離線數(shù)倉。架構(gòu)十分復(fù)雜,需要使用很多系統(tǒng)和計(jì)算框架,這就要求企業(yè)儲(chǔ)備多方面的人才,導(dǎo)致人才成本較高,且出了問題難以排查,終端用戶也需要熟悉多種語法。本文分析目前的數(shù)倉架構(gòu),探索離線和實(shí)時(shí)數(shù)倉是否能放在一起考慮,探索Flink的統(tǒng)一架構(gòu)是否能解決大部分問題。
數(shù)倉架構(gòu)
?? 
數(shù)據(jù)倉庫可以分為三層:ODS(原始數(shù)據(jù)層)、DW(數(shù)據(jù)倉庫層)、ADS(應(yīng)用數(shù)據(jù)層)。
1. ODS (Operation Data Store) 層
從日志或者業(yè)務(wù)DB傳輸過來的原始數(shù)據(jù),傳統(tǒng)的離線數(shù)倉做法也有直接用CDC (Change Data Capture) 工具周期同步到數(shù)倉里面。用一套統(tǒng)一的Kafka來承接這個(gè)角色,可以讓數(shù)據(jù)更實(shí)時(shí)的落入數(shù)倉,也可以在這一層統(tǒng)一實(shí)時(shí)和離線的。
2. DW (Data warehouse) 層
DW層一般也分為DWD層和DWS層:
- DWD (Data warehouse detail) 層:明細(xì)數(shù)據(jù)層,這一層的數(shù)據(jù)應(yīng)該是經(jīng)過清洗的,干凈的、準(zhǔn)確的數(shù)據(jù),它包含的信息和ODS層相同,但是它遵循數(shù)倉和數(shù)據(jù)庫的標(biāo)準(zhǔn)Schema定義。
- DWS (Data warehouse service) 層:匯總數(shù)據(jù)層,這一層可能經(jīng)過了輕度的聚合,可能是星型或雪花模型的結(jié)構(gòu)數(shù)據(jù),這一層已經(jīng)做了一些業(yè)務(wù)層的計(jì)算,用戶可以基于這一層,計(jì)算出數(shù)據(jù)服務(wù)所需數(shù)據(jù)。
3. ADS (Application Data Store) 層
和DWS不同的是,這一層直接面向用戶的數(shù)據(jù)服務(wù),不需要再次計(jì)算,已經(jīng)是最終需要的數(shù)據(jù)。
主要分為兩條鏈路:
- 業(yè)務(wù)DB和日志 -> Kafka -> 實(shí)時(shí)數(shù)倉 (Kafka + Dim維表) -> BI DB -> 數(shù)據(jù)服務(wù)
- 業(yè)務(wù)DB和日志 -> Kafka -> 離線數(shù)倉 (Hive metastore + HDFS) -> BI DB -> 數(shù)據(jù)服務(wù)
主流的數(shù)倉架構(gòu)仍然是Lambda架構(gòu),Lambda架構(gòu)雖然復(fù)雜,但是它能覆蓋業(yè)務(wù)上需要的場(chǎng)景,對(duì)業(yè)務(wù)來說,是最靈活的方式。
Lambda架構(gòu)分為兩條鏈路:
- 傳統(tǒng)離線數(shù)據(jù)具有穩(wěn)定、計(jì)算復(fù)雜、靈活的優(yōu)點(diǎn),運(yùn)行批計(jì)算,保證T+1的報(bào)表產(chǎn)生和靈活的Ad-hoc查詢。
- 實(shí)時(shí)數(shù)倉提供低延時(shí)的數(shù)據(jù)服務(wù),傳統(tǒng)的離線數(shù)倉往往都是T+1的延時(shí),這導(dǎo)致分析人員沒法做一些實(shí)時(shí)化的決策,而實(shí)時(shí)數(shù)倉整條鏈路的延遲最低甚至可以做到秒級(jí),這不但加快了分析和決策,而且也給更多的業(yè)務(wù)帶來了可能,比如實(shí)時(shí)化的監(jiān)控報(bào)警。Flink的強(qiáng)項(xiàng)是實(shí)時(shí)計(jì)算、流計(jì)算,而Kafka是實(shí)時(shí)數(shù)倉存儲(chǔ)的核心。
上圖標(biāo)出了1-9條邊,每條邊代表數(shù)據(jù)的轉(zhuǎn)換,就是大數(shù)據(jù)的計(jì)算,本文后續(xù)將分析這些邊,探索Flink在其中可以發(fā)揮的作用。
Flink一棧式計(jì)算
元數(shù)據(jù)
先說下元數(shù)據(jù)的管理,離線數(shù)倉有Hive metastore來管理元數(shù)據(jù),但是單純的Kafka不具備元數(shù)據(jù)管理的能力,這里推薦兩種做法:
1. Confluent schema registry
搭建起schema registry服務(wù)后,通過confluent的url即可獲取到表的schema信息,對(duì)于上百個(gè)字段的表,它可以省編寫Flink作業(yè)時(shí)的很多事,后續(xù)Flink也正在把它的schema推斷功能結(jié)合Confluent schema registry。但是它仍然省不掉創(chuàng)建表的過程,用戶也需要填寫Confluent對(duì)應(yīng)的URL。
2. Catalog
目前Flink內(nèi)置已提供了HiveCatalog,Kafka的表可以直接集成到Hive metastore中,用戶在SQL中可以直接使用這些表。但是Kafka的start-offset一些場(chǎng)景需要靈活的配置,為此,F(xiàn)link也正在提供 LIKE [1] 和 Table Hints [2] 等手段來解決。
Flink中離線數(shù)倉和實(shí)時(shí)數(shù)倉都使用Hive Catalog:
使用Catalog,后續(xù)的計(jì)算可以完全復(fù)用批和流,提供相同的體驗(yàn)。
數(shù)倉導(dǎo)入
計(jì)算①和⑤分別是實(shí)時(shí)數(shù)倉的導(dǎo)入和離線數(shù)倉的導(dǎo)入,近來,更加實(shí)時(shí)的離線數(shù)倉導(dǎo)入越來越成為數(shù)據(jù)倉庫的常規(guī)做法,F(xiàn)link的導(dǎo)入可以讓離線數(shù)倉的數(shù)據(jù)更實(shí)時(shí)化。
以前主要通過DataStream + StreamingFileSink的方式進(jìn)行導(dǎo)入,但是不支持ORC和無法更新HMS。
Flink streaming integrate Hive后,提供Hive的streaming sink [3],用SQL的方式會(huì)更方便靈活,使用SQL的內(nèi)置函數(shù)和UDF,而且流和批可以復(fù)用,運(yùn)行兩個(gè)流計(jì)算作業(yè)。
數(shù)據(jù)處理
計(jì)算②和⑥分別是實(shí)時(shí)數(shù)倉和離線數(shù)倉的中間數(shù)據(jù)處理,這里面主要有三種計(jì)算:
- ETL:和數(shù)據(jù)導(dǎo)入一樣,批流沒有區(qū)別。
- 維表Join:維表補(bǔ)字段是很常見的數(shù)倉操作,離線數(shù)倉中基本都是直接Join Hive表即可,但是Streaming作業(yè)卻有些不同,下文將詳細(xì)描述。
- Aggregation:Streaming作業(yè)在這些有狀態(tài)的計(jì)算中,產(chǎn)生的不是一次確定的值,而可能是不斷變化的值。
維表Join
與離線計(jì)算不同,離線計(jì)算只用關(guān)心某個(gè)時(shí)間點(diǎn)的維表數(shù)據(jù),而Streaming的作業(yè)持續(xù)運(yùn)行,所以它關(guān)注的不能只是靜態(tài)數(shù)據(jù),需要是動(dòng)態(tài)的維表。
另外為了Join的效率,streaming作業(yè)往往是join一個(gè)數(shù)據(jù)庫表,而不僅僅是Hive表。
例子:
這里有個(gè)非常麻煩的事情,那就是在實(shí)時(shí)數(shù)倉中,需要按時(shí)周期調(diào)度更新維表到實(shí)時(shí)維表數(shù)據(jù)庫中,那能不能直接Join離線數(shù)倉的Hive維表呢?目前社區(qū)也正在開發(fā)Hive維表,它有哪些挑戰(zhàn):
Hive維表太大,放不進(jìn)Cache中:
- 考慮Shuffle by key,分布式的維表Join,減少單并發(fā)Cache的數(shù)據(jù)量
- 考慮將維表數(shù)據(jù)放入State中
維表更新問題:
- 簡(jiǎn)單的方案是TTL過期
- 復(fù)雜一些的方案是實(shí)現(xiàn)Hive streaming source,并結(jié)合Flink的watermark機(jī)制
有狀態(tài)計(jì)算和數(shù)據(jù)導(dǎo)出
例子:
一句簡(jiǎn)單的聚合SQL,它在批計(jì)算和流計(jì)算的執(zhí)行模式是完全不同的。
Streaming的聚合和離線計(jì)算的聚合最大的不同在于它是一個(gè)動(dòng)態(tài)表[4],它的輸出是在持續(xù)變化的。動(dòng)態(tài)表的概念簡(jiǎn)單來說,一個(gè)streaming的count,它的輸出是由輸入來驅(qū)動(dòng)的,而不是像batch一樣,獲取全部輸入后才會(huì)輸出,所以,它的結(jié)果是動(dòng)態(tài)變化的:
- 如果在SQL內(nèi)部,F(xiàn)link內(nèi)部的retract機(jī)制會(huì)保證SQL 的結(jié)果的與批一樣。
- 如果是外部的存儲(chǔ),這給sink帶來了挑戰(zhàn)。
有狀態(tài)計(jì)算后的輸出:
- 如果sink是一個(gè)可更新的數(shù)據(jù)庫,比如HBase/Redis/JDBC,那這看起來不是問題,我們只需要不斷的去更新就好了。
- 但是如果是不可更新的存儲(chǔ)呢,我們沒有辦法去更新原本的數(shù)據(jù)。為此,F(xiàn)link提出了Changelog的支持[5],想內(nèi)置支持這種sink,輸出特定Schema的數(shù)據(jù),讓下游消費(fèi)者也能很好的work起來。
例子:
-- batch:計(jì)算完成后,一次性輸出到mysql中,同key只有一個(gè)數(shù)據(jù)-- streaming:mysql里面的數(shù)據(jù)不斷更新,不斷變化insert into mysql_table select age, avg(amount) from order_with_user_age group by age;-- batch: 同key只有一個(gè)數(shù)據(jù),append即可insert into hive_table select age, avg(amount) from order_with_user_age group by age;-- streaming: kafka里面的數(shù)據(jù)不斷append,并且多出一列,來表示這是upsert的消息,后續(xù)的Flink消費(fèi)會(huì)自動(dòng)做出機(jī)制來處理upsertinsert into kafka_table select age, avg(amount) from order_with_user_age group by age;
AD-HOC與OLAP
離線數(shù)倉可以進(jìn)行計(jì)算⑨,對(duì)明細(xì)數(shù)據(jù)或者匯總數(shù)據(jù)都可以進(jìn)行ad-hoc的查詢,可以讓數(shù)據(jù)分析師進(jìn)行靈活的查詢。
目前實(shí)時(shí)數(shù)倉一個(gè)比較大的缺點(diǎn)是不能Ad-hoc查詢,因?yàn)樗旧頉]有保存歷史數(shù)據(jù),Kafka可能可以保存3天以上的數(shù)據(jù),但是一是存儲(chǔ)成本高、二是查詢效率也不好。
一個(gè)思路是提供OLAP數(shù)據(jù)庫的批流統(tǒng)一Sink組件:
- Druid sink
- Doris sink
- Clickhouse sink
- HBase/Phoenix sink
總結(jié)
本文從目前的Lambda架構(gòu)出發(fā),分析了Flink一棧式數(shù)倉計(jì)算方案的能力,本文中一些Flink新功能還在快速迭代演進(jìn)中,隨著不斷的探索和實(shí)踐,希望朝著計(jì)算一體化的方向逐漸推進(jìn),將來的數(shù)倉架構(gòu)希望能真正統(tǒng)一用戶的離線和實(shí)時(shí),提供統(tǒng)一的體驗(yàn):
- 統(tǒng)一元數(shù)據(jù)
- 統(tǒng)一SQL開發(fā)
- 統(tǒng)一數(shù)據(jù)導(dǎo)入與導(dǎo)出
- 將來考慮統(tǒng)一存儲(chǔ)
參考
[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-110%3A+Support+LIKE+clause+in+CREATE+TABLE
[2]https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Table+Hints
[3]https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
[4]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html
[5]https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL

























