Java线程池详解(二)

三、ThreadPoolExecutor解析

上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。下面的图片展示了ThreadPoolExecutor的类图:

Paste_Image.png

ThreadPoolExecutor的类图
下面是几个比较关键的类成员:

private final BlockingQueue<Runnable> workQueue;  // 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行
 
 private final HashSet<Worker> workers = new HashSet<Worker>();//任务的执行值集合,来消费workQueue里面的任务
  
 private volatile ThreadFactory threadFactory;//线程工厂
      
 private volatile RejectedExecutionHandler handler;//拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下:
  
 1、CallerRunsPolicy:在调用者线程里面运行该任务
 2、DiscardPolicy:丢弃任务
 3、DiscardOldestPolicy:丢弃workQueue的头部任务
  
private volatile int corePoolSize;//最下保活work数量
 
private volatile int maximumPoolSize;//work上限

我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。

step 1: <ExecutorService>
Future<?> submit(Runnable task); 
 
step 2:<AbstractExecutorService>
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
 
step 3:<Executor>
void execute(Runnable command);
 
step 4:<ThreadPoolExecutor>
 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { //提交我们的额任务到workQueue
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //使用maximumPoolSize作为边界
        reject(command); //还不行?拒绝提交的任务
}
 
step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core)
 
 
step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包装任务
final Thread t = w.thread; //获取线程(包含任务)
workers.add(w);   // 任务被放到works中
t.start(); //执行任务

上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。下面分析一下Worker类。

Paste_Image.png

worker类图
上面的图片展示了Worker的类关系图,关键在于他实现了Runnable接口,所以问题的关键就在于run方法上。在这之前,我们来看一下Worker类里面的关键成员:

final Thread thread;
 
Runnable firstTask; //我们提交的任务,可能被立刻执行,也可能被放到队列里面

thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:

  public void run() {
        runWorker(this);
    }
     
    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:

private Runnable getTask() {
       boolean timedOut = false; // Did the last poll() time out?
 
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);
 
           // Check if queue empty only if necessary.
           if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
               decrementWorkerCount();
               return null;
           }
 
           int wc = workerCountOf(c);
 
           // Are workers subject to culling?
           boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
           if ((wc > maximumPoolSize || (timed && timedOut))
               && (wc > 1 || workQueue.isEmpty())) {
               if (compareAndDecrementWorkerCount(c))
                   return null;
               continue;
           }
 
           try {
               Runnable r = timed ?
                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                   workQueue.take();
               if (r != null)
                   return r;
               timedOut = true;
           } catch (InterruptedException retry) {
               timedOut = false;
           }
       }
   }

其实核心也就一句:

Runnable r = timed ?
           workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
           workQueue.take();

我们再回头看一下execute,其实我们上面只走了一条逻辑,在execute的时候,我们的worker的数量还没有到达我们设定的corePoolSize的时候,会走上面我们分析的逻辑,而如果达到了我们设定的阈值之后,execute中会尝试去提交任务,如果提交成功了就结束,否则会拒绝任务的提交。我们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,但是我们上面的分析是corePoolSize,这是因为我们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,否则使用maximumPoolSize来设定边界。直观的解释一下,当线程池里面的Worker数量还没有到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,如果Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,如果没有办法再向阻塞队列放任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,如果线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。可以参考下面的描述来理解:

* When a new task is submitted in method {@link #execute(Runnable)},
* and fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle.  If there are more than corePoolSize but less than
* maximumPoolSize threads running, a new thread will be created only
* if the queue is full.  By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {@link #setCorePoolSize} and {@link
* #setMaximumPoolSize}.

在此需要说明一点,有一个重要的成员:keepAliveTime,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime之后被terminated。可以参考下面的文档:

* If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).

欢迎加入学习交流群569772982,大家一起学习交流。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,045评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,114评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,120评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,902评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,828评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,132评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,590评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,258评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,408评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,335评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,385评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,068评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,660评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,747评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,967评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,406评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,970评论 2 341

推荐阅读更多精彩内容