由于经常遇到AsyncTask,感觉功能比较简单,不成想过总结一下,现在感觉自己还是不要太懒,逼自己写下,总结一下。也可以锻炼自己写博客的能力。
深入理解AsyncTask(一)——线程池
深入理解AsyncTask(一)——AsyncTask
由于AsyncTask是基于线程池基础实现的,这里先学习线程池原理。先了解一下线程池的好处:
线程池主要是用在并发操作中,在并发操作中线程比较多,如果每一个线程执行结束了就销毁,接下来如果新任务来都是时候就需要新建线程,这样频繁创建和销毁线程,会大大降低系统效率。
我们在学习线程池之前,先了解个概念:并发 ,我们也经常听到并行,两者有什么区别呢?
- 并发:同一个处理器在同一时间段处理多个任务,交替执行。
- 并行:不同处理器在同一时刻同时处理不同的任务。
接下来学习线程池的原理
一. ThreadPoolExecutor类
ThreadPoolExecutor是java.uitl.concurrent.ThreadPoolExecutor线程池最核心的一个类,为了理解透彻这个类,先看下这个类的构造函数
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
可以看到此类继承于AbstractExecutorService,有四个构造函数,有着分别不同的参数,接下来对其参数分别解释:
- corePoolSize:核心池的大小,这个参数和后面讲述的线程池的实现原理有非常大的关系。在创建线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来的时候才创建线程去执行任务,除非调用prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建线程池后,线程池中线程为0,当有任务来了之后,就会创建一个线程去执行任务,当线程池中的线程数达到corePoolSize,就会把新线程放到缓冲队列中进行等待。
- maxinumPoolSize:线程池最大的线程数,它表示线程池所能创建的最大数。
- keepAliveTime:表示线程没有执行任务时存活时间。默认情况下,只有当线程池中线程数大于coolPoolSize才会触发keepAliveTime起作用,直到线程池中线程数不大于coolPoolSize。但当调用到allowCoreThreadTimeOut(true)时,此时即使线程池中线程数不大于coolPoolSize,keepAliveTime也会起作用。
- unit:表示keepAliveTime的时间单位,有7中取值,TimeUnit有7种属性。
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
- workQueue:阻塞队列,用于缓存等待任务,一般有以下几种方式:
ArrayBlockingQueue:基于数组的缓冲队列,需要指定大小;接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,
则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
LinkedBlockingQueue:基于链表的缓冲队列,这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;
如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,
因为总线程数永远不会超过corePoolSize
SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?
那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,
maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大
DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
- threadFactory:线程工厂,主要用来创建线程;
- handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
ThreadPoolExecutor与AbstractExecutorService、ExecutorService和Executor之间的简单继承关系
graph BT
ThreadPoolExecutor-->AbstractExecutorService
AbstractExecutorService-->ExecutorService
ExecutorService-->Executor
- Executor: 是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
- ExecutorService:是一个接口,继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
- AbstractExecutorService:是一个抽象类,实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
- ThreadPoolExecutor:继承了类AbstractExecutorService。
在ThreadPoolExecutor类中有几个非常重要的方法:
- execute():实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
- submit():是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。
- shutdown():关闭线程池,之前提交的线程继续执行,不接受新的任务
- shutdownNow():关闭线程池,终止所有任务,已在线程池的线程挂起,并返回线程Runnable列表。
二. 深入剖析线程池实现原理
1. 线程池状态
在ThreadPoolExecutor中定义了几个static final变量表示线程池的各个状态,还通过runState记录线程池的状态,runState是存放在变量COUNT_BITS的高位中。先看下这些变量:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
当创建线程池时,初始时,线程池的状态为RUNNING状态
如果调用shutdown方法时,此时线程池状态为SHUTDOWN状态
如果调用shutdownNow方法时,此时线程池状态为STOP状态。
如果线程池处于SHUTDOWN或者STOP状态时,并且所有工作线程都已经销毁,任务缓冲队列已经清空或者执行完毕,此时线程池状态被置为TERMINATED。
2.任务的执行
在了解将任务提交给线程池到任务执行完毕整个过程之前,先看下ThreadPoolExecutor类中重要成员变量:
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*/
private final ReentrantLock mainLock = new ReentrantLock();//线程池的主要状态锁,对线程池状态(比如线程池大小
//、runState等)的改变都要使用这个锁
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<>();//用来存放工作集
/**
* Wait condition to support awaitTermination.
*/
private final Condition termination = mainLock.newCondition();//用来等待的条件变量
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
private long completedTaskCount; //用来记录已经执行完毕的任务个数
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler; //任务拒绝策略
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;//线程存货时间
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;//是否允许为核心线程设置存活时间
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;//核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;//线程池最大能容忍的线程数
/**
* The default rejected execution handler.
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy(); //默认任务拒绝策略,默认为AbortPolicy
在ThreadPoolExecutor中最核心的提交方法是execute()方法,虽然submit也是提交任务方法,但是submit最终也是调用exceute方法,故这里只需要重点看execute方法。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
2.1. 对execute()方法的实现代码一句句解析:
先看下具体的实现:
if (command == null) throw new NullPointerException();
上述代码中可以看到,如果command=null,则直接抛出异常。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
上述代码是在判断工作线程数是否小于corePoolSize后,再调用addWorker()方法将command加入工作线程中去,加入成功直接return。
if (isRunning(c) && workQueue.offer(command))
上述代码是在工作程线程数大于corePoolSize时,判断当前线程池状态是否在RUNNING,此任务可以加入缓存队列中,接下来需要双重检查,防止在加入缓存队列时,其他线程将线程池关闭,故这里需要双重检查。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
上述代码显示检查此时线程池状态是否不处于RUNNING状态,如果是需要将任务移除队列,然后使用拒绝策略处理。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
上述代码是在双重判断之后,双重判断之后说明此时线程池还处于RUNNING状态,此时工作线程数为0,故需要将任务加入到工作线程中。
else if (!addWorker(command, false))
reject(command);
上述代码是在第一次线程池状态判断时,此时线程池不处于RUNNING状态或者缓存队列没有找到该任务,此时会将任务加入到工作线程中,如果加入失败,此时会调用拒绝策略进行处理。
2.2. 接下来我们看下addWorker方法:
看下函数体的实现:
/*
* Methods for creating, running and cleaning up after workers
*/
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
可以看到此方法的两个参数:
- firstTask:任务
- core:表示是否是核心线程
2.3. 对addWorker()方法的实现一句句进行分析:
先看下这个for循环,在for表达式中是没有条件停止,说明循环体中可以将这个循环停止。
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
看下循环体中的内容:
- 获取线程池的状态,也就是rs,然后对rs进行判断,可以看到当rs >= SHUTDOWN也就是线程池此时不处于RUNNING状态,或者firstTask==null时,此语句的判断的目的就是看线程池是否在运行,还有任务是否空,如果是,则返回false;
- 接下来是第二个for循环,for表达式中也是没有条件的,可以看下其循环体内容:
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
上述代码中先获取工作线程数wc,然后判断工作线程数是否大于corePoolSize或者maximumPoolSize;这个很好理解,如果core=true,表示此时将任务加入到核心工作线程中,此时wc需要和corePoolSize比较,否则需要和maximumPoolSize比较。
这里的CAPACITY,字面意思理解是容量,我个人理解应该是系统的最大容量。
if (compareAndIncrementWorkerCount(c))
break retry;
上述代码中c是一个原子性的变量,通过此变量可以获取线程池状态和工作线程数,这里调用compareAndIncrementWorkerCount()方法是尝试增加原子变量c的值。如果增加失败,则跳出标记语句。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
上述代码重新获取c的值,通过c的值获取到线程池的状态,此时又一次判断线程池的状态,防止其他线程将线程池关闭。如果状态不一致,进行下次循环。
- 接下来真正的将任务加入工作线程中:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
可以看出上述代码中先新建一个Worker对象w,然后判断线程池在RUNNING状态时,调用workers.add(w),此时改变状态worker=ture和workerStarted=true。然后在finally方法中判断是否add失败,如果失败,就会调用addWorkerFailed()方法,这里就这个方法就不做重点介绍了。
2.4. 接下来我们看下Worker对象的实现:
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
这里Worker实现了Runnable接口。这里最重要的方式是run()
a. 先看下构造函数:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
这里getThreadFactory().newThread(this)作用有点像new Thread(this),根据Java的多态性质,此时this作为一个Runnable对象传入。
b. 再看下run()方法的实现:
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
c. run()调用runWorker()方法:
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
上述代码中最关键的一句就是task.run(),这个task是通过getTask()方法拿到,这里其实缓存队列中的任务,也就是最终调用任务自己的run()方法。任务自己的run()需要任务本身实现的,这里不做太多讲解。这里比较关心getTask()方法。看下其实现:
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 先获取线程池状态,如果线程池状态不处于RUNNING,而且缓存队列为空,此时就减少工作线程。
- 接下来会去获取任务数,如果此任务数大于最大任务数时或者已经超时时或者缓存队列空时,此时会去尝试增加工作线程数,如果失败则返回,继续循环。
- 接下来缓存队列会poll出一个Worker对象处理进行处理
总结:
- 如果当线程池中的线程数小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
- 如果当线程中的线程数>=corePoolSize,每来一个任务,会尝试将其添加到任务缓存队列中,若添加成功,该任务会等待空闲线程将其取出执行,若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
- 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
- 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
3.线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
- prestartCoreThread():初始化一个核心线程;
- prestartAllCoreThreads():初始化所有核心线程
下面是这2个方法的实现:
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
++n;
return n;
}
注意: 上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的
r = workQueue.take();
即等待任务队列中有任务。
4.任务缓存队列及排队策略
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
- synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
5.任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
6.线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
7.线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
- setCorePoolSize:设置核心池大小
- setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
三. 常见四种线程池
Java通过Executors提供了四种线程池,这四种线程池都是直接或间接配置ThreadPoolExecutor的参数实现的,下面我都会贴出这四种线程池构造函数的源码,可以看出其构成。
1. CachedThreadPool
字面理解是可缓存线程池,其实就是这个意思。
看下它的优点:
- 线程数无限制
- 有空闲线程则复用空闲线程,若无空闲线程则新建线程
- 一定程度减少频繁创建/销毁线程,减少系统开销
创建方法:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
源码:
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
可以看出corePoolSize = 0;maximumPoolSize = Integer.MAX_VALUE; 超时时间为60s,使用的是SynchronousQueue类型队列
2.FixedThreadPool
字面理解是固定长度线程池
看下优点:
- 可控制线程最大并发数(同时执行的线程数)
- 超出的线程会在队列中等待
创建方法:
//nThreads => 最大线程数即maximumPoolSize
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
//threadFactory => 线程工厂类,用于创建线程
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
源码:
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
两种方法唯一不同就是在线程工厂上,这里看下共同之处:corePoolSize = nThreads;maximumPoolSize = InThreads; 超时时间为0ms,使用的是LinkedBlockingQueue类型队列
3.ScheduledThreadPool
字面理解为计划线程池
看下优点:
- 支持定时及周期性任务执行
创建方法:
//nThreads => 最大线程数即maximumPoolSize
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
//threadFactory => 线程工厂类,用于创建线程
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int nThreads, ThreadFactory threadFactory);
源码:
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
追踪其源码会调用到这:
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
可以看到corePoolSize = corePoolSize; maximumPoolSize = Integer.MAX_VALUE; 超时时间为DEFAULT_KEEPALIVE_MILLIS毫秒(10ms),使用的是DelayedWorkQueue类型队列
4.SingleThreadExecutor
字面理解为单线程线程池
看下其优点:
- 有且仅有一个工作线程执行任务
- 所有任务按照指定顺序执行,即遵循队列的入队出队规则
创建方法:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
源码:
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new
* threads
*
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
两个方法可以看出corePoolSize = 1; maximumPoolSize = 1; 超时时间为0ms,使用的是LinkedBlockingQueue类型队列
结语
写这篇文章挣扎了好久,因为这块的知识其实已经被别人已经梳理的差不多了,而且自己对这块也很熟悉了。后来还是决定动手写下吧,防止以后忘记,也可以将知识传播给大家,这里借鉴了网上大神的分析方法,内容绝对原创分析,基于自己的内容,最新的代码进行的分析,小弟刚刚写起博客没有多久,大家如果发现问题可以一起讨论,下篇继续介绍AsyncTask,谢谢。