偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

如何基于日志,同步實(shí)現(xiàn)數(shù)據(jù)的一致性和實(shí)時(shí)抽取?

運(yùn)維 系統(tǒng)運(yùn)維
本次分享的主題是《基于日志的DWS平臺(tái)實(shí)現(xiàn)和應(yīng)用》,主要是分享一下目前我們?cè)谝诵抛龅囊恍┦虑?。這個(gè)主題里面包含到2個(gè)團(tuán)隊(duì)很多兄弟姐妹的努力的結(jié)果(我們團(tuán)隊(duì)和山巍團(tuán)隊(duì)的成果)。這次就由我代為執(zhí)筆,盡我努力給大家介紹一下。

作者:王東

宜信技術(shù)研發(fā)中心架構(gòu)師

目前就職于宜信技術(shù)研發(fā)中心,任架構(gòu)師,負(fù)責(zé)流式計(jì)算和大數(shù)據(jù)業(yè)務(wù)產(chǎn)品解決方案。

曾任職于Naver china(韓國(guó)***搜索引擎公司)中國(guó)研發(fā)中心資深工程師,多年從事CUBRID分布式數(shù)據(jù)庫(kù)集群開(kāi)發(fā)和CUBRID數(shù)據(jù)庫(kù)引擎開(kāi)發(fā)

http://www.cubrid.org/blog/news/cubrid-cluster-introduction/

主題簡(jiǎn)介:

DWS的背景介紹

dbus+wormhole總體架構(gòu)和技術(shù)實(shí)現(xiàn)方案

DWS的實(shí)際運(yùn)用案例

前言

大家好,我是王東,來(lái)自宜信技術(shù)研發(fā)中心,這是我來(lái)社群的***次分享,如果有什么不足,請(qǐng)大家多多指正、包涵。

本次分享的主題是《基于日志的DWS平臺(tái)實(shí)現(xiàn)和應(yīng)用》,主要是分享一下目前我們?cè)谝诵抛龅囊恍┦虑?。這個(gè)主題里面包含到2個(gè)團(tuán)隊(duì)很多兄弟姐妹的努力的結(jié)果(我們團(tuán)隊(duì)和山巍團(tuán)隊(duì)的成果)。這次就由我代為執(zhí)筆,盡我努力給大家介紹一下。

其實(shí)整個(gè)實(shí)現(xiàn)從原理上來(lái)說(shuō)是比較簡(jiǎn)單的,當(dāng)然也涉及到不少技術(shù)。我會(huì)嘗試用盡量簡(jiǎn)單的方式來(lái)表達(dá),讓大家了解這個(gè)事情的原理和意義。在過(guò)程中,大家有問(wèn)題可以隨時(shí)提出,我會(huì)盡力去解答。

DWS是一個(gè)簡(jiǎn)稱(chēng),是由3個(gè)子項(xiàng)目組成,我稍后做解釋。

一、背景

事情是從公司前段時(shí)間的需求說(shuō)起,大家知道宜信是一個(gè)互聯(lián)網(wǎng)金融企業(yè),我們的很多數(shù)據(jù)與標(biāo)準(zhǔn)互聯(lián)網(wǎng)企業(yè)不同,大致來(lái)說(shuō)就是:

玩數(shù)據(jù)的人都知道數(shù)據(jù)是非常有價(jià)值的,然后這些數(shù)據(jù)是保存在各個(gè)系統(tǒng)的數(shù)據(jù)庫(kù)中,如何讓需要數(shù)據(jù)的使用方得到一致性、實(shí)時(shí)的數(shù)據(jù)呢?

過(guò)去的通用做法有幾種是:

  1. DBA開(kāi)放各個(gè)系統(tǒng)的備庫(kù),在業(yè)務(wù)低峰期(比如夜間),使用方各自抽取所需數(shù)據(jù)。由于抽取時(shí)間不同,各個(gè)數(shù)據(jù)使用方數(shù)據(jù)不一致,數(shù)據(jù)發(fā)生沖突,而且重復(fù)抽取,相信不少DBA很頭疼這個(gè)事情。
  2. 公司統(tǒng)一的大數(shù)據(jù)平臺(tái),通過(guò)Sqoop 在業(yè)務(wù)低峰期到各個(gè)系統(tǒng)統(tǒng)一抽取數(shù)據(jù), 并保存到Hive表中, 然后為其他數(shù)據(jù)使用方提供數(shù)據(jù)服務(wù)。這種做法解決了一致性問(wèn)題,但時(shí)效性差,基本是T+1的時(shí)效。
  3. 基于trigger的方式獲取增量變更,主要問(wèn)題是業(yè)務(wù)方侵入性大,而且trigger也帶來(lái)性能損失。

