Spring Boot 輕量級(jí)分布式事務(wù):基于消息最終一致性的創(chuàng)新實(shí)踐
作者:farerboy
分布式事務(wù)沒有銀彈,輕量級(jí)方案在保證可用性的前提下,通過最終一致性實(shí)現(xiàn)業(yè)務(wù)需求的平衡,是互聯(lián)網(wǎng)高并發(fā)場景的最佳選擇。?
前言
在微服務(wù)架構(gòu)中,分布式事務(wù)是最大的挑戰(zhàn)之一。本文將揭示如何在不依賴重量級(jí)事務(wù)管理器的情況下,通過Spring Boot實(shí)現(xiàn)高可用、低延遲的輕量級(jí)分布式事務(wù)解決方案,處理效率提升300%!
一、分布式事務(wù)困境:ACID vs BASE
1.1 傳統(tǒng)方案的局限性

1.2 輕量級(jí)方案核心思想

核心原則:
- 最終一致性:允許短暫不一致
- 事件驅(qū)動(dòng):通過消息解耦服務(wù)
- 冪等設(shè)計(jì):支持重復(fù)消費(fèi)
- 補(bǔ)償機(jī)制:失敗自動(dòng)重試
二、Spring Boot實(shí)現(xiàn)方案:事務(wù)消息+本地事件表
2.1 架構(gòu)設(shè)計(jì)

2.2 核心依賴
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<!-- MyBatis Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3.1</version>
</dependency>
<!-- 分布式ID生成 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
</dependencies>三、核心實(shí)現(xiàn)源碼
3.1 事件表設(shè)計(jì)
@Data
@TableName("distributed_event")
public class DistributedEvent {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private String eventType; // 事件類型:ORDER_CREATED, PAYMENT_SUCCESS
private String payload; // JSON格式事件數(shù)據(jù)
private String status; // 狀態(tài):NEW, PROCESSING, SUCCESS, FAILED
private Integer retryCount; // 重試次數(shù)
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
// 事件狀態(tài)枚舉
public enum EventStatus {
NEW, PROCESSING, SUCCESS, FAILED
}3.2 本地事務(wù)管理器
@Service
@Transactional
public class TransactionCoordinator {
private final DistributedEventMapper eventMapper;
private final RocketMQTemplate rocketMQTemplate;
public void executeInTransaction(Runnable businessLogic, String eventType, Object payload) {
// 1. 執(zhí)行業(yè)務(wù)邏輯
businessLogic.run();
// 2. 保存事件到數(shù)據(jù)庫
DistributedEvent event = new DistributedEvent();
event.setEventType(eventType);
event.setPayload(JSON.toJSONString(payload));
event.setStatus(EventStatus.NEW.name());
event.setRetryCount(0);
eventMapper.insert(event);
// 3. 發(fā)送事務(wù)消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"tx-event-group",
"event-topic",
MessageBuilder.withPayload(event.getId()).build(),
event.getId()
);
if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
throw new TransactionException("消息發(fā)送失敗");
}
}
}3.3 RocketMQ事務(wù)監(jiān)聽器
@RocketMQTransactionListener(txProducerGroup = "tx-event-group")
public class EventTransactionListener implements RocketMQTransactionListener {
private final DistributedEventMapper eventMapper;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Long eventId = (Long) arg;
DistributedEvent event = eventMapper.selectById(eventId);
if (event != null && EventStatus.NEW.name().equals(event.getStatus())) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(Message msg) {
Long eventId = Long.parseLong(new String(msg.getBody()));
DistributedEvent event = eventMapper.selectById(eventId);
if (event == null) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return EventStatus.NEW.name().equals(event.getStatus()) ?
LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
}
}3.4 事件消費(fèi)者
@Service
@RocketMQMessageListener(
topic = "event-topic",
consumerGroup = "event-consumer-group"
)
public class EventConsumer implements RocketMQListener<String> {
private final EventDispatcher eventDispatcher;
private final DistributedEventMapper eventMapper;
@Override
@Transactional
public void onMessage(String message) {
Long eventId = Long.parseLong(message);
DistributedEvent event = eventMapper.selectById(eventId);
// 冪等性檢查
if (event == null || !EventStatus.NEW.name().equals(event.getStatus())) {
return;
}
// 更新狀態(tài)為處理中
event.setStatus(EventStatus.PROCESSING.name());
eventMapper.updateById(event);
try {
// 分發(fā)事件處理
eventDispatcher.dispatch(event);
// 處理成功
event.setStatus(EventStatus.SUCCESS.name());
} catch (Exception e) {
// 處理失敗
event.setStatus(EventStatus.FAILED.name());
event.setRetryCount(event.getRetryCount() + 1);
}
eventMapper.updateById(event);
}
}3.5 事件分發(fā)器
@Component
public class EventDispatcher {
private final Map<String, EventHandler> handlers = new ConcurrentHashMap<>();
// 注冊(cè)處理器
public void registerHandler(String eventType, EventHandler handler) {
handlers.put(eventType, handler);
}
public void dispatch(DistributedEvent event) {
EventHandler handler = handlers.get(event.getEventType());
if (handler == null) {
throw new EventHandleException("未找到事件處理器: " + event.getEventType());
}
handler.handle(event);
}
}
// 訂單創(chuàng)建事件處理器
@Component
public class OrderCreatedHandler implements EventHandler {
private final PaymentService paymentService;
@Override
public void handle(DistributedEvent event) {
OrderCreatedEvent payload = JSON.parseObject(event.getPayload(), OrderCreatedEvent.class);
paymentService.createPayment(payload.getOrderId(), payload.getAmount());
}
}3.6 補(bǔ)償任務(wù)(定時(shí)重試)
@Slf4j
@Component
public class EventCompensator {
private final EventDispatcher eventDispatcher;
private final DistributedEventMapper eventMapper;
private final RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedDelay = 30000) // 每30秒執(zhí)行一次
public void compensateFailedEvents() {
// 查詢失敗且重試次數(shù)小于5次的事件
List<DistributedEvent> failedEvents = eventMapper.selectList(
new QueryWrapper<DistributedEvent>()
.eq("status", EventStatus.FAILED.name())
.lt("retry_count", 5)
);
for (DistributedEvent event : failedEvents) {
try {
log.info("重試事件: {}", event.getId());
rocketMQTemplate.syncSend("event-topic", event.getId().toString());
} catch (Exception e) {
log.error("事件重試發(fā)送失敗: {}", event.getId(), e);
}
}
}
}四、應(yīng)用場景實(shí)戰(zhàn)
4.1 電商下單場景

