Spring线程池ThreadPoolTaskExecutor的使用

1 线程池简介

1.1 为什么使用线程池

  • 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
  • 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
  • 方便线程并发数的管控,因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场)
  • 提供更强大的功能,延时定时线程池

1.2 线程池为什么需要使用队列

因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换。

创建线程池的消耗较高或者线程池创建线程需要获取mainlock这个全局锁,影响并发效率,阻塞队列可以很好的缓冲

1.3 线程池为什么要使用阻塞队列而不使用非阻塞队列

阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源,当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。
使得在线程不至于一直占用cpu资源。(线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如:while (task != null || (task = getTask()) != null) {})。

不用阻塞队列也是可以的,不过实现起来比较麻烦而已,有好用的为啥不用呢

1.4 如何配置线程池

  • CPU密集型任务
    尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换

  • IO密集型任务
    可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间

  • 混合型任务
    可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效
    因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。
    因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失

1.5 execute()和submit()方法

  1. execute(),执行一个任务,没有返回值
  2. submit(),提交一个线程任务,有返回值

submit(Callable<T> task)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。一般使用FutureTask+Callable配合使用
submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值。
submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null

Future.get()方法会使取结果的线程进入阻塞状态,直到线程执行完成之后,唤醒取结果的线程,然后返回结果

1.6 Spring线程池

Spring 通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用ThreadPoolTaskExecutor实现一个基于线程池的TaskExecutor
还得需要使用@EnableAsync开启异步,并通过在需要的异步方法那里使用注解@Async声明是一个异步任务
Spring 已经实现的异常线程池:

  • SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
  • SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
  • ConcurrentTaskExecutorExecutor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
  • SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类
  • ThreadPoolTaskExecutor:最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装

1.7 @Async调用中的事务处理机制

点击了解使用@Async使用的事务问题

2 示例

2.1 线程池配置类

package cn.jzh.thread;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@ComponentScan("cn.jzh.thread")
@EnableAsync  //开启异步操作
public class TaskExecutorConfig implements AsyncConfigurer {

    /**
     * 通过getAsyncExecutor方法配置ThreadPoolTaskExecutor,获得一个基于线程池TaskExecutor
     *
     * @return
     */
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(5);//核心线程数
        pool.setMaxPoolSize(10);//最大线程数
        pool.setQueueCapacity(25);//线程队列
        pool.initialize();//线程初始化
        return pool;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

配置类中方法说明:
Spring 中的ThreadPoolExecutor是借助JDK并发包中的java.util.concurrent.ThreadPoolExecutor来实现的。其中一些值的含义如下:

  • int corePoolSize:线程池维护线程的最小数量
  • int maximumPoolSize:线程池维护线程的最大数量,线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。
  • long keepAliveTime:空闲线程的存活时间TimeUnit
  • unit:时间单位,现由纳秒,微秒,毫秒,秒
  • BlockingQueue workQueue:持有等待执行的任务队列
  • RejectedExecutionHandler handler 线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。
    当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。
    Reject策略预定义有四种:
  1. ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException
  2. ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.
  3. ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃.
  4. ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如
    果再次失败,则重复此过程)
  5. 自定义策略:当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务

2.2 异步方法

@Async注解可以用在方法上,表示该方法是个异步方法,也可以用在类上,那么表示此类的所有方法都是异步方法
异步方法会自动注入使用ThreadPoolTaskExecutor作为TaskExecutor

package cn.jzh.thread;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Future;

@Service
public class AsyncTaskService {
    /**
     * 
     * @param i
     */
    @Async
    public void executeAsync(Integer i) throws Exception{
        System.out.println("线程ID:" + Thread.currentThread().getId() + "线程名字:" +Thread.currentThread().getName()+"执行异步任务:" + i);
    }

    @Async
    public Future<String> executeAsyncPlus(Integer i) throws Exception {
        System.out.println("线程ID:" + Thread.currentThread().getId() +"线程名字:" +Thread.currentThread().getName()+ "执行异步有返回的任务:" + i);
        return new AsyncResult<>("success:"+i);
    }

}

2.3 启动测试

package cn.jzh.thread;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.concurrent.Future;

public class MainApp {
    public static void main(String[] args) throws Exception{
        System.out.println("主线程id:" + Thread.currentThread().getId() + "开始执行调用任务...");
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
        AsyncTaskService service = context.getBean(AsyncTaskService.class);
        for (int i = 0;i<10;i++){
            service.executeAsync(i);
            Future<String> result = service.executeAsyncPlus(i);
            System.out.println("异步程序执行结束,获取子线程返回内容(会阻塞当前main线程)" + result.get());
        }
        context.close();

        System.out.println("主线程id:" + Thread.currentThread().getId() + "程序结束!!");
    }
}

注意:

  1. 是否影响主线程
    如果main主线程不去获取子线程的结果(Future.get()),那么主线程完全可以不阻塞。那么,此时,主线程和子线程完全异步。此功能,可以做成类似MQ消息中间件之类的,消息异步进行发送
  2. 判断是否执行完毕
    当返回的数据类型为Future类型,其为一个接口。具体的结果类型为AsyncResult,这个是需要注意的地方。
    调用返回结果的异步方法,判断是否执行完毕时需要使用future.isDone()来判断是否执行完毕
public void testAsyncAnnotationForMethodsWithReturnType()  
   throws InterruptedException, ExecutionException {  
    System.out.println("Invoking an asynchronous method. "   + Thread.currentThread().getName());  
    Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();  
   
    while (true) {  ///这里使用了循环判断,等待获取结果信息  
        if (future.isDone()) {  //判断是否执行完毕  
            System.out.println("Result from asynchronous process - " + future.get());  
            break;  
        }  
        System.out.println("Continue doing something else. ");  
        Thread.sleep(1000);  
    }  
}

这些获取异步方法的结果信息,是通过不停的检查Future的状态来获取当前的异步方法是否执行完毕来实现的

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容