這些方案都不算***。我們?cè)诹私夂涂紤]了不同實(shí)現(xiàn)方式后,***借鑒了 linkedin的思想,認(rèn)為要想同時(shí)解決數(shù)據(jù)一致性和實(shí)時(shí)性,比較合理的方法應(yīng)該是來(lái)自于log。

(此圖來(lái)自:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/)

把增量的Log作為一切系統(tǒng)的基礎(chǔ)。后續(xù)的數(shù)據(jù)使用方,通過(guò)訂閱kafka來(lái)消費(fèi)log。

比如:

  • 大數(shù)據(jù)的使用方可以將數(shù)據(jù)保存到Hive表或者Parquet文件給Hive或Spark查詢(xún);
  • 提供搜索服務(wù)的使用方可以保存到Elasticsearch或HBase 中;
  • 提供緩存服務(wù)的使用方可以將日志緩存到Redis或alluxio中;
  • 數(shù)據(jù)同步的使用方可以將數(shù)據(jù)保存到自己的數(shù)據(jù)庫(kù)中;
  • 由于kafka的日志是可以重復(fù)消費(fèi)的,并且緩存一段時(shí)間,各個(gè)使用方可以通過(guò)消費(fèi)kafka的日志來(lái)達(dá)到既能保持與數(shù)據(jù)庫(kù)的一致性,也能保證實(shí)時(shí)性;

為什么使用log和kafka作為基礎(chǔ),而不使用Sqoop進(jìn)行抽取呢? 因?yàn)椋?/p>

為什么不使用dual write(雙寫(xiě))呢?,請(qǐng)參考https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

我這里就不多做解釋了。

二、總體架構(gòu)

于是我們提出了構(gòu)建一個(gè)基于log的公司級(jí)的平臺(tái)的想法。

下面解釋一下DWS平臺(tái), DWS平臺(tái)是有3個(gè)子項(xiàng)目組成:

  1. Dbus(數(shù)據(jù)總線):負(fù)責(zé)實(shí)時(shí)將數(shù)據(jù)從源端實(shí)時(shí)抽出,并轉(zhuǎn)換為約定的自帶schema的json格式數(shù)據(jù)(UMS 數(shù)據(jù)),放入kafka中;
  2. Wormhole(數(shù)據(jù)交換平臺(tái)):負(fù)責(zé)從kafka讀出數(shù)據(jù) 將數(shù)據(jù)寫(xiě)入到目標(biāo)中;
  3. Swifts(實(shí)時(shí)計(jì)算平臺(tái)):負(fù)責(zé)從kafka中讀出數(shù)據(jù),實(shí)時(shí)計(jì)算,并將數(shù)據(jù)寫(xiě)回kafka中。

圖中:

  • Log extractor和dbus共同完成數(shù)據(jù)抽取和數(shù)據(jù)轉(zhuǎn)換,抽取包括全量和增量抽取。
  • Wormhole可以將所有日志數(shù)據(jù)保存到HDFS中; 還可以將數(shù)據(jù)落地到所有支持jdbc的數(shù)據(jù)庫(kù),落地到HBash,Elasticsearch,Cassandra等;
  • Swifts支持以配置和SQL的方式實(shí)現(xiàn)對(duì)進(jìn)行流式計(jì)算,包括支持流式j(luò)oin,look up,filter,window aggregation等功能;
  • Dbus web是dbus的配置管理端,rider除了配置管理以外,還包括對(duì)Wormhole和Swifts運(yùn)行時(shí)管理,數(shù)據(jù)質(zhì)量校驗(yàn)等。

由于時(shí)間關(guān)系,我今天主要介紹DWS中的Dbus和Wormhole,在需要的時(shí)候附帶介紹一下Swifts。

三、dbus解決方案

日志解析

如前面所說(shuō),Dbus主要解決的是將日志從源端實(shí)時(shí)的抽出。 這里我們以MySQL為例子,簡(jiǎn)單說(shuō)明如何實(shí)現(xiàn)。

我們知道,雖然MySQL InnoDB有自己的log,MySQL主備同步是通過(guò)binlog來(lái)實(shí)現(xiàn)的。如下圖:

圖片來(lái)自:https://github.com/alibaba/canal

而binlog有三種模式:

Row 模式:日志中會(huì)記錄成每一行數(shù)據(jù)被修改的形式,然后在slave端再對(duì)相同的數(shù)據(jù)進(jìn)行修改。

Statement 模式: 每一條會(huì)修改數(shù)據(jù)的sql都會(huì)記錄到 master的bin-log中。slave在復(fù)制的時(shí)候SQL進(jìn)程會(huì)解析成和原來(lái)master端執(zhí)行過(guò)的相同的SQL來(lái)再次執(zhí)行。

