偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Spring Boot 輕量級(jí)分布式事務(wù):基于消息最終一致性的創(chuàng)新實(shí)踐

云計(jì)算 分布式
分布式事務(wù)沒有銀彈,輕量級(jí)方案在保證可用性的前提下,通過最終一致性實(shí)現(xiàn)業(yè)務(wù)需求的平衡,是互聯(lián)網(wǎng)高并發(fā)場景的最佳選擇。?


前言

在微服務(wù)架構(gòu)中,分布式事務(wù)是最大的挑戰(zhàn)之一。本文將揭示如何在不依賴重量級(jí)事務(wù)管理器的情況下,通過Spring Boot實(shí)現(xiàn)高可用、低延遲的輕量級(jí)分布式事務(wù)解決方案,處理效率提升300%!

一、分布式事務(wù)困境:ACID vs BASE

1.1 傳統(tǒng)方案的局限性

1.2 輕量級(jí)方案核心思想

核心原則:

  • 最終一致性:允許短暫不一致
  • 事件驅(qū)動(dòng):通過消息解耦服務(wù)
  • 冪等設(shè)計(jì):支持重復(fù)消費(fèi)
  • 補(bǔ)償機(jī)制:失敗自動(dòng)重試

二、Spring Boot實(shí)現(xiàn)方案:事務(wù)消息+本地事件表

2.1 架構(gòu)設(shè)計(jì)

2.2 核心依賴

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>


    <!-- RocketMQ -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>


    <!-- MyBatis Plus -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.5.3.1</version>
    </dependency>


    <!-- 分布式ID生成 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.16</version>
    </dependency>
</dependencies>

三、核心實(shí)現(xiàn)源碼

3.1 事件表設(shè)計(jì)

@Data
@TableName("distributed_event")
public class DistributedEvent {
    @TableId(type = IdType.ASSIGN_ID)
    private Long id;
    private String eventType;   // 事件類型:ORDER_CREATED, PAYMENT_SUCCESS
    private String payload;     // JSON格式事件數(shù)據(jù)
    private String status;      // 狀態(tài):NEW, PROCESSING, SUCCESS, FAILED
    private Integer retryCount; // 重試次數(shù)
    private LocalDateTime createTime;
    private LocalDateTime updateTime;
}


// 事件狀態(tài)枚舉
public enum EventStatus {
    NEW, PROCESSING, SUCCESS, FAILED
}

3.2 本地事務(wù)管理器

@Service
@Transactional
public class TransactionCoordinator {


    private final DistributedEventMapper eventMapper;
    private final RocketMQTemplate rocketMQTemplate;


    public void executeInTransaction(Runnable businessLogic, String eventType, Object payload) {
        // 1. 執(zhí)行業(yè)務(wù)邏輯
        businessLogic.run();


        // 2. 保存事件到數(shù)據(jù)庫
        DistributedEvent event = new DistributedEvent();
        event.setEventType(eventType);
        event.setPayload(JSON.toJSONString(payload));
        event.setStatus(EventStatus.NEW.name());
        event.setRetryCount(0);
        eventMapper.insert(event);


        // 3. 發(fā)送事務(wù)消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "tx-event-group",
            "event-topic",
            MessageBuilder.withPayload(event.getId()).build(),
            event.getId()
        );


        if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
            throw new TransactionException("消息發(fā)送失敗");
        }
    }
}

3.3 RocketMQ事務(wù)監(jiān)聽器

@RocketMQTransactionListener(txProducerGroup = "tx-event-group")
public class EventTransactionListener implements RocketMQTransactionListener {


    private final DistributedEventMapper eventMapper;


    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Long eventId = (Long) arg;
        DistributedEvent event = eventMapper.selectById(eventId);


        if (event != null && EventStatus.NEW.name().equals(event.getStatus())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }


    @Override
    public LocalTransactionState checkLocalTransaction(Message msg) {
        Long eventId = Long.parseLong(new String(msg.getBody()));
        DistributedEvent event = eventMapper.selectById(eventId);


        if (event == null) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }


        return EventStatus.NEW.name().equals(event.getStatus()) ? 
            LocalTransactionState.COMMIT_MESSAGE : 
            LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

3.4 事件消費(fèi)者

@Service
@RocketMQMessageListener(
    topic = "event-topic",
    consumerGroup = "event-consumer-group"
)
public class EventConsumer implements RocketMQListener<String> {


