一個支持監(jiān)聽SQL、感知事務狀態(tài)、回溯數(shù)據(jù)源的動態(tài)數(shù)據(jù)源框架
項目更名
在easymulti-datasource-spring-boot-starter之后筆者又開發(fā)了hotkit-r2dbc,這兩個項目都支持動態(tài)數(shù)據(jù)源切換,前者支持mybatis框架,后者支持響應式編程spring-data-r2dbc框架,既然都是ORM框架,不如合并到一個項目中維護。
GitHub上原easymulti-datasource-spring-boot-starter項目已更名為easymulti-datasource,而原easymulti-datasource-spring-boot-starter模塊已經(jīng)更名為easymulti-datasource-mybatis,版本號從3.0.1開始。新版本增加了easymulti-datasource-r2dbc(也就是原h(huán)otkit-r2dbc)。
項目背景
多數(shù)據(jù)源動態(tài)切換似乎已經(jīng)成了微服務的標配,做過那么多項目發(fā)現(xiàn)每個項目都要配一個動態(tài)數(shù)據(jù)源,都要寫一個切面去實現(xiàn)動態(tài)切換,因此,我將這些繁瑣的配置封裝為starter,拿來即用。
easymulti-datasource兩個模塊:
- easymulti-datasource-mybatis(原easymulti-datasource-spring-boot-starter)
- easymulti-datasource-r2dbc(原h(huán)otkit-r2dbc)
easymulti-datasource-mybatis
mybatis版多數(shù)據(jù)源框架,提供聲明式和編程式動態(tài)切換數(shù)據(jù)源功能。
easymulti-datasource-mybatis自動整合了mybatis-plus,提供兩種動態(tài)多數(shù)據(jù)源模式,分別是主從數(shù)據(jù)源模式、非主從的多數(shù)據(jù)源模式,每個數(shù)據(jù)源使用獨立的連接池配置,可針對每個數(shù)據(jù)源單獨配置連接池。
支持多數(shù)據(jù)源動態(tài)切換并不是easymulti-datasource-mybatis框架的最大亮點,easymulti-datasource-mybatis區(qū)別于其它動態(tài)數(shù)據(jù)源切換框架的主要特色如下:
支持監(jiān)聽SQL,監(jiān)聽修改某個表的某些字段的sql,用于實現(xiàn)埋點事件;
支持事務狀態(tài)監(jiān)聽、注冊事務監(jiān)聽器,用于在事務回滾/提交時再完成一些后臺操作;
詳細使用可參見wiki。
依賴配置
maven中使用:
- <dependency>
- <groupId>com.github.wujiuye</groupId>
- <artifactId>easymulti-datasource-mybatis</artifactId>
- <version>${version}</version>
- </dependency>
舊版本為:
- <dependency>
- <groupId>com.github.wujiuye</groupId>
- <artifactId>easymulti-datasource-spring-boot-starter</artifactId>
- <version>${version}</version>
- </dependency>
版本選擇注意事項說明如下圖所示。
動態(tài)切換數(shù)據(jù)源
- 使用注解切換數(shù)據(jù)源:@EasyMutiDataSource;
- 使用API切換數(shù)據(jù)源:DataSourceContextHolder#setDataSource。
AOP中注冊事務監(jiān)聽器
在application配置文件中打開追蹤事務方法調(diào)用鏈路的開關,配置如下。
- ## 監(jiān)控事務方法調(diào)用鏈路
- easymuti:
- transaction:
- open-chain: true
定義切面,攔截Mapper方法,在環(huán)繞方法中實現(xiàn)更新緩存的邏輯,代碼如下。
- TransactionInvokeContext.currentExistTransaction:判斷當前調(diào)用鏈路上是否存在事務;
- TransactionInvokeContext.addCurrentTransactionMethodPopListener:給當前事務綁定一個監(jiān)聽器(PopTransactionListener),當事務提交或者回滾時監(jiān)聽器被調(diào)用。
如上代碼所示,首先是判斷當前調(diào)用鏈路上是否存在事務,如果存在,則給當前事務注入一個監(jiān)聽器,由監(jiān)聽器完成緩存更新邏輯,如果不存在事務,在目標方法執(zhí)行完成后且無異常拋出時執(zhí)行更新緩存邏輯。
監(jiān)聽SQL
easymulti-datasource-mybatis支持sql埋點監(jiān)聽功能,并且支持監(jiān)聽事務狀態(tài),如果當前sql執(zhí)行存在事務中,則會在事務提交后才會回調(diào)sql監(jiān)聽者。
第一步:啟用sql埋點監(jiān)聽功能,并且啟用事務調(diào)用鏈路追蹤功能。
- easymuti:
- transaction:
- open-chain: true
- sql-watcher:
- enable: true
第二步:編寫觀察者,可以有n多個,并且多個觀察者也可觀察同一個表、甚至相同字段。
- @Component
- @Slf4j
- public class TestTableFieldObserver implements TableFieldObserver , InitializingBean {
- @Override
- public Set<WatchMetadata> observeMetadatas() {
- // 在這里注冊要監(jiān)聽哪些表的哪些字段
- }
- /**
- * 監(jiān)聽到sql時被同步調(diào)用
- *
- * @param commandType 事件類型
- * @param matchResult 匹配的ITEM
- * @return 返回異步消費者
- */
- @Override
- public AsyncConsumer observe(CommandType commandType, MatchItem matchResult) {
- // 同步消費
- // 這里是sql執(zhí)行之前,可在sql執(zhí)行之前做一些事情,比如新舊數(shù)據(jù)的對比,這里查出舊數(shù)據(jù)
- // 異步消費,再sql執(zhí)行完成時,或者在事務方法執(zhí)行完成時(如果存在事務),完成指:正常執(zhí)行完成 or 方法異常退出
- return throwable -> {
- // sql執(zhí)行拋出異常不處理
- if (throwable != null) {
- return;
- }
- // 消費事件
- // ....
- };
- }
- }
observe方法在監(jiān)聽到sql時被同步調(diào)用,該方法返回的AsyncConsumer則在事務提交后被回調(diào)調(diào)用,如果事務回滾了則不會被調(diào)用。
如果調(diào)用鏈路上出現(xiàn)多個事務,那么根據(jù)事務的傳播機制,只在當前方法所在事務提交時才會回調(diào)注冊在該事務上的所有AsyncConsumer。
easymulti-datasource-r2dbc
spring-data-r2dbc版多數(shù)據(jù)源組件,用于響應式編程。
easymulti-datasource-r2dbc為spring-data-r2dbc實現(xiàn)動態(tài)路由接口,為反應式編程提供聲明式和編程式多數(shù)據(jù)源動態(tài)切換提供支持。同樣支持兩種多數(shù)據(jù)源模式,覆蓋常見的多數(shù)據(jù)源使用場景,分別是主從模式和Cluster模式,Cluster模式支持最多配置3個數(shù)據(jù)源,而主從模式支持一主一從。
添加依賴與配置數(shù)據(jù)源
使用easymulti-datasource-r2dbc后,無需再在項目中添加spring-boot-starter-data-r2dbc的依賴,也不需要添加spring-data-r2dbc的依賴。
easymulti-datasource-r2dbc版本號對應spring-data-r2dbc的版本號:
easymulti-datasource-r2dbc | spring-data-r2dbc |
---|---|
3.0.1-RELEASE | 1.1.0.RELEASE |
在項目中添加easymulti-datasource-r2dbc的依賴,如下。
- <dependency>
- <groupId>com.github.wujiuye</groupId>
- <artifactId>easymulti-datasource-r2dbc</artifactId>
- <version>${version}</version>
- </dependency>
此時,只需要額外添加用到的數(shù)據(jù)庫類型對應的驅(qū)動依賴即可,例如,添加mysql的r2dbc驅(qū)動。
- <dependency>
- <groupId>dev.miku</groupId>
- <artifactId>r2dbc-mysql</artifactId>
- <version>0.8.2.RELEASE</version>
- </dependency>
如果使用主從模式,則使用如下配置。
- easymuti:
- database:
- r2dbc:
- master-slave-mode:
- master:
- url: r2dbc:mysql://127.0.0.1:3306/r2dbc_stu
- username: root
- password:
- pool:
- max-size: 5
- idel-timeout: 60
- slave:
- url: r2dbc:mysql://127.0.0.1:3306/r2dbc_stu
- username: root
- password:
- pool:
- max-size: 5
- idel-timeout: 60
master會被設置為默認使用的數(shù)據(jù)源,slave有則配置,沒有也可以為空。雖然slave允許為空,但如果真的不需要多數(shù)據(jù)源,也是沒有必要使用easymulti-datasource-r2dbc的。
如果使用Cluster模式,則使用如下配置。
- easymuti:
- database:
- r2dbc:
- cluster-mode:
- first:
- url: r2dbc:mysql://127.0.0.1:3306/r2dbc_stu
- username: root
- password:
- pool:
- max-size: 5
- idel-timeout: 60
- second:
- url: r2dbc:mysql://127.0.0.1:3306/r2dbc_stu
- username: root
- password:
- pool:
- max-size: 5
- idel-timeout: 60
- third:
- url: r2dbc:mysql://127.0.0.1:3306/r2dbc_stu
- username: root
- password:
- pool:
- max-size: 5
- idel-timeout: 60
其中first會被設置為默認使用的數(shù)據(jù)源,second與third可以為空。
聲明式動態(tài)切換數(shù)據(jù)源
聲明式動態(tài)切換數(shù)據(jù)源即使用注解方式動態(tài)切換數(shù)據(jù)源,只需要在spring bean的public方法或者類上添加@R2dbcDataBase注解,將注解的value屬性指定為使用的數(shù)據(jù)源。
示例代碼如下。
- @Service
- public class PersonService {
- @Resource
- private PersonRepository personRepository;
- // 方法返回值類型為Mono測試
- @R2dbcDataBase(MasterSlaveMode.Master)
- @Transactional(rollbackFor = Throwable.class)
- public Mono<Integer> addPerson(Person... persons) {
- Mono<Integer> txOp = null;
- for (Person person : persons) {
- if (txOp == null) {
- txOp = personRepository.insertPerson(person.getId(), person.getName(), person.getAge());
- } else {
- txOp = txOp.then(personRepository.insertPerson(person.getId(), person.getName(), person.getAge()));
- }
- }
- return txOp;
- }
- // 方法返回值類型為Flux測試
- @R2dbcDataBase(MasterSlaveMode.Master)
- @Transactional(rollbackFor = Throwable.class)
- public Flux<Integer> addPersons(Flux<Person> persons) {
- return persons.flatMap(person -> personRepository.insertPerson(person.getId(), person.getName(), person.getAge()));
- }
- }
- 如果是主從模式,@R2dbcDataBase注解的value屬性可選值參見MasterSlaveMode接口聲明的常量;
- 如果是Cluster模式,@R2dbcDataBase注解的value屬性可選值參見ClusterMode接口聲明的常量;
編程式動態(tài)切換數(shù)據(jù)源
聲明式切換數(shù)據(jù)源的實現(xiàn)是依賴編程式切換數(shù)據(jù)源實現(xiàn)的,因此,我們也可以直接編寫代碼切換數(shù)據(jù)源,而不需要將方法改為public暴露出去。
只需要調(diào)用EasyMutiR2dbcRoutingConnectionFactory提供的靜態(tài)方法putDataSource為Context寫入使用的數(shù)據(jù)源,代碼如下。
- public class RoutingTest extends SupporSpringBootTest {
- @Resource
- private DatabaseClient client;
- @Resource
- private ReactiveTransactionManager reactiveTransactionManager;
- @Test
- public void test() throws InterruptedException {
- TransactionalOperator operator = TransactionalOperator.create(reactiveTransactionManager);
- Mono<Void> atomicOperation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
- .bind("id", "joe")
- .bind("name", "Joe")
- .bind("age", 34)
- .fetch().rowsUpdated()
- .then(client.execute("INSERT INTO person (id, name) VALUES(:id, :name)")
- .bind("id", "joe")
- .bind("name", "Joe")
- .fetch().rowsUpdated())
- .then();
- // 包裝事務
- Mono<Void> txOperation = operator.transactional(atomicOperation);
- // 包裝切換數(shù)據(jù)源
- EasyMutiR2dbcRoutingConnectionFactory.putDataSource(txOperation, MasterSlaveMode.Slave).subscribe();
- TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
- }
- }
需要注意,如果需要使用事務,必須先調(diào)用TransactionalOperator對象的transactional方法,再調(diào)用EasyMutiR2dbcRoutingConnectionFactory的putDataSource方法。
本文轉(zhuǎn)載自微信公眾號「Java藝術」,可以通過以下二維碼關注。轉(zhuǎn)載本文請聯(lián)系Java藝術公眾號。