Mixed模式: MySQL會(huì)根據(jù)執(zhí)行的每一條具體的sql語(yǔ)句來(lái)區(qū)分對(duì)待記錄的日志形式,也就是在Statement和Row之間選擇一種。

他們各自的優(yōu)缺點(diǎn)如下:

此處來(lái)自:http://www.jquerycn.cn/a_13625

由于statement 模式的缺點(diǎn),在與我們的DBA溝通過(guò)程中了解到,實(shí)際生產(chǎn)過(guò)程中都使用row 模式進(jìn)行復(fù)制。這使得讀取全量日志成為可能。

通常我們的MySQL布局是采用 2個(gè)master主庫(kù)(vip)+ 1個(gè)slave從庫(kù) + 1個(gè)backup容災(zāi)庫(kù) 的解決方案,由于容災(zāi)庫(kù)通常是用于異地容災(zāi),實(shí)時(shí)性不高也不便于部署。

為了最小化對(duì)源端產(chǎn)生影響,顯然我們讀取binlog日志應(yīng)該從slave從庫(kù)讀取。

讀取binlog的方案比較多,github上不少,參考https://github.com/search?utf8=%E2%9C%93&q=binlog。最終我們選用了阿里的canal做位日志抽取方。

Canal最早被用于阿里中美機(jī)房同步, canal原理相對(duì)比較簡(jiǎn)單:

  1. Canal模擬MySQL Slave的交互協(xié)議,偽裝自己為MySQL Slave,向MySQL Slave發(fā)送dump協(xié)議
  2. MySQL master收到dump請(qǐng)求,開(kāi)始推送binary log給Slave(也就是canal)
  3. Canal解析binary log對(duì)象(原始為byte流)

圖片來(lái)自:https://github.com/alibaba/canal

解決方案

Dbus 的MySQL版主要解決方案如下:

對(duì)于增量的log,通過(guò)訂閱Canal Server的方式,我們得到了MySQL的增量日志:

  • 按照Canal的輸出,日志是protobuf格式,開(kāi)發(fā)增量Storm程序,將數(shù)據(jù)實(shí)時(shí)轉(zhuǎn)換為我們定義的UMS格式(json格式,稍后我會(huì)介紹),并保存到kafka中;
  • 增量Storm程序還負(fù)責(zé)捕獲schema變化,以控制版本號(hào);
  • 增量Storm的配置信息保存在Zookeeper中,以滿足高可用需求。
  • Kafka既作為輸出結(jié)果也作為處理過(guò)程中的緩沖器和消息解構(gòu)區(qū)。
  • 在考慮使用Storm作為解決方案的時(shí)候,我們主要是認(rèn)為Storm有以下優(yōu)點(diǎn):
  • 技術(shù)相對(duì)成熟,比較穩(wěn)定,與kafka搭配也算標(biāo)準(zhǔn)組合;
  • 實(shí)時(shí)性比較高,能夠滿足實(shí)時(shí)性需求;
  • 滿足高可用需求;
  • 通過(guò)配置Storm并發(fā)度,可以活動(dòng)性能擴(kuò)展的能力;

全量抽取

對(duì)于流水表,有增量部分就夠了,但是許多表需要知道最初(已存在)的信息。這時(shí)候我們需要initial load(***次加載)。

對(duì)于initial load(***次加載),同樣開(kāi)發(fā)了全量抽取Storm程序通過(guò)jdbc連接的方式,從源端數(shù)據(jù)庫(kù)的備庫(kù)進(jìn)行拉取。initial load是拉全部數(shù)據(jù),所以我們推薦在業(yè)務(wù)低峰期進(jìn)行。好在只做一次,不需要每天都做。

全量抽取,我們借鑒了Sqoop的思想。將全量抽取Storm分為了2 個(gè)部分:

  1. 數(shù)據(jù)分片
  2. 實(shí)際抽取

數(shù)據(jù)分片需要考慮分片列,按照配置和自動(dòng)選擇列將數(shù)據(jù)按照范圍來(lái)分片,并將分片信息保存到kafka中。

下面是具體的分片策略:

全量抽取的Storm程序是讀取kafka的分片信息,采用多個(gè)并發(fā)度并行連接數(shù)據(jù)庫(kù)備庫(kù)進(jìn)行拉取。因?yàn)槌槿〉臅r(shí)間可能很長(zhǎng)。抽取過(guò)程中將實(shí)時(shí)狀態(tài)寫(xiě)到Zookeeper中,便于心跳程序監(jiān)控。

統(tǒng)一消息格式

