SpringBoot與Canal整合,實(shí)現(xiàn)金融交易系統(tǒng)的實(shí)時(shí)數(shù)據(jù)同步功能
作者:Java知識(shí)日歷
Canal是阿里巴巴開(kāi)源的一個(gè)用于高效抓取 MySQL 數(shù)據(jù)庫(kù)增量變更日志(binlog)并進(jìn)行處理的中間件。它可以將 MySQL 的 binlog 解析為結(jié)構(gòu)化的 JSON 格式,并提供多種方式將這些數(shù)據(jù)推送到下游系統(tǒng)。
Canal是阿里巴巴開(kāi)源的一個(gè)用于高效抓取 MySQL 數(shù)據(jù)庫(kù)增量變更日志(binlog)并進(jìn)行處理的中間件。它可以將 MySQL 的 binlog 解析為結(jié)構(gòu)化的 JSON 格式,并提供多種方式將這些數(shù)據(jù)推送到下游系統(tǒng)。
我們?yōu)槭裁催x擇Canal?
- 實(shí)時(shí)性: Canal基于MySQL的binlog機(jī)制,能夠在毫秒級(jí)內(nèi)完成數(shù)據(jù)同步。
- 批量獲取數(shù)據(jù):Canal支持批量獲取數(shù)據(jù)庫(kù)變更數(shù)據(jù),減少網(wǎng)絡(luò)開(kāi)銷和處理時(shí)間。
- 多線程處理:Canal可以配置多線程來(lái)處理不同的數(shù)據(jù)變更事件,提高整體吞吐量。
- 斷點(diǎn)續(xù)傳:Canal支持從斷點(diǎn)繼續(xù)消費(fèi)數(shù)據(jù),確保數(shù)據(jù)不會(huì)丟失。
- 持久化存儲(chǔ):Canal可以將消費(fèi)進(jìn)度持久化到ZooKeeper中,保證在故障恢復(fù)后能夠繼續(xù)正常工作。
- 容錯(cuò)機(jī)制:Canal內(nèi)置了多種容錯(cuò)機(jī)制,如重試策略和自動(dòng)恢復(fù)功能,提高了系統(tǒng)的可靠性。
- 標(biāo)準(zhǔn)協(xié)議:Canal使用標(biāo)準(zhǔn)化的binlog協(xié)議,易于與其他系統(tǒng)集成。
- 過(guò)濾機(jī)制:Canal支持靈活的過(guò)濾規(guī)則,可以選擇性地訂閱特定的數(shù)據(jù)庫(kù)和表。
- 動(dòng)態(tài)配置:Canal支持動(dòng)態(tài)配置,可以根據(jù)實(shí)際需求調(diào)整監(jiān)控范圍和處理邏輯。
- 自定義處理:Canal允許開(kāi)發(fā)者編寫(xiě)自定義的處理器,實(shí)現(xiàn)復(fù)雜的數(shù)據(jù)處理邏輯。
- 精確同步:Canal能夠精確地捕獲和同步數(shù)據(jù)庫(kù)的每一行變更,確保數(shù)據(jù)的一致性。
- 事務(wù)支持:Canal能夠處理復(fù)雜的事務(wù)場(chǎng)景,確保事務(wù)的原子性和完整性。
- 沖突解決:Canal提供了多種沖突解決策略,避免數(shù)據(jù)同步過(guò)程中的沖突問(wèn)題。
哪些公司使用了Canal?
- 阿里巴巴 :Canal 被用于多個(gè)業(yè)務(wù)部門的數(shù)據(jù)同步需求。
- 騰訊 :在社交網(wǎng)絡(luò)、游戲等業(yè)務(wù)中使用 Canal 進(jìn)行數(shù)據(jù)同步。
- 美團(tuán):在餐飲外賣、酒店預(yù)訂等多個(gè)業(yè)務(wù)中使用 Canal 進(jìn)行數(shù)據(jù)同步。
- 小米 :在智能家居、手機(jī)銷售等多種業(yè)務(wù)中使用 Canal 進(jìn)行數(shù)據(jù)同步。
- 滴滴出行:在網(wǎng)約車、共享單車等多種業(yè)務(wù)中使用 Canal 進(jìn)行數(shù)據(jù)同步。
- 網(wǎng)易:在游戲、音樂(lè)等多種業(yè)務(wù)中使用 Canal 進(jìn)行數(shù)據(jù)同步。
代碼實(shí)操
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>canal-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>canal-demo</name>
<description>Demo project for Spring Boot with Canal</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
# 數(shù)據(jù)源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example
交易實(shí)體類
package com.example.canaldemo.model;
import lombok.Data;
@Data
public class Transaction {
private Long id; // 主鍵ID
private String transactionId; // 交易ID
private Double amount; // 交易金額
private String status; // 交易狀態(tài)
}
create table
CREATE TABLE transaction (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
transaction_id VARCHAR(50) NOT NULL,
amount DECIMAL(18, 2) NOT NULL,
status VARCHAR(20) NOT NULL
);
交易Mapper接口
package com.example.canaldemo.mapper;
import com.example.canaldemo.model.Transaction;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
/**
* 交易Mapper接口
*/
@Mapper
public interface TransactionMapper {
/**
* 插入一條新的交易記錄
*
* @param transaction 交易對(duì)象
*/
@Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")
void insert(@Param("transaction") Transaction transaction);
/**
* 更新一條交易記錄
*
* @param transaction 交易對(duì)象
*/
@Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")
void update(@Param("transaction") Transaction transaction);
}
Canal監(jiān)聽(tīng)器類
package com.example.canaldemo.listener;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.example.canaldemo.mapper.TransactionMapper;
import com.example.canaldemo.model.Transaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;
/**
* Canal監(jiān)聽(tīng)器類,用于監(jiān)聽(tīng)數(shù)據(jù)庫(kù)的變化并進(jìn)行相應(yīng)的處理
*/
@Component
public class CanalListener {
private final String destination = "example"; // 這個(gè)值需要與Canal配置中的destination一致
private final String serverIp = "127.0.0.1";
private final int port = 11111;
@Autowired
private TransactionMapper transactionMapper;
/**
* 在Bean初始化后啟動(dòng)Canal監(jiān)聽(tīng)器
*/
@PostConstruct
public void start() {
// 創(chuàng)建Canal連接器
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
try {
// 連接到Canal服務(wù)器
connector.connect();
// 訂閱所有數(shù)據(jù)庫(kù)的所有表
connector.subscribe(".*\\..*");
// 回滾到上次中斷的位置
connector.rollback();
while (true) {
// 獲取一批消息,最多100條
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 如果沒(méi)有消息,則等待1秒
Thread.sleep(1000);
} else {
// 處理消息
processMessage(message.getEntries());
}
// 提交確認(rèn)
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 斷開(kāi)連接
connector.disconnect();
}
}
/**
* 處理Canal發(fā)送過(guò)來(lái)的消息
*
* @param entryList 消息列表
*/
private void processMessage(List<CanalEntry.Entry> entryList) {
for (CanalEntry.Entry entry : entryList) {
// 忽略事務(wù)開(kāi)始和結(jié)束事件
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage;
try {
// 解析RowChange數(shù)據(jù)
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
// 打印日志
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
// 處理每一行數(shù)據(jù)變化
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());
if (eventType == CanalEntry.EventType.DELETE) {
// 處理刪除事件(如果需要)
} elseif (eventType == CanalEntry.EventType.INSERT) {
// 插入新記錄
transactionMapper.insert(transaction);
} else {
// 更新現(xiàn)有記錄
transactionMapper.update(transaction);
}
}
}
}
/**
* 將Canal列數(shù)據(jù)轉(zhuǎn)換為Transaction對(duì)象
*
* @param columns 列數(shù)據(jù)列表
* @return 轉(zhuǎn)換后的Transaction對(duì)象
*/
private Transaction convertToTransaction(List<CanalEntry.Column> columns) {
Transaction transaction = new Transaction();
for (CanalEntry.Column column : columns) {
switch (column.getName()) {
case"id":
transaction.setId(Long.parseLong(column.getValue()));
break;
case"transaction_id":
transaction.setTransactionId(column.getValue());
break;
case"amount":
transaction.setAmount(Double.parseDouble(column.getValue()));
break;
case"status":
transaction.setStatus(column.getValue());
break;
}
}
return transaction;
}
}
Application
package com.example.canaldemo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
測(cè)試
插入一條交易記錄
curl -X POST http://localhost:8080/api/transactions \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'
更新一條交易記錄
curl -X PUT http://localhost:8080/api/transactions/TX123 \
-H "Content-Type: application/json" \
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'
觀察后臺(tái)日志
================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1 update=true
transaction_id : TX123 update=true
amount : 100.00 update=true
status : PENDING update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1 update=false
transaction_id : TX123 update=false
amount : 100.00 update=false
status : PENDING update=false
-------> after
id : 1 update=false
transaction_id : TX123 update=false
amount : 100.00 update=false
status : COMPLETED update=true
責(zé)任編輯:武曉燕
來(lái)源:
Java知識(shí)日歷