Java线程池的使用

image.png

使用线程池的原因

  1. 无线创建线程的不足

    在生产环境中,为每一个任务都分配一个线程这种方法存在一些缺陷:

    • 线程生命周期的开销:线程的创建与销毁都会消耗大量资源,频繁创建与销毁线程会带来很大的资源开销

    • 资源消耗:活跃的线程会消耗系统资源。如果可运行的线程数量大于可用的处理器数量,闲置的线程会占用许多内存,并且频繁的线程上下文切换也会带来很大的性能开销

    • 稳定性:操作系统在可创建的线程数量上有一个限制。在高负载情况下,应用程序很有可能突破这个限制,资源耗尽后很可能抛出OutOfMemoryError异常

  2. 提高响应速度

    任务到达时,不再需要创建线程就可以立即执行

  3. 线程池提供了管理线程的功能

    比如,可以统计任务的完成情况,统计活跃线程与闲置线程的数量等

使用场景

  • 不适用场合
  1. 依赖性任务

    在线程池中,如果任务依赖于其他任务,那么可能产生死锁。比如,在单线程的Executor中,如果一个任务将另一个任务提交到同一个Executor,并且等待这个被提交任务的结果,那么通常会引发死锁

  2. 使用ThreadLocal的任务

    ThreadLocal可以存储线程级变量,将变量封闭到特定的线程当中。然而使用线程池时,这些线程都会被自由的重用,在线程池的线程中不应该使用ThreadLocal在任务之间传递值。

    当线程本地值的生命周期受限于任务的生命周期时,可以在线程池的线程中使用ThreadLocal,任务结束后调用ThreadLocal.remove方法将已存储的值清除。

  3. 使用线程封闭机制的任务

    在单线程应用程序中,不用考虑对象的并发安全问题,他们都被很好的封闭在单个线程当中。如果将单线程的环境换成线程池,那么这些对象有可能造成并发安全问题,失去线程安全性

  4. 不同类型或运行时长差异较大的任务

    不同类型任务之间很可能存在依赖,并且他们执行的时长也不相同,在线程池中运行时很有可能造成拥塞,甚至死锁

  • 适用场合

    当任务是同类型且相互独立时,线程池的性能可以达到最佳

    网页服务器、文件服务器、邮件服务器,他们的请求往往是同类型且相互独立的

架构

线程池异常处理方案这篇博客中已经提到了线程池的架构,如图:

image.png

Executor:异步任务执行框架的基础

public interface Executor {
    void execute(Runnable command);
}

通过使用Executor,将请求处理任务的提交与任务的实际执行解耦,只需要采用另一种不同的Executor实现,就可以改变服务器的行为。比如:

// 为每个任务分配一个线程
public class ThreadPerTaskExecutor implements Executor {
    @Override
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}
// 以同步的方式执行每个任务
public class WithinThreadExecutor implements Executor{
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService:ExecutorService扩展了Executor接口,添加了一些用于管理生命周期和任务提交的方法

public interface ExecutorService extends Executor {
    // 生命周期管理
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    // 任务提交
    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService的生命周期有3中状态:运行、关闭、终止。ExecutorService在初始创建时处于运行状态。shutdown方法执行平缓的关闭过程:不再接受新任务,同时等待已提交的任务执行完成,包括在任务队列中尚未开始的任务。shutdownNow方法将尝试取消所有运行中的任务,并不再启动队列中尚未执行的任务。

所有任务完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过轮询isTerminated来判断ExecutorService是否终止。

AbstractExecutorService ThreadPoolExecutor ScheduledThreadPoolExecutor: 线程池的实现

ThreadPoolExecutor扩展了ExecutorService接口,是线程池的具体实现。ScheduledThreadPoolExecutor支持定时以及周期性任务的执行。

ThreadPoolExecutor支持两种方式的任务提交:exec.execute(Runnable r)以及exec.submit(Runnable r)。关于任务的这两种提交方式在线程池异常处理方案已经提到过了,不再赘述。

定制线程池

先来了解一下线程池的创建:

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

以上是ThreadPoolExecutor的构造函数,看一下每个参数的含义:

  1. corePoolSize

corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

  1. runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

  2. maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

  3. ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

  4. RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。下面会有介绍几种饱和策略。

  5. keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

  6. TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

image.png

设置线程池的大小

线程池过大,会导致大量的线程在很少的cpu和内存资源上发生竞争,频繁的线程上下文切换也会带来额外的性能开销。线程池过小,导致许多空闲的处理器无法执行工作,降低吞吐率。

  1. cpu密集型

对于计算密集型的任务,当系统拥有n个处理器时,将线程池大小设置为n+1通常可以实现最优利用率。

  1. io密集型

对于包含io操作或其他阻塞操作的任务,由于线程不会一直执行,因此线程池的规模应该更大。有这么一个简单的公式:

N[threads] = N[cpu] * U[cpu] * (1 + W/C)

其中,N[threads]是线程池的大小,U[cpu]是cpu的利用率,W/C是任务等待时间与任务执行时间的比值。

可以通过一些监控工具获得cpu利用率等,Runtime.getRuntime().availableProcessors()返回cpu的数目