無(wú)論是增量還是全量,最終輸出到kafka中的消息都是我們約定的一個(gè)統(tǒng)一消息格式,稱(chēng)為UMS(unified message schema)格式。

如下圖所示:

 

消息中schema部分,定義了namespace 是由 類(lèi)型+數(shù)據(jù)源名+schema名+表名+版本號(hào)+分庫(kù)號(hào)+分表號(hào) 能夠描述整個(gè)公司的所有表,通過(guò)一個(gè)namespace就能唯一定位。

  • _ums_op_ 表明數(shù)據(jù)的類(lèi)型是I(insert),U(update),D(刪除);
  • _ums_ts_ 發(fā)生增刪改的事件的時(shí)間戳,顯然新的數(shù)據(jù)發(fā)生的時(shí)間戳更新;
  • _ums_id_ 消息的唯一id,保證消息是唯一的,但這里我們保證了消息的先后順序(稍后解釋);

payload是指具體的數(shù)據(jù),一個(gè)json包里面可以包含1條至多條數(shù)據(jù),提高數(shù)據(jù)的有效載荷。

UMS中支持的數(shù)據(jù)類(lèi)型,參考了Hive類(lèi)型并進(jìn)行簡(jiǎn)化,基本上包含了所有數(shù)據(jù)類(lèi)型。

全量和增量的一致性

在整個(gè)數(shù)據(jù)傳輸中,為了盡量的保證日志消息的順序性,kafka我們使用的是1個(gè)partition的方式。在一般情況下,基本上是順序的和唯一的。

但是我們知道寫(xiě)kafka會(huì)失敗,有可能重寫(xiě),Storm也用重做機(jī)制,因此,我們并不嚴(yán)格保證exactly once和完全的順序性,但保證的是at least once。

因此_ums_id_變得尤為重要。

對(duì)于全量抽取,_ums_id_是唯一的,從zk中每個(gè)并發(fā)度分別取不同的id片區(qū),保證了唯一性和性能,填寫(xiě)負(fù)數(shù),不會(huì)與增量數(shù)據(jù)沖突,也保證他們是早于增量消息的。

對(duì)于增量抽取,我們使用的是MySQL的日志文件號(hào) + 日志偏移量作為唯一id。Id作為64位的long整數(shù),高7位用于日志文件號(hào),低12位作為日志偏移量。

例如:000103000012345678。 103 是日志文件號(hào),12345678 是日志偏移量。

這樣,從日志層面保證了物理唯一性(即便重做也這個(gè)id號(hào)也不變),同時(shí)也保證了順序性(還能定位日志)。通過(guò)比較_ums_id_ 消費(fèi)日志就能通過(guò)比較_ums_id_知道哪條消息更新。

其實(shí)_ums_ts_與_ums_id_意圖是類(lèi)似的,只不過(guò)有時(shí)候_ums_ts_可能會(huì)重復(fù),即在1毫秒中發(fā)生了多個(gè)操作,這樣就得靠比較_ums_id_了。

心跳監(jiān)控和預(yù)警

整個(gè)系統(tǒng)涉及到數(shù)據(jù)庫(kù)的主備同步,Canal Server,多個(gè)并發(fā)度Storm進(jìn)程等各個(gè)環(huán)節(jié)。

因此對(duì)流程的監(jiān)控和預(yù)警就尤為重要。

通過(guò)心跳模塊,例如每分鐘(可配置)對(duì)每個(gè)被抽取的表插入一條心態(tài)數(shù)據(jù)并保存發(fā)送時(shí)間,這個(gè)心跳表也被抽取,跟隨著整個(gè)流程下來(lái),與被同步表在實(shí)際上走相同的邏輯(因?yàn)槎鄠€(gè)并發(fā)的的Storm可能有不同的分支),當(dāng)收到心跳包的時(shí)候,即便沒(méi)有任何增刪改的數(shù)據(jù),也能證明整條鏈路是通的。

Storm程序和心跳程序?qū)?shù)據(jù)發(fā)送公共的統(tǒng)計(jì)topic,再由統(tǒng)計(jì)程序保存到influxdb中,使用grafana進(jìn)行展示,就可以看到如下效果:

圖中是某業(yè)務(wù)系統(tǒng)的實(shí)時(shí)監(jiān)控信息。上面是實(shí)時(shí)流量情況,下面是實(shí)時(shí)延時(shí)情況??梢钥吹?,實(shí)時(shí)性還是很不錯(cuò)的,基本上1~2秒數(shù)據(jù)就已經(jīng)到末端kafka中。

Granfana提供的是一種實(shí)時(shí)監(jiān)控能力。

如果出現(xiàn)延時(shí),則是通過(guò)dbus的心跳模塊發(fā)送郵件報(bào)警或短信報(bào)警。

