偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

深度剖析 Seata 源碼

開發(fā)
本文將針對(duì) seata 分布式事務(wù)注冊(cè)到提交回滾的全流程進(jìn)行深入分析和講解,希望對(duì)你有幫助。

一、如何使用源碼

需要了解的是,這篇文章是基于筆者相對(duì)早期的項(xiàng)目作為樣例進(jìn)行講解,所以對(duì)應(yīng)的seata版本為1.4.2(核心部分實(shí)現(xiàn)大體是一樣的),建議讀者閱讀本文在調(diào)試源碼時(shí)可以選擇和筆者相同的版本進(jìn)行理解學(xué)習(xí),對(duì)應(yīng)的下載地址為:https://github.com/apache/incubator-seata/tree/v1.4.2

完成下載后,為保證編譯可以通過我們還需要將seata-serializer-protobuf模塊移除掉,該模塊的位置如下圖所示:

同時(shí)seata的啟動(dòng)類位于seata-server模塊,所以我們需要將該模塊的registry.conf的配置改為自己的配置:

以筆者為例,seata配置都是通過nacos進(jìn)行統(tǒng)一管理的,所以對(duì)應(yīng)的配置類型也都是針對(duì)nacos維度去協(xié)調(diào)適配,大體配置如下所示:

registry {
  # 將seata注冊(cè)到nacos上
  type = "nacos"
  nacos {
  # nacos地址
    serverAddr = "ip:8848"
    # 命名空間id
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    # 組名
    group = "DEFAULT_GROUP"
    # 集群節(jié)點(diǎn)名稱
    cluster = "default"
  }
}
config {
  # 通過nacos獲取配置
  type = "nacos"
  nacos {
    serverAddr = "ip:8848"
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    group = "DEFAULT_GROUP"
  }
}

經(jīng)過這幾個(gè)步驟后seata就可以像我們?nèi)粘R粯拥姆绞竭M(jìn)行使用了。

二、基于AT模式詳解Seata全鏈路流程

1. seata服務(wù)端啟動(dòng)

我們先從seata的服務(wù)端啟動(dòng)開始,seata服務(wù)端啟動(dòng)時(shí)會(huì)進(jìn)行如下幾個(gè)核心步驟:

  • 創(chuàng)建工作線程池workingThreads。
  • 基于工作線程池創(chuàng)建一個(gè)Netty服務(wù)端對(duì)外提供服務(wù)。
  • 基于該服務(wù)端創(chuàng)建的一個(gè)默認(rèn)的協(xié)調(diào)者DefaultCoordinator管理全局事務(wù)。
  • 默認(rèn)協(xié)調(diào)者初始化幾個(gè)定時(shí)任務(wù)處理一些異步的全局事務(wù)提交、回滾、超時(shí)監(jiān)測(cè)的任務(wù)。

對(duì)應(yīng)的我們給出這塊邏輯的核心入口代碼,即位于Server的主函數(shù)入口的main方法,可以看到seata服務(wù)端的創(chuàng)建是基于netty完成的,完成創(chuàng)建和初始化之后就與協(xié)調(diào)者coordinator進(jìn)行綁定:

public static void main(String[] args) throws IOException {
        //......
        //創(chuàng)建工作線程池處理業(yè)務(wù)請(qǐng)求
        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
        //基于該線程池初始化 seata 服務(wù)端
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
 //......
        //log store mode : file, db, redis
        SessionHolder.init(parameterParser.getStoreMode());
        //初始化協(xié)調(diào)者,處理seata服務(wù)端收到的各種事務(wù)讀寫請(qǐng)求
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        //初始化各種異步定時(shí)任務(wù):全局事務(wù)提交、全局事務(wù)回滾、超時(shí)監(jiān)測(cè)等
        coordinator.init();
        //將協(xié)調(diào)者作為seata服務(wù)端的處理器
        nettyRemotingServer.setHandler(coordinator);
       //......
    }

對(duì)應(yīng)的我們也給出默認(rèn)協(xié)調(diào)者的初始化源碼,即DefaultCoordinator的init方法,可以看到這段代碼本質(zhì)上就是提交一些定時(shí)任務(wù)處理全局事務(wù)提交、回滾、超時(shí)監(jiān)測(cè)、undo log刪除等:

