基于 Flink CDC 實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時(shí)同步和轉(zhuǎn)換
?摘要:本文整理自 Apache Flink Committer、Flink CDC Maintainer、阿里巴巴高級(jí)開發(fā)工程師徐榜江(雪盡)在 5 月 21 日 Flink CDC Meetup 的演講。主要內(nèi)容包括:
- Flink CDC 技術(shù)
- 傳統(tǒng)數(shù)據(jù)集成方案的痛點(diǎn)
- 基于 Flink CDC 的海量數(shù)據(jù)的實(shí)時(shí)同步和轉(zhuǎn)換
- Flink CDC 社區(qū)發(fā)展
01Flink CDC 技術(shù)

CDC 是 Change Data Capture 的縮寫,是一種捕獲變更數(shù)據(jù)的技術(shù),CDC 技術(shù)很早就存在,發(fā)展至今,業(yè)界的 CDC 技術(shù)方案眾多,從原理上可以分為兩大類:
- 一類是基于查詢的 CDC 技術(shù) ,比如 DataX。隨著當(dāng)下場(chǎng)景對(duì)實(shí)時(shí)性要求越來越高,此類技術(shù)的缺陷也逐漸凸顯。離線調(diào)度和批處理的模式導(dǎo)致延遲較高;基于離線調(diào)度做切片,因而無法保障數(shù)據(jù)的一致性;另外,也無法保障實(shí)時(shí)性。
- 一類是基于日志的 CDC 技術(shù),比如 Debezium、Canal、 Flink CDC。這種 CDC 技術(shù)能夠?qū)崟r(shí)消費(fèi)數(shù)據(jù)庫(kù)的日志,流式處理的模式可以保障數(shù)據(jù)的一致性,提供實(shí)時(shí)的數(shù)據(jù),可以滿足當(dāng)下越來越實(shí)時(shí)的業(yè)務(wù)需求。

上圖為常見開源 CDC 的方案對(duì)比。可以看到 Flink CDC 的機(jī)制以及在增量同步、斷點(diǎn)續(xù)傳、全量同步的表現(xiàn)都很好,也支持全增量一體化同步,而很多其他開源方案無法支持全增量一體化同步。Flink CDC 是分布式架構(gòu),可以滿足海量數(shù)據(jù)同步的業(yè)務(wù)場(chǎng)景。依靠 Flink 的生態(tài)優(yōu)勢(shì),它提供了 DataStream API 以及 SQL API,這些 API 提供了非常強(qiáng)大的 transformation 能力。此外,F(xiàn)link CDC 社區(qū)和 Flink 社區(qū)的開源生態(tài)非常完善,吸引了很多社區(qū)用戶和公司在社區(qū)開發(fā)共建。

Flink CDC 支持全增量一體化同步,為用戶提供實(shí)時(shí)一致性快照。比如一張表里有歷史的全量數(shù)據(jù),也有新增的實(shí)時(shí)變更數(shù)據(jù),增量數(shù)據(jù)不斷地往 Binlog 日志文件里寫,F(xiàn)link CDC 會(huì)先同步全量歷史數(shù)據(jù),再無縫切換到同步增量數(shù)據(jù),增量同步時(shí),如果是新增的插入數(shù)據(jù)(上圖中藍(lán)色小塊),會(huì)追加到實(shí)時(shí)一致性快照中;如果是更新的數(shù)據(jù)(上圖中黃色小塊),則會(huì)在已有歷史數(shù)據(jù)里做更新。
Flink CDC 相當(dāng)于提供了實(shí)時(shí)物化視圖,為用戶提供數(shù)據(jù)庫(kù)中表的實(shí)時(shí)一致性快照,用于可以對(duì)這些數(shù)據(jù)做進(jìn)一步加工,比如清洗、聚合、過濾等,然后再寫入下游。
02傳統(tǒng)數(shù)據(jù)集成方案的痛點(diǎn)