實(shí)時(shí)脫敏

考慮到數(shù)據(jù)安全性,對(duì)于有脫敏需求的場(chǎng)景,Dbus的全量storm和增量storm程序也完成了實(shí)時(shí)脫敏的功能。脫敏方式有3種:

總結(jié)一下:簡(jiǎn)單的說(shuō),Dbus就是將各種源的數(shù)據(jù),實(shí)時(shí)的導(dǎo)出,并以UMS的方式提供訂閱, 支持實(shí)時(shí)脫敏,實(shí)際監(jiān)控和報(bào)警。

四、Wormhole解決方案

說(shuō)完Dbus,該說(shuō)一下Wormhole,為什么兩個(gè)項(xiàng)目不是一個(gè),而要通過(guò)kafka來(lái)對(duì)接呢?

其中很大一個(gè)原因就是解耦,kafka具有天然的解耦能力,程序直接可以通過(guò)kafka做異步的消息傳遞。Dbus和Wornhole內(nèi)部也使用了kafka做消息傳遞和解耦。

另外一個(gè)原因就是,UMS是自描述的,通過(guò)訂閱kafka,任何有能力的使用方來(lái)直接消費(fèi)UMS來(lái)使用。

雖然UMS的結(jié)果可以直接訂閱,但還需要開(kāi)發(fā)的工作。Wormhole解決的是:提供一鍵式的配置,將kafka中的數(shù)據(jù)落地到各種系統(tǒng)中,讓沒(méi)有開(kāi)發(fā)能力的數(shù)據(jù)使用方通過(guò)wormhole來(lái)實(shí)現(xiàn)使用數(shù)據(jù)。

如圖所示,Wormhole 可以將kafka中的UMS 落地到各種系統(tǒng),目前用的最多的HDFS,JDBC的數(shù)據(jù)庫(kù)和HBase。

在技術(shù)棧上, wormhole選擇使用spark streaming來(lái)進(jìn)行。

在Wormhole中,一條flow是指從一個(gè)namaspace從源端到目標(biāo)端。一個(gè)spark streaming服務(wù)于多條flow。

選用Spark的理由是很充分的:

  • Spark天然的支持各種異構(gòu)存儲(chǔ)系統(tǒng);
  • 雖然Spark Stream比Storm延時(shí)稍差,但Spark有著更好的吞吐量和更好的計(jì)算性能;
  • Spark在支持并行計(jì)算方面有更強(qiáng)的靈活性;
  • Spark提供了一個(gè)技術(shù)棧內(nèi)解決Sparking Job,Spark Streaming,Spark SQL的統(tǒng)一功能,便于后期開(kāi)發(fā);
  • 這里補(bǔ)充說(shuō)一下Swifts的作用:
  • Swifts的本質(zhì)是讀取kafka中的UMS數(shù)據(jù),進(jìn)行實(shí)時(shí)計(jì)算,將結(jié)果寫(xiě)入到kafka的另外一個(gè)topic。
  • 實(shí)時(shí)計(jì)算可以是很多種方式:比如過(guò)濾filter,projection(投影),lookup, 流式j(luò)oin window aggregation,可以完成各種具有業(yè)務(wù)價(jià)值的流式實(shí)時(shí)計(jì)算。

Wormhole和Swifts對(duì)比如下:

落HDFS

通過(guò)Wormhole Wpark Streaming程序消費(fèi)kafka的UMS,首先UMS log可以被保存到HDFS上。

kafka一般只保存若干天的信息,不會(huì)保存全部信息,而HDFS中可以保存所有的歷史增刪改的信息。這就使得很多事情變?yōu)榭赡埽?/p>

  • 通過(guò)重放HDFS中的日志,我們能夠還原任意時(shí)間的歷史快照。
  • 可以做拉鏈表,還原每一條記錄的歷史信息,便于分析;
  • 當(dāng)程序出現(xiàn)錯(cuò)誤是,可以通過(guò)回灌(backfill),重新消費(fèi)消息,重新形成新的快照。

可以說(shuō)HDFS中的日志是很多的事情基礎(chǔ)。

介于Spark原生對(duì)parquet支持的很好,Spark SQL能夠?qū)arquet提供很好的查詢(xún)。UMS落地到HDFS上是保存到Parquet文件中的。Parquet的內(nèi)容是所有l(wèi)og的增刪改信息以及_ums_id_,_ums_ts_都存下來(lái)。

Wormhole spark streaming根據(jù)namespace 將數(shù)據(jù)分布存儲(chǔ)到不同的目錄中,即不同的表和版本放在不同目錄中。