public void init() {
        //每秒執(zhí)行,處理需要回滾的分布式事務(wù)
        retryRollbacking.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.retryRollbackingLock();
            if (lock) {
                try {
                    handleRetryRollbacking();
                } catch (Exception e) {
                    LOGGER.info("Exception retry rollbacking ... ", e);
                } finally {
                    SessionHolder.unRetryRollbackingLock();
                }
            }
        }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
        //異步定時(shí)提交全局事務(wù)的定時(shí)任務(wù),每秒執(zhí)行一次
        asyncCommitting.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.asyncCommittingLock();
            if (lock) {
                try {
                    //掃描獲取各種異步提交的全局事務(wù)
                    handleAsyncCommitting();
                } catch (Exception e) {
                    LOGGER.info("Exception async committing ... ", e);
                } finally {
                    SessionHolder.unAsyncCommittingLock();
                }
            }
        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
    }

2. 本地服務(wù)如何基于GlobalTransaction注解開啟事務(wù)

我們都知道seata也是基于spring boot實(shí)現(xiàn)的,所以我們可以大膽的認(rèn)為應(yīng)用端使用GlobalTransaction開啟分布式事務(wù)本質(zhì)上也是和spring boot自動(dòng)裝配有著一定的聯(lián)系。

所以我們從seata-spring-boot-starter這個(gè)腳手架的源碼包的spring.factories文件入手,可以看到一個(gè)SeataAutoConfiguration的注入:

于是我們就可以看到一個(gè)GlobalTransactionScanner即一個(gè)關(guān)于GlobalTransaction注解掃描的類:

@Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
      //......
        //掃描我們的配置文件中配置的applicationId、txServiceGroup對(duì)應(yīng)的事務(wù)
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

查看GlobalTransactionScanner源碼我們可以看到該類型繼承了spring的初始化bean并設(shè)置屬性后的拓展點(diǎn)InitializingBean的afterPropertiesSet方法,該方法內(nèi)部會(huì)初始化當(dāng)前seata客戶端,分別初始化TM客戶端(使用GlobalTransaction注解的方法的服務(wù)即做為TM)和RM客戶端處理其他TM或者RM服務(wù)端發(fā)送的消息,它們初始化的工作分別是:

  • TM客戶端會(huì)注冊(cè)各種TC消息響應(yīng)的處理器,處理各種seata server對(duì)應(yīng)的TC響應(yīng)的消息,例如:全局事務(wù)開啟結(jié)果處理器、全局事務(wù)提交處理器、全局事務(wù)回滾處理器等。
  • RM客戶端則是注冊(cè)一些各種seata server對(duì)應(yīng)TC請(qǐng)求消息的處理器,例如:分支事務(wù)提交、分支事務(wù)回滾、分支事務(wù)undo.log刪除等。

對(duì)應(yīng)我們給出GlobalTransactionScanner的afterPropertiesSet源碼可以看到客戶端初始化這段調(diào)用的入口,可以看到啟動(dòng)時(shí)某個(gè)線程完成CAS上鎖初始化標(biāo)識(shí)之后,即通過initClient初始化客戶端:

@Override
    public void afterPropertiesSet() {
        //......
        //基于擴(kuò)展點(diǎn)進(jìn)行客戶端初始化
        if (initialized.compareAndSet(false, true)) {
            initClient();
        }
    }

步入后即可看到對(duì)于TM和RM客戶端的初始化調(diào)用:

private void initClient() {
        //......
        // 初始化TM客戶端
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
       //......
        // 初始化RM客戶端
        RMClient.init(applicationId, txServiceGroup);
      //......
    }

此時(shí)我們先看看TM客戶端內(nèi)部的處理函數(shù)即位于TmNettyRemotingClient的registerProcessor即可看到上述所說(shuō)的TC響應(yīng)消息處理器的綁定步驟,即:

  • 注冊(cè)TC響應(yīng)消息處理器
  • 注冊(cè)全局事務(wù)開啟響應(yīng)處理器
  • 注冊(cè)全局事務(wù)提交響應(yīng)處理器
  • 注冊(cè)心跳消息處理器