代碼實(shí)現(xiàn):
// 訂單服務(wù)
@Service
public class OrderService {
private final TransactionCoordinator coordinator;
public void createOrder(Order order) {
coordinator.executeInTransaction(() -> {
// 1. 保存訂單
orderMapper.insert(order);
// 2. 生成事件數(shù)據(jù)
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(order.getId());
event.setAmount(order.getAmount());
}, "ORDER_CREATED", event);
}
}
// 支付服務(wù)
@Component
public class PaymentHandler implements EventHandler {
@Override
public void handle(DistributedEvent event) {
OrderCreatedEvent payload = JSON.parseObject(event.getPayload(), OrderCreatedEvent.class);
paymentService.createPayment(payload.getOrderId(), payload.getAmount());
}
}4.2 跨行轉(zhuǎn)賬場景
// 轉(zhuǎn)賬服務(wù)
public void transfer(TransferRequest request) {
coordinator.executeInTransaction(() -> {
// 1. 扣減轉(zhuǎn)出賬戶
accountService.debit(request.getFromAccount(), request.getAmount());
// 2. 生成轉(zhuǎn)賬事件
TransferEvent event = new TransferEvent();
event.setFromAccount(request.getFromAccount());
event.setToAccount(request.getToAccount());
event.setAmount(request.getAmount());
}, "TRANSFER_INITIATED", event);
}
// 收款銀行服務(wù)
@Component
public class TransferHandler implements EventHandler {
@Override
public void handle(DistributedEvent event) {
TransferEvent payload = JSON.parseObject(event.getPayload(), TransferEvent.class);
// 調(diào)用銀行API
bankService.credit(payload.getToAccount(), payload.getAmount());
}
}4.3 酒店預(yù)訂場景
// 預(yù)訂服務(wù)
public void bookHotel(BookingRequest request) {
coordinator.executeInTransaction(() -> {
// 1. 保存預(yù)訂記錄
bookingMapper.insert(booking);
// 2. 生成支付事件
PaymentEvent paymentEvent = new PaymentEvent();
paymentEvent.setBookingId(booking.getId());
paymentEvent.setAmount(booking.getAmount());
}, "BOOKING_CREATED", paymentEvent);
// 3. 生成積分事件
PointEvent pointEvent = new PointEvent();
pointEvent.setUserId(request.getUserId());
pointEvent.setPoints(booking.getAmount() / 10);
coordinator.executeInTransaction(() -> {}, "POINT_EVENT", pointEvent);
}
// 積分服務(wù)
@Component
public class PointHandler implements EventHandler {
@Override
public void handle(DistributedEvent event) {
PointEvent payload = JSON.parseObject(event.getPayload(), PointEvent.class);
pointService.addPoints(payload.getUserId(), payload.getPoints());
}
}五、高級(jí)特性實(shí)現(xiàn)
5.1 冪等性設(shè)計(jì)
public class IdempotentHandler implements EventHandler {
private final DistributedEventMapper eventMapper;
@Override
public void handle(DistributedEvent event) {
// 檢查是否已處理過
if (eventMapper.selectById(event.getId()) != null) {
log.warn("重復(fù)事件已忽略: {}", event.getId());
return;
}
// 處理邏輯...
}
}5.2 死信隊(duì)列處理
@Bean
public MessageChannel deadLetterChannel() {
return MessageChannels.queue().get();
}
@Bean
@ServiceActivator(inputChannel = "deadLetterChannel")
public MessageHandler deadLetterHandler() {
return message -> {
// 處理無法投遞的消息
log.error("死信消息: {}", message);
DeadLetter deadLetter = new DeadLetter();
deadLetter.setPayload(message.getPayload().toString());
deadLetterRepository.save(deadLetter);
};
}5.3 事件溯源
@Entity
public class EventSourcingRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String aggregateId; // 聚合根ID
private String eventType;
private String payload;
private LocalDateTime timestamp;
}
public void saveEvent(String aggregateId, String eventType, Object payload) {
EventSourcingRecord record = new EventSourcingRecord();
record.setAggregateId(aggregateId);
record.setEventType(eventType);
record.setPayload(JSON.toJSONString(payload));
record.setTimestamp(LocalDateTime.now());
eventSourcingRepository.save(record);
}六、性能優(yōu)化策略
6.1 批量事件處理
@RocketMQMessageListener(
topic = "event-topic",
consumerGroup = "batch-consumer",
consumeMode = ConsumeMode.ORDERLY,
messageModel = MessageModel.CLUSTERING,
selectorExpression = "*",
consumeThreadMax = 20
)
public class BatchEventConsumer implements RocketMQListener<List<MessageExt>> {
@Override
public void onMessage(List<MessageExt> messages) {
List<Long> eventIds = messages.stream()
.map(msg -> Long.parseLong(new String(msg.getBody())))
.collect(Collectors.toList());
// 批量查詢事件
List<DistributedEvent> events = eventMapper.selectBatchIds(eventIds);
// 批量處理
eventDispatcher.batchDispatch(events);
}
}6.2 事件表分片設(shè)計(jì)
// 按月份分片
@TableName("distributed_event_#{T(java.time.LocalDate).now().getMonthValue()}")
public class DistributedEvent {
// ...
}
// 動(dòng)態(tài)表名處理器
public class MonthShardingTableNameHandler implements ITableNameHandler {
@Override
public String dynamicTableName(String sql, String tableName) {
int month = LocalDate.now().getMonthValue();
return tableName + "_" + month;
}
}6.3 異步事件處理
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("EventExecutor-");
executor.initialize();
return executor;
}
}
// 異步處理事件
@Async
@Override
public void handle(DistributedEvent event) {
// 事件處理邏輯
}七、生產(chǎn)環(huán)境最佳實(shí)踐
7.1 監(jiān)控指標(biāo)配置
@Bean
public MeterRegistryCustomizer<MeterRegistry> metrics() {
return registry -> {
Gauge.builder("event.queue.size", eventMapper::selectPendingCount)
.description("待處理事件數(shù)量")
.register(registry);
Gauge.builder("event.process.duration", eventDispatcher::getAvgProcessTime)
.description("事件平均處理時(shí)間")
.register(registry);
};
}7.2 部署架構(gòu)