由于每次寫(xiě)的Parquet都是小文件,大家知道HDFS對(duì)于小文件性能并不好,因此另外還有一個(gè)job,每天定時(shí)將這些的Parquet文件進(jìn)行合并成大文件。

每個(gè)Parquet文件目錄都帶有文件數(shù)據(jù)的起始時(shí)間和結(jié)束時(shí)間。這樣在回灌數(shù)據(jù)時(shí),可以根據(jù)選取的時(shí)間范圍來(lái)決定需要讀取哪些Parquet文件,不必讀取全部數(shù)據(jù)。

插入或更新數(shù)據(jù)的冪等性

常常我們遇到的需求是,將數(shù)據(jù)經(jīng)過(guò)加工落地到數(shù)據(jù)庫(kù)或HBase中。那么這里涉及到的一個(gè)問(wèn)題就是,什么樣的數(shù)據(jù)可以被更新到數(shù)據(jù)?

這里最重要的一個(gè)原則就是數(shù)據(jù)的冪等性。

無(wú)論是遇到增刪改任何的數(shù)據(jù),我們面臨的問(wèn)題都是:

  1. 該更新哪一行;
  2. 更新的策略是什么。

對(duì)于***個(gè)問(wèn)題,其實(shí)就需要定位數(shù)據(jù)要找一個(gè)唯一的鍵,常見(jiàn)的有:

  1. 使用業(yè)務(wù)庫(kù)的主鍵;
  2. 由業(yè)務(wù)方指定幾個(gè)列做聯(lián)合唯一索引;

對(duì)于第二個(gè)問(wèn)題,就涉及到_ums_id_了,因?yàn)槲覀円呀?jīng)保證了_ums_id_大的值更新,因此在找到對(duì)應(yīng)數(shù)據(jù)行后,根據(jù)這個(gè)原則來(lái)進(jìn)行替換更新。

之所以要軟刪除和加入_is_active_列,是為了這樣一種情況:

如果已經(jīng)插入的_ums_id_比較大,是刪除的數(shù)據(jù)(表明這個(gè)數(shù)據(jù)已經(jīng)刪除了), 如果不是軟刪除,此時(shí)插入一個(gè)_ums_id_小的數(shù)據(jù)(舊數(shù)據(jù)),就會(huì)真的插入進(jìn)去。

這就導(dǎo)致舊數(shù)據(jù)被插入了。不冪等了。所以被刪除的數(shù)據(jù)依然保留(軟刪除)是有價(jià)值的,它能被用于保證數(shù)據(jù)的冪等性。

HBase的保存

插入數(shù)據(jù)到Hbase中,相當(dāng)要簡(jiǎn)單一些。不同的是HBase可以保留多個(gè)版本的數(shù)據(jù)(當(dāng)然也可以只保留一個(gè)版本)默認(rèn)是保留3個(gè)版本;

因此插入數(shù)據(jù)到HBase,需要解決的問(wèn)題是:

  1. 選擇合適的rowkey:Rowkey的設(shè)計(jì)是可以選的,用戶(hù)可以選擇源表的主鍵,也可以選擇若干列做聯(lián)合主鍵。
  2. 選擇合適的version:使用_ums_id_+ 較大的偏移量(比如100億) 作為row的version。

Version的選擇很有意思,利用_ums_id_的唯一性和自增性,與version自身的比較關(guān)系一致:即version較大等價(jià)于_ums_id_較大,對(duì)應(yīng)的版本較新。

從提高性能的角度,我們可以將整個(gè)Spark Streaming的Dataset集合直接插入到HBase,不需要比較。讓HBase基于version自動(dòng)替我們判斷哪些數(shù)據(jù)可以保留,哪些數(shù)據(jù)不需要保留。

Jdbc的插入數(shù)據(jù):

插入數(shù)據(jù)到數(shù)據(jù)庫(kù)中,保證冪等的原理雖然簡(jiǎn)單,要想提高性能在實(shí)現(xiàn)上就變得復(fù)雜很多,總不能一條一條的比較然后在插入或更新。

我們知道Spark的RDD/dataset都是以集合的方式來(lái)操作以提高性能,同樣的我們需要以集合操作的方式實(shí)現(xiàn)冪等性。

具體思路是:

  1. 首先根據(jù)集合中的主鍵到目標(biāo)數(shù)據(jù)庫(kù)中查詢(xún),得到一個(gè)已有數(shù)據(jù)集合;
  2. 與dataset中的集合比較,分出兩類(lèi):

A:不存在的數(shù)據(jù),即這部分?jǐn)?shù)據(jù)insert就可以;

B:存在的數(shù)據(jù),比較_ums_id_, 最終只將哪些_ums_id_更新較大row到目標(biāo)數(shù)據(jù)庫(kù),小的直接拋棄。