    private final EventDispatcher eventDispatcher;
    private final DistributedEventMapper eventMapper;


    @Override
    @Transactional
    public void onMessage(String message) {
        Long eventId = Long.parseLong(message);
        DistributedEvent event = eventMapper.selectById(eventId);


        // 冪等性檢查
        if (event == null || !EventStatus.NEW.name().equals(event.getStatus())) {
            return;
        }


        // 更新狀態(tài)為處理中
        event.setStatus(EventStatus.PROCESSING.name());
        eventMapper.updateById(event);


        try {
            // 分發(fā)事件處理
            eventDispatcher.dispatch(event);


            // 處理成功
            event.setStatus(EventStatus.SUCCESS.name());
        } catch (Exception e) {
            // 處理失敗
            event.setStatus(EventStatus.FAILED.name());
            event.setRetryCount(event.getRetryCount() + 1);
        }


        eventMapper.updateById(event);
    }
}

3.5 事件分發(fā)器

@Component
public class EventDispatcher {


    private final Map<String, EventHandler> handlers = new ConcurrentHashMap<>();


    // 注冊(cè)處理器
    public void registerHandler(String eventType, EventHandler handler) {
        handlers.put(eventType, handler);
    }


    public void dispatch(DistributedEvent event) {
        EventHandler handler = handlers.get(event.getEventType());
        if (handler == null) {
            throw new EventHandleException("未找到事件處理器: " + event.getEventType());
        }


        handler.handle(event);
    }
}


// 訂單創(chuàng)建事件處理器
@Component
public class OrderCreatedHandler implements EventHandler {


    private final PaymentService paymentService;


    @Override
    public void handle(DistributedEvent event) {
        OrderCreatedEvent payload = JSON.parseObject(event.getPayload(), OrderCreatedEvent.class);
        paymentService.createPayment(payload.getOrderId(), payload.getAmount());
    }
}

3.6 補(bǔ)償任務(wù)(定時(shí)重試)

@Slf4j
@Component
public class EventCompensator {


    private final EventDispatcher eventDispatcher;
    private final DistributedEventMapper eventMapper;
    private final RocketMQTemplate rocketMQTemplate;


    @Scheduled(fixedDelay = 30000) // 每30秒執(zhí)行一次
    public void compensateFailedEvents() {
        // 查詢失敗且重試次數(shù)小于5次的事件
        List<DistributedEvent> failedEvents = eventMapper.selectList(
            new QueryWrapper<DistributedEvent>()
                .eq("status", EventStatus.FAILED.name())
                .lt("retry_count", 5)
        );


        for (DistributedEvent event : failedEvents) {
            try {
                log.info("重試事件: {}", event.getId());
                rocketMQTemplate.syncSend("event-topic", event.getId().toString());
            } catch (Exception e) {
                log.error("事件重試發(fā)送失敗: {}", event.getId(), e);
            }
        }
    }
}

四、應(yīng)用場景實(shí)戰(zhàn)

4.1 電商下單場景

代碼實(shí)現(xiàn):

// 訂單服務(wù)
@Service
public class OrderService {


    private final TransactionCoordinator coordinator;


    public void createOrder(Order order) {
        coordinator.executeInTransaction(() -> {
            // 1. 保存訂單
            orderMapper.insert(order);


            // 2. 生成事件數(shù)據(jù)
            OrderCreatedEvent event = new OrderCreatedEvent();
            event.setOrderId(order.getId());
            event.setAmount(order.getAmount());


        }, "ORDER_CREATED", event);
    }
}


// 支付服務(wù)
@Component
public class PaymentHandler implements EventHandler {


    @Override
    public void handle(DistributedEvent event) {
        OrderCreatedEvent payload = JSON.parseObject(event.getPayload(), OrderCreatedEvent.class);
        paymentService.createPayment(payload.getOrderId(), payload.getAmount());
    }
}

4.2 跨行轉(zhuǎn)賬場景

// 轉(zhuǎn)賬服務(wù)
public void transfer(TransferRequest request) {
    coordinator.executeInTransaction(() -> {
        // 1. 扣減轉(zhuǎn)出賬戶
        accountService.debit(request.getFromAccount(), request.getAmount());


        // 2. 生成轉(zhuǎn)賬事件
        TransferEvent event = new TransferEvent();
        event.setFromAccount(request.getFromAccount());
        event.setToAccount(request.getToAccount());
        event.setAmount(request.getAmount());


    }, "TRANSFER_INITIATED", event);
}


// 收款銀行服務(wù)
@Component
public class TransferHandler implements EventHandler {


