前言
在我们的常规写多线程的过程中,总是要实现写一个线程池,然后继承一个runnable接口等。虽然实际上也不是很复杂,但是也挺麻烦的。springboot中将线程池的工作进行了一套简化。可以让我们机遇注解的基础上更好的实现线程池的使用。
配置项
package com.study.springcloud.config.threadpool;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class AsyncExecutor {
//核心线程数
private static final int CORE_POOL_SIZE = 5;
//最大线程数
private static final int MAX_POOL_SIZE = 5;
//队列大小
private static final int QUEUE_CAPACITY = 50;
//线程池中的线程的名称前缀
private static final String THREAD_NAME = "MyExecutor-";
@Bean
public ThreadPoolTaskExecutor getAsyncExecutorMethod() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(CORE_POOL_SIZE);
//配置最大线程数
executor.setMaxPoolSize(MAX_POOL_SIZE);
//配置队列大小
executor.setQueueCapacity(QUEUE_CAPACITY);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(THREAD_NAME);
//配置线程池拒绝策略,我设置为CallerRunsPolicy,当线程和队列都满了,由发起线程的主线程自己执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
整体来说标准的线程池配置,只是声明了@Configuration可以托管给springboot进行管理,从而达到更好的使用效率。
调用
调用分两种,一种是需要等待返回值的调用,一种是不需要等待返回值的调用。
无需返回值调用
conrtoller
@Autowired
AsyncService asyncExecutor;
public CommonResult ac2() {
ConcurrentLinkedQueue<String> aa =new ConcurrentLinkedQueue();
aa.add("1");
aa.add("2");
aa.add("3");
aa.add("1");
aa.add("2");
aa.add("3");
aa.add("1");
aa.add("2");
aa.add("3");
aa.add("1");
aa.add("2");
aa.add("3");
aa.add("1");
aa.add("2");
aa.add("3");
try {
// asyncExecutor.test3();
// asyncExecutor.test4();
while(!aa.isEmpty()) {
asyncExecutor.test5(aa);
}
} catch (InterruptedException e) {
log.error("ac2 error");
}
System.out.println("hello");
return new CommonResult(444,"查询失败01",null);
}
service
@Async("getAsyncExecutorMethod")
public void test5 (ConcurrentLinkedQueue<String> aa) throws InterruptedException{
if(!aa.isEmpty())
System.out.println(aa.poll()+Thread.currentThread().getName());
// Thread.sleep(1000);
}
在service方法中,我们通过@Async注解指定我们调用的线程名称,对应的就是@Configuration中的@Bean的方法,然后此方法就被声明了异步了。然后调用也很简单,直接的controller层用声明的好的方法即可。让然实际开发并不建议这样子玩,还是乖乖的调用逻辑放到service层会更加优雅点。
结果
可看到打印出乱序的队列内容
需要等待返回值
service
// @Async("getAsyncExecutorMethod")
public String test6(ConcurrentLinkedQueue<String> aa) throws Exception{
if(!aa.isEmpty()) {
String result = aa.poll() + " " + Thread.currentThread().getName();
return result;
}
return "";
}
controller
@GetMapping("/thread/async3")
public CommonResult ac3() {
ConcurrentLinkedQueue<String> aa =new ConcurrentLinkedQueue();
aa.add("1");
aa.add("2");
aa.add("3");
aa.add("1");
aa.add("2");
aa.add("3");
aa.add("1");
aa.add("2");
aa.add("3");
aa.add("1");
aa.add("2");
aa.add("3");
aa.add("1");
aa.add("2");
aa.add("3");
StringBuffer sb = new StringBuffer();
List<CompletableFuture> futureList=new ArrayList<>();
try {
while(!aa.isEmpty()) {
CompletableFuture<String> temp =CompletableFuture.supplyAsync(()->{
try {
return asyncExecutor.test6(aa);
} catch (Exception e) {
System.out.println(e.getMessage());
throw new RuntimeException(e);
}
},aE.getAsyncExecutorMethod());
futureList.add(temp);
}
// futureList.forEach(CompletableFuture::join);
for(CompletableFuture<String> aaaa:futureList){
String tempStr=aaaa.join();
sb.append(tempStr).append(" ");
}
}
catch (Exception e){
}
// System.out.println("hello");
return new CommonResult(200,"查询成功",sb.toString());
}
如果我们需要使用等待返回值的方式的话,那么我们需要使用到一个叫CompletableFuture类,这个类有很多异步调用方法,后面可以再开专栏来讨论下。这里我们是使用supplyAsync这个调用,用lambda的方式来调用test6这个方法,需要注意的是,此时test6不能使用@Async这个注解。这是因为supplyAsync这个方法本身会注册一个线程池,如果把test6再声明成Async的话则会变成调用两次线程池,造成出乎意料外结果。并且在外面我们需要使用CompletableFuture中的join方法来等待各个线程的完成情况。
结果
题外话
我们可以看到在等待返回值的代码段中会有很多的空行,这是因为我们代码中是一个while队列不为空的死循环来确定执行次数的。但实际上在程序运行的一瞬间,线程加载没被运行时,while可以直接把线程池的等候队列打满。然后才不提交,等等候队列有空余了之后才会重新执行提交的过程。然而我们的线程配置了50个等候队列,换言之我们一共提交了50次执行,但队列元素只有15个,因此就会打出很多额外的空字符串了。