上圖為傳統(tǒng)數(shù)據(jù)入倉(cāng)架構(gòu) 1.0,主要使用 DataX 或 Sqoop 全量同步到 HDFS,再圍繞 Hive 做數(shù)倉(cāng)。
此方案存在諸多缺陷:容易影響業(yè)務(wù)穩(wěn)定性,因?yàn)槊刻於夹枰獜臉I(yè)務(wù)表里查詢數(shù)據(jù);天級(jí)別的產(chǎn)出導(dǎo)致時(shí)效性差,延遲高;如果將調(diào)度間隔調(diào)成幾分鐘一次,則會(huì)對(duì)源庫(kù)造成非常大的壓力;擴(kuò)展性差,業(yè)務(wù)規(guī)模擴(kuò)大后極易出現(xiàn)性能瓶頸。

上圖為傳統(tǒng)數(shù)據(jù)入倉(cāng) 2.0 架構(gòu)。分為實(shí)時(shí)和離線兩條鏈路,實(shí)時(shí)鏈路做增量同步,比如通過 Canal 同步到 Kafka 后再做實(shí)時(shí)回流;全量同步一般只做一次,與每天的增量在 HDFS 上做定時(shí)合并,最后導(dǎo)入到 Hive 數(shù)倉(cāng)里。
此方式只做一次全量同步,因此基本不影響業(yè)務(wù)穩(wěn)定性,但是增量同步有定時(shí)回流,一般只能保持在小時(shí)和天級(jí)別,因此它的時(shí)效性也比較低。同時(shí),全量與增量?jī)蓷l鏈路是割裂的,意味著鏈路多,需要維護(hù)的組件也多,系統(tǒng)的可維護(hù)性會(huì)比較差。

上圖為傳統(tǒng) CDC ETL 分析架構(gòu)。通過 Debezium、Canal 等工具采集 CDC 數(shù)據(jù)后,寫入消息隊(duì)列,再使用計(jì)算引擎做計(jì)算清洗,最終傳輸?shù)较掠未鎯?chǔ),完成實(shí)時(shí)數(shù)倉(cāng)、數(shù)據(jù)湖的構(gòu)建。

傳統(tǒng) CDC ETL 分析里引入了很多組件比如 Debezium、Canal,都需要部署和維護(hù), Kafka 消息隊(duì)列集群也需要維護(hù)。Debezium 的缺陷在于它雖然支持全量加增量,但它的單并發(fā)模型無法很好地應(yīng)對(duì)海量數(shù)據(jù)場(chǎng)景。而 Canal 只能讀增量,需要 DataX 與 Sqoop 配合才能讀取全量,相當(dāng)于需要兩條鏈路,需要維護(hù)的組件也增加。因此,傳統(tǒng) CDC ETL 分析的痛點(diǎn)是單并發(fā)性能差,全量增量割裂,依賴的組件較多。
03基于 Flink CDC 的海量數(shù)據(jù)的實(shí)時(shí)同步和轉(zhuǎn)換
Flink CDC 的方案能夠給海量數(shù)據(jù)的實(shí)時(shí)同步和轉(zhuǎn)換帶來什么改善?

Flink CDC 2.0 在 MySQL CDC 上實(shí)現(xiàn)了增量快照讀取算法,在最新的 2.2 版本里 Flink CDC 社區(qū) 將增量快照算法抽象成框架,使得其他數(shù)據(jù)源也能復(fù)用增量快照算法。
增量快照算法解決了全增量一體化同步里的一些痛點(diǎn)。比如 Debezium 早期版本在實(shí)現(xiàn)全增量一體化同步時(shí)會(huì)使用鎖,并且且是單并發(fā)模型,失敗重做機(jī)制,無法在全量階段實(shí)現(xiàn)斷點(diǎn)續(xù)傳。增量快照算法使用了無鎖算法,對(duì)業(yè)務(wù)庫(kù)非常友好;支持了并發(fā)讀取,解決了海量數(shù)據(jù)的處理問題;支持了斷點(diǎn)續(xù)傳,避免失敗重做,能夠極大地提高數(shù)據(jù)同步的效率與用戶體驗(yàn)。

