Spring Boot + Nacos 實(shí)現(xiàn)了一個(gè)動(dòng)態(tài)化線程池,非常實(shí)用!
在后臺開發(fā)中,會(huì)經(jīng)常用到線程池技術(shù),對于線程池核心參數(shù)的配置很大程度上依靠經(jīng)驗(yàn)。然而,由于系統(tǒng)運(yùn)行過程中存在的不確定性,我們很難一勞永逸地規(guī)劃一個(gè)合理的線程池參數(shù)。在對線程池配置參數(shù)進(jìn)行調(diào)整時(shí),一般需要對服務(wù)進(jìn)行重啟,這樣修改的成本就會(huì)偏高。一種解決辦法就是,將線程池的配置放到平臺側(cè),運(yùn)行開發(fā)同學(xué)根據(jù)系統(tǒng)運(yùn)行情況對核心參數(shù)進(jìn)行動(dòng)態(tài)配置。
本文以Nacos作為服務(wù)配置中心,以修改線程池核心線程數(shù)、最大線程數(shù)為例,實(shí)現(xiàn)一個(gè)簡單的動(dòng)態(tài)化線程池。
代碼實(shí)現(xiàn)
1.依賴
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <version>2021.1</version>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    <version>2021.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>2.配置yml文件
bootstrap.yml:
server:
  port: 8010
  # 應(yīng)用名稱(nacos會(huì)將該名稱當(dāng)做服務(wù)名稱)
spring:
  application:
    name: order-service
  cloud:
    nacos:
      discovery:
        namespace: public
        server-addr: 192.168.174.129:8848
      config:
        server-addr: 192.168.174.129:8848
        file-extension: ymlapplication.yml:
spring:
  profiles:
    active: dev為什么要配置兩個(gè)yml文件?
springboot中配置文件的加載是存在優(yōu)先級順序的,bootstrap優(yōu)先級高于application。
nacos在項(xiàng)目初始化時(shí),要保證先從配置中心進(jìn)行配置拉取,拉取配置之后才能保證項(xiàng)目的正常啟動(dòng)。
3.nacos配置
登錄到nacos管理頁面,新建配置,如下圖所示:
圖片
注意Data ID的命名格式為,${spring.application.name}-${spring.profile.active}.${spring.cloud.nacos.config.file-extension} ,在本文中,Data ID的名字就是order-service-dev.yml。
圖片
這里我們只配置了兩個(gè)參數(shù),核心線程數(shù)量和最大線程數(shù)。
4.線程池配置和nacos配置變更監(jiān)聽
@RefreshScope
@Configuration
public class DynamicThreadPool implements InitializingBean {
    @Value("${core.size}")
    private String coreSize;
 
    @Value("${max.size}")
    private String maxSize;
 
    private static ThreadPoolExecutor threadPoolExecutor;
 
    @Autowired
    private NacosConfigManager nacosConfigManager;
 
    @Autowired
    private NacosConfigProperties nacosConfigProperties;
 
