CountDownLatch
需求背景:
有三个task同时在线程池中执行,想要三个task都执行完成之后,再执行其他任务。
分析:
- 需要监控三个任务的执行
- 等待三个任务执行完毕
由此引出CountDownLatch
,先上代码:
public void test() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CountDownLatch countDownLatch = new CountDownLatch(3);
executorService.submit(new DependentService(countDownLatch));
executorService.submit(new DependentService(countDownLatch));
executorService.submit(new DependentService(countDownLatch));
try {
countDownLatch.wait();//block
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class DependentService implements Runnable {
private CountDownLatch latch;
public DependentService(CountDownLatch countDownLatch) {
latch = countDownLatch;
}
@Override
public void run() {
//start your task
latch.countDown();
}
}
不难看出latch.countDown();
用于监控每一个任务执行完毕,而countDownLatch.wait()
用于等待所有三个监控的任务执行完毕。注意点在于CountDownLatch
的构造函数传参要和实际执行的任务数量相同。
CyclicBarrier
需求背景:
以游戏中常见的广播通知为例,某个玩家向其他三个玩家广播了一条消息。消息从玩家发送到服务端,服务端要确保同时向其他三个玩家广播消息。
(从服务端确保大家都能同时收到消息的前提是同时发送消息(这里不考虑网络延迟影响),anyway,这是只是一个简化说明的例子)
分析:
- 三个线程同时运行,这是保证同时发送的前提
- 三个线程都到了某一个准备的状态,然后同时运行发送消息的代码
由此引出CyclicBarrier
, 见代码:
public void test() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
executorService.submit(new SyncService(cyclicBarrier));
executorService.submit(new SyncService(cyclicBarrier));
executorService.submit(new SyncService(cyclicBarrier));
Thread.sleep(3000);
}
public class SyncService implements Runnable {
private CyclicBarrier barrier;
public SyncService(CyclicBarrier cyclicBarrier) {
barrier = cyclicBarrier;
}
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
//send message
}
}
不难看出,首先是三个线程都会分别到达barrier.await();
然后分别被block住,直到三个线程都到达了这个状态,就会同时执行下面send message
的代码。
Phaser
Phaser
集CountDownLatch
和CyclicBarrier
功能于一身,并且增加了更多控制功能。当然也更加复杂,这里只是做一个简单的介绍。
- 和
CyclicBarrier
作用相似的用法:
public void test() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Phaser phaser = new Phaser(3);
executorService.submit(new PhaseService(phaser));
executorService.submit(new PhaseService(phaser));
executorService.submit(new PhaseService(phaser));
Thread.sleep(3000);
}
public class PhaseService implements Runnable{
private Phaser phaser;
public PhaseService(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
phaser.arriveAndAwaitAdvance();// 和 barrier.await() 作用相同
//do your task
}
}
new Phaser(3)
构造方法,这个传参3是到达下一个Phase需要同时满足的状态个数,上面的例子就是指到达phaser.arriveAndAwaitAdvance()
的线程数。如果传递了3,但是下面提交了四个task运行在四个线程中,那么就会有一个task所在的线程被block在phaser.arriveAndAwaitAdvance()
状态。 当然我们也可以在代码运行中动态的注册这样初始化的时候就不需要传递具体的数值。
- 和
CountDownLatch
用法相似
public void test() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Phaser phaser = new Phaser(4);//注意和上面传参的区别3和4
executorService.submit(new PhaseService(phaser));
executorService.submit(new PhaseService(phaser));
executorService.submit(new PhaseService(phaser));
phaser.arriveAndAwaitAdvance();
}
public class PhaseService implements Runnable{
private Phaser phaser;
public PhaseService(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
//do your task
phaser.arriveAndAwaitAdvance();//注意差异,是先完成task再执行这一步
}
}
注意点标注在代码中了,可以看到在调用线程多了phaser.arriveAndAwaitAdvance();
,同时传参从3变成了4,可以体会一下这里细节的差异。
这里对Phaser
只是做一个简单的介绍,并没有实际使用经验,更多细节可以参考官方文档:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Phaser.html
贴一个很好的使用教程:
https://www.netjstech.com/2016/01/phaser-in-java-concurrency.html