上圖為全增量一體化的框架。整個(gè)框架簡(jiǎn)單來講就是將數(shù)據(jù)庫(kù)里的表按 PK 或 UK 切分成 一個(gè)個(gè) chunk ,然后分給多個(gè) task 做并行讀取,即在全量階段實(shí)現(xiàn)了并行讀取。全量和增量能夠自動(dòng)切換,切換時(shí)通過無鎖算法來做無鎖一致性的切換。切換到增量階段后,只需要單獨(dú)的 task 去負(fù)責(zé)增量部分的數(shù)據(jù)解析,以此實(shí)現(xiàn)了全增量一體化讀取。進(jìn)入增量階段后,作業(yè)不再需要的資源,用戶可以修改作業(yè)并發(fā)將其釋放。

我們將全增量一體化框架與 Debezium 1.6 版本做 簡(jiǎn)單的 TPC-DS 讀取測(cè)試對(duì)比,customer 單表數(shù)據(jù)量 6500 萬,在 Flink CDC 用 8 個(gè)并發(fā)的情況下,吞吐提升了 6.8 倍,耗時(shí)僅 13 分鐘,得益于并發(fā)讀取的支持,如果用戶需要更快的讀取速度,用戶可以增加并發(fā)實(shí)現(xiàn)。

Flink CDC 在設(shè)計(jì)時(shí),也考慮了面向存儲(chǔ)友好的寫入設(shè)計(jì)。在 Flink CDC 1.x 版本中,如果想實(shí)現(xiàn) exactly-once 同步,需要配合 Flink 提供的 checkpoint 機(jī)制,全量階段沒有做切片,則只能在一個(gè) checkpoint 里完成,這會(huì)導(dǎo)致一個(gè)問題:每個(gè) checkpoint 中間要將這張表的全量數(shù)據(jù)吐給下游的 writer,writer 會(huì)將這張表的全量數(shù)據(jù)混存在內(nèi)存中,會(huì)對(duì)其內(nèi)存造成非常大的壓力,作業(yè)穩(wěn)定性也特別差。
Flink CDC 2.0 提出了增量快照算法后,通過切片能夠?qū)?checkpoint 粒度降至 chunk, 并且 chunk 大小是用戶可配置的,默認(rèn)是 8096 條,用戶可以將其調(diào)至更小,減輕 writer 的壓力,減少內(nèi)存資源的使用,提升下游寫入存儲(chǔ)時(shí)的穩(wěn)定性。

全增量一體化之后, Flink CDC 的入湖架構(gòu)變得非常簡(jiǎn)單,且不會(huì)影響業(yè)務(wù)的穩(wěn)定性;能夠做到分鐘級(jí)的產(chǎn)出,也就意味著可以實(shí)現(xiàn)近實(shí)時(shí)或?qū)崟r(shí)分析;并發(fā)讀取實(shí)現(xiàn)了更高的吞吐,在海量數(shù)據(jù)場(chǎng)景下有著不俗的表現(xiàn);鏈路短,組件少,運(yùn)維友好。

有了 Flink CDC 之后,傳統(tǒng) CDC ETL 分析的痛點(diǎn)也得到了極大改善,不再需要 Canal、Kafka 消息隊(duì)列等組件,只需要依賴 Flink,實(shí)現(xiàn)了全增量一體化同步和實(shí)時(shí) ETL 加工的能力,且支持并發(fā)讀取,整個(gè)架構(gòu)鏈路短,組件少,易于維護(hù)。

依托于 Flink DataStream API 以及易用的 SQL API ,F(xiàn)link CDC 還提供了非常強(qiáng)大完善的 transformation 能力,且在 transformation 過程中能夠保證 changelog 語義。在傳統(tǒng)方案里,在 changelog 上做 transformation 并保證 changelog 語義是非常難以實(shí)現(xiàn)的。