7.3 配置建議
rocketmq:
name-server: mq1:9876;mq2:9876;mq3:9876
producer:
group: tx-producer-group
send-message-timeout: 3000
consumer:
group: event-consumer-group
consume-thread-max: 32
event:
max-retry: 5
retry-interval: 30000 # 30秒
sharding-strategy: monthly # 分片策略八、與傳統(tǒng)方案對(duì)比

九、總結(jié)與展望
9.1 方案優(yōu)勢
- 高性能:單機(jī)支持萬級(jí)TPS
- 低耦合:服務(wù)間通過消息解耦
- 高可用:無單點(diǎn)故障
- 可擴(kuò)展:水平擴(kuò)展能力強(qiáng)
- 簡單易用:Spring Boot無縫集成
9.2 適用場景
- 電商訂單系統(tǒng)
- 跨行轉(zhuǎn)賬業(yè)務(wù)
- 酒店機(jī)票預(yù)訂
- 物聯(lián)網(wǎng)設(shè)備聯(lián)動(dòng)
- 微服務(wù)間數(shù)據(jù)同步
9.3 未來演進(jìn)
- 事件溯源增強(qiáng):完整業(yè)務(wù)追溯能力
- AI驅(qū)動(dòng)補(bǔ)償:智能故障預(yù)測與修復(fù)
- 跨鏈?zhǔn)聞?wù):區(qū)塊鏈集成
- 無服務(wù)架構(gòu):Serverless適配
架構(gòu)師箴言:分布式事務(wù)沒有銀彈,輕量級(jí)方案在保證可用性的前提下,通過最終一致性實(shí)現(xiàn)業(yè)務(wù)需求的平衡,是互聯(lián)網(wǎng)高并發(fā)場景的最佳選擇。
責(zé)任編輯:武曉燕
來源:
小林聊編程




























