一、背景
我问群里的同学,下一期的话题讨论什么?一同学说多线程,当时听到这个话题本想拒绝的,因为这个话题太泛了,很难写出一篇好的文章;涉及到的内容又很多,随便拿出一块就能讨论几天;但为了大家的积级性,还是硬着头皮应下来(实际上我对这一块也不怎么理解),基本上花了一周的时候看了几百页的书,然后希望能够有条理地写出一篇文章,但发现还是太难了;如果包含太多的技术细节,我甚至没办法把概念讲清楚;后来我决定,写这篇文章为了大家理解JAVA多线程最基本的概念,涉及到的技术细节我们以子话题的形式,慢慢为大家更新。
二、几个基本概念
并发是指让多个任务并行执行(同时执行多个任务),以使任务得到更快的执行。实现并发的方式,有两个:进程机制和线程机制;一个CPU同时只能执行一个进程,
并发与并行
并发指的是具有处理多任务的能力
并行指的是具有同时处理多任务的能力
“并发”和“并行”唯一的一个区别在于“同时”,也从上可以看出,“并行”系统一定是“并发”系统,“并行”是“并发”的一个子集。
下面来自知乎一个网友的回答用一个很好的场景解释了它们的区别:
- 你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。
- 你吃饭吃到一半,电话来了,你停了下来接了电话,接完后继续吃饭,这说明你支持并发。
- 你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行。
进程与线程
进程是资源分配的最小单位,同时也支持调度;什么是资源呢,就是内存/io/文件等,进程的资源是在开启的时候就分配好的,各进程之间不可相互换资源。
线程是独立运行和独立调度的基本单位。独立运行的意思是,线程才是真正的任伤执行者,一个线程可以执行多个任务(任务切换)。独立调度即时间切片,每个线程按照时间切片执行任务。
线程依赖于进程,进程已经申请了线程执行任务时的资源,多个线程之间共享这部分资源,一个进程可以有多个线程。
线程驱动任务的执行。
进程与线程之间的关系看到一篇很好的文章,大家可以参考。
一个小插曲:线程一定要依附于进程吗?(不一定,但在JAVA一定是),其实进程和线程是实现并发的两种方式,曾经有人提出,并发的实现只应该采用进程的方式,因为线程的上下文切换存在不小的开销,并不一定能够提高任务的执行效率。比如,full GC,如果采用并行收集,其线程数量也不应该超过计算机的核数,如果超过核数,其性能还不如顺序收集。但为什么线程又能大行其道呢?这脱离不了任务执行中的一个状态:“阻塞”,当其中一个任务处于阻塞状态(比如等待输入)时,就可以执行另外一个任务,这样大大提高了执行的效率。线程打破了摩尔定律,又有人提出了阿姆达尔定律
JAVA采用的是多线程的编程机制,因此本文讨论的主要问题集中在,线程如何调度任务,如何管理线程以及各线程之间的资源竞争问题
三、任务
从上一节的描述中我们知道,最终执行的是任务,那我们就需要了解一下如何创建任务
3.1 创建任务
有两种类型的任务,一种是不需要返回结果的runnable,另外一种是希望返回结果的Callable。接下来我们就详情了解一下,如何创建两种任务:
Runnable
先来看个例子(就一个递减操作):
public class ListOff implements Runnable {
protected int countDown = 10;
public ListOff() {
}
public String status() {
return "#" + "(" + (countDown > 0 ? countDown : "Liftoff!") + ").";
}
public void run() {
while (countDown-- > 0) {
System.out.print(status());
}
System.out.println();
}
}
然后把这个任务注册到多个线程上,并启动线程,只需要new Thread再start就可以了,是不是很简单。
public class TestThread {
@Test
public void testOneThread() {
Thread t = new Thread(new ListOff());
t.start();
}
@Test
public void testNThread() {
int i = 10;
while(i-- > 0) {
Thread t = new Thread(new ListOff());
t.start();
}
}
}
Callable
public class GetRand implements Callable<Integer>{
public Integer call() {
Random random = new Random();
return random.nextInt(100);
}
}
public class TestThread {
@Test
public void testCallable() throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
List<Future<Integer>> results = new ArrayList<>();
int i = 10;
while (i-- > 0) {
results.add(exec.submit(new GetRand()));
}
for (Future<Integer> n : results) {
System.out.println(n.get());
}
}
}.
为什么要调用start?通过此代码找到Thread中的start()方法的定义,可以发现此方法中使用了private native void start0();其中native关键字表示可以调用操作系统的底层函数,那么这样的技术成为JNI技术(java Native Interface)
还有一种调度runnable的方法,那就是继承Thread.
public class PThread extends Thread{
public PThread() {
super();
}
@Override
public void run () {
System.out.println(DateFormat.getDateInstance(DateFormat.DEFAULT).format(new Date()));
}
@Test
public void TestMain() {
int i = 10;
while(i-- > 0) {
new PThread().run();
}
}
}
两种方法有什么区别呢(copy别人的结论)?
Java中实现多线程有两种方法:继承Thread类、实现Runnable接口,在程序开发中只要是多线程,肯定永远以实现Runnable接口为主,因为实现Runnable接口相比继承Thread类有如下优势:
- 1、可以避免由于Java的单继承特性而带来的局限;
- 2、增强程序的健壮性,代码能够被多个线程共享,代码与数据是独立的;
- 3、适合多个相同程序代码的线程区处理同一资源的情况。
想要更加详情地了解,参考
四 线程管理与调度
通过上面的任务介绍,我们可们简单了解了如何创建两种类型的任务。下面我们就具体来了解一下线程以及线程如何调度任务。
我们主要通过Executor框架来管理和调度线程
4.1 executor
通过new Thread()的方式去管理线程,在极简单的系统条件下是可行的,但当你试图去完成一个系统时,发现并不可行。好在JAVA已经设计好了Executors来帮我们来管理线程。
Java从1.5版本开始,为简化多线程并发编程,引入全新的并发编程包:java.util.concurrent及其并发编程框架(Executor框架)。 Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。他们的关系为
并发编程的一种编程方式是把任务拆分为一系列的小任务,即Runnable,然后将这些任务提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用其内部的线程池来完成操作。 Executor的子接口有:ExecutorService,ScheduledExecutorService,已知实现类:AbstractExecutorService,ScheduledThreadPoolExecutor,ThreadPoolExecutor。
4.2 线程池
newCachedThreadPool() | 缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse.如果没有,就建一个新的线程加入池中;缓存型池子通常用于执行一些生存期很短的异步型任务因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。能reuse的线程,必须是timeout IDLE内的池中线程,缺省 timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。 |
newFixedThreadPool(int) | newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程;-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子;-和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器;-从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:fixed池线程数固定,并且是0秒IDLE(无IDLE) cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE |
newScheduledThreadPool(int) | 这个池子里的线程可以按schedule依次delay执行,或周期执行 |
SingleThreadExecutor() | -单例线程,任意时间池中只能有一个线程;-用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE) |
4.3 队列
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)
corePoolSize:线程池中所保存的核心线程数,包括空闲线程。
maximumPoolSize:池中允许的最大线程数。
keepAliveTime:线程池中的空闲线程所能持续的最长时间。
unit:持续时间的单位。
workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。
根据ThreadPoolExecutor源码前面大段的注释,我们可以看出,当试图通过excute方法讲一个Runnable任务添加到线程池中时,按照如下顺序来处理:
1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;
2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);
3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;
4、如果线程池中的线程数量等于了maximumPoolSize,有4种才处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。
总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。
另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。
我们大致来看下Executors的源码,newCachedThreadPool的不带. RejectedExecutionHandler参数(即第五个参数,线程数量超过maximumPoolSize时,指定处理方式)的构造方法如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
它将corePoolSize设定为0,而将maximumPoolSize设定为了Integer的最大值,线程空闲超过60秒,将会从线程池中移除。由于核心线程数为0,因此每次添加任务,都会先从线程池中找空闲线程,如果没有就会创建一个线程(SynchronousQueue<Runnalbe>决定的,后面会说)来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最大值,因此这个线程池理论上可以不断扩大。
再来看newFixedThreadPool的不带RejectedExecutionHandler参数的构造方法,如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
它将corePoolSize和maximumPoolSize都设定为了nThreads,这样便实现了线程池的大小的固定,不会动态地扩大,另外,keepAliveTime设定为了0,也就是说线程只要空闲下来,就会被移除线程池,敢于LinkedBlockingQueue下面会说。
下面说说几种排队的策略:
1、直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。
2、无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。
3、有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。