Tomcat线程池详解

写在前面的话

最近一直都在研究Java的线程池ThreadPoolExecutor,但是虽然它那么好,但是在实际的用途中怎么去用,对于我来说就不知道如何下手了,还好有开源社区我们可以了解很多项目中所运用到的线程池,比如最熟悉的就是Apache Tomcat了,相信都对它不默生,一个Apache软件基金下的一个开源Web容器,所以今天就来聊一下Tomcat的线程池实现。

准备工作

首先去Apache Tomcat的官网下载Tomcat的源代码,这里给出<a href="http://mirrors.tuna.tsinghua.edu.cn/apache/tomcat/tomcat-7/v7.0.72/src/apache-tomcat-7.0.72-src.zip">Tomcat源码链接</a>,下载下来之后,它是一个zip文件,需要把它进行解压到相应的文件夹下面,以便我们能方便的查看其源代码。分析源码最行之有效的方法就是知道这个类有哪些方法,哪些字段,继承了哪些类,实现了哪些接口,所以我们这里推荐一款UML工具, astah-professional,可自行下载安装,这是一个收费软件,但是它有50天的试用期,所以我们可以以使用的身份使用该软件。准备工作做好之后就可以进行下一步的操作了。

初探Tomcat线程池

Tomcat的线程池的类文件在../apache-tomcat-7.0.72-src\java\org\apache\catalina\core包下面,定位到这个文件夹下面可以看到StandardThreadExecutor.java就是我们找寻的类了,用文本工具打开就可以查看其源码了。这里源码如下:
StandardThreadExecutor.java

public class StandardThreadExecutor extends LifecycleMBeanBase
        implements Executor, ResizableExecutor {
    //默认线程的优先级
    protected int threadPriority = Thread.NORM_PRIORITY;
    //守护线程
    protected boolean daemon = true;
    //线程名称的前缀
    protected String namePrefix = "tomcat-exec-";
    //最大线程数默认200个
    protected int maxThreads = 200;
    //最小空闲线程25个
    protected int minSpareThreads = 25;
    //超时时间为6000
    protected int maxIdleTime = 60000;
    //线程池容器
    protected ThreadPoolExecutor executor = null;
    //线程池的名称
    protected String name;
     //是否提前启动线程
    protected boolean prestartminSpareThreads = false;
    //队列最大大小
    protected int maxQueueSize = Integer.MAX_VALUE;
    //为了避免在上下文停止之后,所有的线程在同一时间段被更新,所以进行线程的延迟操作
    protected long threadRenewalDelay = 1000L;
    //任务队列
    private TaskQueue taskqueue = null;

    //容器启动时进行,具体可参考org.apache.catalina.util.LifecycleBase#startInternal()
    @Override
    protected void startInternal() throws LifecycleException {
        //实例化任务队列
        taskqueue = new TaskQueue(maxQueueSize);
        //自定义的线程工厂类,实现了JDK的ThreadFactory接口
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
        //这里的ThreadPoolExecutor是tomcat自定义的,不是JDK的ThreadPoolExecutor
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
        executor.setThreadRenewalDelay(threadRenewalDelay);
        //是否提前启动线程,如果为true,则提前初始化minSpareThreads个的线程,放入线程池内
        if (prestartminSpareThreads) {
            executor.prestartAllCoreThreads();
        }
        //设置任务容器的父级线程池对象
        taskqueue.setParent(executor);
        //设置容器启动状态
        setState(LifecycleState.STARTING);
    }
    
  //容器停止时的生命周期方法,进行关闭线程池和资源清理
    @Override
    protected void stopInternal() throws LifecycleException {

        setState(LifecycleState.STOPPING);
        if ( executor != null ) executor.shutdownNow();
        executor = null;
        taskqueue = null;
    }
    
    //这个执行线程方法有超时的操作,参考org.apache.catalina.Executor接口
    @Override
    public void execute(Runnable command, long timeout, TimeUnit unit) {
        if ( executor != null ) {
            executor.execute(command,timeout,unit);
        } else { 
            throw new IllegalStateException("StandardThreadExecutor not started.");
        }
    }

    //JDK默认操作线程的方法,参考java.util.concurrent.Executor接口
    @Override
    public void execute(Runnable command) {
        if ( executor != null ) {
            try {
                executor.execute(command);
            } catch (RejectedExecutionException rx) {
                //there could have been contention around the queue
                if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
            }
        } else throw new IllegalStateException("StandardThreadPool not started.");
    }

    //由于继承了org.apache.tomcat.util.threads.ResizableExecutor接口,所以可以重新定义线程池的大小
    @Override
    public boolean resizePool(int corePoolSize, int maximumPoolSize) {
        if (executor == null)
            return false;

        executor.setCorePoolSize(corePoolSize);
        executor.setMaximumPoolSize(maximumPoolSize);
        return true;
    }
}

  看完了上面的源码之后,不知此刻的你是一面茫然还是认为小菜一碟呢,不管怎样,我们先来看下UML类图吧,了解一下具体的继承关系,你就明白了,废话不多说,能用图片解决的东西尽量少用文字。

