在多线程环境中,多个线程之间互相协作,以达到高效实现程序功能的目的,比如某些多线程程序要求线程执行有先后顺序、获取某个线程的执行结果,要想实现多个线程之间的协同,就需要线程之间互相通信,线程通信主要分为一下四类:
- 1)文件共享
- 2)网络共享
- 3)共享变量
- 4)JDK提供的线程协调API(主要有:
suspend/resume
、wait/notify
、park/unpark
)
一、文件共享
一个线程将数据写入到文件中,另一个线程再去读取文件,实现数据的共享,最终达到线程通信的目的。
代码示例:
public class FileShareComm {
public static void main(String[] args) {
// 线程1 - 写入数据
new Thread(() -> {
System.out.println("线程1启动");
try {
while (true) {
Files.write(Paths.get("data.log"),
("当前时间" + String.valueOf(System.currentTimeMillis())).getBytes());
Thread.sleep(1000L);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 线程2 - 读取数据
new Thread(() -> {
System.out.println("线程2启动");
try {
while (true) {
Thread.sleep(1000L);
byte[] allBytes = Files.readAllBytes(Paths.get("data.log"));
System.out.println(new String(allBytes));
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
程序运行结果:
线程1启动
线程2启动
当前时间1570711321681
当前时间1570711322768
当前时间1570711323768
当前时间1570711324775
二、网络共享
通俗地说就是网络上不同计算机之间通过套接字(Socket)进行通信,一个Socket一般由IP和Port组成。
三、共享变量
多个线程对某个内存中数据进去读取和写入,实现线程通信。
代码示例:
public class VariableShareComm {
// 共享变量
public static String content = "空";
public static void main(String[] args) {
// 线程1 - 写入数据
new Thread(() -> {
System.out.println("线程1启动!");
try {
while (true) {
content = "当前时间" + String.valueOf(System.currentTimeMillis());
Thread.sleep(1000L);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 线程2 - 读取数据
new Thread(() -> {
System.out.println("线程2启动!");
try {
while (true) {
Thread.sleep(1000L);
System.out.println(content);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
程序运行结果:
线程1启动!
线程2启动!
当前时间1570712442853
当前时间1570712443859
当前时间1570712444868
当前时间1570712445883
四、线程协作 — JDK API
JDK中对于需要多线程协作完成某一任务的场景,提供了对应API支持,主要有suspend/resume
、wait/notify
、park/unpark
。
关于多线程协作有个经典的场景:生产者 - 消费者模型(线程阻塞、线程唤醒)
示例:线程1去买包子,没有包子,则暂停执行,等待通知;线程2生产出包子,通知线程1继续执行。
下面演示如何用各个JDK API实现生产者-消费者模型。
1、被弃用的suspend和resume
调用suspend
挂起目标线程,通过resume
可以恢复线程执行,由于supend/resume
即要求resume
在suspend
之后调用,并且suspend
被调用后不会释放锁,因此容易写出死锁的代码,所以被弃用。
(1)死锁的场景1:suspend不释放锁,resume需要获取锁
代码示例:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 死锁的suspend/resume: suspend并不会像wait一样释放锁,因此容易写出死锁代码 */
public void suspendResumeDeadLockTest() throws Exception {
// 启动线程
Thread consumerThread = new Thread(() -> {
if (baozidian == null) { // 如果没包子,则进入等待
System.out.println("1、进入等待");
// 当前线程拿到锁,然后挂起(还是RUNNABLE状态)
synchronized (this) {
Thread.currentThread().suspend();
}
}
System.out.println("2、买到包子,回家");
});
consumerThread.start();
// 3秒之后,生产一个包子
Thread.sleep(3000L);
//System.out.println("consumerThread's status " + consumerThread.getState().toString());
baozidian = new Object();
synchronized (this) {
consumerThread.resume();
}
System.out.println("3、通知消费者");
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
demo6.suspendResumeDeadLockTest();
}
}
执行结果:
【代码解析】
由于resume会在休眠3秒之后被调用,所以保证了resume在suspend之后执行,consumerThread的run方法体内执行suspend之前要先拿到demo6对象锁,3秒后创建baozidian对象,主线程要调用resume方法通知consumerThread线程,但是由于执行suspend时没有释放demo5对象锁,所以这里主线程没办法拿到锁,导致没法执行resume,结果是consumerThread永远处于挂起状态。
(2)死锁的场景2:resume在suspend之前执行
代码示例:
/** 导致永久挂起的suspend/resume */
public void suspendResumeDeadLockTest2() throws Exception {
// 启动线程
Thread consumerThread = new Thread(() -> {
if (baozidian == null) { // 如果没包子,则进入等待
System.out.println("1、进入等待");
try {
Thread.sleep(5000L); // 为这个线程加上一点延时
} catch (Exception e) {
e.printStackTrace();
}
// 这里的挂起执行在resume后面
Thread.currentThread().suspend();
}
System.out.println("2、买到包子,回家");
});
consumerThread.start();
// 3秒之后,生产一个包子
Thread.sleep(3000L);
baozidian = new Object();
consumerThread.resume();
System.out.println("3、通知消费者");
consumerThread.join();
}
执行结果:
【代码解析】
由于consumerThread的run方法体内执行suspend之前会先休眠5秒,所以导致resume会先执行,suspend后执行,后面的程序不会再次resume,同样会导致consumerThread永远处于挂起状态。
(3)正常的suspend/resume
代码示例:
/** 正常的suspend/resume */
public void suspendResumeTest() throws Exception {
// 启动线程
Thread consumerThread = new Thread(() -> {
if (baozidian == null) { // 如果没包子,则进入等待
System.out.println("1、进入等待");
Thread.currentThread().suspend();
}
System.out.println("2、买到包子,回家");
});
consumerThread.start();
// 3秒之后,生产一个包子
Thread.sleep(3000L); // 延迟3秒,保证调用resume()之前已经调用suspend()完毕
baozidian = new Object();
consumerThread.resume();
System.out.println("3、通知消费者");
}
执行结果:
2、wait/notify机制
wait/notify依赖于Java对象监视器锁,而监视器锁又是跟sychronized
配合使用的,因此wait/notify必须写在同步块中,并且wait/notify方法只能由同一对象锁的持有者线程调用,否则会抛出IllegalMonitorStateException异常。
特别注意,使用sychronized
时,用到的监视器锁是监视对象obj对应的监视器锁,所以调用wait
方法时,必须调用obj.wait()
,这样obj的对象监视器才会去释放对应的监视器锁。
synchronized (obj) {
try {
System.out.println("1、进入等待");
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
(1)正常的wait/notify
wait
和notify
方法都必须在sychronized
块中被调用,且sychronized
和调用方法时必须使用相同的锁对象,notify
必须在wait
被调用之后再调用,代码如下:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 正常的wait/notify */
public void waitNotifyTest() throws Exception {
// 启动线程
new Thread(() -> {
if (baozidian == null) {
synchronized (this) {
try {
System.out.println("1、进入等待");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("2、买到包子,回家");
}).start();
// 3秒之后,生产一个包子
Thread.sleep(3000L);
baozidian = new Object();
synchronized (this) {
this.notifyAll();
System.out.println("3、通知消费者");
}
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
// 2、wait/notify
demo6.waitNotifyTest();
}
}
执行结果:
(2)死锁的wait/notify
wait
方法会导致当前线程等待,加入对应的对象的监视器等待集合中,并且释放当前持有的对象锁,notify/notifyAll
方法会唤醒一个或者所有正在等待该对象锁的线程。
虽然wait
会自动解锁,但是对调用顺序有要求,如果在notify被调用之后,才开始wait
方法的调用,线程会永远处于WAITING状态;如果调用notify/notifyAll
时对象锁的等待集合中没有等待的线程,自然通知不到任一个线程,只有在通知前有线程调用了wait
进入等待集合中,才能真正通知到等待的线程。
代码示例:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 会导致程序永久等待的wait/notify */
public void waitNotifyDeadLockTest() throws Exception {
// 启动线程
new Thread(() -> {
if (baozidian == null) {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (this) {
try {
System.out.println("1、进入等待");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("2、买到包子,回家");
}).start();
// 3秒之后,生产一个包子
Thread.sleep(3000L);
baozidian = new Object();
synchronized (this) {
this.notifyAll();
System.out.println("3、通知消费者");
}
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
// 2、wait/notify
demo6.waitNotifyDeadLockTest();
}
}
执行结果:
3、park/unpark机制
park/unpark
是JDK API的另一种线程通信机制,一个线程调用了park
则等待颁发一个“许可”,如果当前存在未被使用的许可,则线程可以直接获取许可直接运行;如果当前许可数为0,则需要等待其他线程调用unpark
颁发许可。
需要许可和颁发许可的线程没有强依赖关系,任何一个线程颁发的许可都可以被任意需要许可的线程使用,任何一个线程都可以颁发许可。因此,park
和unpark
对调用顺序没有要求,同时由于park/unpark
不像wait/notify
那样是基于锁监视器的,所以park/unpark
不会释放当前线程持有的锁。
一个线程多次调用park
时,只有第一次调用生效,不会因为多次park
而去获取多个许可证,因为底层是基于一个布尔值的CAS原子操作。
(1)正常的park/unpark
代码示例:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 正常的park/unpark */
public void parkUnparkTest() throws Exception {
// 启动线程
Thread consumerThread = new Thread(() -> {
if (baozidian == null) {
System.out.println("1、进入等待");
LockSupport.park();
}
System.out.println("2、买到包子,回家");
});
consumerThread.start();
// 3秒之后,生产一个包子
Thread.sleep(3000L);
baozidian = new Object();
System.out.println();
LockSupport.unpark(consumerThread);
System.out.println("3、通知消费者");
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
// 3、park/unpark
demo6.parkUnparkTest();
}
}
执行结果:
(2)死锁的park/unpark
由于park
时不会释放锁,所以如果执行park
所在的代码块是需要先获取锁的同步代码块,并且unpark()
需要获取相同的锁时,会触发死锁。
代码示例:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 死锁的park/unpark */
public void parkUnparkDeadLockTest() throws Exception {
// 启动线程
Thread consumerThread = new Thread(() ->{
if (baozidian == null) {
System.out.println("1、进入等待");
synchronized (this) {
LockSupport.park();
}
}
System.out.println("2、买到包子,回家");
});
consumerThread.start();
// 3秒之后,生产一个包子
Thread.sleep(3000L);
baozidian = new Object();
synchronized (this) {
LockSupport.unpark(consumerThread);
}
System.out.println("3、通知消费者");
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
// 3、park/unpark
demo6.parkUnparkDeadLockTest();
}
}
4、总结
通讯方式 | 等待通知时状态 | 死锁场景 | 优点 | 缺点 |
---|---|---|---|---|
suspend/resume |
RUNNABLE | 1、resume在suspend之前被调用 2、suspend和resume所在的同步代码块使用相同的锁导致死锁 |
~ | 很容易触发死锁 |
wait/notify |
WAITING | notify在wait之前使用 | 基于对象监视器锁,调用wait 时会释放锁 |
执行顺序有要求 |
park/unpark |
WAITING | park和unpark所在同步代码块使用相同的锁导致死锁 | 对执行顺序没要求 |
park 不会释放锁,可能导致死锁 |
五、伪唤醒
一般情况下,当线程运行需要的等待某个条件还不具备是,线程会调用上述的suspend
、wait
、park
方法将线程挂起,然后等待另一个线程满足这个条件后再通知挂起线程,如果使用if
语句来判断是否进入等待状态,可能会引起伪唤醒问题,问题代码模板示例:
sychronized(lock) {
if (<条件判断>) {
lock.wait();
}
// 执行后续操作
}
1、什么是伪唤醒?
伪唤醒是指线程并非因为notify
、notifyAll
、unpark
等api调用而唤醒,是更底层的原因导致的,此时条件判断还不满足,但是却因为伪唤醒运行后续的代码,导致程序运行异常或错误。
2、如何解决伪唤醒问题?
不用if
语句来判断,而是在循环中检查等待条件,这样确保程序在伪唤醒的条件下依然不会在条件没满足的情况下去执行后续操作,而是再次将线程挂起,如下所示:
// wait
sychronized(obj) {
while (<条件判断>) {
obj.wait();
}
// 执行后续操作
}
// park
while(<条件判断>) {
LockSupport.park();
// 执行后续操作
}
实例代码演示:
/** 正常的wait/notify */
public void waitNotifyTest() throws Exception {
// 启动线程
Thread consumerThread = new Thread(() -> {
while (baozidian == null) {
synchronized (this) {
try {
System.out.println("1、进入等待");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("2、买到包子,回家");
});
consumerThread.start();
// 3秒之后,生产一个包子
Thread.sleep(3000L);
System.out.println("consumerThread's status " + consumerThread.getState().toString());
baozidian = new Object();
synchronized (this) {
this.notifyAll();
System.out.println("3、通知消费者");
}
}
/** 正常的park/unpark */
public void parkUnparkTest() throws Exception {
// 启动线程
Thread consumerThread = new Thread(() -> {
while (baozidian == null) {
System.out.println("1、进入等待");
LockSupport.park();
}
System.out.println("2、买到包子,回家");
});
consumerThread.start();
// 3秒之后,生产一个包子
Thread.sleep(3000L);
baozidian = new Object();
//System.out.println(consumerThread.getState().toString());
LockSupport.unpark(consumerThread);
System.out.println("3、通知消费者");
}