private void registerProcessor() {
        // 1.registry TC response processor 注冊(cè)一些TC響應(yīng)消息的處理器
        ClientOnResponseProcessor onResponseProcessor =
                new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        //全局事務(wù)開啟結(jié)果響應(yīng)處理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
        //全局事務(wù)提交響應(yīng)處理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
        // 2. 注冊(cè)心跳消息
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

同理我們也給出RM客戶端內(nèi)部初始化的調(diào)用RmNettyRemotingClient的registerProcessor方法:

  • 注冊(cè)分支事務(wù)提交消息處理器
  • 注冊(cè)rm客戶端對(duì)應(yīng)的分支事務(wù)提及和回滾處理器
  • 注冊(cè)u(píng)ndo Log刪除處理器
  • 注冊(cè)TC響應(yīng)消息處理器
  • 注冊(cè)心跳處理器
private void registerProcessor() {
        // 1. 注冊(cè)分支事務(wù)提交消息處理器
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
        // 2.注冊(cè)rm客戶端對(duì)應(yīng)的分支事務(wù)回滾處理器
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
        // 3. 注冊(cè)u(píng)ndo log刪除處理器
        RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
        // 4. 注冊(cè)TC響應(yīng)消息處理器
        ClientOnResponseProcessor onResponseProcessor =
            new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
        // 5.注冊(cè)心跳消息處理器
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

同時(shí)GlobalTransactionScanner繼承了AbstractAutoProxyCreator的wrapIfNecessary,該代理類會(huì)在spring容器中的bean進(jìn)行檢查并決定是否進(jìn)行動(dòng)態(tài)代理。以我們的GlobalTransactionScanner邏輯它本質(zhì)上就是:

  • 檢查當(dāng)前bean是否有GlobalTransactional這個(gè)注解
  • 如果有則基于全局事務(wù)攔截器對(duì)其進(jìn)行增強(qiáng)

對(duì)應(yīng)核心邏輯如下所示,可以看到這段代碼會(huì)通過existsAnnotation檢查當(dāng)前bean是否存在GlobalTransactional注解,如果有則基于globalTransactionalInterceptor 對(duì)其進(jìn)行增強(qiáng):

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
              //......
                //check TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                   //......
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    //判斷是否有GlobalTransaction注解,如果有則為其生成分布式事務(wù)的動(dòng)態(tài)代理
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    //如果攔截器為空則初始化攔截器
                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }

              //......
                if (!AopUtils.isAopProxy(bean)) {
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    //基于上一步的interceptor為其生成動(dòng)態(tài)代理
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
             //......
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

這也就意味著我們調(diào)用帶有GlobalTransactional注解方法時(shí),就會(huì)走到GlobalTransactionalInterceptor的增強(qiáng)邏輯上,它會(huì)走到GlobalTransactionalInterceptor的invoke方法上,最終會(huì)走到事務(wù)模板類transactionalTemplate的execute方法,該方法會(huì)執(zhí)行如下三個(gè)核心步驟:

  • 開啟全局事務(wù)。
  • 執(zhí)行原始業(yè)務(wù)邏輯。
  • 根據(jù)各個(gè)分支事務(wù)結(jié)果提交或者回滾事務(wù)。

對(duì)應(yīng)的我們給出GlobalTransactionalInterceptor的invoke方法,可以看到當(dāng)該方法認(rèn)為注解存在的情況下會(huì)直接調(diào)用handleGlobalTransaction開啟并處理全局事務(wù):

@Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
      //......
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {

            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //獲取GlobalTransactional注解信息
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            //......
            if (!localDisable) {
                //若全局事務(wù)注解不為空則調(diào)用handleGlobalTransaction處理全局事務(wù)
                if (globalTransactionalAnnotation != null) {
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
                     //......
                }
            }
        }
         //......
    }

步入其內(nèi)部就會(huì)走到transactionalTemplate的execute方法,即可看到對(duì)于:

  • 分支事務(wù)的創(chuàng)建
  • 告知TC請(qǐng)求開啟全局事務(wù)
  • 執(zhí)行本地事務(wù)
  • 全局提交或者回滾

對(duì)應(yīng)邏輯的源碼如下所示,讀者可結(jié)合說(shuō)明了解:

public Object execute(TransactionalExecutor business) throws Throwable {
       //......

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            //如果tx為空則以全局事務(wù)啟動(dòng)者的身份創(chuàng)建一個(gè)全新的事務(wù)
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                //向TC發(fā)送請(qǐng)求開啟全局事務(wù)
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    //執(zhí)行業(yè)務(wù)邏輯(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    //全局事務(wù)回滾
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                //分支事務(wù)執(zhí)行成功,提交全局事務(wù)
                commitTransaction(tx);

                return rs;
            } finally {
             //......
            }
        } finally {
         //......
        }
    }

