偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

事務(wù)提交之后異步執(zhí)行工具類封裝

開(kāi)發(fā) 架構(gòu)
我們?cè)陧?xiàng)目中大多都是使用聲明式事務(wù)(@Transactional注解) ,Spring會(huì)基于動(dòng)態(tài)代理機(jī)制對(duì)我們的業(yè)務(wù)方法進(jìn)行增強(qiáng),控制Connection,從而達(dá)到事務(wù)的目的。那么我們能否在此找尋一些蛛絲馬跡。我們來(lái)看下Spring事務(wù)的相關(guān)核心類(裝配流程不詳細(xì)敘述)。

一、背景

許多時(shí)候,我們期望在事務(wù)提交之后異步執(zhí)行某些邏輯,調(diào)用外部系統(tǒng),發(fā)送MQ,推送ES等等;當(dāng)事務(wù)回滾時(shí),異步操作也不執(zhí)行,這些異步操作需要等待事務(wù)完成后才執(zhí)行;比如出入庫(kù)的事務(wù)執(zhí)行完畢后,異步發(fā)送MQ給報(bào)表系統(tǒng)、ES等等。

二、猜想

我們?cè)陧?xiàng)目中大多都是使用聲明式事務(wù)(@Transactional注解) ,spring會(huì)基于動(dòng)態(tài)代理機(jī)制對(duì)我們的業(yè)務(wù)方法進(jìn)行增強(qiáng),控制connection,從而達(dá)到事務(wù)的目的。那么我們能否在此找尋一些蛛絲馬跡。我們來(lái)看下spring事務(wù)的相關(guān)核心類(裝配流程不詳細(xì)敘述)。

TransactionInterceptor:

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
  @Override
  @Nullable
  public Object invoke(MethodInvocation invocation) throws Throwable {
     // Work out the target class: may be {@code null}.
     // The TransactionAttributeSource should be passed the target class
     // as well as the method, which may be from an interface.
     Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
     // Adapt to TransactionAspectSupport's invokeWithinTransaction...
     return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
  }
}

TransactionAspectSupport(重點(diǎn)關(guān)注事務(wù)提交之后做了哪些事情,有哪些擴(kuò)展點(diǎn))。

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
 protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
   // If the transaction attribute is null, the method is non-transactional.
   TransactionAttributeSource tas = getTransactionAttributeSource();
   final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
   final TransactionManager tm = determineTransactionManager(txAttr);

   if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
         if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
            throw new TransactionUsageException(
                  "Unsupported annotated transaction on suspending function detected: " + method +
                  ". Use TransactionalOperator.transactional extensions instead.");
         }
         ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
         if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                  method.getReturnType());
         }
         return new ReactiveTransactionSupport(adapter);
      });
      return txSupport.invokeWithinTransaction(
            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
   }

   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
   final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

   if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
      // 創(chuàng)建事務(wù),此處也會(huì)創(chuàng)建connection
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

      Object retVal;
      try {
         // 執(zhí)行目標(biāo)方法
         retVal = invocation.proceedWithInvocation();
      }
      catch (Throwable ex) {
         // 目標(biāo)方法異常時(shí)處理
         completeTransactionAfterThrowing(txInfo, ex);
         throw ex;
      }
      finally {
		 // 重置TransactionInfo ThreadLocal
         cleanupTransactionInfo(txInfo);
      }

      if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
         // Set rollback-only in case of Vavr failure matching our rollback rules...
         TransactionStatus status = txInfo.getTransactionStatus();
         if (status != null && txAttr != null) {
            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
         }
      }
	  // 業(yè)務(wù)方法成功執(zhí)行,提交事務(wù)(重點(diǎn)關(guān)注此處),最終會(huì)調(diào)用AbstractPlatformTransactionManager#commit方法
      commitTransactionAfterReturning(txInfo);
      return retVal;
   }

   else {
      final ThrowableHolder throwableHolder = new ThrowableHolder();

      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
         Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
            TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
            try {
               Object retVal = invocation.proceedWithInvocation();
               if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                  // Set rollback-only in case of Vavr failure matching our rollback rules...
                  retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
               }
               return retVal;
            }
            catch (Throwable ex) {
               if (txAttr.rollbackOn(ex)) {
                  // A RuntimeException: will lead to a rollback.
                  if (ex instanceof RuntimeException) {
                     throw (RuntimeException) ex;
                  }
                  else {
                     throw new ThrowableHolderException(ex);
                  }
               }
               else {
                  // A normal return value: will lead to a commit.
                  throwableHolder.throwable = ex;
                  return null;
               }
            }
            finally {
               cleanupTransactionInfo(txInfo);
            }
         });

         // Check result state: It might indicate a Throwable to rethrow.
         if (throwableHolder.throwable != null) {
            throw throwableHolder.throwable;
         }
         return result;
      }
      catch (ThrowableHolderException ex) {
         throw ex.getCause();
      }
      catch (TransactionSystemException ex2) {
         if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
            ex2.initApplicationException(throwableHolder.throwable);
         }
         throw ex2;
      }
      catch (Throwable ex2) {
         if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
         }
         throw ex2;
      }
   }
}
}

