拉取 Binlog,自動(dòng)數(shù)據(jù)同步,老板要給漲工資....
本文轉(zhuǎn)載自微信公眾號(hào)「微觀技術(shù)」,作者Tom哥 。轉(zhuǎn)載本文請(qǐng)聯(lián)系微觀技術(shù)公眾號(hào)。
大家好,我是Tom哥~
MySQL 數(shù)據(jù)庫(kù)大家一定都不陌生,今天跟大家聊聊數(shù)據(jù)同步的事
關(guān)于數(shù)據(jù)同步,我們常見(jiàn)的策略就是 同步雙寫、異步消息
1、同步雙寫:字面意思,同步+雙寫。比如老庫(kù)模型重構(gòu),數(shù)據(jù)遷移到新庫(kù),遷移過(guò)程中,如果有數(shù)據(jù)變更,既要寫到老庫(kù),也要寫到新庫(kù),兩邊同步更新。
- 優(yōu)點(diǎn):同步機(jī)制,保證了數(shù)據(jù)的實(shí)效性。
- 缺點(diǎn):額外增加同步處理邏輯,會(huì)有性能損耗
2、異步消息:如果依賴方過(guò)多,我們通常是將變更數(shù)據(jù)異構(gòu)發(fā)送到MQ消息系統(tǒng),感興趣的業(yè)務(wù)可以訂閱消息Topic,拉取消息,然后按自己的業(yè)務(wù)邏輯處理。
- 優(yōu)點(diǎn):架構(gòu)解耦,可以采用異步來(lái)做,降低主鏈路的性能損耗。如果是多個(gè)消費(fèi)方,不會(huì)出現(xiàn)指數(shù)性能疊加
- 缺點(diǎn):異步機(jī)制,無(wú)法滿足實(shí)時(shí)性,有一定延遲。只能達(dá)到最終一致性。
上面兩種方案,都是采用硬編碼,那么有沒(méi)有通用的技術(shù)方案。不關(guān)心你是什么業(yè)務(wù),寫入什么數(shù)據(jù),對(duì)平臺(tái)來(lái)講可以抽象成一張張 MySQL 表,直接同步表數(shù)據(jù)。只有使用方才真正去關(guān)心數(shù)據(jù)內(nèi)容。
可以參考 MySQL 的主從同步原理,拉取 binlog,只要將里面的數(shù)據(jù)解析出來(lái)即可。
流行的中間件是阿里開(kāi)源的 Canal,今天我們就來(lái)做個(gè)技術(shù)方案,大概內(nèi)容如下:
一、Canal 介紹
Canal,譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。
Canal 誕生之初是為了解決多個(gè)備庫(kù)與主庫(kù)間數(shù)據(jù)同步,對(duì)主庫(kù)造成的壓力。
慢慢的,這個(gè)管道被發(fā)揚(yáng)光大,應(yīng)用場(chǎng)景也越來(lái)越多
工作原理很簡(jiǎn)單,把自己偽裝成 MySQL 的 slave,模擬 MySQL slave 的交互協(xié)議向 MySQL master 發(fā)送 dump 請(qǐng)求。
MySQL master 收到canal發(fā)送過(guò)來(lái)的dump請(qǐng)求,開(kāi)始推送binary log給canal,然后canal解析binlog 日志,再存儲(chǔ)到不同的存儲(chǔ)介質(zhì)中,比如:MySQL、Kafka、Elastic Search、Pulsar 等
業(yè)務(wù)場(chǎng)景:
- 數(shù)據(jù)庫(kù)實(shí)時(shí)備份
- ES 數(shù)據(jù)索引的構(gòu)建和維護(hù)
- 分布式緩存(如:Redis)的同步維護(hù)
- 數(shù)據(jù)異構(gòu),訂閱方可以按自己的業(yè)務(wù)需求訂閱消費(fèi),如:Kafka、Pulsar 等
二、安裝 MySQL
1、拉取 MySQL 鏡像
- docker pull mysql:5.7
2、查看鏡像
- docker images
3、啟動(dòng) MySQL 進(jìn)程
- docker run \
- --name mysql \
- -p 3306:3306 \
- -e MYSQL_ROOT_PASSWORD=123456 \
- -d mysql:5.7
4、查看進(jìn)程
- [root@iZbp12gqydkgwid86ftoauZ mysql]# docker ps -a
- CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
- e92827897538 mysql "docker-entrypoint.s…" 4 seconds ago Up 2 seconds 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
5、進(jìn)入 MySQL 容器
- docker exec -it 167bfa3785f1 /bin/bash
注意:修改一些配置文件,可能會(huì)遇到一些問(wèn)題,如:
docker容器中使用vi或vim提示bash: vi: command not found的處理方法
因?yàn)闆](méi)有安裝vi編輯器,可以執(zhí)行下面命令
- apt-get update
- apt-get install vim
6、常用 MySQL 客戶端命令
- # 登陸 mysql
- mysql -uroot -p111111
- # 顯示數(shù)據(jù)庫(kù)列表
- show databases;
- # 選擇數(shù)據(jù)庫(kù)
- use mysql;
- # 顯示所有表
- show tables;
- # 顯示表結(jié)構(gòu)
- describe 表名;
- 其他更多命令:
- https://www.cnblogs.com/bluecobra/archive/2012/01/11/2318922.html
三、MySQL 相關(guān)配置
創(chuàng)建一個(gè) MySQL 用戶,用戶名:tom ,密碼:123456
- create user 'tom'@'%' identified by '123456';
為用戶:tom 授予所有庫(kù)的讀寫權(quán)限
- grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'tom'@'%' identified by '123456';
修改 MySQL 配置文件 my.cnf,位置:/etc/my.cnf
- [mysqld]
- log-bin=mysql-bin # 開(kāi)啟 binlog
- binlog-format=ROW # 選擇 行 模式
- server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)
注意:需要重啟MySQL容器實(shí)例,執(zhí)行命令 docker restart mysql
查看binlog模式:
查看binlog日志文件列表:
查看當(dāng)前正在寫入的binlog文件:
四、安裝 Canal
1、從官網(wǎng)下載安裝包
下載地址:
https://github.com/alibaba/canal/releases
本文實(shí)驗(yàn)用的是最新版本 v1.1.5,主要是對(duì)不同的客戶端的個(gè)性化支持,屬于生態(tài)擴(kuò)展。
其他更多特性,大家可以去官網(wǎng)查看
解壓 tar.gz 壓縮包
- tar -zxvf canal.deployer-1.1.5.tar.gz
打開(kāi)配置文件 conf/example/instance.properties,修改配置如下:
- ## v1.0.26版本后會(huì)自動(dòng)生成slaveId,所以可以不用配置
- # canal.instance.mysql.slaveId=0
- # 數(shù)據(jù)庫(kù)地址
- canal.instance.master.address=127.0.0.1:3306
- # binlog日志名稱
- canal.instance.master.journal.name=mysql-bin.000001
- # mysql主庫(kù)鏈接時(shí)起始的binlog偏移量
- canal.instance.master.position=156
- # mysql主庫(kù)鏈接時(shí)起始的binlog的時(shí)間戳
- canal.instance.master.timestamp=
- canal.instance.master.gtid=
- # username/password
- # 在MySQL服務(wù)器授權(quán)的賬號(hào)密碼
- canal.instance.dbUsername=root
- canal.instance.dbPassword=111111
- # 字符集
- canal.instance.connectionCharset = UTF-8
- # enable druid Decrypt database password
- canal.instance.enableDruid=false
- # table regex .*\\..*表示監(jiān)聽(tīng)所有表 也可以寫具體的表名,用,隔開(kāi)
- canal.instance.filter.regex=.*\\..*
- # mysql 數(shù)據(jù)解析表的黑名單,多個(gè)表用,隔開(kāi)
- canal.instance.filter.black.regex=
啟動(dòng)命令
- ./startup.sh
由于采用的阿里云的 ECS 服務(wù)器,發(fā)現(xiàn)沒(méi)有安裝 JAVA 環(huán)境。
Oracle 官網(wǎng)下載 JDK 8 的安裝包
下載地址:
https://www.oracle.com/java/technologies/downloads/#java8
然后,通過(guò)下面的命令將安裝包上傳到 ECS 服務(wù)器
- scp jdk-8u311-linux-x64.tar.gz root@118.31.168.234:/root/java //上傳文件
安裝 JDK 8 環(huán)境
文檔:https://developer.aliyun.com/article/701864
五、啟動(dòng) Canal
進(jìn)入 canal.deployer-1.1.5/bin
執(zhí)行啟動(dòng)腳本:
- ./startup.sh
進(jìn)入 canal.deployer-1.1.5/logs/example
如果 example.log 日志文件中,出現(xiàn)下面的內(nèi)容,表示啟動(dòng)成功
- 2022-01-03 08:23:10.165 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop CannalInstance for null-example
- 2022-01-03 08:23:10.177 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop successful....
- 2022-01-03 08:23:10.298 [canal-instance-scan-0] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
- 2022-01-03 08:23:10.298 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
- 2022-01-03 08:23:10.298 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
- 2022-01-03 08:23:10.299 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
六、工程實(shí)驗(yàn)
創(chuàng)建一個(gè) SpringBoot 工程,spring-boot-bulking-canal
引入相關(guān)pom依賴
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.4</version>
- </dependency>
編寫java類,與 canal 服務(wù)端 建立連接,拉取數(shù)據(jù)庫(kù)的變更數(shù)據(jù)
- // 創(chuàng)建鏈接
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
- try {
- //打開(kāi)連接
- connector.connect();
- //訂閱全部表
- connector.subscribe(".*\\..*");
- //回滾到未進(jìn)行ack的地方,下次fetch的時(shí)候,可以從最后一個(gè)沒(méi)有ack的地方開(kāi)始拿
- connector.rollback();
- while (true) {
- Message message = connector.getWithoutAck(BATCH_SIZE);
- long batchId = message.getId();
- printEntry(message.getEntries());
- // batch id 提交
- connector.ack(batchId);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- connector.disconnect();
- }
在 ds1 數(shù)據(jù)庫(kù)下創(chuàng)建 MySQL 表
- CREATE TABLE `person` (
- `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
- `income` bigint(20) NOT NULL COMMENT '收入',
- `expend` bigint(20) NOT NULL COMMENT '支出',
- PRIMARY KEY (`id`),
- KEY `idx_income` (`income`)
- ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='個(gè)人收支表';
插入一條記錄:
- insert into person values(100,1000,1000);
Java類解析binlog,在控制臺(tái)打印變更日志:
- binlog[mysql-bin.000002:1946] , table[ds1,person] , eventType : INSERT
- id : 100 update=true
- income : 1000 update=true
- expend : 1000 update=true
對(duì) id=100 記錄做修改:
- update person set income=2000, expend=2000 where id=100;
控制臺(tái)打印變更日志:
- binlog[mysql-bin.000002:2252] , table[ds1,person] , eventType : UPDATE
- ------->; before
- id : 100 update=false
- income : 1000 update=false
- expend : 1000 update=false
- ------->; after
- id : 100 update=false
- income : 2000 update=true
- expend : 2000 update=true