3. 客戶端如何開啟分布式事務(wù)

上文調(diào)用分布式事務(wù)的方法時(shí)內(nèi)部會(huì)走到的代理的transactionalTemplate的execute方法,其內(nèi)部有個(gè)beginTransaction就是開啟分布式事務(wù)的關(guān)鍵,由上文可知作為GlobalTransactional注解的方法對(duì)對(duì)應(yīng)的服務(wù)就是作為TM即transaction manager,所以在調(diào)用beginTransaction時(shí),這個(gè)方法的代理就會(huì)以TM的身份發(fā)送一個(gè)請(qǐng)求告知TC自己要開啟一個(gè)全局事務(wù),TC經(jīng)過自己的協(xié)調(diào)處理后(后文會(huì)介紹流程)返回一份xid告知TM開啟成功:

對(duì)應(yīng)的我們查看seata客戶端對(duì)應(yīng)TransactionalTemplate的beginTransaction方法即可看到begin方法的調(diào)用,該方法回告知seata服務(wù)端自己要開啟一個(gè)全局事務(wù):

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //......
            //開始分布式事務(wù)
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
         //......
        } catch (TransactionException txe) {
            //......

        }
    }

查看begin內(nèi)部就是通過TM發(fā)起請(qǐng)求,得到xid并緩存到當(dāng)前線程內(nèi)部,開始后續(xù)的執(zhí)行流程分布式事務(wù)處理流程:

@Override
    public void begin(int timeout, String name) throws TransactionException {
        //......
        //通過TM告知TC開啟全局事務(wù),從而得到xid
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        //將xid緩存到當(dāng)前線程的緩存中
        RootContext.bind(xid);
        //......
    }

4. seata服務(wù)端如何注冊(cè)全局事務(wù)

基于上述請(qǐng)求,對(duì)應(yīng)seata server端的TC收到請(qǐng)求后會(huì)基于傳參中的消息標(biāo)信息,定位到對(duì)應(yīng)的執(zhí)行器即TM消息處理器,然后驅(qū)動(dòng)TM處理器將這個(gè)請(qǐng)求生成一份全局session信息從而構(gòu)成本次請(qǐng)求的全局事務(wù)信息,再將請(qǐng)求寫入數(shù)據(jù)表中:

我們給出TC處理消息的代碼入口AbstractNettyRemotingServer的channelRead方法,從名字不難看出TC服務(wù)端也是基于netty實(shí)現(xiàn),其內(nèi)部通過processMessage處理各種消息:

@Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  
            //基于netty編寫的服務(wù)端,channelRead通過processMessage處理客戶端各種請(qǐng)求
            processMessage(ctx, (RpcMessage) msg);
        }

步入processMessage即可看到基于處理表定位消息并交由處理器處理消息邏輯pair.getFirst().process(ctx, rpcMessage);:

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
         //......
         //獲取網(wǎng)絡(luò)消息
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //通過處理表定位到對(duì)應(yīng)的處理器
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                //基于第一個(gè)處理器處理當(dāng)前消息
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                //......
                            } finally {
                                //......
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        //......
                        
                    }
                } else {
                //......
               }
        }
    }

因?yàn)槲覀兊南⑹荰M發(fā)來(lái)的,所以上一步的處理器是ServerOnRequestProcessor的:

@Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
         //處理TM客戶端發(fā)送來(lái)的消息
            onRequestMessage(ctx, rpcMessage);
        } else {
           //......
        }
    }

最終走到GlobalBeginRequest這個(gè)工具的handle基于協(xié)調(diào)者將事務(wù)信息寫入global_table從而得到xid返回給TM客戶端:

@Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
  //生成全局事務(wù)信息并得到xid將數(shù)據(jù)寫入響應(yīng)返回給TM
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
       //.......
    }

5. RM和TC如何協(xié)調(diào)處理分支事務(wù)