使用Spark的同學(xué)都知道,RDD/dataset都是可以partition的,可以使用多個(gè)worker并進(jìn)行操作以提高效率。

在考慮并發(fā)情況下,插入和更新都可能出現(xiàn)失敗,那么還有考慮失敗后的策略。

比如:因?yàn)閯e的worker已經(jīng)插入,那么因?yàn)槲ㄒ恍约s束插入失敗,那么需要改為更新,還要比較_ums_id_看是否能夠更新。

對(duì)于無(wú)法插入其他情況(比如目標(biāo)系統(tǒng)有問(wèn)題),Wormhole還有重試機(jī)制。說(shuō)起來(lái)細(xì)節(jié)特別多。這里就不多介紹了。

有些還在開(kāi)發(fā)中。

插入到其他存儲(chǔ)中的就不多介紹了,總的原則是:根據(jù)各自存儲(chǔ)自身特性,設(shè)計(jì)基于集合的,并發(fā)的插入數(shù)據(jù)實(shí)現(xiàn)。這些都是Wormhole為了性能而做的努力,使用Wormhole的用戶(hù)不必關(guān)心 。

五、運(yùn)用案例

實(shí)時(shí)營(yíng)銷(xiāo)

說(shuō)了那么多,DWS有什么實(shí)際運(yùn)用呢?下面我來(lái)介紹某系統(tǒng)使用DWS實(shí)現(xiàn)了的實(shí)時(shí)營(yíng)銷(xiāo)。

如上圖所示:

系統(tǒng)A的數(shù)據(jù)都保存到自己的數(shù)據(jù)庫(kù)中,我們知道,宜信提供很多金融服務(wù),其中包括借款,而借款過(guò)程中很重要的就是信用審核。

借款人需要提供證明具有信用價(jià)值的信息,比如央行征信報(bào)告,是具有***信用數(shù)據(jù)的數(shù)據(jù)。 而銀行流水,網(wǎng)購(gòu)流水也是具有較強(qiáng)的信用屬性的數(shù)據(jù)。

借款人通過(guò)Web或手機(jī)APP在系統(tǒng)A中填寫(xiě)信用信息時(shí),可能會(huì)某些原因無(wú)法繼續(xù),雖然可能這個(gè)借款人是一個(gè)優(yōu)質(zhì)潛在客戶(hù),但以前由于無(wú)法或很久才能知道這個(gè)信息,所以實(shí)際上這樣的客戶(hù)是流失了。

應(yīng)用了DWS以后,借款人已經(jīng)填寫(xiě)的信息已經(jīng)記錄到數(shù)據(jù)庫(kù)中,并通過(guò)DWS實(shí)時(shí)的進(jìn)行抽取、計(jì)算和落地到目標(biāo)庫(kù)中。根據(jù)對(duì)客戶(hù)的打分,評(píng)價(jià)出優(yōu)質(zhì)客戶(hù)。然后立刻將這個(gè)客戶(hù)的信息輸出到客服系統(tǒng)中。

客服人員在很短的時(shí)間(幾分鐘以?xún)?nèi))就通過(guò)打電話的方式聯(lián)系上這個(gè)借款人(潛客),進(jìn)行客戶(hù)關(guān)懷,將這個(gè)潛客轉(zhuǎn)換為真正的客戶(hù)。我們知道借款是有時(shí)效性的,如果時(shí)間太久就沒(méi)有價(jià)值了。

如果沒(méi)有實(shí)時(shí)抽取/計(jì)算/落庫(kù)的能力,那么這一切都無(wú)法實(shí)現(xiàn)。

實(shí)時(shí)報(bào)表系統(tǒng)

另外一個(gè)實(shí)時(shí)報(bào)表的應(yīng)用如下:

我們數(shù)據(jù)使用方的數(shù)據(jù)來(lái)自多個(gè)系統(tǒng),以前是通過(guò)T+1的方式獲得報(bào)表信息,然后指導(dǎo)第二天的運(yùn)營(yíng),這樣時(shí)效性很差。

通過(guò)DWS,將數(shù)據(jù)從多個(gè)系統(tǒng)中實(shí)時(shí)抽取,計(jì)算和落地,并提供報(bào)表展示,使得運(yùn)營(yíng)可以及時(shí)作出部署和調(diào)整,快速應(yīng)對(duì)。

六、總結(jié)

