本案例参考至《Java7并发编程实战手册》
在java并发编程的过程中,往往会遇到这样的需求:现在有多个工人,每个工人都制作同一件产品,而且相对于每个工人来说产品的制作工序都是一样的。每制作完一道工序,产品都需要使用大型机器进行再加工,为了保证经济效率。现在要求每一道工序都需要所有的工人完成后,将所有的产品送进工厂加工,加工完毕之后再将产品分发给所有的工人进行下一轮的工序。(也就是说,每道工序都必须等待所有的人完成之后,大家才能继续下面的工作,只要有一个人没完成都要等到这个人完成之后才能向下执行)
在java中要实现这样的需求可以使用Phaser并发阶段任务执行机制
工人类(表示工人执行工作的各个步骤)
import java.util.Date;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class Worker implements Runnable{
private Phaser phaser;
public Worker(Phaser phaser) {
super();
this.phaser = phaser;
}
@Override
public void run() {
//将每个工人到达工厂的信息打印出来
System.out.printf("%s: Has arrived to do the company. %s---------00000000000000000000\n",Thread.currentThread().getName(),new Date());
//等待所有的线程执行到这里,各个线程才会开始向下执行,不过在向下执行之前会执行phaser的onAdvance()方法
phaser.arriveAndAwaitAdvance();
System.out.printf("%s :Is going to do the frist step.%s******************0\n",Thread.currentThread().getName(),new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s: Has done the frist step. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s :Is going to do the second step.%s---------1111111111111111111\n",Thread.currentThread().getName(),new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s: Has done the second step. %s*****************1\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s :Is going to do the thrid step.%s---------222222222222222222\n",Thread.currentThread().getName(),new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s: Has done the thrid step. %s******************2\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
}
}
自定义的MyPhaser类
/**
* MyPhaser:并发阶段任务中的阶段切换
* @author JM
* @date 2017-2-27 下午9:54:15
* @since JDK 1.7
*/
public class MyPhaser extends Phaser {
/**
* 重写onAdvance(int x,int y)方法
* 在Phaser类中,onAdvance(int x,int y)方法在Phaser阶段改变的时候会自动执行,
* x表示当前的阶段数,y表示注册的参与者数量
* 如果onAdvance(int x,int y)方法返回false表示phaser在继续执行,返回true表示phaser已经完成执行并且进入了终止态
*/
@Override
public boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
return workersArrived();
case 1:
return finishFristExercise();
case 2:
return finishSecondExercise();
case 3:
return finishExam();
default:
return true;
}
}
/**
* workersArrived:返回false,表明phaser已经开始执行
* @author JM
* 2017-2-27 下午9:53:39
* @return
* boolean
*/
private boolean workersArrived(){
System.out.printf("Phaser: The job are going to start. The workers are ready.\n");
//getRegisteredParties()返回的是注册的线程数
System.out.printf("We have %d workers.\n",getRegisteredParties());
return false;
}
/**
* finishFristExercise:表示完成了第一阶段的工序
* @author JM
* 2017-2-27 下午10:08:04
* @return
* boolean
*/
private boolean finishFristJob(){
System.out.printf("Phaser: All the workers have finished the first step.\n");
System.out.printf("Phaser: It's time to second step.\n");
return false;
}
/**
* finishFristExercise:表示完成了第二阶段的工序
* @author JM
* 2017-2-27 下午10:08:04
* @return
* boolean
*/
private boolean finishSecondJob(){
System.out.printf("Phaser: All the workers have finished the second step.\n");
System.out.printf("Phaser: It's time to third step.\n");
return false;
}
/**
* finishFristExercise:表示完成了第二阶段的工序
* @author JM
* 2017-2-27 下午10:08:04
* @return
* boolean
*/
private boolean finishJob(){
System.out.printf("Phaser: All the workers have finished the job.\n");
System.out.printf("Phaser: Thank you for your time.\n");
return true;
}
}
测试类
import java.util.concurrent.Phaser;
public class Main {
public static void main(String[] args) {
MyPhaser myPhaser = new MyPhaser();
Woker[] workers= new Worker[5];
for (int i = 0; i < workers.length; i++) {
// 创建五个工人对象,并且通过register()方法将他们注册到phaser。五个线程,phaser的每个阶段
// (调用arriveAndAwaitAdvance()方法的地方)都要等待五个线程执行完才能继续执行下去
students[i] = new Student(myPhaser);
myPhaser.register();
}
Thread threads[] = new Thread[workers.length];
for (int i = 0; i < workers.length; i++) {
threads[i] = new Thread(workers[i], "Workers" + i);
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("Main:The phaser has finished:%s.\n",myPhaser.isTerminated());
}
}
MyPhaser中,重写了onAdvance(int phase,int registerParties)方法。其中phaser表示当前阶段数(每个线程在完成一阶段的任务时可以调用arriverAndAwaitAdvance()等待其他所有线程执行完这个阶段,然后再继续执行下去,这样每一次调用arriverAndAwaitAdvance()方法就会使phaser阶段数加一,所有的线程都完成之后并不是立即向下执行,而是先要执行onAdvance,这个就是onAdvance方法的意义)。