完成全局事務(wù)的注冊(cè)管理之后,我們?cè)賮?lái)聊聊各個(gè)分支事務(wù)的執(zhí)行和提交回滾,上文提及,seata原生我們本地的jdbc數(shù)據(jù)庫(kù)連接通過代理加以封裝,所以在我們seata客戶端執(zhí)行本地事務(wù)完成后提交的commit方法是經(jīng)過了seata的代理這一層,該連接代理在調(diào)用commit方法時(shí),其內(nèi)部就會(huì)通過RM向TC注冊(cè)一個(gè)分支事務(wù)的請(qǐng)求,TC收到請(qǐng)求后會(huì)執(zhí)行如下工作:

  • 基于lock_table嘗試為事務(wù)生成全局鎖。
  • 分支事務(wù)信息寫入到branch_table表上并返回branch_id給RM:

我們給出ConnectionProxy的commit方法入口,其內(nèi)部調(diào)用了一個(gè)doCommit方法,它就是事務(wù)提交的核心邏輯:

@Override
    public void commit() throws SQLException {
        try {
         //excute會(huì)調(diào)用doCommit生成undoLog緩存和執(zhí)行分支事務(wù)
            LOCK_RETRY_POLICY.execute(() -> {
                //excuete執(zhí)行成功后這一步會(huì)注冊(cè)分支事務(wù)并提交本地事務(wù)和undoLog鏡像以保證原子性
                doCommit();
                return null;
            });
        } catch (SQLException e) {
           //......
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

其內(nèi)部調(diào)用ConnectionProxy的doCommit會(huì)調(diào)用processGlobalTransactionCommit執(zhí)行分支事務(wù):

private void doCommit() throws SQLException {
        //如果處于全局事務(wù)中則調(diào)用processGlobalTransactionCommit
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
         //......
        } else {
           //......
        }
    }

最終就可以在processGlobalTransactionCommit看到如下邏輯:

  • register這個(gè)注冊(cè)分支事務(wù)的邏輯,TC基于RM給定的resourceId信息,生成操作數(shù)據(jù)的全局鎖,并插入分支事務(wù)信息到brach_table中。
  • undo日志刷盤到本地undo日志中。
  • 本地業(yè)務(wù)的事務(wù)提交。
private void processGlobalTransactionCommit() throws SQLException {
        try {
            //向TC發(fā)起請(qǐng)求注冊(cè)分支事務(wù),TC基于RM給定的resourceId生成全局鎖并插入分支事務(wù)信息到brach_table后就不會(huì)拋異常
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            //undo日志刷盤
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //本地事務(wù)提交
            targetConnection.commit();
        } catch (Throwable ex) {
          //......
        }
          //......
    }

這里我們著重看一下register函數(shù),其內(nèi)部本質(zhì)上就是通過RM客戶端告知TC自己準(zhǔn)備執(zhí)行分支事務(wù)提交,幫我上一把全局鎖并注冊(cè)分支事務(wù):

private void register() throws TransactionException {
        if (!context.hasUndoLog() || !context.hasLockKey()) {
            return;
        }
        //向tc發(fā)起請(qǐng)求并獲得register
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), null, context.buildLockKeys());
        //緩存到當(dāng)前線程中
        context.setBranchId(branchId);
    }

最后這個(gè)注冊(cè)的邏輯就會(huì)來(lái)到AbstractResourceManager的branchRegister上,可以看到它會(huì)攜帶著全局事務(wù)id和主鍵等數(shù)據(jù)發(fā)送請(qǐng)求給TC:

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        try {
            BranchRegisterRequest request = new BranchRegisterRequest();
            //傳入全局事務(wù)id即xid
            request.setXid(xid);
            //基于當(dāng)前數(shù)據(jù)主鍵生成lockkeys
            request.setLockKey(lockKeys);
            request.setResourceId(resourceId);
            request.setBranchType(branchType);
            request.setApplicationData(applicationData);
            //基于RM的netty客戶端將其異步發(fā)送
            BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
         //......
            return response.getBranchId();
        } catch (TimeoutException toe) {
           //......
        }
    }

6. seata服務(wù)端處理分支事務(wù)請(qǐng)求

TC處理流程與上述文章同理,收到消息后基于request中的消息表定位到對(duì)應(yīng)的處理器,我們這里最終會(huì)走到BranchRegisterRequest的處理器上,通過AbstractTCInboundHandler注冊(cè)分支事務(wù):