海量數(shù)據(jù)的實(shí)時(shí)同步和轉(zhuǎn)換示例 1:Flink CDC 實(shí)現(xiàn)異構(gòu)數(shù)據(jù)源的集成
這個(gè)業(yè)務(wù)場(chǎng)景是業(yè)務(wù)表比如產(chǎn)品表和訂單表在 MySQL 數(shù)據(jù)庫(kù)里,物流表存在 PG 數(shù)據(jù)庫(kù)里,要實(shí)現(xiàn)異構(gòu)數(shù)據(jù)源的集成,并且在集成過程做打?qū)?。需要將產(chǎn)品表、訂單表與物流表做 Streaming Join 之后再將結(jié)果表寫入庫(kù)里。借助 Flink CDC,整個(gè)過程只需要用 5 行 Flink SQL 就能夠?qū)崿F(xiàn)。這里使用的下游存儲(chǔ)是 Hudi,整個(gè)鏈路可以得到分鐘級(jí)甚至更低的產(chǎn)出,使圍繞 Hudi 做近實(shí)時(shí)的分析成為了可能。

海量數(shù)據(jù)的實(shí)時(shí)同步和轉(zhuǎn)換示例 2:Flink CDC 實(shí)現(xiàn)分庫(kù)分表集成
Flink CDC 對(duì)分庫(kù)分表做了非常完善的支持,在聲明 CDC 表時(shí)支持使用正則表達(dá)式匹配庫(kù)名和表名,正則表達(dá)式意味著可以匹配多個(gè)庫(kù)以及這多個(gè)庫(kù)下的多張表。同時(shí)提供了 metadata column 的支持,可以知道數(shù)據(jù)來自于哪個(gè) 數(shù)據(jù)庫(kù)、來自于哪張表,寫入下游 Hudi 時(shí),可以帶上 metadata 聲明的兩個(gè)列,將 database_name、table_name 以及原始表中的 主鍵(例子中為 id 列)作為新的主鍵,只需三行 Flink SQL 即可實(shí)現(xiàn)分庫(kù)分表數(shù)據(jù)的實(shí)時(shí)集成,非常簡(jiǎn)單。

依托于 Flink 豐富的生態(tài),能夠?qū)崿F(xiàn)很多上下游的擴(kuò)展,F(xiàn)link 自身就有豐富的 connector 生態(tài)。Flink CDC 加入之后,上游有了更豐富的源可以攝取,下游也有豐富的目的端可以寫入。

海量數(shù)據(jù)的實(shí)時(shí)同步和轉(zhuǎn)換示例 3:三行 SQL 實(shí)現(xiàn)單品累計(jì)銷量實(shí)時(shí)排行榜
這個(gè) Demo 演示在無需任何依賴的前提下,通過 3 行 SQL 實(shí)現(xiàn)商品的實(shí)時(shí)排行榜。首先在 Docker 里添加 MySQL 和 ElasticSearch 鏡像, ElasticSearch 是目的端。將 Docker 拉起后,下載 Flink 包以及 MySQL CDC 和 ElasticSearch 的兩個(gè) SQL Connector jar。拉起 Flink 集群和 SQL Client。在 MySQL 內(nèi)建庫(kù)建表,灌入數(shù)據(jù),更新后再用 Flink SQL 做一些實(shí)時(shí)加工和分析,寫入 ES。在 MySQL 的數(shù)據(jù)庫(kù)里構(gòu)造一張訂單表并插入數(shù)據(jù)。

上圖第一行 SQL 是創(chuàng)建訂單表,第二行是創(chuàng)建結(jié)果表,第三行是做 group by 的查詢實(shí)現(xiàn)實(shí)時(shí)排行榜功能,再寫入到第二行 SQL 創(chuàng)建的 ElasticSearch 表中。

我們?cè)?ElasticSearch 里做了可視化呈現(xiàn),可以查看到隨著 MySQL 中訂單源源不斷地更新,ElasticSearch 的排行榜會(huì)實(shí)時(shí)刷新。
04Flink CDC 社區(qū)發(fā)展

在過去的一年多時(shí)間,社區(qū)發(fā)了 4 個(gè)大版本, contributor 和 commits數(shù)量在不斷增長(zhǎng),社區(qū)也越來越活躍。我們一直堅(jiān)持將核心的 feature 全部提供給社區(qū)版,比如 MySQL 的百億級(jí)超大表、增量快照框架、MySQL 動(dòng)態(tài)加表等高級(jí)功能。