  1. 资源依赖

如果任务还依赖一些其他的有限资源,比如数据库连接,文件句柄等,那么这些资源也会影响线程池的大小:计算每个任务对该资源的需求量,用该资源的可用总量除以每个任务的需求量,所得的结果就是线程池大小的上限。

Executors

Executors提供了许多静态工厂方法来创建一个线程池:

newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。

具体情况可以结合Executors源码和ThreadPoolExecutor的构造函数查看。我们也可以模仿Executors的这几个工厂方法来定制自己的线程池执行策略。

扩展ThreadPoolExecutor

  1. 线程池异常处理方案这篇总结曾经提到重写ThreadPoolExecutor的afterExecute方法来处理未检测异常,这就是扩展ThreadPoolExecutor的一个例子。除此之外,还可以在这些方法中添加日志、计时、监视等功能。

线程池完成关闭操作后会调用方法terminated。terminated可以用来释放Executor在其生命周期中分配的各种资源,以及执行发送通知、记录日志等操作。

下面编写一个利用beforeExecute、afterExecute和terminated添加日志记录和统计信息收集的扩展ThreadPoolExecutor。

public class TimingThreadPool extends ThreadPoolExecutor{
    // 使用ThreadLocal存储任务起始时间,在beforeExecute设置起始时间,在afterExecute中可以看到这个值
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private final Logger logger = Logger.getLogger(TimingThreadPool.class.getName());
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();
    
    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        logger.fine(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            logger.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }
    
    @Override
    protected void terminated() {
        try {
            logger.fine(String.format("Terminated: avg time=%dns", totalTime.get()/numTasks.get()));
        } finally {
            super.terminated();
        }
    }

}
  1. 扩展ThreadPoolExecutor的newTaskFor方法可以修改通过submit方法返回的默认Future实现FutureTask为自己的实现。在我们自己实现Future的类中可以针对任务做一些操作,比如定制任务的取消行为:
public class CacellingExecutor extends ThreadPoolExecutor {

    public CacellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    
    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof CancellableTask) {
            return ((CancellableTask<T>)callable).newTask();
        }
        return super.newTaskFor(callable);
    }

}
interface CancellableTask<T> extends Callable<T> {
    void cancel();
    RunnableFuture<T> newTask();
}

abstract class SocketUsingTask<T> implements CancellableTask<T> {
    private Socket socket;
    public SocketUsingTask(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void cancel() {在并发应用程序中,线程池是很重要的一块。读完《java并发编程实战》以及研究了一遍jdk源代码之后,总结一下线程池方面的知识~
        try {
            this.socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public RunnableFuture<T> newTask() {
        return new FutureTask<T>(this){
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

异常处理

异常处理这部分,在前面的博客中已经总结过了:线程池异常处理方案

饱和策略

当线程池达到饱和以后(maximumPoolSzie),饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可以通过setRejectedExecutionHandler来修改。当某个任务被提交到一个已经关闭的Executor时,也会用到饱和策略。jdk提供了几种不同的RejectedExecutionHandler实现:

  1. AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

AbortPolicy是默认的饱和策略,该饱和策略将抛出未检查的RejectedExecutionException。调用者可以处理这个异常。

  1. CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

CallerRunsPolicy将任务回退到调用者,他不会在线程池的某个线程中提交任务,而是在调用execute的线程中运行,从而降低新任务的流量。

  1. DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

DiscardPolicy会悄悄抛弃任务,什么也不做。

  1. DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

DiscardOldestPolicy会抛弃下一个将被执行的任务,然后重新尝试提交任务。

其他

  1. CompletionService

如果向Executor提交了一组计算任务,并希望在计算完成后获取结果,那么可以保留与每个任务关联的Future,然后轮询这些future的get方法,判断任务是否完成。这种方法虽然可行,但是有些繁琐。

CompletionService将Executor和BlockingQueue的功能融合在一起,可以将任务提交给他执行,然后使用类似于队列的take或poll方法获取已完成结果。

ExecutorCompletionService 实现了CompletionService,他的实现很简单,在构造函数中创建一个BlockingQueue来保存计算完成的结果。当提交某个任务时,该任务首先包装成为一个QueueingFuture,这是FutureTask的一个子类,他改写了done方法,将结果放入BlockingQueue中。ExecutorCompletionService的take和poll方法委托给了BlockingQueue。

  1. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor以延迟或定时的方式执行任务,类似于Timer。由于Timer的一些缺陷,可以使用ScheduledThreadPoolExecutor来代替Timer。

Timer在执行所有的定时任务时只会创建一个线程,如果某个任务执行时间过长,就会破坏其他TimerTask的定时准确性。TimerTask抛出异常后,Timer线程也不会捕获这个异常,从而终止定时线程。尚未执行的TimerTask不会再执行,新的任务也不会被调度。

参考

java并发编程实战

聊聊并发(三)——JAVA线程池的分析和使用

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容