多线程一直是工作或面试过程中的高频知识点,今天给大家分享一下使用 Spring 的 ThreadPoolTaskExecutor 来自定义线程池和实现异步调用多线程。
一、ThreadPoolTaskExecutor
本文采用 Executors 的工厂方法进行配置。
1、将线程池用到的参数定义到配置文件中
在项目的 resources 目录下创建 executor.properties 文件,并添加如下配置:
1 2 3 4 5 6 7 8 9 10 11
| # 异步线程配置 # 核心线程数 async.executor.thread.core_pool_size=5 # 最大线程数 async.executor.thread.max_pool_size=8 # 任务队列大小 async.executor.thread.queue_capacity=2 # 线程池中线程的名称前缀 async.executor.thread.name.prefix=async-service- # 缓冲队列中线程的空闲时间 async.executor.thread.keep_alive_seconds=100
|
2、Executors 的工厂配置
2.1、配置详情
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Configuration
@PropertySource(value = {"classpath:executor.properties"}, ignoreResourceNotFound=false, encoding="UTF-8") @Slf4j public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Value("${async.executor.thread.keep_alive_seconds}") private int keepAliveSeconds;
@Bean(name = "asyncTaskExecutor") public ThreadPoolTaskExecutor taskExecutor() { log.info("启动"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix(namePrefix); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
|
2.2、注解说明
- @Configuration:Spring 容器在启动时,会加载带有 @Configuration 注解的类,对其中带有 @Bean 注解的方法进行处理。
- @Bean:是一个方法级别上的注解,主要用在 @Configuration 注解的类里,也可以用在 @Component 注解的类里。添加的 bean 的 id 为方法名。
- @PropertySource:加载指定的配置文件。value 值为要加载的配置文件,ignoreResourceNotFound 意思是如果加载的文件找不到,程序是否忽略它。默认为 false 。如果为 true ,则代表加载的配置文件不存在,程序不报错。在实际项目开发中,最好设置为 false 。如果 application.properties 文件中的属性与自定义配置文件中的属性重复,则自定义配置文件中的属性值被覆盖,加载的是 application.properties 文件中的配置属性。
- @Slf4j:lombok 的日志输出工具,加上此注解后,可直接调用 log 输出各个级别的日志。
- @Value:调用配置文件中的属性并给属性赋予值。
2.3、线程池配置说明
- 核心线程数:线程池创建时候初始化的线程数。当线程数超过核心线程数,则超过的线程则进入任务队列。
- 最大线程数:只有在任务队列满了之后才会申请超过核心线程数的线程。不能小于核心线程数。
任务队列:线程数大于核心线程数的部分进入任务队列。如果任务队列足够大,超出核心线程数的线程不会被创建,它会等待核心线程执行完它们自己的任务后再执行任务队列的任务,而不会再额外地创建线程。举例:如果有20个任务要执行,核心线程数:10,最大线程数:20,任务队列大小:2。则系统会创建18个线程。这18个线程有执行完任务的,再执行任务队列中的任务。
线程的空闲时间:当 线程池中的线程数量 大于 核心线程数 时,如果某线程空闲时间超过 keepAliveTime ,线程将被终止。这样,线程池可以动态的调整池中的线程数。
- 拒绝策略:如果(总任务数 - 核心线程数 - 任务队列数)-(最大线程数 - 核心线程数)> 0 的话,则会出现线程拒绝。举例:( 12 - 5 - 2 ) - ( 8 - 5 ) > 0,会出现线程拒绝。线程拒绝又分为 4 种策略,分别为:
- CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
- AbortPolicy():直接抛出异常。
- DiscardPolicy():直接丢弃。
- DiscardOldestPolicy():丢弃队列中最老的任务。
2.4、线程池配置个人理解
- 当一个任务被提交到线程池时,首先查看线程池的核心线程是否都在执行任务。如果没有,则选择一条线程执行任务。
- 如果核心线程都在执行任务,查看任务队列是否已满。如果不满,则将任务存储在任务队列中。核心线程执行完自己的任务后,会再处理任务队列中的任务。
- 如果任务队列已满,查看线程池(最大线程数控制)是否已满。如果不满,则创建一条线程去执行任务。如果满了,就按照策略处理无法执行的任务。
二、异步调用线程
通常 ThreadPoolTaskExecutor 是和 @Async 一起使用。在一个方法上添加 @Async 注解,表明是异步调用方法函数。@Async 后面加上线程池的方法名或 bean 名称,表明异步线程会加载线程池的配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Component @Slf4j public class ThreadTest {
@Async("asyncTaskExecutor") public void ceshi3() { for (int i = 0; i <= 10; i++) { log.info("ceshi3: " + i); try { Thread.sleep(2000 * 5); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
备注:一定要在启动类上添加 @EnableAsync 注解,这样 @Async 注解才会生效。
注意事项:
如下方式会使 @Async 失效:
- 异步方法使用 static 修饰。
- 异步类没有使用 @Component 注解(或其他注解)导致 spring 无法扫描到异步类。
- 异步方法不能与异步方法在同一个类中。
- 类中需要使用 @Autowired 或 @Resource 等注解自动注入,不能自己手动 new 对象。
- 如果使用 SpringBoot 框架必须在启动类中增加 @EnableAsync 注解。
- 在 @Async 方法上标注 @Transactional 是没用的。 在 @Async 方法调用的方法上标注 @Transactional 有效。
三、多线程使用场景
1、定时任务 @Scheduled
1 2 3 4 5 6 7 8
| @SpringBootApplication @EnableScheduling public class SpringBootStudyApplication { public static void main(String[] args) { SpringApplication.run(SpringBootStudyApplication.class, args); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class listennerTest3 {
@Autowired private ThreadTest t; @Scheduled(cron = "0 0/1 * * * ?") public void run() { t.ceshi3(); } }
|
ceshi3() 方法调用线程池配置,且异步执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Component @Slf4j public class ThreadTest {
@Async("asyncTaskExecutor") public void ceshi3() { for (int i = 0; i <= 10; i++) { log.info("ceshi3: " + i); try { Thread.sleep(2000 * 5); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
2、程序一启动就异步执行多线程
通过继承 CommandLineRunner 类实现。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class ListennerTest implements CommandLineRunner {
@Autowired private ThreadTest t;
@Override public void run(String... args) { for (int i = 1; i <= 10; i++) { t.ceshi(); } } }
|
1 2 3 4 5 6 7 8 9
| @Component @Slf4j public class ThreadTest {
@Async("asyncTaskExecutor") public void ceshi() { log.info("ceshi"); } }
|
3、定义一个 http 接口
还可以通过接口的形式来异步调用多线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @RestController @RequestMapping("thread") public class ListennerTest2 {
@Autowired private ThreadTest t;
@GetMapping("ceshi2") public void run() { for (int i = 1; i < 10; i++) { t.ceshi2(); } } }
|
1 2 3 4 5 6 7 8 9 10 11
| @Component @Slf4j public class ThreadTest {
@Async("asyncTaskExecutor") public void ceshi2() { for (int i = 0; i <= 3; i++) { log.info("ceshi2"); } } }
|
4、测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @RunWith(SpringRunner.class) @SpringBootTest public class ThreadRunTest {
@Autowired private ThreadTest t;
@Test public void thread1() { for (int i = 1; i <= 10; i++) { t.ceshi4(); } } }
|
1 2 3 4 5 6 7 8
| @Component @Slf4j public class ThreadTest { @Async("asyncTaskExecutor") public void ceshi4() { log.info("ceshi4"); } }
|
四、总结
以上主要介绍了 ThreadPoolTaskExecutor 线程池的配置、使用、相关注解的意义及作用,也简单介绍了使用 @Async 来异步调用线程,最后又列举了多线程的使用场景,并配上了代码示例。希望大家喜欢。
五、补充
也可以不使用 @Async 来异步调用线程,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.xxx.xxx.runner;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component;
@Component public class xxxService implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(xxxService.class);
@Autowired private ThreadPoolTaskExecutor taskExecutor;
public void testExecutor(final String str) { taskExecutor.execute(new Runnable() { @Override public void run() { log.info(Thread.currentThread().getName() + "--" + str); } }); }
@Override public void run(String... args) throws Exception { for (int i = 1; i <= 10; i++) { testExecutor("asd"); } } }
|