最新的 2.2 版本中同樣新增了很多功能。首先,數(shù)據(jù)源方面,支持了 OceanBase、PolarDB-X、SqlServer、TiDB。此外,不斷豐富了 Flink CDC 的生態(tài),兼容了 Flink 1.13 和 1.14 集群,提供了增量快照讀取框架。另外,支持了 MySQL CDC 動(dòng)態(tài)加表以及對(duì) MongoDB 做了完善,比如支持指定的集合,通過正則表達(dá)式使其更加靈活友好。

除此之外,文檔也是社區(qū)特別重要的一部分。我們提供了獨(dú)立的版本化社區(qū)網(wǎng)站,在網(wǎng)站里不同版本對(duì)應(yīng)不同版本的文檔,提供了豐富的 demo 以及中英文的 FAQ,幫助新手快速入門。

?在社區(qū)的多個(gè)關(guān)鍵指標(biāo),比如創(chuàng)建的 issue 數(shù),合并的 PR 數(shù),Github Star 數(shù)上,F(xiàn)link CDC 社區(qū)的表現(xiàn)都非常不錯(cuò)。

Flink CDC 社區(qū)的未來規(guī)劃主要包含以下三個(gè)方面:
- 框架完善:增量快照框架目前只支持 MySQL CDC ,Oracle、PG 和 MongoDB 正在對(duì)接中,希望未來所有數(shù)據(jù)庫(kù)都能夠?qū)拥礁玫目蚣苌?;針?duì) Schema Evolution 和整庫(kù)同步做了一些探索性的工作,成熟后將向社區(qū)提供。
- 生態(tài)集成:提供更多 DB 和更多版本;數(shù)據(jù)湖集成方面希望鏈路更通暢;提供一些端到端的方案,用戶無須關(guān)心 Hudi 和 Flink CDC 的參數(shù)。
- 易用性:提供更多開箱即用的體驗(yàn)以及完善文檔教程。
Qustions&Answers
Q1:CDC 什么時(shí)候能夠支持整庫(kù)同步以及 DDL 的同步?
正在設(shè)計(jì)中,因?yàn)樗枰紤]到 Flink 引擎?zhèn)鹊闹С峙c配合,不是單獨(dú)在 Flink CDC 社區(qū)內(nèi)開發(fā)就可以實(shí)現(xiàn)的,需要與 Flink 社區(qū)聯(lián)動(dòng)。
Q2:什么時(shí)候支持 Flink 1.15?
目前生產(chǎn)上的 Flink 集群還是以 1.13、1.14 為主。社區(qū)計(jì)劃在 2.3 版本中支持 Flink 1.15,可以關(guān)注 issue:https://github.com/ververica/flink-cdc-connectors/issues/1363,也歡迎貢獻(xiàn)。
Q3:有 CDC 結(jié)果表寫入 Oracle 的實(shí)踐嗎?
1.14 版本的 Flink 暫不支持,這個(gè)是因?yàn)?Sink 端的 JDBC Connector 不支持 Oracle dialect,F(xiàn)link 1.15 版本的 JDBC Connector 已經(jīng)支持了 Oracle dialect,1.15 版本的 Flink 集群可以支持。
Q4:下個(gè)版本能否支持讀取 ES?
還需要考察 transactional log 機(jī)制以及它是否適合作為 CDC 的數(shù)據(jù)源。
Q5:能做到單 job 監(jiān)控多表 sink 多表嗎?
可以實(shí)現(xiàn)單作業(yè)監(jiān)控多表 sink 到多個(gè)下游表;但如果是 sink 到多表,需要 DataStream 進(jìn)行分流,不同的流寫到不同的表。
Q6:Binlog 日志只有最近兩個(gè)月的數(shù)據(jù),能否支持先全量后增量讀???
默認(rèn)支持的就是先全量后增量,一般 binlog 保存七天或兩三天都可以。
Q7:2.2 版本 MySQL 沒有主鍵,全量如何同步?
可以回退到不用增量快照框架;在增量快照框架上,社區(qū)已有組件的 issue,預(yù)計(jì)將在社區(qū) 2.3 版本提供支持。?


































