新手也能看懂的 SpringBoot 異步編程指南
通過(guò)本文你可以了解到下面這些知識(shí)點(diǎn):
- Future 模式介紹以及核心思想
- 核心線程數(shù)、最大線程數(shù)的區(qū)別,隊(duì)列容量代表什么;
- ThreadPoolTaskExecutor 飽和策略;
- SpringBoot 異步編程實(shí)戰(zhàn),搞懂代碼的執(zhí)行邏輯。
Future 模式
異步編程在處理耗時(shí)操作以及多任務(wù)處理的場(chǎng)景下非常有用,我們可以更好的讓我們的系統(tǒng)利用好機(jī)器的 CPU 和 內(nèi)存,提高它們的利用率。多線程設(shè)計(jì)模式有很多種,F(xiàn)uture模式是多線程開(kāi)發(fā)中非常常見(jiàn)的一種設(shè)計(jì)模式,本文也是基于這種模式來(lái)說(shuō)明 SpringBoot 對(duì)于異步編程的知識(shí)。
實(shí)戰(zhàn)之前我先簡(jiǎn)單介紹一下 Future 模式的核心思想 吧!。
Future 模式的核心思想是 異步調(diào)用 。當(dāng)我們執(zhí)行一個(gè)方法時(shí),假如這個(gè)方法中有多個(gè)耗時(shí)的任務(wù)需要同時(shí)去做,而且又不著急等待這個(gè)結(jié)果時(shí)可以讓客戶(hù)端立即返回然后,后臺(tái)慢慢去計(jì)算任務(wù)。當(dāng)然你也可以選擇等這些任務(wù)都執(zhí)行完了,再返回給客戶(hù)端。這個(gè)在 Java 中都有很好的支持,我在后面的示例程序中會(huì)詳細(xì)對(duì)比這兩種方式的區(qū)別。
SpringBoot 異步編程實(shí)戰(zhàn)
如果我們需要在 SpringBoot 實(shí)現(xiàn)異步編程的話(huà),通過(guò) Spring 提供的兩個(gè)注解會(huì)讓這件事情變的非常簡(jiǎn)單。
- @EnableAsync:通過(guò)在配置類(lèi)或者M(jìn)ain類(lèi)上加@EnableAsync開(kāi)啟對(duì)異步方法的支持。
- @Async 可以作用在類(lèi)上或者方法上,作用在類(lèi)上代表這個(gè)類(lèi)的所有方法都是異步方法。
1. 自定義 TaskExecutor
很多人對(duì)于 TaskExecutor 不是太了解,所以我們花一點(diǎn)篇幅先介紹一下這個(gè)東西。從名字就能看出它是任務(wù)的執(zhí)行者,它領(lǐng)導(dǎo)執(zhí)行著線程來(lái)處理任務(wù),就像司令官一樣,而我們的線程就好比一只只軍隊(duì)一樣,這些軍隊(duì)可以異步對(duì)敵人進(jìn)行打擊👊。
Spring 提供了TaskExecutor接口作為任務(wù)執(zhí)行者的抽象,它和java.util.concurrent包下的Executor接口很像。稍微不同的 TaskExecutor接口用到了 Java 8 的語(yǔ)法@FunctionalInterface聲明這個(gè)接口口是一個(gè)函數(shù)式接口。
- org.springframework.core.task.TaskExecutor
- @FunctionalInterface
- public interface TaskExecutor extends Executor {
- void execute(Runnable var1);
- }
如果沒(méi)有自定義Executor, Spring 將創(chuàng)建一個(gè) SimpleAsyncTaskExecutor 并使用它。
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.annotation.AsyncConfigurer;
- import org.springframework.scheduling.annotation.EnableAsync;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import java.util.concurrent.Executor;
- /** @author shuang.kou */
- @Configuration
- @EnableAsync
- public class AsyncConfig implements AsyncConfigurer {
- private static final int CORE_POOL_SIZE = 6;
- private static final int MAX_POOL_SIZE = 10;
- private static final int QUEUE_CAPACITY = 100;
- @Bean
- public Executor taskExecutor() {
- // Spring 默認(rèn)配置是核心線程數(shù)大小為1,最大線程容量大小不受限制,隊(duì)列容量也不受限制。
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- // 核心線程數(shù)
- executor.setCorePoolSize(CORE_POOL_SIZE);
- // 最大線程數(shù)
- executor.setMaxPoolSize(MAX_POOL_SIZE);
- // 隊(duì)列大小
- executor.setQueueCapacity(QUEUE_CAPACITY);
- // 當(dāng)最大池已滿(mǎn)時(shí),此策略保證不會(huì)丟失任務(wù)請(qǐng)求,但是可能會(huì)影響應(yīng)用程序整體性能。
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
- executor.initialize();
- return executor;
- }
- }
ThreadPoolTaskExecutor 常見(jiàn)概念:
- Core Pool Size : 核心線程數(shù)線程數(shù)定義了最小可以同時(shí)運(yùn)行的線程數(shù)量。
- Queue Capacity : 當(dāng)新任務(wù)來(lái)的時(shí)候會(huì)先判斷當(dāng)前運(yùn)行的線程數(shù)量是否達(dá)到核心線程數(shù),如果達(dá)到的話(huà),信任就會(huì)被存放在隊(duì)列中。
- Maximum Pool Size : 當(dāng)隊(duì)列中存放的任務(wù)達(dá)到隊(duì)列容量的時(shí)候,當(dāng)前可以同時(shí)運(yùn)行的線程數(shù)量變?yōu)樽畲缶€程數(shù)。
一般情況下不會(huì)將隊(duì)列大小設(shè)為:Integer.MAX_VALUE,也不會(huì)將核心線程數(shù)和最大線程數(shù)設(shè)為同樣的大小,這樣的話(huà)最大線程數(shù)的設(shè)置都沒(méi)什么意義了,你也無(wú)法確定當(dāng)前 CPU 和內(nèi)存利用率具體情況如何。
如果隊(duì)列已滿(mǎn)并且當(dāng)前同時(shí)運(yùn)行的線程數(shù)達(dá)到最大線程數(shù)的時(shí)候,如果再有新任務(wù)過(guò)來(lái)會(huì)發(fā)生什么呢?
Spring 默認(rèn)使用的是 ThreadPoolExecutor.AbortPolicy。在Spring的默認(rèn)情況下,ThreadPoolExecutor 將拋出 RejectedExecutionException 來(lái)拒絕新來(lái)的任務(wù) ,這代表你將丟失對(duì)這個(gè)任務(wù)的處理。對(duì)于可伸縮的應(yīng)用程序,建議使用 ThreadPoolExecutor.CallerRunsPolicy。當(dāng)最大池被填滿(mǎn)時(shí),此策略為我們提供可伸縮隊(duì)列。
ThreadPoolTaskExecutor 飽和策略定義:
如果當(dāng)前同時(shí)運(yùn)行的線程數(shù)量達(dá)到最大線程數(shù)量時(shí),ThreadPoolTaskExecutor 定義一些策略:
ThreadPoolExecutor.AbortPolicy:拋出 RejectedExecutionException來(lái)拒絕新任務(wù)的處理。
ThreadPoolExecutor.CallerRunsPolicy:調(diào)用執(zhí)行自己的線程運(yùn)行任務(wù)。您不會(huì)任務(wù)請(qǐng)求。但是這種策略會(huì)降低對(duì)于新任務(wù)提交速度,影響程序的整體性能。另外,這個(gè)策略喜歡增加隊(duì)列容量。如果您的應(yīng)用程序可以承受此延遲并且你不能任務(wù)丟棄任何一個(gè)任務(wù)請(qǐng)求的話(huà),你可以選擇這個(gè)策略。
ThreadPoolExecutor.DiscardPolicy: 不處理新任務(wù),直接丟棄掉。
ThreadPoolExecutor.DiscardOldestPolicy:此策略將丟棄最早的未處理的任務(wù)請(qǐng)求。
2. 編寫(xiě)一個(gè)異步的方法
下面模擬一個(gè)查找對(duì)應(yīng)字符開(kāi)頭電影的方法,我們給這個(gè)方法加上了@Async注解來(lái)告訴 Spring 它是一個(gè)異步的方法。另外,這個(gè)方法的返回值 CompletableFuture.completedFuture(results)這代表我們需要返回結(jié)果,也就是說(shuō)程序必須把任務(wù)執(zhí)行完成之后再返回給用戶(hù)。
請(qǐng)留意completableFutureTask方法中的第一行打印日志這句代碼,后面分析程序中會(huì)用到,很重要!
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- import java.util.concurrent.CompletableFuture;
- import java.util.stream.Collectors;
- /** @author shuang.kou */
- @Service
- public class AsyncService {
- private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);
- private List<String> movies =
- new ArrayList<>(
- Arrays.asList(
- "Forrest Gump",
- "Titanic",
- "Spirited Away",
- "The Shawshank Redemption",
- "Zootopia",
- "Farewell ",
- "Joker",
- "Crawl"));
- /** 示范使用:找到特定字符/字符串開(kāi)頭的電影 */
- @Async
- public CompletableFuture<List<String>> completableFutureTask(String start) {
- // 打印日志
- logger.warn(Thread.currentThread().getName() + "start this task!");
- // 找到特定字符/字符串開(kāi)頭的電影
- List<String> results =
- movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList());
- // 模擬這是一個(gè)耗時(shí)的任務(wù)
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //返回一個(gè)已經(jīng)用給定值完成的新的CompletableFuture。
- return CompletableFuture.completedFuture(results);
- }
- }
3. 測(cè)試編寫(xiě)的異步方法
- /** @author shuang.kou */
- @RestController
- @RequestMapping("/async")
- public class AsyncController {
- @Autowired
- AsyncService asyncService;
- @GetMapping("/movies")
- public String completableFutureTask() throws ExecutionException, InterruptedException {
- //開(kāi)始時(shí)間
- long start = System.currentTimeMillis();
- // 開(kāi)始執(zhí)行大量的異步任務(wù)
- List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
- List<CompletableFuture<List<String>>> completableFutureList =
- words.stream()
- .map(word -> asyncService.completableFutureTask(word))
- .collect(Collectors.toList());
- // CompletableFuture.join()方法可以獲取他們的結(jié)果并將結(jié)果連接起來(lái)
- List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
- // 打印結(jié)果以及運(yùn)行程序運(yùn)行花費(fèi)時(shí)間
- System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
- return results.toString();
- }
- }
請(qǐng)求這個(gè)接口,控制臺(tái)打印出下面的內(nèi)容:
- 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-1] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-1start this task!
- 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-6] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-6start this task!
- 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-5] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-5start this task!
- 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-4] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-4start this task!
- 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-3] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-3start this task!
- 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-2] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-2start this task!
- Elapsed time: 1010
首先我們可以看到處理所有任務(wù)花費(fèi)的時(shí)間大概是 1 s。這與我們自定義的 ThreadPoolTaskExecutor 有關(guān),我們配置的核心線程數(shù)是 6 ,然后通過(guò)通過(guò)下面的代碼模擬分配了 6 個(gè)任務(wù)給系統(tǒng)執(zhí)行。這樣每個(gè)線程都會(huì)被分配到一個(gè)任務(wù),每個(gè)任務(wù)執(zhí)行花費(fèi)時(shí)間是 1 s ,所以處理 6 個(gè)任務(wù)的總花費(fèi)時(shí)間是 1 s。
- List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
- List<CompletableFuture<List<String>>> completableFutureList =
- words.stream()
- .map(word -> asyncService.completableFutureTask(word))
- .collect(Collectors.toList());
你可以自己驗(yàn)證一下,試著去把核心線程數(shù)的數(shù)量改為 3 ,再次請(qǐng)求這個(gè)接口你會(huì)發(fā)現(xiàn)處理所有任務(wù)花費(fèi)的時(shí)間大概是 2 s。
另外,從上面的運(yùn)行結(jié)果可以看出,當(dāng)所有任務(wù)執(zhí)行完成之后才返回結(jié)果。這種情況對(duì)應(yīng)于我們需要返回結(jié)果給客戶(hù)端請(qǐng)求的情況下,假如我們不需要返回任務(wù)執(zhí)行結(jié)果給客戶(hù)端的話(huà)呢? 就比如我們上傳一個(gè)大文件到系統(tǒng),上傳之后只要大文件格式符合要求我們就上傳成功。普通情況下我們需要等待文件上傳完畢再返回給用戶(hù)消息,但是這樣會(huì)很慢。采用異步的話(huà),當(dāng)用戶(hù)上傳之后就立馬返回給用戶(hù)消息,然后系統(tǒng)再默默去處理上傳任務(wù)。這樣也會(huì)增加一點(diǎn)麻煩,因?yàn)槲募赡軙?huì)上傳失敗,所以系統(tǒng)也需要一點(diǎn)機(jī)制來(lái)補(bǔ)償這個(gè)問(wèn)題,比如當(dāng)上傳遇到問(wèn)題的時(shí)候,發(fā)消息通知用戶(hù)。
下面會(huì)演示一下客戶(hù)端不需要返回結(jié)果的情況:
將completableFutureTask方法變?yōu)?void 類(lèi)型
- @Async
- public void completableFutureTask(String start) {
- ......
- //這里可能是系統(tǒng)對(duì)任務(wù)執(zhí)行結(jié)果的處理,比如存入到數(shù)據(jù)庫(kù)等等......
- //doSomeThingWithResults(results);
- }
Controller 代碼修改如下:
- @GetMapping("/movies")
- public String completableFutureTask() throws ExecutionException, InterruptedException {
- // Start the clock
- long start = System.currentTimeMillis();
- // Kick of multiple, asynchronous lookups
- List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
- words.stream()
- .forEach(word -> asyncService.completableFutureTask(word));
- // Wait until they are all done
- // Print results, including elapsed time
- System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
- return "Done";
- }
請(qǐng)求這個(gè)接口,控制臺(tái)打印出下面的內(nèi)容:
- Elapsed time: 0
- 2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-4] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-4start this task!
- 2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-3] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-3start this task!
- 2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-2] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-2start this task!
- 2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-1] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-1start this task!
- 2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-6] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-6start this task!
- 2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-5] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-5start this task!
可以看到系統(tǒng)會(huì)直接返回給用戶(hù)結(jié)果,然后系統(tǒng)才真正開(kāi)始執(zhí)行任務(wù)。
待辦
- Future vs. CompletableFuture
- 源代碼分析
Reference
- https://spring.io/guides/gs/async-method/
- https://medium.com/trendyol-tech/spring-boot-async-executor-management-with-threadpooltaskexecutor-f493903617d





























