Java线程池研究

    线程池是java中的重要知识点,今天研究下,首先来看下线程池是怎么使用的,然后在使用的基础上再进行原理剖析:

public class MyTask implements Runnable{
    private int taskNum;
    
    public MyTask(int num) {
        this.taskNum = num;
    }
     
    @Override
    public void run() {
        System.out.println("正在执行task "+taskNum);
        try {
            Thread.currentThread().sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"执行完毕");
    }
}

    上面首先创建一个线程子类,线程池就是首先预创建指定的数量,当我们需要执行某个逻辑的时候,把这些逻辑封装到一个Runnable对象里面,然后丢给线程池,线程池就会自动去执行,所以首先要创建一个线程子类,封装我们的业务逻辑。

public class Test {
    public static void main(String[] args) {  
        //创建一个线程池:第一个参数表示核心线程数;第二个参数表示最大线程数
        //第三个参数表示线程存活的时间;第四个参数是存活的时间单位;最后一个
        //数表示一个任务队列,当没有足够线程去执行任务时,任务将被添加到队列       
        ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(5));
         
        //循环将一个个Runnable对象丢给线程池去执行
        for(int i=0;i<15;i++){
            MyTask myTask = new MyTask(i);
            pool.execute(myTask);
            System.out.println("线程池中线程数目:"+pool.getPoolSize()+",队列中等待执行的任务数目:"+
            pool.getQueue().size()+",已执行完的任务数目:" + pool.getCompletedTaskCount());
        }
        
        //关闭线程池
        pool.shutdown();
    }
}

    上面首先创建一个线程池对象pool,然后丢给线程池对象pool的execute方法区执行,输出结果如下:

//核心线程执行任务
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完的任务数目:0
正在执行task 4
//一共就5个核心线程,已经用完了,别的任务只能去队列里面等待了
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完的任务数目:0
//队列的容量是5,现在已经满了,所以要创建新的非核心线
//程执行任务;队列不满,是不会创建非核心线程去执行的
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行完的任务数目:0
//非核心线程执行任务
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行完的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行完的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行完的任务数目:0
正在执行task 14
task 1执行完毕
task 13执行完毕
task 2执行完毕
正在执行task 6
task 10执行完毕
正在执行task 8
task 11执行完毕
task 4执行完毕
task 3执行完毕
正在执行task 9
正在执行task 7
task 0执行完毕
正在执行task 5
task 14执行完毕
task 12执行完毕

    通过上面的例子可以看出,简单的使用线程池还是很方便的,下面从线程类ThreadPoolExecutor开始剖析线程池的原理,先从ThreadPoolExecutor的类信息开始看:

public class ThreadPoolExecutor extends AbstractExecutorService {
    //原子整数,他是一个复合型数据,低29位表示线程池中活动的线程数量,
    //用workerCount表示;高3位代表线程池运行状态:RUNNING、
    //SHUTDOWN、STOP、TIDYING和TERMINATED,用runState表示
    //这个变量特别重要,后面很多操作都要获取他才能决定下一步
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    //线程池中活动的线程数量workerCount占的位数:32 - 3 = 29
    private static final int COUNT_BITS = Integer.SIZE - 3;

    //线程池中获得的线程的最大数量,2的29次幂 - 1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    //线程池状态,RUNNING表示正在运行,接受新任务并处理队列中的任务;-1在底层
    //由32个1表示,左移29位就是111 00000 00000000 00000000 00000000
    //也就是低29位全部为0;高3位全部为1的话,表示SHUTDOWN状态
    private static final int RUNNING    = -1 << COUNT_BITS;

    //线程池状态,不接受新任务,但是会处理队列里面的任务。0不管怎么
    //左移都是0所以低29位是0,高3位全部是0的话,表示SHUTDOWN状态
    private static final int SHUTDOWN   =  0 << COUNT_BITS;

    //线程池状态,不接受新任务,不会处理队列里面的任务;而且会中断掉正在处理的任务;
    //1由31个0和1个1组成,左移29位就是001 00000 00000000 00000000 00000000,
    //也就是低29位全部是0的,高3位是001的话,表示STOP状态
    private static final int STOP       =  1 << COUNT_BITS;

    //线程池状态,表示所有任务已经结束,workerCount为0,线程池过渡到TIDYING
    //状态;2在底层由30个0和一个10组成,左移29位就是010 00000 00000000
    // 00000000 00000000,也就是低29位全部为0,高3位为010就是TIDYING状态
    private static final int TIDYING    =  2 << COUNT_BITS;