說(shuō)了那么多,大致總結(jié)一下:

  • DWS技術(shù)上基于主流實(shí)時(shí)流式大數(shù)據(jù)技術(shù)框架,高可用大吞吐強(qiáng)水平擴(kuò)容,低延遲高容錯(cuò)最終一致。
  • DWS能力上支持異構(gòu)多源多目標(biāo)系統(tǒng),支持多數(shù)據(jù)格式(結(jié)構(gòu)化半結(jié)構(gòu)化非結(jié)構(gòu)化數(shù)據(jù))和實(shí)時(shí)技術(shù)能力。
  • DWS將三個(gè)子項(xiàng)目合并作為一個(gè)平臺(tái)推出,使得我們具備了實(shí)時(shí)的能力, 驅(qū)動(dòng)各種實(shí)時(shí)場(chǎng)景應(yīng)用。

適合場(chǎng)景包括:實(shí)時(shí)同步/實(shí)時(shí)計(jì)算/實(shí)時(shí)監(jiān)控/實(shí)時(shí)報(bào)表/實(shí)時(shí)分析/實(shí)時(shí)洞察/實(shí)時(shí)管理/實(shí)時(shí)運(yùn)營(yíng)/實(shí)時(shí)決策

感謝大家的聆聽(tīng),此次分享到此為止。

Q&A

Q1:Oracle log reader有開(kāi)源方案嗎?

A1:對(duì)于Oracle業(yè)界也有許多商業(yè)解決方案,例如:Oracle GoldenGate(原來(lái)的goldengate), Oracle Xstream, IBM InfoSphere Change Data Capture(原來(lái)的DataMirror),Dell SharePlex (原來(lái)的Quest),國(guó)內(nèi)的DSG superSync等,開(kāi)源的方案好用的很少。

Q2:這個(gè)項(xiàng)目投入了多少人力物力?感覺(jué)有點(diǎn)復(fù)雜。

Q2:DWS是三個(gè)子項(xiàng)目組成,平均每個(gè)項(xiàng)目5~7人。是有點(diǎn)復(fù)雜,其實(shí)也是試圖使用大數(shù)據(jù)技術(shù)來(lái)解決我們公司目前遇到的困難。

因?yàn)槭歉愦髷?shù)據(jù)相關(guān)技術(shù),所有團(tuán)隊(duì)里面的兄弟姐妹都還是比較happy的:)

其實(shí)這里面,Dbus和Wormhole相對(duì)固定模式化,容易輕松復(fù)用。Swifts實(shí)時(shí)計(jì)算是與每個(gè)業(yè)務(wù)相關(guān)比較大的,自定義比較強(qiáng),相對(duì)比較麻煩一些。

Q3:宜信的這個(gè)DWS系統(tǒng)會(huì)開(kāi)源么?

A3:我們也考慮過(guò)向社區(qū)貢獻(xiàn),就像宜信的其他開(kāi)源項(xiàng)目一樣,目前項(xiàng)目剛剛成形,還有待進(jìn)一步磨煉,我相信未來(lái)的某個(gè)時(shí)候,我們會(huì)給它開(kāi)源出來(lái)。

Q4:架構(gòu)師怎么理解,是不是系統(tǒng)工程師?

A4:不是系統(tǒng)工程師,在我們宜信有多位架構(gòu)師,應(yīng)該算是以技術(shù)驅(qū)動(dòng)業(yè)務(wù)的技術(shù)管理人員。包含產(chǎn)品設(shè)計(jì),技術(shù)管理等。

Q5:復(fù)制方案是否是OGG?

A5:OGG與上面提到的其他商業(yè)解決方案都是可選方案。

責(zé)任編輯:武曉燕 來(lái)源: 運(yùn)維派
相關(guān)推薦

2021-07-26 06:33:42

CRDT數(shù)據(jù)CAP

2024-08-20 16:13:52

2024-06-04 09:51:48

2020-06-01 22:09:48

緩存緩存同步緩存誤用

2021-12-14 07:15:57

MySQLRedis數(shù)據(jù)

2019-08-30 12:46:10

并發(fā)扣款查詢(xún)SQL

2022-09-15 10:37:46

MySQLRedis數(shù)據(jù)一致性

2022-02-17 21:04:27

數(shù)據(jù)庫(kù)MysqlRedis

2023-05-26 07:34:50

RedisMySQL緩存

2025-03-27 08:20:54

2017-07-25 14:38:56

數(shù)據(jù)庫(kù)一致性非鎖定讀一致性鎖定讀

2020-11-24 09:03:41

一致性MySQLMVCC

2021-03-04 06:49:53

RocketMQ事務(wù)

2017-06-27 09:40:28

MYSQL數(shù)據(jù)備份

2022-10-19 12:22:53

并發(fā)扣款一致性

2024-12-26 15:01:29

2023-09-07 08:11:24

Redis管道機(jī)制

2022-12-14 08:23:30

2021-05-19 21:50:46

Hash算法測(cè)試

2022-11-10 07:49:09

hash算法代碼
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)