AbstractPlatformTransactionManager:

public final void commit(TransactionStatus status) throws TransactionException {
   if (status.isCompleted()) {
      throw new IllegalTransactionStateException(
            "Transaction is already completed - do not call commit or rollback more than once per transaction");
   }

   DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
   if (defStatus.isLocalRollbackOnly()) {
      if (defStatus.isDebug()) {
         logger.debug("Transactional code has requested rollback");
      }
      processRollback(defStatus, false);
      return;
   }

   if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
      if (defStatus.isDebug()) {
         logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
      }
      processRollback(defStatus, true);
      return;
   }
   // 事務(wù)提交處理
   processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
   try {
      boolean beforeCompletionInvoked = false;

      try {
         boolean unexpectedRollback = false;
         prepareForCommit(status);
         triggerBeforeCommit(status);
         triggerBeforeCompletion(status);
         beforeCompletionInvoked = true;

         if (status.hasSavepoint()) {
            if (status.isDebug()) {
               logger.debug("Releasing transaction savepoint");
            }
            unexpectedRollback = status.isGlobalRollbackOnly();
            status.releaseHeldSavepoint();
         }
         else if (status.isNewTransaction()) {
            if (status.isDebug()) {
               logger.debug("Initiating transaction commit");
            }
            unexpectedRollback = status.isGlobalRollbackOnly();
            doCommit(status);
         }
         else if (isFailEarlyOnGlobalRollbackOnly()) {
            unexpectedRollback = status.isGlobalRollbackOnly();
         }

         // Throw UnexpectedRollbackException if we have a global rollback-only
         // marker but still didn't get a corresponding exception from commit.
         if (unexpectedRollback) {
            throw new UnexpectedRollbackException(
                  "Transaction silently rolled back because it has been marked as rollback-only");
         }
      }
      catch (UnexpectedRollbackException ex) {
         // can only be caused by doCommit
         triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
         throw ex;
      }
      catch (TransactionException ex) {
         // can only be caused by doCommit
         if (isRollbackOnCommitFailure()) {
            doRollbackOnCommitException(status, ex);
         }
         else {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
         }
         throw ex;
      }
      catch (RuntimeException | Error ex) {
         if (!beforeCompletionInvoked) {
            triggerBeforeCompletion(status);
         }
         doRollbackOnCommitException(status, ex);
         throw ex;
      }

      // Trigger afterCommit callbacks, with an exception thrown there
      // propagated to callers but the transaction still considered as committed.
      try {
		 // 在事務(wù)提交后觸發(fā)(追蹤到這里就離真相不遠(yuǎn)了)
         triggerAfterCommit(status);
      }
      finally {
         triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
      }

   }
   finally {
      cleanupAfterCompletion(status);
   }
}

TransactionSynchronizationUtils:

public abstract class TransactionSynchronizationUtils {

  public static void triggerAfterCommit() {
     // TransactionSynchronizationManager: 事務(wù)同步器管理
     invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
  }
  public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
     if (synchronizations != null) {
        for (TransactionSynchronization synchronization : synchronizations) {
		   // 調(diào)用TransactionSynchronization#afterCommit方法,默認(rèn)實(shí)現(xiàn)為空,留給子類擴(kuò)展
		   // 那么我們想在事務(wù)提交之后做一些異步操作,實(shí)現(xiàn)此方法即可
           synchronization.afterCommit();
        }
     }
  }
}

TransactionSynchronization:

public interface TransactionSynchronization extends Flushable {
   default void afterCommit() {}
}

過(guò)程中我們發(fā)現(xiàn)TransactionSynchronizationManager、TransactionSynchronization、TransactionSynchronizationAdapter 等相關(guān)類涉及aop的整個(gè)流程,篇幅有限,在此不詳細(xì)展開(kāi),當(dāng)然我們的一些擴(kuò)展也是離不開(kāi)這些基礎(chǔ)類的。