    @Override
    public void handle(DistributedEvent event) {
        TransferEvent payload = JSON.parseObject(event.getPayload(), TransferEvent.class);


        // 調(diào)用銀行API
        bankService.credit(payload.getToAccount(), payload.getAmount());
    }
}

4.3 酒店預(yù)訂場景

// 預(yù)訂服務(wù)
public void bookHotel(BookingRequest request) {
    coordinator.executeInTransaction(() -> {
        // 1. 保存預(yù)訂記錄
        bookingMapper.insert(booking);


        // 2. 生成支付事件
        PaymentEvent paymentEvent = new PaymentEvent();
        paymentEvent.setBookingId(booking.getId());
        paymentEvent.setAmount(booking.getAmount());


    }, "BOOKING_CREATED", paymentEvent);


    // 3. 生成積分事件
    PointEvent pointEvent = new PointEvent();
    pointEvent.setUserId(request.getUserId());
    pointEvent.setPoints(booking.getAmount() / 10);
    coordinator.executeInTransaction(() -> {}, "POINT_EVENT", pointEvent);
}


// 積分服務(wù)
@Component
public class PointHandler implements EventHandler {


    @Override
    public void handle(DistributedEvent event) {
        PointEvent payload = JSON.parseObject(event.getPayload(), PointEvent.class);
        pointService.addPoints(payload.getUserId(), payload.getPoints());
    }
}

五、高級(jí)特性實(shí)現(xiàn)

5.1 冪等性設(shè)計(jì)

public class IdempotentHandler implements EventHandler {


    private final DistributedEventMapper eventMapper;


    @Override
    public void handle(DistributedEvent event) {
        // 檢查是否已處理過
        if (eventMapper.selectById(event.getId()) != null) {
            log.warn("重復(fù)事件已忽略: {}", event.getId());
            return;
        }


        // 處理邏輯...
    }
}

5.2 死信隊(duì)列處理

@Bean
public MessageChannel deadLetterChannel() {
    return MessageChannels.queue().get();
}


@Bean
@ServiceActivator(inputChannel = "deadLetterChannel")
public MessageHandler deadLetterHandler() {
    return message -> {
        // 處理無法投遞的消息
        log.error("死信消息: {}", message);
        DeadLetter deadLetter = new DeadLetter();
        deadLetter.setPayload(message.getPayload().toString());
        deadLetterRepository.save(deadLetter);
    };
}

5.3 事件溯源

@Entity
public class EventSourcingRecord {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String aggregateId; // 聚合根ID
    private String eventType;
    private String payload;
    private LocalDateTime timestamp;
}


public void saveEvent(String aggregateId, String eventType, Object payload) {
    EventSourcingRecord record = new EventSourcingRecord();
    record.setAggregateId(aggregateId);
    record.setEventType(eventType);
    record.setPayload(JSON.toJSONString(payload));
    record.setTimestamp(LocalDateTime.now());
    eventSourcingRepository.save(record);
}

六、性能優(yōu)化策略

6.1 批量事件處理

@RocketMQMessageListener(
    topic = "event-topic",
    consumerGroup = "batch-consumer",
    consumeMode = ConsumeMode.ORDERLY,
    messageModel = MessageModel.CLUSTERING,
    selectorExpression = "*",
    consumeThreadMax = 20
)
public class BatchEventConsumer implements RocketMQListener<List<MessageExt>> {


    @Override
    public void onMessage(List<MessageExt> messages) {
        List<Long> eventIds = messages.stream()
            .map(msg -> Long.parseLong(new String(msg.getBody())))
            .collect(Collectors.toList());


        // 批量查詢事件
        List<DistributedEvent> events = eventMapper.selectBatchIds(eventIds);


        // 批量處理
        eventDispatcher.batchDispatch(events);
    }
}

6.2 事件表分片設(shè)計(jì)

// 按月份分片
@TableName("distributed_event_#{T(java.time.LocalDate).now().getMonthValue()}")
public class DistributedEvent {
    // ...
}


// 動(dòng)態(tài)表名處理器
public class MonthShardingTableNameHandler implements ITableNameHandler {