@Override
    public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
        BranchRegisterResponse response = new BranchRegisterResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
            @Override
            public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
                throws TransactionException {
                try {
                    //tc注冊(cè)分支事務(wù)入口
                    doBranchRegister(request, response, rpcContext);
                } catch (StoreException e) {
                 //......
                }
            }
        }, request, response);
        return response;
    }

最終這段邏輯就會(huì)走到AbstractCore的branchRegister,大體執(zhí)行的步驟是:

  • 生成分支事務(wù)session
  • 嘗試獲得數(shù)據(jù)全局鎖lock_table
  • 取鎖成功將分支事務(wù)信息寫入branch_table
  • 返回branch_id給RM

對(duì)應(yīng)源碼邏輯如下,大體邏輯就說(shuō)基于分支事務(wù)session生成全局鎖存到lock_table后,將分支事務(wù)信息存到branch_table中:

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
      //......
        return SessionHolder.lockAndExecute(globalSession, () -> {
             //......
            //獲取分支事務(wù)的表信息并將其寫入到lock_table中意味獲得全局鎖,上鎖失敗會(huì)拋異常
            branchSessionLock(globalSession, branchSession);

            try {
                //添加分支事務(wù)信息到branch_table中
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                 //......
            }
             //......
             //返回分支事務(wù)id
            return branchSession.getBranchId();
        });
    }

TC返回成功后,RM就會(huì)執(zhí)行undo日志刷盤和本地事務(wù)提交,詳情參考我們本節(jié)代碼processGlobalTransactionCommit方法,這里不貼出了。

7. RM生成回滾日志

對(duì)于java程序而言大部分SQL操作底層都是基于Executor執(zhí)行器操作的,在上述代理執(zhí)行commit方法前,seata底層將代理的連接即上文的connectionProxy通過AbstractDMLBaseExecutor執(zhí)行SQL操作,該執(zhí)會(huì)針對(duì)我們的連接代理進(jìn)行如下邏輯處理:

  • 判斷連接代理connectionProxy是否是自動(dòng)提交,若非自動(dòng)提交則調(diào)用executeAutoCommitFalse方法,該方法會(huì)生成undoLog數(shù)據(jù)寫入緩存,然后將RM當(dāng)執(zhí)行分支事務(wù)SQL,基于該執(zhí)行結(jié)果生成后置鏡像,最后將undo日志寫入undo_log表中。
  • 若開啟自動(dòng)提交則關(guān)閉自動(dòng)提交后,復(fù)用executeAutoCommitFalse方法執(zhí)行系統(tǒng)的undoLog和分支事務(wù)SQL的執(zhí)行操作。

對(duì)應(yīng)源碼的整體工作鏈路圖如下所示:

這里我們直接給出AbstractDMLBaseExecutor的doExecute方法作為入口,可以看到若開啟自動(dòng)提交則調(diào)用executeAutoCommitTrue,反之調(diào)用executeAutoCommitFalse:

@Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        //若自動(dòng)提交則關(guān)閉自動(dòng)提交,并生成undo信息存入緩沖區(qū)
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            //直接生成undo log鏡像寫入緩存
            return executeAutoCommitFalse(args);
        }
    }

因?yàn)槎紩?huì)復(fù)用executeAutoCommitFalse這段邏輯,所以我們直接查看這個(gè)方法的邏輯,可以看到該邏輯內(nèi)部會(huì)基于分支事務(wù)前后的數(shù)據(jù)生成前置和后置鏡像:

protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        //基于分支事務(wù)的SQL定位操作前的SQL生成前置鏡像
        TableRecords beforeImage = beforeImage();
        //執(zhí)行分支事務(wù)的SQL 
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        //生成分支事務(wù)操作后置鏡像
        TableRecords afterImage = afterImage(beforeImage);
        //將undoLog寫入緩沖區(qū)
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

8. 事務(wù)全局提交與回滾