    @Override
    public void afterPropertiesSet() throws Exception {
        //按照nacos配置初始化線程池
        threadPoolExecutor = new ThreadPoolExecutor(Integer.parseInt(coreSize), Integer.parseInt(maxSize), 10L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10),
                new ThreadFactoryBuilder().setNameFormat("c_t_%d").build(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("rejected!");
                    }
                });
 
        //nacos配置變更監(jiān)聽
        nacosConfigManager.getConfigService().addListener("order-service-dev.yml", nacosConfigProperties.getGroup(),
                new Listener() {
                    @Override
                    public Executor getExecutor() {
                        return null;
                    }
 
                    @Override
                    public void receiveConfigInfo(String configInfo) {
                        //配置變更,修改線程池配置
                        System.out.println(configInfo);
                        changeThreadPoolConfig(Integer.parseInt(coreSize), Integer.parseInt(maxSize));
                    }
                });
    }
 
    /**
     * 打印當(dāng)前線程池的狀態(tài)
     */
    public String printThreadPoolStatus() {
        return String.format("core_size:%s,thread_current_size:%s;" +
                        "thread_max_size:%s;queue_current_size:%s,total_task_count:%s", threadPoolExecutor.getCorePoolSize(),
                threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getQueue().size(),
                threadPoolExecutor.getTaskCount());
    }
 
    /**
     * 給線程池增加任務(wù)
     *
     * @param count
     */
    public void dynamicThreadPoolAddTask(int count) {
        for (int i = 0; i < count; i++) {
            int finalI = i;
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(finalI);
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
 
    /**
     * 修改線程池核心參數(shù)
     *
     * @param coreSize
     * @param maxSize
     */
    private void changeThreadPoolConfig(int coreSize, int maxSize) {
        threadPoolExecutor.setCorePoolSize(coreSize);
        threadPoolExecutor.setMaximumPoolSize(maxSize);
    }
}這個(gè)代碼就是實(shí)現(xiàn)動(dòng)態(tài)線程池和核心了,需要說明的是:
- @RefreshScope:這個(gè)注解用來支持nacos的動(dòng)態(tài)刷新功能;
 - @Value("${max.size}"),@Value("${core.size}"):這兩個(gè)注解用來讀取我們上一步在nacos配置的具體信息;同時(shí),nacos配置變更時(shí),能夠?qū)崟r(shí)讀取到變更后的內(nèi)容
 - nacosConfigManager.getConfigService().addListener:配置監(jiān)聽,nacos配置變更時(shí)實(shí)時(shí)修改線程池的配置。
 
5.controller
為了觀察線程池動(dòng)態(tài)變更的效果,增加Controller類。
@RestController
@RequestMapping("/threadpool")
public class ThreadPoolController {
 
    @Autowired
    private DynamicThreadPool dynamicThreadPool;
 
    /**
     * 打印當(dāng)前線程池的狀態(tài)
     */
    @GetMapping("/print")
    public String printThreadPoolStatus() {
        return dynamicThreadPool.printThreadPoolStatus();
    }
 
    /**
     * 給線程池增加任務(wù)
     *
     * @param count
     */
    @GetMapping("/add")
    public String dynamicThreadPoolAddTask(int count) {
        dynamicThreadPool.dynamicThreadPoolAddTask(count);
        return String.valueOf(count);
    }
}6.測試
啟動(dòng)項(xiàng)目,訪問http://localhost:8010/threadpool/print打印當(dāng)前線程池的配置。
圖片
可以看到,這個(gè)就是我們之前在nacos配置的線程數(shù)。
訪問http://localhost:8010/threadpool/add?count=20增加20個(gè)任務(wù),重新打印線程池配置
圖片
可以看到已經(jīng)有線程在排隊(duì)了。
為了能夠看到效果,我們多訪問幾次/add接口,增加任務(wù)數(shù),在控制臺出現(xiàn)拒絕信息時(shí)調(diào)整nacos配置。
圖片
此時(shí),執(zhí)行/add命令時(shí),所有的線程都會(huì)提示rejected。
調(diào)整nacos配置,將核心線程數(shù)調(diào)整為50,最大線程數(shù)調(diào)整為100.
圖片
重新多次訪問/add接口增加任務(wù),發(fā)現(xiàn)沒有拒絕信息了。這時(shí),打印具體的線程狀態(tài),發(fā)現(xiàn)線程池參數(shù)修改成功。

總結(jié)
這里,只是簡單實(shí)現(xiàn)了一個(gè)可以調(diào)整核心線程數(shù)和最大線程數(shù)的動(dòng)態(tài)線程池。具體的線程池實(shí)現(xiàn)原理可以參考美團(tuán)的這篇文章:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html,結(jié)合監(jiān)控告警等實(shí)現(xiàn)一個(gè)完善的動(dòng)態(tài)線程池產(chǎn)品。
優(yōu)秀的輪子還有好多,比如Hippo4J ,使用起來和dynamic-tp差不多。Hippo4J 有無依賴中間件實(shí)現(xiàn)動(dòng)靜線程池,也有默認(rèn)實(shí)現(xiàn)Nacos和Apollo的版本,而dynamic-tp 默認(rèn)實(shí)現(xiàn)依賴Nacos或Apollo。















 
 
 
















 
 
 
 