前言

在我们的常规写多线程的过程中,总是要实现写一个线程池,然后继承一个runnable接口等。虽然实际上也不是很复杂,但是也挺麻烦的。springboot中将线程池的工作进行了一套简化。可以让我们机遇注解的基础上更好的实现线程池的使用。

配置项

package com.study.springcloud.config.threadpool;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration

public class AsyncExecutor {
    //核心线程数
    private static final int CORE_POOL_SIZE = 5;

    //最大线程数
    private static final int MAX_POOL_SIZE = 5;

    //队列大小
    private static final int QUEUE_CAPACITY = 50;

    //线程池中的线程的名称前缀
    private static final String THREAD_NAME = "MyExecutor-";


    @Bean
    public ThreadPoolTaskExecutor getAsyncExecutorMethod() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(CORE_POOL_SIZE);
        //配置最大线程数
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        //配置队列大小
        executor.setQueueCapacity(QUEUE_CAPACITY);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(THREAD_NAME);
        //配置线程池拒绝策略,我设置为CallerRunsPolicy,当线程和队列都满了,由发起线程的主线程自己执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

整体来说标准的线程池配置,只是声明了@Configuration可以托管给springboot进行管理,从而达到更好的使用效率。

调用

调用分两种,一种是需要等待返回值的调用,一种是不需要等待返回值的调用。

无需返回值调用

conrtoller
    @Autowired
    AsyncService asyncExecutor;
  public CommonResult ac2() {
        ConcurrentLinkedQueue<String> aa =new ConcurrentLinkedQueue();
        aa.add("1");
        aa.add("2");
        aa.add("3");
        aa.add("1");
        aa.add("2");
        aa.add("3");
        aa.add("1");
        aa.add("2");
        aa.add("3");
        aa.add("1");
        aa.add("2");
        aa.add("3");
        aa.add("1");
        aa.add("2");
        aa.add("3");
        try {
//            asyncExecutor.test3();
//            asyncExecutor.test4();
            while(!aa.isEmpty()) {
                asyncExecutor.test5(aa);
            }
        } catch (InterruptedException e) {
            log.error("ac2 error");
        }
        System.out.println("hello");
        return new CommonResult(444,"查询失败01",null);
    }

service

    @Async("getAsyncExecutorMethod")
    public void test5 (ConcurrentLinkedQueue<String> aa) throws InterruptedException{
        if(!aa.isEmpty())
            System.out.println(aa.poll()+Thread.currentThread().getName());
//            Thread.sleep(1000);
    }

在service方法中,我们通过@Async注解指定我们调用的线程名称,对应的就是@Configuration中的@Bean的方法,然后此方法就被声明了异步了。然后调用也很简单,直接的controller层用声明的好的方法即可。让然实际开发并不建议这样子玩,还是乖乖的调用逻辑放到service层会更加优雅点。

结果

Snipaste_2024-01-28_16-48-38
可看到打印出乱序的队列内容

需要等待返回值

service

//    @Async("getAsyncExecutorMethod")
    public String test6(ConcurrentLinkedQueue<String> aa) throws Exception{
        if(!aa.isEmpty()) {
            String result = aa.poll() + " " + Thread.currentThread().getName();
            return result;
        }
        return "";
    }

controller

    @GetMapping("/thread/async3")
    public CommonResult ac3() {
        ConcurrentLinkedQueue<String> aa =new ConcurrentLinkedQueue();
        aa.add("1");
        aa.add("2");
        aa.add("3");
        aa.add("1");
        aa.add("2");
        aa.add("3");
        aa.add("1");
        aa.add("2");
        aa.add("3");
        aa.add("1");
        aa.add("2");
        aa.add("3");
        aa.add("1");
        aa.add("2");
        aa.add("3");
        StringBuffer sb = new StringBuffer();
        List<CompletableFuture> futureList=new ArrayList<>();
        try {
            while(!aa.isEmpty()) {
                CompletableFuture<String> temp =CompletableFuture.supplyAsync(()->{
                    try {

                        return asyncExecutor.test6(aa);
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                        throw new RuntimeException(e);
                    }
                },aE.getAsyncExecutorMethod());
                futureList.add(temp);
            }
//            futureList.forEach(CompletableFuture::join);
            for(CompletableFuture<String> aaaa:futureList){
                String tempStr=aaaa.join();
                sb.append(tempStr).append(" ");
            }
        }
        catch (Exception e){

        }
//        System.out.println("hello");
        return new CommonResult(200,"查询成功",sb.toString());
    }

如果我们需要使用等待返回值的方式的话,那么我们需要使用到一个叫CompletableFuture类,这个类有很多异步调用方法,后面可以再开专栏来讨论下。这里我们是使用supplyAsync这个调用,用lambda的方式来调用test6这个方法,需要注意的是,此时test6不能使用@Async这个注解。这是因为supplyAsync这个方法本身会注册一个线程池,如果把test6再声明成Async的话则会变成调用两次线程池,造成出乎意料外结果。并且在外面我们需要使用CompletableFuture中的join方法来等待各个线程的完成情况。

结果

Snipaste_2024-01-28_20-12-22

题外话

我们可以看到在等待返回值的代码段中会有很多的空行,这是因为我们代码中是一个while队列不为空的死循环来确定执行次数的。但实际上在程序运行的一瞬间,线程加载没被运行时,while可以直接把线程池的等候队列打满。然后才不提交,等等候队列有空余了之后才会重新执行提交的过程。然而我们的线程配置了50个等候队列,换言之我们一共提交了50次执行,但队列元素只有15个,因此就会打出很多额外的空字符串了。