TransactionalTemplate(即TM)驅(qū)動(dòng)各種分支事務(wù)準(zhǔn)備成功后,就會(huì)執(zhí)行commitTransaction提交全局事務(wù),對(duì)應(yīng)的代碼位于TransactionalTemplate的execute方法,該方法會(huì)通知TC驅(qū)動(dòng)全局事務(wù)提交,而TC收到該請(qǐng)求之后,就會(huì)驅(qū)動(dòng)各個(gè)分支事務(wù)提交事務(wù),每個(gè)分支事務(wù)收到該請(qǐng)求后就會(huì)刪除undoLog并提交各自未提交的事務(wù):

public Object execute(TransactionalExecutor business) throws Throwable {
          //......

            try {
             
                //向TC發(fā)送請(qǐng)求開啟全局事務(wù)
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    
                    //執(zhí)行業(yè)務(wù)邏輯(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {            
                    //全局事務(wù)回滾
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

              
                //分支事務(wù)執(zhí)行成功,提交全局事務(wù)
                commitTransaction(tx);

                return rs;
            } finally {
          //......
            }
        } finally {
          //......
        }
    }

步入其內(nèi)部可以看到DefaultGlobalTransaction調(diào)用transactionManager即TM提交全局事務(wù):

@Override
    public void commit() throws TransactionException {
       //......
        try {
            while (retry > 0) {
                try {
                    //執(zhí)行全局事務(wù)提交
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                   //......
            }
        } finally {
          //......
        }
        //......
    }

這個(gè)commit的邏輯也很簡(jiǎn)單,即告知TC要提交全局事務(wù)了:

@Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        //通知TC提交全局事務(wù)
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

對(duì)應(yīng)的TC收到該請(qǐng)求后,對(duì)應(yīng)的AbstractTCInboundHandler就會(huì)調(diào)用doGlobalCommit通知各個(gè)RM提交全局事務(wù):

@Override
    public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
        GlobalCommitResponse response = new GlobalCommitResponse();
        response.setGlobalStatus(GlobalStatus.Committing);
        exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
            @Override
            public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
                throws TransactionException {
                try {
                //遍歷RM提交各個(gè)分支事務(wù)
                    doGlobalCommit(request, response, rpcContext);
                } catch (StoreException e) {
                  //......
                }
            }
     //......

          //......


        }, request, response);
        return response;
    }

對(duì)應(yīng)的我們可以來(lái)道該源碼內(nèi)部的DefaultCore的doGlobalCommit方法印證這一點(diǎn),可以看到該方法會(huì)遍歷各個(gè)分支事務(wù)調(diào)用branchCommit通知其提交或者回滾事務(wù):

@Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
      //......
        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            //遍歷全局事務(wù)中的分支事務(wù)
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                //......
                }
                try {
                    //告知RM提交事務(wù)
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                    //......
                } catch (Exception ex) {
                    //......
                }
                return CONTINUE;
            });
             //......
        }
        //......
        return success;
    }

最后請(qǐng)求達(dá)到RM上的DefaultRMHandler按照TC要求提交或者回滾事務(wù):

//RM提交分支事務(wù)
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }
    //RM回滾分支事務(wù)
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }

提交事務(wù)本質(zhì)上就是提交后刪除undoLog即可,這里我們以分支事務(wù)回滾為例,可以看到上述代碼BranchRollbackResponse 會(huì)調(diào)用handle方法執(zhí)行分支事務(wù)回滾,該方法最終會(huì)走到AbstractRMHandler的doBranchRollback,該方法會(huì)調(diào)動(dòng)RM管理器將分支事務(wù)回滾:

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        //......
        //回滾分支事務(wù)
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        //將xid和處理結(jié)果狀態(tài)響應(yīng)給TC
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
       //......
    }

最終該方法內(nèi)部就會(huì)調(diào)用AbstractUndoLogManager的undo解析當(dāng)前分支事務(wù)的前置鏡像數(shù)據(jù),該方法內(nèi)部執(zhí)行邏輯為:

  • 定位分支事務(wù)的undo日志數(shù)據(jù)
  • 反序列化為undo對(duì)象
  • 基于該undo對(duì)象信息解析出表名、列以及數(shù)據(jù)等信息。
  • 通過undoExecutor 執(zhí)行器將分支事務(wù)還原。

對(duì)應(yīng)源碼如下:

@Override
    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
      //......

        for (; ; ) {
            try {
               //......

                // Find UNDO LOG
                //獲取當(dāng)前分支事務(wù)的undo鏡像
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();

                boolean exists = false;
                while (rs.next()) {
                   //......
                    //獲取undo數(shù)據(jù)
                    byte[] rollbackInfo = getRollbackInfo(rs);

                    //反序列化生成undo對(duì)象 branchUndoLog
                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        // put serializer name to local
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        //遍歷undo對(duì)象生成SQL還原分支事務(wù)值
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            //獲取表的表名、列的元信息
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            //獲取對(duì)應(yīng)的執(zhí)行執(zhí)行器 將對(duì)應(yīng)分支事務(wù)的表數(shù)據(jù)回滾
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        // remove serializer name
                        removeCurrentSerializer();
                    }
                }

                //......
            } catch (SQLIntegrityConstraintViolationException e) {
                //......
            } catch (Throwable e) {
                //......

            } finally {
               //......
            }
        }
    }

三、小結(jié)

讓我們來(lái)做個(gè)小結(jié),總的來(lái)說(shuō)seata實(shí)現(xiàn)數(shù)據(jù)庫(kù)的AT模式分布式事務(wù)的流程為:

(1) 調(diào)用帶有g(shù)lobalTransactional注解的方法執(zhí)行業(yè)務(wù)邏輯。

(2) 該方法以TM的身份通知TC開啟全局事務(wù)。

(3) TC收到通知后到global_table創(chuàng)建該方法的全局事務(wù)信息,這里以筆者某個(gè)下單業(yè)務(wù)的分布式事務(wù)場(chǎng)景為例,對(duì)應(yīng)的數(shù)據(jù)如下所示:

(4) RM開始工作,各自找TC注冊(cè)分支事務(wù),基于當(dāng)前數(shù)據(jù)生成全局鎖存入lock_table,保證當(dāng)前數(shù)據(jù)操作時(shí)沒有其他事務(wù)干擾:

全局鎖成功后TC將數(shù)據(jù)存入branch_table表,對(duì)應(yīng)數(shù)據(jù)如下所示:

(5) RM完成分支事務(wù)注冊(cè)后,持有本地鎖的事務(wù)執(zhí)行本地分支事務(wù),成功后將生成分支數(shù)據(jù)的前后鏡像undo表,如下所示:

這里我們以后置鏡像為例子查看賬戶表修改后的字段值為例,可以看到該鏡像將每一個(gè)字段的類型、值等信息都序列化為JSON生成undo鏡像:

(6) TM感知到所有分支事務(wù)準(zhǔn)備成功,通知TC將這些RM(分支事務(wù))提交,即將undoLog刪除,反之基于undoLog將數(shù)據(jù)回滾。

對(duì)應(yīng)我們給出下面這段圖,讀者可以結(jié)合上面源碼梳理一下全流程:

我是 SharkChili ,Java 開發(fā)者,Java Guide 開源項(xiàng)目維護(hù)者。歡迎關(guān)注我的公眾號(hào):寫代碼的SharkChili,也歡迎您了解我的開源項(xiàng)目 mini-redis:https://github.com/shark-ctrl/mini-redis。

責(zé)任編輯:趙寧寧 來(lái)源: 寫代碼的SharkChili
相關(guān)推薦

2022-09-27 18:56:28

ArrayList數(shù)組源代碼

2024-02-05 19:06:04

DartVMGC流程

2010-03-01 14:50:06

Python 工具

2009-09-15 14:52:15

linq級(jí)聯(lián)刪除

2010-03-01 18:33:30

2023-01-10 13:48:50

ContainerdCRI源碼

2011-05-23 14:20:59

WordPress

2010-02-01 13:34:59

Python 腳本

2010-02-02 15:25:35

Python語(yǔ)法

2010-02-03 16:56:24

Python包

2010-03-05 16:38:30

2014-10-17 09:30:38

2020-04-01 10:28:12

Apache HBas數(shù)據(jù)結(jié)構(gòu)算法

2010-02-04 15:38:39

Android 手機(jī)

2022-03-24 14:40:31

開發(fā)Harmony鴻蒙

2022-04-29 14:56:40

通話應(yīng)用源碼剖析

2009-12-07 18:43:29

WCF框架

2010-02-05 18:00:18

Android源代碼

2010-02-06 15:32:30

Android架構(gòu)

2010-02-26 17:44:40

Python測(cè)試框架
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)