MySQL同步ES的五種方案!
前言
有些小伙伴在工作中可能遇到過數(shù)據(jù)庫查詢慢的問題,特別是模糊查詢和復(fù)雜聚合查詢,這時候引入ES(Elasticsearch)作為搜索引擎是個不錯的選擇。
今天我們來聊聊MySQL同步到ES(Elasticsearch)的5種常見方案。
希望對你會有所幫助。
一、為什么需要MySQL同步到ES?
在我們深入討論方案之前,先明確一下為什么需要將MySQL數(shù)據(jù)同步到ES:
- 全文搜索能力:ES提供強大的全文搜索功能,遠(yuǎn)超MySQL的LIKE查詢。
- 復(fù)雜聚合分析:ES支持復(fù)雜的聚合查詢,適合大數(shù)據(jù)分析。
- 高性能查詢:ES的倒排索引設(shè)計使查詢速度極快。
- 水平擴展:ES天生支持分布式,易于水平擴展。
先來看一下整體的同步架構(gòu)圖:
圖片
接下來,我們詳細(xì)分析每種方案的實現(xiàn)原理和優(yōu)缺點。
二、方案一:雙寫方案
雙寫方案是最直接的同步方式,即在業(yè)務(wù)代碼中同時向MySQL和ES寫入數(shù)據(jù)。
示例代碼:
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Transactional
public void addUser(User user) {
// 寫入MySQL
userMapper.insert(user);
// 寫入Elasticsearch
IndexQuery indexQuery = new IndexQueryBuilder()
.withObject(user)
.withId(user.getId().toString())
.build();
elasticsearchTemplate.index(indexQuery);
}
@Transactional
public void updateUser(User user) {
// 更新MySQL
userMapper.updateById(user);
// 更新Elasticsearch
IndexRequest request = new IndexRequest("user_index")
.id(user.getId().toString())
.source(JSON.toJSONString(user), XContentType.JSON);
elasticsearchTemplate.getClient().index(request, RequestOptions.DEFAULT);
}
}優(yōu)缺點分析
優(yōu)點:
- 實現(xiàn)簡單,不需要引入額外組件
- 實時性高,數(shù)據(jù)立即同步
缺點:
- 數(shù)據(jù)一致性難保證,需要處理分布式事務(wù)問題
- 代碼侵入性強,業(yè)務(wù)邏輯復(fù)雜
- 性能受影響,每次寫操作都要等待ES響應(yīng)
適用場景
適合數(shù)據(jù)量不大,對實時性要求高,且能夠接受一定數(shù)據(jù)不一致的業(yè)務(wù)場景。
三、方案二:定時任務(wù)方案
定時任務(wù)方案通過定期掃描MySQL數(shù)據(jù)變化來同步到ES。
示例代碼:
@Component
public class UserSyncTask {
@Autowired
private UserMapper userMapper;
@Autowired
private UserESRepository userESRepository;
// 每5分鐘執(zhí)行一次
@Scheduled(fixedRate = 5 * 60 * 1000)
public void syncUserToES() {
// 查詢最近更新的數(shù)據(jù)
Date lastSyncTime = getLastSyncTime();
List<User> updatedUsers = userMapper.selectUpdatedAfter(lastSyncTime);
// 同步到ES
for (User user : updatedUsers) {
userESRepository.save(user);
}
// 更新最后同步時間
updateLastSyncTime(new Date());
}
// 獲取最后同步時間
private Date getLastSyncTime() {
// 從數(shù)據(jù)庫或Redis中獲取
// ...
}
}數(shù)據(jù)更新追蹤策略
為了提高同步效率,通常需要設(shè)計良好的數(shù)據(jù)變更追蹤機制:
圖片
優(yōu)缺點分析
優(yōu)點:
- 實現(xiàn)簡單,不需要修改現(xiàn)有業(yè)務(wù)代碼
- 對數(shù)據(jù)庫壓力可控,可以調(diào)整同步頻率
缺點:
- 實時性差,數(shù)據(jù)同步有延遲
- 可能遺漏數(shù)據(jù),如果系統(tǒng)崩潰會丟失部分?jǐn)?shù)據(jù)
- 掃描全表可能對數(shù)據(jù)庫造成壓力
適用場景
適合對實時性要求不高,數(shù)據(jù)變更不頻繁的場景。
四、方案三:Binlog同步方案
Binlog是MySQL的二進制日志,記錄了所有數(shù)據(jù)變更操作。
通過解析Binlog可以實現(xiàn)數(shù)據(jù)同步。
示例代碼:
public class BinlogSyncService {
public void startSync() {
BinaryLogClient client = new BinaryLogClient("localhost", 3306, "username", "password");
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
EventData eventData = event.getData();
if (eventData instanceof WriteRowsEventData) {
// 插入操作
WriteRowsEventData writeData = (WriteRowsEventData) eventData;
processInsertEvent(writeData);
} elseif (eventData instanceof UpdateRowsEventData) {
// 更新操作
UpdateRowsEventData updateData = (UpdateRowsEventData) eventData;
processUpdateEvent(updateData);
} elseif (eventData instanceof DeleteRowsEventData) {
// 刪除操作
DeleteRowsEventData deleteData = (DeleteRowsEventData) eventData;
processDeleteEvent(deleteData);
}
}
});
client.connect();
}
private void processInsertEvent(WriteRowsEventData eventData) {
// 處理插入事件,同步到ES
for (Serializable[] row : eventData.getRows()) {
User user = convertRowToUser(row);
syncToElasticsearch(user, "insert");
}
}
private void syncToElasticsearch(User user, String operation) {
// 同步到ES的實現(xiàn)
// ...
}
}優(yōu)缺點分析
優(yōu)點:
- 實時性高,幾乎實時同步
- 對業(yè)務(wù)代碼無侵入,不需要修改現(xiàn)有代碼
- 性能好,不影響數(shù)據(jù)庫性能
缺點:
- 實現(xiàn)復(fù)雜,需要解析Binlog格式
- 需要考慮Binlog格式變更的兼容性問題
- 主從切換時可能需要重新同步
適用場景
適合對實時性要求高,數(shù)據(jù)量大的場景。
五、方案四:Canal方案
Canal是阿里巴巴開源的MySQL Binlog增量訂閱&消費組件,簡化了Binlog同步的復(fù)雜性。
示例代碼:
# canal.properties 配置
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=username
canal.instance.dbPassword=password
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*public class CanalClientExample {
public static void main(String[] args) {
// 創(chuàng)建Canal連接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
try {
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
if (batchId != -1 && !message.getEntries().isEmpty()) {
processEntries(message.getEntries());
connector.ack(batchId); // 提交確認(rèn)
}
Thread.sleep(1000);
}
} finally {
connector.disconnect();
}
}
private static void processEntries(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
processInsert(rowData);
} elseif (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
processUpdate(rowData);
} elseif (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
processDelete(rowData);
}
}
}
}
}
}架構(gòu)設(shè)計
Calan方案的架構(gòu)如下:
圖片
優(yōu)缺點分析
優(yōu)點:
- 實時性高,延遲低
- 對業(yè)務(wù)系統(tǒng)無侵入
- 阿里巴巴開源項目,社區(qū)活躍
缺點:
- 需要部署維護Canal服務(wù)器
- 需要處理網(wǎng)絡(luò)分區(qū)和故障恢復(fù)
- 可能產(chǎn)生數(shù)據(jù)重復(fù)同步問題
適用場景
適合大數(shù)據(jù)量、高實時性要求的場景,且有專門團隊維護中間件。
六、方案五:MQ異步方案
MQ異步方案通過消息隊列解耦MySQL和ES的同步過程,提高系統(tǒng)的可靠性和擴展性。
示例代碼:
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void addUser(User user) {
// 寫入MySQL
userMapper.insert(user);
// 發(fā)送消息到MQ
rabbitTemplate.convertAndSend("user.exchange", "user.add", user);
}
@Transactional
public void updateUser(User user) {
// 更新MySQL
userMapper.updateById(user);
// 發(fā)送消息到MQ
rabbitTemplate.convertAndSend("user.exchange", "user.update", user);
}
}
@Component
public class UserMQConsumer {
@Autowired
private UserESRepository userESRepository;
@RabbitListener(queues = "user.queue")
public void processUserAdd(User user) {
userESRepository.save(user);
}
@RabbitListener(queues = "user.queue")
public void processUserUpdate(User user) {
userESRepository.save(user);
}
@RabbitListener(queues = "user.queue")
public void processUserDelete(Long userId) {
userESRepository.deleteById(userId);
}
}消息隊列選型對比
不同的消息隊列產(chǎn)品有不同特點,下面是常見MQ的對比:
圖片
優(yōu)缺點分析
優(yōu)點:
- 完全解耦,MySQL和ES同步過程相互獨立
- 高可用,MQ本身提供消息持久化和重試機制
- 可擴展,可以方便地增加消費者處理消息
缺點:
- 系統(tǒng)復(fù)雜度增加,需要維護MQ集群
- 可能產(chǎn)生消息順序問題,需要處理消息順序性
- 數(shù)據(jù)一致性延遲,依賴于消息消費速度
適用場景
適合大型分布式系統(tǒng),對可靠性和擴展性要求高的場景。
七、5種方案對比
為了更直觀地比較這5種方案,我們來看一個綜合對比表格:
方案名稱 | 實時性 | 數(shù)據(jù)一致性 | 系統(tǒng)復(fù)雜度 | 性能影響 | 適用場景 |
雙寫方案 | 高 | 難保證 | 低 | 高 | 小規(guī)模應(yīng)用 |
定時任務(wù) | 低 | 最終一致 | 低 | 中 | 非實時場景 |
Binlog方案 | 高 | 最終一致 | 高 | 低 | 大數(shù)據(jù)量高實時 |
Canal方案 | 高 | 最終一致 | 中 | 低 | 大數(shù)據(jù)量高實時 |
MQ異步方案 | 中 | 最終一致 | 高 | 低 | 分布式大型系統(tǒng) |
選擇建議
有些小伙伴在工作中可能會糾結(jié)選擇哪種方案,這里給出一些建議:
- 初創(chuàng)項目或小規(guī)模系統(tǒng):可以選擇雙寫方案或定時任務(wù)方案,實現(xiàn)簡單。
- 中大型系統(tǒng):建議使用Canal方案或MQ異步方案,保證系統(tǒng)的可靠性和擴展性。
- 大數(shù)據(jù)量高實時要求:Binlog方案或Canal方案是最佳選擇。
- 已有MQ基礎(chǔ)設(shè)施:優(yōu)先考慮MQ異步方案,充分利用現(xiàn)有資源。
注意事項
無論選擇哪種方案,都需要注意以下幾點:
- 冪等性處理:同步過程需要保證冪等性,防止重復(fù)數(shù)據(jù)。
- 監(jiān)控告警:建立完善的監(jiān)控體系,及時發(fā)現(xiàn)同步延遲或失敗。
- 數(shù)據(jù)校驗:定期校驗MySQL和ES中的數(shù)據(jù)一致性。
- 容錯機制:設(shè)計良好的故障恢復(fù)機制,避免數(shù)據(jù)丟失。
總結(jié)
MySQL同步到ES(Elasticsearch)是現(xiàn)代應(yīng)用開發(fā)中常見的需求,選擇合適的同步方案對系統(tǒng)性能和可靠性至關(guān)重要。
本文介紹了5種常見方案,各有優(yōu)缺點,適用于不同場景。
在實際項目中,可能需要根據(jù)具體需求組合使用多種方案,或者對某種方案進行定制化改造。
重要的是要理解每種方案的原理和特點,才能做出合理的技術(shù)選型。

