    @Override
    public String dynamicTableName(String sql, String tableName) {
        int month = LocalDate.now().getMonthValue();
        return tableName + "_" + month;
    }
}

6.3 異步事件處理

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {


    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("EventExecutor-");
        executor.initialize();
        return executor;
    }
}


// 異步處理事件
@Async
@Override
public void handle(DistributedEvent event) {
    // 事件處理邏輯
}

七、生產(chǎn)環(huán)境最佳實(shí)踐

7.1 監(jiān)控指標(biāo)配置

@Bean
public MeterRegistryCustomizer<MeterRegistry> metrics() {
    return registry -> {
        Gauge.builder("event.queue.size", eventMapper::selectPendingCount)
            .description("待處理事件數(shù)量")
            .register(registry);


        Gauge.builder("event.process.duration", eventDispatcher::getAvgProcessTime)
            .description("事件平均處理時(shí)間")
            .register(registry);
    };
}


7.2 部署架構(gòu)

7.3 配置建議

rocketmq:
  name-server: mq1:9876;mq2:9876;mq3:9876
  producer:
    group: tx-producer-group
    send-message-timeout: 3000
  consumer:
    group: event-consumer-group
    consume-thread-max: 32


event:
  max-retry: 5
  retry-interval: 30000 # 30秒
  sharding-strategy: monthly # 分片策略

八、與傳統(tǒng)方案對(duì)比

九、總結(jié)與展望

9.1 方案優(yōu)勢

  1. 高性能:單機(jī)支持萬級(jí)TPS
  2. 低耦合:服務(wù)間通過消息解耦
  3. 高可用:無單點(diǎn)故障
  4. 可擴(kuò)展:水平擴(kuò)展能力強(qiáng)
  5. 簡單易用:Spring Boot無縫集成

9.2 適用場景

  • 電商訂單系統(tǒng)
  • 跨行轉(zhuǎn)賬業(yè)務(wù)
  • 酒店機(jī)票預(yù)訂
  • 物聯(lián)網(wǎng)設(shè)備聯(lián)動(dòng)
  • 微服務(wù)間數(shù)據(jù)同步

9.3 未來演進(jìn)

  1. 事件溯源增強(qiáng):完整業(yè)務(wù)追溯能力
  2. AI驅(qū)動(dòng)補(bǔ)償:智能故障預(yù)測與修復(fù)
  3. 跨鏈?zhǔn)聞?wù):區(qū)塊鏈集成
  4. 無服務(wù)架構(gòu):Serverless適配

架構(gòu)師箴言:分布式事務(wù)沒有銀彈,輕量級(jí)方案在保證可用性的前提下,通過最終一致性實(shí)現(xiàn)業(yè)務(wù)需求的平衡,是互聯(lián)網(wǎng)高并發(fā)場景的最佳選擇。

責(zé)任編輯:武曉燕 來源: 小林聊編程
相關(guān)推薦

2021-06-16 08:33:02

分布式事務(wù)ACID

2023-07-25 09:52:00

本地事務(wù)宕機(jī)

2022-12-19 19:12:17

分布式事務(wù)

2019-10-11 23:27:19

分布式一致性算法開發(fā)

2021-07-26 06:33:42

CRDT數(shù)據(jù)CAP

2024-01-31 09:54:51

Redis分布式

2019-09-05 08:43:34

微服務(wù)分布式一致性數(shù)據(jù)共享

2021-11-22 16:30:30

分布式一致性分布式系統(tǒng)

2021-07-28 08:39:25

分布式架構(gòu)系統(tǒng)

2017-09-21 10:59:36

分布式系統(tǒng)線性一致性測試

2015-10-19 10:42:37

分布式一致性應(yīng)用系統(tǒng)

2024-06-04 10:58:30

2022-06-07 12:08:10

Paxos算法

2021-06-03 15:27:31

RaftSOFAJRaft

2024-11-28 10:56:55

2025-06-09 08:00:37

分布式文件系統(tǒng)

2025-05-09 01:04:00

2017-09-22 12:08:01

數(shù)據(jù)庫分布式系統(tǒng)互聯(lián)網(wǎng)

2021-06-06 12:45:41

分布式CAPBASE

2018-03-19 09:50:50

分布式存儲(chǔ)系統(tǒng)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)