三、實(shí)現(xiàn)

事務(wù)提交之后異步執(zhí)行,我們需自定義synchronization.afterCommit,結(jié)合線程池一起使用,定義線程池TaskExecutor。

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(******);
    taskExecutor.setMaxPoolSize(******);
    taskExecutor.setKeepAliveSeconds(******);
    taskExecutor.setQueueCapacity(******);
    taskExecutor.setThreadNamePrefix(******);
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
    taskExecutor.initialize();
    return taskExecutor;
}

定義AfterCommitExecutor接口。

public interface AfterCommitExecutor extends Executor { }

定義AfterCommitExecutorImpl實(shí)現(xiàn)類,注意需繼承TransactionSynchronizationAdapter類。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.core.NamedThreadLocal;
import org.springframework.core.task.TaskExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.List;
import java.util.ArrayList;

@Component
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {

    private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);
    // 保存要運(yùn)行的任務(wù)線程
    private static final ThreadLocal<List<Runnable>> RUNNABLE_THREAD_LOCAL = new NamedThreadLocal<>("AfterCommitRunnable");
    // 設(shè)置線程池
    @Autowired
    private TaskExecutor taskExecutor;

    /**
     * 異步執(zhí)行
     *
     * @param runnable 異步線程
     */
    @Override
    public void execute(Runnable runnable) {
        LOGGER.info("Submitting new runnable {} to run after commit", runnable);
        // 如果事務(wù)已經(jīng)提交,馬上進(jìn)行異步處理
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
            runnable.run();
            return;
        }
        // 同一個(gè)事務(wù)的合并到一起處理(注意:沒(méi)有初始化則初始化,并注冊(cè))
        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();
        if (null == threadRunnableList) {
            threadRunnableList = new ArrayList<>();
            RUNNABLE_THREAD_LOCAL.set(threadRunnableList);
            TransactionSynchronizationManager.registerSynchronization(this);
        }
        threadRunnableList.add(runnable);
    }

    /**
     * 監(jiān)聽(tīng)到事務(wù)提交之后執(zhí)行方法
     */
    @Override
    public void afterCommit() {
        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();
        LOGGER.info("Transaction successfully committed, executing {} threadRunnable", threadRunnableList.size());
        for (Runnable runnable : threadRunnableList) {
            try {
                taskExecutor.execute(runnable);
            } catch (RuntimeException e) {
                LOGGER.error("Failed to execute runnable " + runnable, e);
            }
        }
    }

    /**
     * 事務(wù)提交/回滾執(zhí)行
     *
     * @param status (STATUS_COMMITTED-0、STATUS_ROLLED_BACK-1、STATUS_UNKNOWN-2)
     */
    @Override
    public void afterCompletion(int status) {
        LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
        RUNNABLE_THREAD_LOCAL.remove();
    }
}

使用。

工具類封裝好了,使用上那就很簡(jiǎn)便了:注入AfterCommitExecutor,調(diào)用AfterCommitExecutor.execute(runnable)方法即可

四、總結(jié)

spring如此龐大,找準(zhǔn)切入點(diǎn),許多問(wèn)題都是可以找到解決思路、或者方案;

你對(duì)spring了解多少......

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2014-07-31 18:23:41

Process

2018-01-30 18:49:16

前端JavascriptCSS

2021-04-18 07:09:50

工具類異步編程

2021-03-17 00:05:50

分布式事務(wù)提交

2009-02-11 13:08:29

事務(wù)提交事務(wù)管理Spring

2024-03-13 14:35:33

Spring事件異步

2023-02-27 14:42:46

MySQLSQL

2022-07-27 08:52:10

MySQL二階段提交

2024-02-01 08:42:55

2011-08-16 15:06:43

IOS開(kāi)發(fā)異步請(qǐng)求

2009-08-19 09:36:03

ADO封裝類

2012-03-09 10:58:23

2023-07-26 09:24:03

分布式事務(wù)分布式系統(tǒng)

2024-05-21 14:12:07

2009-12-07 15:34:18

PHP類的封裝

2023-04-26 01:29:05

OkHttp3工具方式

2020-06-27 09:01:53

Java包裝類編程語(yǔ)言

2009-07-22 10:13:31

異步ActionASP.NET MVC

2011-01-12 17:48:21

ArgusIP網(wǎng)絡(luò)事務(wù)評(píng)審網(wǎng)絡(luò)安全工具

2024-01-15 07:05:50

開(kāi)發(fā)大事務(wù)數(shù)據(jù)庫(kù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)