    //线程池状态,表示terminated()方法已经完成;3在底层由30个0和一个11组成
    //左移29位就是011 00000 00000000 00000000 00000000,也就是低29位
    //为0,高3位是011的话,线程池此时是TERMINATED状态
    private static final int TERMINATED =  3 << COUNT_BITS;

    //任务队列,当线程池的核心线程不够用时,新添加
    //的任务将会被放入此队列,待以后执行这些任务
    private final BlockingQueue<Runnable> workQueue;

    //重入锁
    private final ReentrantLock mainLock = new ReentrantLock();

    //一个Worker类型的集合,Worker实现了Runnable接口,可以猜测
    //线程池中的运行单元是不是就是他呢?
    private final HashSet<Worker> workers = new HashSet<Worker>();

    //一个Worker类型的集合,Worker实现了Runnable接口,可以猜测
    //线程池中的运行单元是不是就是他呢?
    private final HashSet<Worker> workers = new HashSet<Worker>();

    //线程池中曾经创建过的最大的线程数量
    private int largestPoolSize;

    //任务已经完成的数量
    private long completedTaskCount;

    //线程工厂类,创建线程用的
    private volatile ThreadFactory threadFactory;

    //拒绝策略。当任务队列已满,而且线程数量达到
    //最大值时,如果还添加任务,就会采取此拒绝策略
    private volatile RejectedExecutionHandler handler;


    //空闲线程存活的时间,时间一到,直接终止
    private volatile long keepAliveTime;

    //是否终止处于空闲的核心线程,如果是true,时间到了就终止核心线程,否则不终止
    private volatile boolean allowCoreThreadTimeOut;

    //核心线程数量
    private volatile int corePoolSize;

    //此线程池最大的线程数,创建线程池的时候手动指定
    private volatile int maximumPoolSize;

    //此线程池最大的线程数,创建线程池的时候手动指定
    private volatile int maximumPoolSize;
}

    上面就是ThreadPoolExecutor类的一些成员变量。下面分析下他的构造函数,ThreadPoolExecutor类有4个构造函数,3个构造函数最终调的是第4个,所以直接来看第四个构造函数:

/**
 * 参数解析
 * @param corePoolSize      核心线程数
 * @param maximumPoolSize   最大线程数
 * @param keepAliveTime     空闲线程存活的时间
 * @param unit              上面那个时间的单位
 * @param workQueue         任务队列
 * @param threadFactory     线程工厂对象
 * @param handler           拒绝策略
 */
 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {
     //一波判断,各个参数必须要在正常,不能残疾
     if (corePoolSize < 0 ||
         maximumPoolSize <= 0 ||
         maximumPoolSize < corePoolSize ||
         keepAliveTime < 0)
         throw new IllegalArgumentException();
     if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();

         //一波赋值,不解释
         this.corePoolSize = corePoolSize;
         this.maximumPoolSize = maximumPoolSize;
         this.workQueue = workQueue;
         this.keepAliveTime = unit.toNanos(keepAliveTime);
         this.threadFactory = threadFactory;
         this.handler = handler;
 }

    线程池里面有两个线程非常重要,一个是核心线程corePoolSize,另一是最大线程maximumPoolSize;当有任务到来时,先用核心线程去执行;如果核心线程用完了,那么任务放入队列里面存放起来,此时就算设置了非核心线程,非核心线程也是懒得动的;当队列满了以后,才会起非核心线程去执行任务(非核心线程有点飘)。其执行流程如下:
线程池执行流程

    下面就来看下线程池执行的方法execute吧:

  public void execute(Runnable command) {
        //非空判断,不解释
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        //获取原子整数的值,这个整数聚合了当前活动的线程总数和线程池状态
        int c = ctl.get();

        //如果核心线程没有用完,那么用核心线程执行任务
        if (workerCountOf(c) < corePoolSize) {

            //addWorker就是创建一个Worker对象来执行此任务,众多
            //的worker是放在集合里面统一管理的,如果新建的worker
            //成功加入此集合,那么说明addWorker成功;注意addWorker
            //成功并不代表此任务就执行成功了。Worker对象是Runnable
            //的子类,里面有个Thread的成员变量,我们传进来的任务就是
            //由这个变量来运行,所以Worker是任务执行的载体,很重要
            if (addWorker(command, true))
                return;

            //addWorker失败,说明worker创建失败,或者
            //没有添加进集合里面,这里再次获取线程池的信息
            c = ctl.get();
        }
        //如果线程池处于运行状态,而且任务成功被添加进队列,重新检查。因
        //为在判断线程池状态和添加任务之后,线程池的状态可能再次发生变化
        //workQueue.offer就是把我们传进来的任务添加进任务队列
        if (isRunning(c) && workQueue.offer(command)) {

            //再次获取线程池状态
            int recheck = ctl.get();

            //若干线程池状态发生变化,不再运行了,
            //那么从任务队列移除此任务,相当于回退操作
            if (! isRunning(recheck) && remove(command))

                //执行拒绝策略
                reject(command);

            //如果线程池正在运行,但是活跃线程数量为0,那么再addWorker一次
            else if (workerCountOf(recheck) == 0)

                //这里传入null的原因是目标任务在外层if已经添加进去了。注意
                //最后一个参数,没有核心线程了,说明只能用非核心线程执行任务
                addWorker(null, false);
        }

        //如果添加队列失败,那么手动新增线程去执行;如果还是失败
        //说明线程池挂了或者处于饱和状态,没得救了,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

    execute就是这样了,调用addWorker去新增一个Worker对象,然后通过此对象来运行我们的任务,所以先简要分析下Worker类,他是ThreadPoolExecutor的内部类:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            //设置同步状态,-1代表在调用runWorker前禁止中断
            setState(-1); // inhibit interrupts until runWorker

            //给成员变量赋值,这个值可能是空,如果是空
            //,就从任务队列里面获取(当然不是在这里获取)
            this.firstTask = firstTask;

            //构建Thread对象,最终就是他去执行我们传进来的任务
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        //这里才是执行任务的地方
        public void run() {
            runWorker(this);
        }
        ......
}

    Worker本身不是很难,可以简单的理解成我们常说的线程线程,简单了解下即可,下面研究重要的方法addWorker:

    //根据线程池当前的状态和边界值(核心线程数和最大线程数)来就检查一个Worker是否
    //可以添加到线程池。如果可以,workerCount将会增加;如果有可能,此Worker将被
    //创建和运行传进来的任务。如果线程池挂了,或者线程数量超出边界值,此方法将会返
    //回false;如果线程工厂创建线程失败,那么也会返回false,然后回退
    private boolean addWorker(Runnable firstTask, boolean core) {
        //for循环的标记位
        retry:

        //双重for循环,外层for循环用来判断线程池
        //状态;内层for循环用来增加线程数的CAS操作
        for (;;) {
            //获取线程池信息
            int c = ctl.get();

            //获取线程池活动运行状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 如果线程池当前状态大于等于SHUTDOWN,也就是SHUTDOWN
            // 、STOP、TIDYING和TERMINATED其中之一,那么继续后面的
            // 判断,后面的判断不好理解,主要在于外层有个取反操作,可以
            // 转换如下:(rs!=SHUTDOWN || firstTask!=null ||
            // workQueue.isEmpty())。分析:1.如果线程池状态大于SHUTDOWN
            //,说明不宜接受新任务,也不会处理队列里面的任务,返回false;2.如
            //果传进来的firstTask不为空,说明是添加新任务,此时不会处理,返回
            //false,注意firstTask是可以为空的,为空代表着新起线程执行队列里面
            //的任务;workQueue.isEmpty()代表没有等待的任务了,此时线程池的状
            //是大于等于SHUTDOWN的,当然不会新起线程去执行任务了。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //获取线程数量
                int wc = workerCountOf(c);

                //如果线程数量超出边界,那么返回false,Worker添加失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;

                //如果没有超出边界,原子操作增加线程数量
                if (compareAndIncrementWorkerCount(c))
                    //增加完成后,跳出外层for循环,添加worker
                    //成功,addWorker方法成功了一半,可喜可贺
                    break retry;

                //如果自增线程数量失败,再次获取线程池运行时信息
                c = ctl.get();  // Re-read ctl

                //如果当前的状态不等于之前的状态,跳出内层循环,执行外循环,再次判断
                //状态;因为执行下次循环的时候,线程池的状态可能会发生变化。反正这里
                //就是往死里去增加线程池记录的线程数量,不成功誓不罢休,除非超出边界了
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                // CAS操作失败要么是因为线程池状态改变了,要么是因为workCount改变了
                // ,如果workCount改变了,说明存在竞争,继续内循环自增workerCount
            }
        }

        //流程执行到这里,说明活跃的线程数已经成功自增

        //定义两个变量,
        boolean workerStarted = false;
        boolean workerAdded = false;

        //线程池中的一个线程对象就是一个Worker对象
        Worker w = null;

        try {
            //构建一个Worker对象
            w = new Worker(firstTask);

            //拿到Worker对象里面的线程类
            final Thread t = w.thread;

            //如果线程不为空
            if (t != null) {
                //重入锁
                final ReentrantLock mainLock = this.mainLock;

                //先锁起来
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //拿到锁有重新检查线程池状态
                    int rs = runStateOf(ctl.get());

                    //如果线程池处于运行状态,或者线程池SHUTDOWN了,同时
                    //firstTask为空,说明要起线程去执行任务队列里面的任务
                    //,其他情况一律不执行。这也说明了SHUTDOWN状态下的线程
                    //池,是会去执行任务队列里面的任务的,SHUTDOWN比较厚道
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {

                        //检查刚起的线程是不是已经被执行了,如果执行了,死给你看
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();

                        //workers是一个HashSet,里面保存的是Worker对象
                        workers.add(w);

                        //HashSet的元素个数
                        int s = workers.size();

                        //如果HashSet里面的Worker数量大于曾经创建过
                        //的最大的线程数量,那么对这个数量进行重新赋值
                        if (s > largestPoolSize)
                            largestPoolSize = s;

                        //Worker对象添加成功
                        workerAdded = true;
                    }
                } finally {
                    //释放锁
                    mainLock.unlock();
                }

                //如果Worker添加成功,启动线程去执行
                if (workerAdded) {
                    t.start();
                    //设置状态
                    workerStarted = true;
                }
            }
        } finally {
            //任务执行失败的话,把Worker从HashSet里面移除
            if (! workerStarted)
                //调用addWorkerFailed处理失败的后续操作
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    addWorker方法有点难。这个方法上来就是一个双重for死循环,外层for循环的作用是判断线程池的状态,如果当前线程池的状态大于SHUTDOWN,那么此线程池不接受新任务;如果大于等于SHUTDOWN,而且传进来的任务不为空(说明是新任务),那么也不接受;如果线程池状态大于等于SHUTDOWN,而且任务队列为空,没的说,也不接受;三种不接受都导致返回false,这就是外层for循环到的作用;一旦线程池状态校验通过,那么就进入第二层for循环,增加线程池活跃线程的数量,通过死循环,气而不馁的增加数量,除非数量超过线程池的边界,此时才返回false。如果线程池的线程数量自增成功,那么构建一个Worker对象,此时再次检查线程池状态,因为上面两个for循环是线程不安全的,一个线程通过了上面两个for循环检查状态和自增数量后,另外一个线程也可以去自增,此时可能导致线程池的状态发生变化,所以需要再次检查;这次检查通过后,将新建的Woker对象加入集合workers,接着就执行Worker对象里面的Thread的run方法;如果失败,调用addWorkerFailed。这段代码逻辑很简单,但是实现起来是比较复杂的。下面看下Worker的Thread的run方法是怎么执行的:

/** Delegates main run loop to outer runWorker  */
//这里才是执行任务的地方
public void run() {
    runWorker(this);
}

    简单的调用runWorker,runWorker方法是一个核心方法,很难,下面尽量分析:

    final void runWorker(Worker w) {
        //拿到当前的线程,也就是调用runWorker方法的线程
        Thread wt = Thread.currentThread();

        //拿到Task
        Runnable task = w.firstTask;

        //将firstTask置空
        w.firstTask = null;

        //遥想当年创建Worker的时候设置了一个状态-1,代表
        //不允许打断,这里设置成1代表可以打断
        w.unlock(); // allow interrupts

        //标记线程是不是异常终止的
        boolean completedAbruptly = true;

        try {
            //如果Task不为空,说明用户传入了一个任务;如果为空,说
            //明要从任务队列里面获取任务来执行,否则跳出while循环
            while (task != null || (task = getTask()) != null) {
                //加锁,目的是为了在线程池shutdown
                //的时候,对于正在执行的任务不被中断
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //如果线程池正在stop,那么要保证当前线程是中断状态
                //如果不是,要保证当前先不是中断状态
                //runStateAtLeast(ctl.get(), STOP)的作用就是拿到线程池
                //的运行状态,然后和STOP比对,大于等于STOP就返回true;
                //Thread.interrupted是设置调用runWorker方法的线程的状态为中断,仅仅是设置
                //线程中断状态,不会真的中断运行runWorker方法的线程,除非runWorker方法的线程
                //里面有sleep、wait或者join,此时就抛出InterruptedException异常
                //!wt.isInterrupted()是指任务线程没有被中断
                //所以if的作用是如果线程池的状态大于等于STOP,而且任务线程没有中断,那么给任务线程设置一个中断
                //标记位;或者运行runWorker方法的线程别设置了中断位,而且线程池的状态大于等于STOP,那么任务线程也要设置标记位
                //否则就不会给任务线程设置标记位,这就是为什么线程池shutdown之后,任务还能运行的原因
                if ((runStateAtLeast(ctl.get(), STOP) 
                        || (Thread.interrupted() 
                        && runStateAtLeast(ctl.get(), STOP))) 
                        && !wt.isInterrupted())
                    //设置任务线程的中断状态,也就是说interrupt会发出中断信号,这个信号只能被wait()、sleep()和join()方法
                    //捕捉并产生中断。interrupt本身仅仅只会设置线程中断标记位,并不会真正产生中断
                    wt.interrupt();
                try {
                    //任务执行前需要做的工作,默认空实现,可自己扩展
                    //beforeExecute可能会导致线程异常而死亡
                    beforeExecute(wt, task);

                    //创建一个Throwable对象,捕获异常用
                    Throwable thrown = null;
                    try {
                        //执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任务执行后需要做的工作,默认空实现,可自己扩展
                        //这里把抛出的异常都传给了afterExecute
                        afterExecute(task, thrown);
                    }
                } finally {
                    //任务置空
                    task = null;

                    //完成的任务自增
                    w.completedTasks++;

                    //释放锁
                    w.unlock();
                }
            }

            //不是异常中断的
            completedAbruptly = false;
        } finally {
            //退出Worker
            processWorkerExit(w, completedAbruptly);
        }
    }

    runWorker方法不太好理解,主要是在锁的那里理解的有偏差。这个方法的主要任务就是执行我们传进来的Runnable对象的run方法。接下来看下getTask方法:

    //获取执行任务
    private Runnable getTask() {
        //上次从队列里面获取任务是否超时导致未能获取到任务
        boolean timedOut = false; // Did the last poll() time out?

        //死循环
        for (;;) {
            //获取线程池运行时状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //如果线程池状态大于等于SHUTDOWN,任务队列是空的,那么返回null同时将
            //workerCount自减(CAS操作),因为addWorker中已经将他自增了,所以这里要退货。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            //拿到活跃线程数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //allowCoreThreadTimeOut默认是false,代表核心线程是否有超时限制
            //如果wc > corePoolSize,说明用到了非核心线程,此时就要有超时限制了
            //timed的主要用于获取任务,如果有超时限制,那么调用poll方法获取任务
            //在keepAliveTime时间内没有获取到任务,就继续下次循环获取任务;如果
            //是false,那么调用take方法获取任务,此时线程就会卡在这,阻塞式获取
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //这个判断分解后更好理解,他的判断依据是如果当前活跃线程数大于最大线程数,那么直接返回,不搞了;
            //不管任务队列是不是空的,也不管wc到底是不是大于1,统统不玩了。如果获取任务有超时限制,而且上次
            //获取任务还搞砸了,超时了,那么如果活跃线程数大于1,或者队列是空的,那么也不玩了;队列是空的时候
            //不玩很好理解,那么如果允许超时,而且上次还超时了,那么也不玩了呢,应该是队列里面没有任务了吧(不确定)
            if (  (wc > maximumPoolSize || (timed && timedOut))      &&     (wc > 1 || workQueue.isEmpty())   ) {
                //将活跃线程数自减
                if (compareAndDecrementWorkerCount(c))
                    //返回空
                    return null;

                //如果自减失败,继续for循环,因为下次for循环的时候,
                //线程池的状态可能有所改变,如果没变,那么再次自减
                continue;
            }

            try {
                //允许超时,那么调用poll获取任务,否则调用take阻塞式获取,也就是会一直卡在这
                //poll只在keepAliveTime这个时间内获取任务,如果获取不到,那么进行下一次循环
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();//阻塞式方式获取,一直等到队列有任务
                if (r != null)
                    //返回结果
                    return r;
                //如果没有拿到任务,说明获取超时,修改状态,继续下次循环
                timedOut = true;
            } catch (InterruptedException retry) {
                //如果被打算,那么继续下次循环
                timedOut = false;
            }
        }
    }

    getTask方法也比较难,主要是if条件写的太简洁了,分析起来有点吃力。下面分析线程退出的时候线程池做了什么:

    //处理线程的退出。两种场景,一种是由于异常而退出,一种是由于任务执行完毕而退出
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //如果是异常退出,那么将线程池的线程数量自减
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //正常退出的话,那么将执行成功的任务自增
            completedTaskCount += w.completedTasks;

            //从集合里面删除此worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        //尝试终止线程池,不一定能成功
        tryTerminate();

        //获取线程池运行时信息
        int c = ctl.get();

        //如果此时状态小于STOP
        if (runStateLessThan(c, STOP)) {

            //如果不是异常退出的话
            if (!completedAbruptly) {

                //计算核心线程的最小数量
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

                //如果没有核心线程了,但是任务队列里面还有任务,那么将min置为1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;

                //如果线程数量不小于最小值,那么退出,队
                //列里面的任务就由现有的线程去负责执行
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            
            //若干是异常退出的话,那么再增加一个非核心线程
            //如果队列里面还有任务,但是线程数不够,也会走这里
            addWorker(null, false);
        }
    }

    这个方法还算比较简单,首先判断此线程是不是异常退出,是的话将线程数量自减。接着,不管是正常退出还是异常退出,都要将线程从worker集合里面移除掉;接着判断线程池状态,如果小于STOP,说明线程池此时正在正常运行,此时判断核心线程的最小数量,如果核心线程的最小数量为0,但是此时队列里面还有任务的话,就将核心线程的最少数量置成1;如果此时线程池的线程数量大于等于这个核心线程池的最小数量,那些队列里面的任务就让这些线程去执行吧。如果是异常退出的线程,或者此时线程池的线程数量小于核心线程的最小值,那么就需要添加一个线程进去。

    下面分析线程池退出机制:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //安全检查,线程池不是谁都可以关掉的,必须具备相应的权限,不管
        checkShutdownAccess();

        //设置线程池状态为SHUTDOWN
        advanceRunState(SHUTDOWN);

        //中断空闲中的线程,注意,正在运行的线程是不会中断的
        interruptIdleWorkers();

        //shutdown后的回调,空实现,根据自己的需求扩展
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
        
    //最后尝试终止线程池
    tryTerminate();
}

    接着分析tryTerminate:

    final void tryTerminate() {
        //又是死循环,这么多死循环
        for (;;) {
            //拿到线程池的运行时信息
            int c = ctl.get();
            //如果正在运行,或者状态比TIDYING大,或者
            //状态是SHUTDOWN但是队里里面还有任务,那么就不终止
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //如果活跃线程数不等于0,那么终止空闲的线程,此时就不能直接退出了,要返回
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //if里面的逻辑是将状态TIDYING和线程数0聚合在一起
                //然后更新到ctl,如果状态是TIDYING而且线程数是0
                //了,那么这个线程就没什么用了,可以退出了
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        //空实现,根据自己的需要扩展
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

    接下来看下空闲线程是怎么退出的:

    private void interruptIdleWorkers(boolean onlyOne) {
        //上锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍历Worker
            for (Worker w : workers) {
                //拿到线程
                Thread t = w.thread;
                //如果线程没有别中断,而且能够获取worker的锁,那么
                //就将此线程的中断标记位置为true,如果获取不到,说明
                //此worker正在执行,此时就不能中断这些线程了。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                //如果终止空闲线程的操作只执行一次的话,那么就退出
                //否则就往死里循环,一直到所有的执行任务的线程都空闲
                //然后终止为止
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    自此,线程池的源码磕磕碰碰的分析了一遍,有些地方理解的不到位,待功力大增了再修改。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,519评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,842评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,544评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,742评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,646评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,027评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,513评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,169评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,324评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,268评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,299评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,996评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,591评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,667评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,911评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,288评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,871评论 2 341

推荐阅读更多精彩内容