springBoot 多线程+线程池处理+等待获取执行结果 (Future)(CompletableFuture)

springBoot 多线程+线程池处理+等待获取执行结果Future

Java 线程池

Java通过Executors提供四种线程池,分别为:

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

优点
重用存在的线程,减少对象创建、消亡的开销,性能佳。
可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
提供定时执行、定期执行、单线程、并发数控制等功能。

在springboot项目中一般使用方法二。

一、方法一(CountDownLatch)

public class StatsDemo {
    final static SimpleDateFormat sdf = new SimpleDateFormat(
            "yyyy-MM-dd HH:mm:ss");

    final static String startTime = sdf.format(new Date());

    /**
     * IO密集型任务  = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
     * CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
     * 混合型任务  = 视机器配置和复杂度自测而定
     */
    private static int corePoolSize = Runtime.getRuntime().availableProcessors();
    /**
     * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
     *                           TimeUnit unit,BlockingQueue<Runnable> workQueue)
     * corePoolSize用于指定核心线程数量
     * maximumPoolSize指定最大线程数
     * keepAliveTime和TimeUnit指定线程空闲后的最大存活时间
     * workQueue则是线程池的缓冲队列,还未执行的线程会在队列中等待
     * 监控队列长度,确保队列有界
     * 不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。
     * 而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。
     * ExecutorService 默认的实现是一个无界的 LinkedBlockingQueue。
     */
    private static ThreadPoolExecutor executor  = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(1000));

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        //使用execute方法
          executor.execute(new Stats("任务A", 1000, latch));
          executor.execute(new Stats("任务B", 1000, latch));
          executor.execute(new Stats("任务C", 1000, latch));
          executor.execute(new Stats("任务D", 1000, latch));
          executor.execute(new Stats("任务E", 1000, latch));
        latch.await();// 等待所有人任务结束
        System.out.println("所有的统计任务执行完成:" + sdf.format(new Date()));
    }

    static class Stats implements Runnable  {
        String statsName;
        int runTime;
        CountDownLatch latch;

        public Stats(String statsName, int runTime, CountDownLatch latch) {
            this.statsName = statsName;
            this.runTime = runTime;
            this.latch = latch;
        }

        public void run() {
            try {
                System.out.println(statsName+ " do stats begin at "+ startTime);
                //模拟任务执行时间
                Thread.sleep(runTime);
                System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
                latch.countDown();//单次任务结束,计数器减一
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

二、方法二(Future)

重点是和springBoot整合,采用注解bean方式生成ThreadPoolTaskExecutor
在springBoot项目中开启异步线程需要满足一下几点

  • 在启动类加入异步线程注解@EnableAsync
  • 创建线程池并创建Bean实例

1. 自定义线程池

@Configuration
//@EnableAsync
public class ThreadPoolConfig
{

    /**
     *   默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
     *  当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
     *  当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
     */

    /**
     * 获得Java虚拟机可用的处理器个数 + 1
     */
    private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;

    @Value("${async.executor.thread.core_pool_size:0}")
    public static int corePoolSizeConfig;
    // 核心线程池大小
    public static int corePoolSize = corePoolSizeConfig ==0 ? THREADS : corePoolSizeConfig;

    // 最大可创建的线程数
    //@Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize = 2 * THREADS;;

    // 队列最大长度
    //@Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity = 1024;

    // 线程池维护线程所允许的空闲时间
    //@Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds = 300;


    //线程池名前缀 
    //@Value("${async.executor.thread.threadNamePrefix}")
    private static final String threadNamePrefix = "Async-Service-";

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(maxPoolSize);
        executor.setCorePoolSize(corePoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setThreadNamePrefix(threadNamePrefix);
        // 线程池对拒绝任务(无线程可用)的处理策略
       // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
      // 初始化
        executor.initialize();
        return executor;
    }

}

2. 异步执行方法

启动类添加@EnableAsync注解

@SpringBootApplication
@EnableAsync
@EnableScheduling
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

service层方法

@Service
public class AsyncInvokeService {

    @Async("threadPoolTaskExecutor")
    public Future<Boolean> exec1(String name) {
        System.out.println("子线程 name -->" + Thread.currentThread().getName());
        System.out.println(name);
        Thread.sleep(10000);
        return new AsyncResult<>(true);
    }

    @Async("threadPoolTaskExecutor")
    public Future<Boolean> exec2(String phone) {
        System.out.println("子线程 name -->" + Thread.currentThread().getName());
        System.out.println(phone);
        Thread.sleep(10000);
        return new AsyncResult<>(true);
    }

3. 多线程执行返回结果

    @GetMapping("/gettest")
    public String b() throws InterruptedException, ExecutionException {
        Future<Boolean> future1 = asyncInvokeService.exec1("张三");
        Future<Boolean> future2 = asyncInvokeService.exec2("15618881888");

        List<Future<Boolean>> futureList = new ArrayList<>();      
        futureList.add(future1);
        futureList.add(future2);

        //查询任务执行的结果
        for (Future<?> future : futureList) {
            while (true) {//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
                if (future.isDone() && !future.isCancelled()) { //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
                    Boolean result = future.get();//获取结果
                    System.out.println("任务i=" + i + "获取完成!" + new Date());
                    list.add(result);
                    break;//当前future获取结果完毕,跳出while
                } else {
                    Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
                }
            }
        }
        return "执行成功!!";
    }

三、方法三(CompletableFuture)

示例:

1、线程池

@Configuration
//@EnableAsync
public class ThreadPoolConfig {

    /**
     *   默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
     *  当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
     *  当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
     */

    /**
     * 获得Java虚拟机可用的处理器个数 + 1
     */
    private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;

    //    @Value("${async.executor.thread.core_pool_size:0}") //可在配置文件配置核心线程数
    public static int corePoolSizeConfig = 0;
    // 核心线程池大小
    public static int coreIoPoolSize = (corePoolSizeConfig == 0 ? THREADS : corePoolSizeConfig) * 4;

    // 最大可创建的线程数
    //@Value("${async.executor.thread.max_pool_size}")
    private int maxIoPoolSize = 4 * 2 * THREADS;

    // 核心线程池大小
    public static int coreCpuPoolSize = corePoolSizeConfig == 0 ? THREADS : corePoolSizeConfig;

    // 最大可创建的线程数
    //@Value("${async.executor.thread.max_pool_size}")
    private int maxCpuPoolSize = 2 * THREADS;


    // 队列最大长度
    //@Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity = 1024;

    // 线程池维护线程所允许的空闲时间
    //@Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds = 60;


    //线程池名前缀
    //@Value("${async.executor.thread.threadNamePrefix}")
    private static final String threadNamePrefix = "Async-Service-";

    /**
     * IO 密集类型线程池 (corePoolSize 核心线程 和 maxPoolSize最大线程数比cpu核数翻4倍)
     *
     * @return
     */
    @Bean(name = "threadIoPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadIoPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(maxIoPoolSize);
        executor.setCorePoolSize(coreIoPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setThreadNamePrefix(threadNamePrefix);
        // 线程池对拒绝任务(无线程可用)的处理策略
        // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }

    /**
     * cpu 密集类型线程池
     *
     * @return
     */
    @Bean(name = "threadCpuPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadCpuPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(maxCpuPoolSize);
        executor.setCorePoolSize(coreCpuPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setThreadNamePrefix(threadNamePrefix);
        // 线程池对拒绝任务(无线程可用)的处理策略
        // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }


}

2、调用

@Service
@Slf4j
public class TestServiceImpl  implements TestService {
    @Autowired
    private Executor threadIoPoolTaskExecutor;
    
    public void testCompletableFuture(){

        //并行调用 提供执行效率
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> testAsync("a"), threadIoPoolTaskExecutor);
        CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> testAsync("b"), threadIoPoolTaskExecutor);
        CompletableFuture<String> c = CompletableFuture.supplyAsync(() -> testAsync("c"), threadIoPoolTaskExecutor);

        try {
            String resultA = a.get();
            String resultB = b.get();
            String resultC = c.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public String testAsync(String s){
        //耗时操作
        
        return s;
    }
}

参考:
SpringBoot线程池ThreadPoolExecutor
SpringBoot线程池ThreadPoolTaskExecutor

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容