StandardThreadExecutor类继承关系

  接下来,我们来看一下ResizableExecutor这个接口:

import java.util.concurrent.Executor;

public interface ResizableExecutor extends Executor {

    /**
     * Returns the current number of threads in the pool.
     *
     * @return the number of threads
     */
    public int getPoolSize();

    public int getMaxThreads();

    /**
     * Returns the approximate number of threads that are actively executing
     * tasks.
     *
     * @return the number of threads
     */
    public int getActiveCount();

    public boolean resizePool(int corePoolSize, int maximumPoolSize);

    public boolean resizeQueue(int capacity);

}

  实现这个接口之后,就能动态改变线程池的大小和任务队列的大小了,它是继承自JDK的Executor接口的,其它的接口不再多说,可自行查看源码。

Tomcat线程池的实现

  Tomcat的线程池的名字也叫作ThreadPoolExecutor,刚开始看源代码的时候还以为是使用了JDK的ThreadPoolExecutor了呢,后面仔细查看才知道是Tomcat自己实现的一个ThreadPoolExecutor,不过基本上都差不多,都是在JDK之上封装了一些自己的东西,上源码:

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
    protected static final StringManager sm = StringManager
            .getManager("org.apache.tomcat.util.threads.res");
    /**
     *  已经提交但尚未完成的任务数量。
     *  这包括已经在队列中的任务和已经交给工作线程的任务但还未开始执行的任务
     *  这个数字总是大于getActiveCount()的
     **/
    private final AtomicInteger submittedCount = new AtomicInteger(0);
    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
    /**
    *  最近的时间在ms时,一个线程决定杀死自己来避免
    *  潜在的内存泄漏。 用于调节线程的更新速率。
    */
    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
    //延迟2个线程之间的延迟。 如果为负,不更新线程。
    private long threadRenewalDelay = 1000L;
    //4个构造方法  ... 省略

    public long getThreadRenewalDelay() {
        return threadRenewalDelay;
    }

    public void setThreadRenewalDelay(long threadRenewalDelay) {
        this.threadRenewalDelay = threadRenewalDelay;
    }
    
    /**
    *  方法在完成给定Runnable的执行时调用。
    *  此方法由执行任务的线程调用。 如果
    *  非null,Throwable是未捕获的{@code RuntimeException}
    *  或{@code Error},导致执行突然终止。...
    *  @param r 已完成的任务
    *  @param t 引起终止的异常,如果执行正常完成则为null
    **/
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedCount.decrementAndGet();

        if (t == null) {
            stopCurrentThreadIfNeeded();
        }
    }

    //如果当前线程在上一次上下文停止之前启动,则抛出异常,以便停止当前线程。
    protected void stopCurrentThreadIfNeeded() {
        if (currentThreadShouldBeStopped()) {
            long lastTime = lastTimeThreadKilledItself.longValue();
            if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
                if (lastTimeThreadKilledItself.compareAndSet(lastTime,
                        System.currentTimeMillis() + 1)) {
                    // OK, it's really time to dispose of this thread

                    final String msg = sm.getString(
                                    "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
                                    Thread.currentThread().getName());

                    throw new StopPooledThreadException(msg);
                }
            }
        }
    }
    
    //当前线程是否需要被终止
    protected boolean currentThreadShouldBeStopped() {
        if (threadRenewalDelay >= 0
            && Thread.currentThread() instanceof TaskThread) {
            TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
            //线程创建的时间<上下文停止的时间,则可以停止该线程
            if (currentTaskThread.getCreationTime() <
                    this.lastContextStoppedTime.longValue()) {
                return true;
            }
        }
        return false;
    }

    public int getSubmittedCount() {
        return submittedCount.get();
    }

    @Override
    public void execute(Runnable command) {
        execute(command,0,TimeUnit.MILLISECONDS);
    }

    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    Thread.interrupted();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

    public void contextStopping() {
        this.lastContextStoppedTime.set(System.currentTimeMillis());
        int savedCorePoolSize = this.getCorePoolSize();
        TaskQueue taskQueue =
                getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
        if (taskQueue != null) {
            taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
        }

        // setCorePoolSize(0) wakes idle threads
        this.setCorePoolSize(0);

        if (taskQueue != null) {
            // ok, restore the state of the queue and pool
            taskQueue.setForcedRemainingCapacity(null);
        }
        this.setCorePoolSize(savedCorePoolSize);
    }
}

Tomcat的线程池根据文档来说:和java.util.concurrent一样,但是它实现了一个高效的方法getSubmittedCount()方法用来处理工作队列。具体可以查看上面的注释和源码就知道了。把UML图献上。

ThreadPoolExecutor

