为什么要使用线程池?
引用自 http://ifeve.com/java-threadpool/ 的说明:
1.频繁的创建和销毁线程会影响系统的性能,线程池可以通过重复利用已创建的线程来降低线程创建和销毁造成的资源消耗;
2.提高响应的速度。当任务到达时,任务可以不需要等到线程的创建就能立刻执行任务。
3.提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,管理与监控。
线程池的类图:
ThreadPoolExecutor的构造方法:
1.
corePoolSize:核心线程数量,当有新的任务提交时,会在线程池创建一个新的线程,直到线程数量达到核心线程数量为止。
maximumPoolSize:线程池允许的最大线程数量。
keepAliveTime:当线程池中线程的数量大于corePoolSize的时候,没有新的任务提交的话,超过corePoolSize所允许的等待时间。
unit:时间的单位。可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS)和毫微秒(NANOSECONDS)。
workQueue:阻塞队列。当线程池中的线程数量已经达到corePoolSize之后,再通过execute添加的新的任务会被加到workQueue队列中。
threadFactory:创建新线程的工厂。
handler:当线程池和队列都满了的时候,执行的一种策略,用来处理新提交的任务。有以下4种策略:
1.AbortPolicy:直接抛出异常(默认);
2.CallerRunsPolicy:使用调用者所在的线程来执行任务;
3.DiscardOldestPolicy:丢弃阻塞队列中考前的任务,并执行当前的任务;
4.DiscardPolicy:直接丢弃任务;
SynchronousQueue:直接提交队列,该队列没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。所以他不保存任务,总是将任务提交给线程执行,如果没有空闲的线程,则创建新的线程,当线程数量达到最大,则执行拒绝策略。
ArrayBlockingQueue:有界任务队列,线程池的线程数小于corePoolSize,则创建新的线程,大于corePoolSize,则将新的任务加入等待队列。若等待队列已满,则在总线程不大于maximumPoolSize下,创建新的线程执行任务,大于maximumPoolSize则执行拒绝策略。
LinkedBlockingQueue:无界队列,除非系统资源耗尽,否则不存在任务入队失败的情况。线程池的线程数小于corePoolSize,则创建新的线程,大于corePoolSize,则将新的任务加入等待队列。
PriorityBlockingQueue:优先任务队列,可以控制任务的执行先后顺序,是无界队列。ArrayBlockingQueue,LinkedBlockingQueue都是按照先进先出算法处理任务的,PriorityBlockingQueue可以根据任务自身的优先顺序先后执行。
线程池的工作流程
从上图可以看出,当一个新的任务提交到线程池的时候,线程池的处理流程如下:
1.首先判断核心线程池是否已满?如果没满,则创建一个新的线程来执行任务,满了进入下一个流程;
2.判断队列是否已满?如果没满,则将新提交的任务存储在工作队列里,满了进入下一个流程;
3.最后判断整个线程池是否已满?如果没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略处理此任务。
源码分析:
上边的过程作了一个大概的了解,下边看一下源代码的实现:
execute()
1.workerCountOf(c) < corePoolSize
通过workerCountOf()方法拿到当前活动的线程数,如果当前活动的线程数小于corePoolSize,则新建一个线程放入到线程池中,并且把任务添加到该线程。
2. if(addWorker(command, true))
PS:addWorker中的第二个参数表示添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;如果为true,根据corePoolSize来判断;如果为false,则根据maximumuPoolSize来判断。
3.if(isRunning(c) && workQueue.offer(command))
如果当前线程是运行状态并且任务添加到队列成功
4.if (! isRunning(recheck) && remove(command))
这里是一个二次检查,因为入队之后状态还是可能发生变化的。
5.else if (workerCountOf(recheck) ==0) addWorker(null, false);
获取线程池中的有效线程数,如果等于0,则执行addWorker()方法。
传入的参数表示:
第一个参数null, 表示在线程池中创建一个线程,但是不启动;
第二个参数为false,将线程池的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
6.else if (!addWorker(command, false)) reject(command);
如果队列失败了,尝试添加新的任务,如果失败则拒绝该任务。
总结:简单来说在执行execute()方法时如果状态一直是RUNNING时的执行过程如下:
1.如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
2.如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
3.如果workerCount >= corePoolSize && workerCount < maximumPoolSize, 且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
4.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务(默认处理方式直接抛出异常);
PS:这里addWorker(null, false); 也就是创建一个线程,但是并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。因此当workerCountOf(recheck) == 0的时候执行addWorker(null,false)也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
runWorker()
简单说明一下runWorker方法的执行过程:
1.while循环不断的通过getTask()方法获取任务;
2.然后运行任何任务之前,拿到锁来防止执行任务时中断;
3.然后确保除非线程池停止,那么要保证当前线程是中断状态,否则保证当前线程不是中断状态;
4.task.run()执行任务;
5.当task为null的时候则跳出循环,执行processWorkerExit()方法;
6.runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
PS:beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类实现。
completedAbruptly变量用来表示执行任务的过程中是否出现了异常,processWorkerExit方法会对这个变量值进行判断。
线程池的合理配置
线程池需要合理的配置,首先应该先分析任务的特点,比如:
1.任务的性质:CPU密集型任务,IO密集型任务还是混合型任务;
2.任务的优先级:高,中,低。
3.任务的执行时间:长,中,短。
IO密集型:由于需要等待IO操作,线程并不是一直在执行任务,可以 配置多一点线程。
CPU密集型:CPU密集型任务需要大量的计算,并且没有什么阻塞,CPU一直全速运行,所以尽可能少的配置线程数量。
混合型:混合型任务如果可以拆分,则拆分成CPU密集型和IO密集型任务,只要两个任务执行的时间相差不是特别大,拆分后执行的吞吐率应该高于串行执行的吞吐率。反之如果任务执行时间相差太大,就没必要拆分。
优先级:优先级不同的任务可以使用优先级队列PriorityBlockQueue来处理,它可以让优先级高的任务先执行(如果一直有优先级高的任务提交到队列里,可能会造成优先级低的任务永远得不到执行)。
执行时间:执行时间不同的任务也可以使用优先级队列,让执行时间短的任务先执行。
线程池的监控
线程池提供了一些属性可以进行监控,这样有利于我们更好的使用线程池。
taskCount:线程池需要执行的任务数量;
completedTaskCount:线程池在运行过程中已经完成的任务数量。
largestPoolSize:线程池曾经创建过的最大线程数量。通过这个我们可以知道线程池是否满过。如果等于线程池的最大大小,则表示线程池曾经满过。
getPoolSize:线程池的数量。
getActiveCount:活动的线程数。
通过继承线程池并且重写线程池的beforeExecute, afterExecute 和terminated方法,就可以在任务执行前,执行后和线程池关闭前做一些事情。比如监控任务的最大执行时间,最小执行时间等。这几个方法在线程池里都是空方法。