0%

SpringBoot中线程池的使用

SpringBoot中线程池的使用

SpringBoot两种配置线程池的方式

1、自定义配置

2、修改原生Spring异步线程池的装配

自定义线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
@EnableAsync // 开启线程池

public class TaskExecutePool{

@Autowired
private TaskThreadPoolConfig config;

@Bean
public Executor myTaskAsyncPool(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// 核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());

// 最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());

// 队列容量
executor.setQueueCapacity(config.getQueueCapacity());

// 活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());

// 线程名字前缀
executor.setThreadNamePrefix(
"MyExecutor-"
);

// setRejectedExecutionHandler:当 pool 已经达到 max size 的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();

return executor;
}
}

修改原生Spring异步线程池的装配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Configuration
@EnableAsync

public class NativeAsyncTaskExecutePool implements AsyncConfigurer{

private Logger logger = LoggerFactory.getLogger(this.getClass());

// 注入配置类
@Autowired
TaskThreadPoolConfig config;

@Override
public Executor getAsyncExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// 核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());

// 最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());

// 队列容量
executor.setQueueCapacity(config.getQueueCapacity());

// 活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());

// 线程名字前缀
executor.setThreadNamePrefix("MyExecutor2-");

// setRejectedExecutionHandler:当 pool 已经达到 max size 的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();

return executor;
}


/**
* 异步任务中异常处理
* @return
*/

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){

return (ex, method, objects) -> {
logger.error("=========================="+ex.getMessage()+"=======================", ex);
logger.error("exception method:"+method.getName());
};
}
}
线程池配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class TaskThreadPoolConfig{

@Value("${task.pool.corePoolSize}")

private int corePoolSize;

@Value("${task.pool.maxPoolSize}")

private int maxPoolSize;

@Value("${task.pool.keepAliveSeconds}")

private int keepAliveSeconds;

@Value("${task.pool.queueCapacity}")

private int queueCapacity;

......// 省略 get(),set() 方法
}
配置文件配置线程池大小
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# spring 线程池
task:
pool:
#核心线程池
corePoolSize: 500

#最大线程池
maxPoolSize: 1000

#活跃时间
keepAliveSeconds: 300

#队列容量
queueCapacity: 50
需要异步线程执行的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class AsyncTask{

protected final Logger logger = LoggerFactory.getLogger(this.getClass());

@Async("myTaskAsyncPool") //myTaskAsynPool 即配置线程池的方法名,此处如果不写自定义线程池的方法名,会使用默认的线程池

public void doTask1(int i){
logger.info("Task"+i+"started.");
}

@Async// 使用默认的线程池
public void doTask2(inti){
if(i == 0){
throw new NullPointerException();
}
logger.info("Task2-Native"+i+"started.");
}

@Async// 使用默认的线程池并返回参数
public ListenableFuture<String> doTask3(int i){

logger.info("Task3- 返回值"+i+"started.");

return new AsyncResult<>(i + "");
}
}
获取线程池并执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void AsyncTaskTest(){

for(int i = 0; i < 10000; i++){
try{
// 自定义线程池
asyncTask.doTask1(i);
//spring 异步线程池
asyncTask.doTask2(i);

String text = asyncTask.doTask3(i).get();// 阻塞调用
System.out.println(text);
String context = asyncTask.doTask3(i).get(1, TimeUnit.SECONDS);// 限时调用
System.out.println(context);
} catch(InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
}
logger.info("All tasks finished.");
}