Tomcat线程工厂

  想要自定义线程工厂类,只需要实现JDK的ThreadFactory接口就可以了,我们来看看Tomcat是如何实现的吧:

public class TaskThreadFactory implements ThreadFactory {
    //线程组
    private final ThreadGroup group;
    //线程增长因子
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    //名称前缀
    private final String namePrefix;
    //是否是守护线程
    private final boolean daemon;
    //线程优先级
    private final int threadPriority;
    public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.threadPriority = priority;
    }

    @Override
    public Thread newThread(Runnable r) {
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
        t.setDaemon(daemon);
        t.setPriority(threadPriority);
        return t;
    }

}

Tomcat的线程工厂类和JDK实现的线程工厂类相差无几,具体可以参考一下JDK线程工厂Executors.DefaultThreadFactory工厂类的实现。

Tomcat的线程类

  Tomcat自己定义了TaskThread用于线程的执行,里面增加了creationTime字段用于定义线程创建的开始时间,以便后面线程池获取这个时间来进行优化。

/**
 * 一个实现创建时间纪录的线程类
 */
public class TaskThread extends Thread {

    private static final Log log = LogFactory.getLog(TaskThread.class);
    private final long creationTime;

    public TaskThread(ThreadGroup group, Runnable target, String name) {
        super(group, new WrappingRunnable(target), name);
        this.creationTime = System.currentTimeMillis();
    }

    public TaskThread(ThreadGroup group, Runnable target, String name,
            long stackSize) {
        super(group, new WrappingRunnable(target), name, stackSize);
        this.creationTime = System.currentTimeMillis();
    }

    public final long getCreationTime() {
        return creationTime;
    }

    /**
    *  封装{@link Runnable}以接受任何{@link StopPooledThreadException},而不是让它走,并可能在调试器中触发中断。
     */
    private static class WrappingRunnable implements Runnable {
        private Runnable wrappedRunnable;
        WrappingRunnable(Runnable wrappedRunnable) {
            this.wrappedRunnable = wrappedRunnable;
        }
        @Override
        public void run() {
            try {
                wrappedRunnable.run();
            } catch(StopPooledThreadException exc) {
                //expected : we just swallow the exception to avoid disturbing
                //debuggers like eclipse's
                log.debug("Thread exiting on purpose", exc);
            }
        }
    }
}

按照Tomcat的注解可知,它就是一个普通的线程类然后增加一个纪录线程创建的时间纪录而已,后面还使用动态内部类封装了一个Runnable,用于调试出发中断。

Tomcat任务队列

  Tomcat的线程队列由org.apache.tomcat.util.threads.TaskQueue来处理,它集成自LinkedBlockingQueue(一个阻塞的链表队列),来看下源代码吧。

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = 1L;

    private ThreadPoolExecutor parent = null;

    // no need to be volatile, the one times when we change and read it occur in
    // a single thread (the one that did stop a context and fired listeners)
    private Integer forcedRemainingCapacity = null;

    public TaskQueue() {
        super();
    }

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public TaskQueue(Collection<? extends Runnable> c) {
        super(c);
    }

    public void setParent(ThreadPoolExecutor tp) {
        parent = tp;
    }

    public boolean force(Runnable o) {
        if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
    }

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }


    @Override
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            // the poll timed out, it gives an opportunity to stop the current
            // thread if needed to avoid memory leaks.
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    @Override
    public Runnable take() throws InterruptedException {
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
            // yes, this may return null (in case of timeout) which normally
            // does not occur with take()
            // but the ThreadPoolExecutor implementation allows this
        }
        return super.take();
    }

    @Override
    public int remainingCapacity() {
        if (forcedRemainingCapacity != null) {
            return forcedRemainingCapacity.intValue();
        }
        return super.remainingCapacity();
    }

    public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
        this.forcedRemainingCapacity = forcedRemainingCapacity;
    }
}

TaskQueue这个任务队列是专门为线程池而设计的。优化任务队列以适当地利用线程池执行器内的线程。
如果你使用一个普通的队列,执行器将产生线程,当有空闲线程,你不能强制项目到队列本身。

总结

从0到1分析一下Apache Tomcat的线程池,感觉心好累啊,不过有收获,至少多线程池这一块又加强了,首先是定位到了StandardThreadExecutor这个类,然后由此展开,ResizableExecutor(动态大小的线程池接口) 、ThreadPoolExecutor (Tomcat线程池具体实现对象)、TaskThreadFactory(Tomcat线程工厂)、TaskThread(Tomcat线程类-一个纪录创建时间的线程类)、TaskQueue(Tomcat的任务队列-一个专门为线程池而设计优化的任务队列),喝口水,压压惊。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,681评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,710评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,623评论 0 334
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,202评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,232评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,368评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,795评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,461评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,647评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,476评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,525评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,226评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,785评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,857评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,090评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,647评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,215评论 2 341

推荐阅读更多精彩内容