Phaser的使用介绍

本案例参考至《Java7并发编程实战手册》

在java并发编程的过程中,往往会遇到这样的需求:现在有多个工人,每个工人都制作同一件产品,而且相对于每个工人来说产品的制作工序都是一样的。每制作完一道工序,产品都需要使用大型机器进行再加工,为了保证经济效率。现在要求每一道工序都需要所有的工人完成后,将所有的产品送进工厂加工,加工完毕之后再将产品分发给所有的工人进行下一轮的工序。(也就是说,每道工序都必须等待所有的人完成之后,大家才能继续下面的工作,只要有一个人没完成都要等到这个人完成之后才能向下执行)
在java中要实现这样的需求可以使用Phaser并发阶段任务执行机制

工人类(表示工人执行工作的各个步骤)
import java.util.Date;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class Worker implements Runnable{
    private Phaser phaser;
    
    public Worker(Phaser phaser) {
        super();
        this.phaser = phaser;
    }

    @Override
    public void run() {
        //将每个工人到达工厂的信息打印出来
        System.out.printf("%s: Has arrived to do the company. %s---------00000000000000000000\n",Thread.currentThread().getName(),new Date());
        //等待所有的线程执行到这里,各个线程才会开始向下执行,不过在向下执行之前会执行phaser的onAdvance()方法
        phaser.arriveAndAwaitAdvance();
        
        System.out.printf("%s :Is going to do the frist step.%s******************0\n",Thread.currentThread().getName(),new Date());
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s: Has done the frist step. %s\n",Thread.currentThread().getName(),new Date());
        phaser.arriveAndAwaitAdvance();
        
        
        System.out.printf("%s :Is going to do the second step.%s---------1111111111111111111\n",Thread.currentThread().getName(),new Date());
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s: Has done the second step. %s*****************1\n",Thread.currentThread().getName(),new Date());
        phaser.arriveAndAwaitAdvance();
        
        
        System.out.printf("%s :Is going to do the thrid step.%s---------222222222222222222\n",Thread.currentThread().getName(),new Date());
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s: Has done the thrid step. %s******************2\n",Thread.currentThread().getName(),new Date());
        phaser.arriveAndAwaitAdvance();
    }

}
自定义的MyPhaser类
/**
 * MyPhaser:并发阶段任务中的阶段切换
 * @author   JM
 * @date     2017-2-27 下午9:54:15
 * @since    JDK 1.7
 */
public class MyPhaser extends Phaser {
    
    /**
     * 重写onAdvance(int x,int y)方法
     * 在Phaser类中,onAdvance(int x,int y)方法在Phaser阶段改变的时候会自动执行,
     * x表示当前的阶段数,y表示注册的参与者数量
     * 如果onAdvance(int x,int y)方法返回false表示phaser在继续执行,返回true表示phaser已经完成执行并且进入了终止态
     */
    @Override
    public boolean onAdvance(int phase, int registeredParties) {
        switch (phase) {
        case 0:
            return workersArrived();
        case 1:
            return finishFristExercise();
        case 2:
            return finishSecondExercise();
        case 3:
            return finishExam();
        default:
            return true;
        }
    }
    
    /**
     * workersArrived:返回false,表明phaser已经开始执行
     * @author JM 
     * 2017-2-27 下午9:53:39
     * @return   
     * boolean
     */
    private boolean workersArrived(){
        System.out.printf("Phaser: The job are going to start. The workers are ready.\n");
        //getRegisteredParties()返回的是注册的线程数
        System.out.printf("We have %d workers.\n",getRegisteredParties());
        return false;
    }
    
    /**
     * finishFristExercise:表示完成了第一阶段的工序
     * @author JM 
     * 2017-2-27 下午10:08:04
     * @return   
     * boolean
     */
    private boolean finishFristJob(){
        System.out.printf("Phaser: All the workers have finished the first step.\n");
        System.out.printf("Phaser: It's time to second step.\n");
        return false;
    }
    
    /**
     * finishFristExercise:表示完成了第二阶段的工序
     * @author JM 
     * 2017-2-27 下午10:08:04
     * @return   
     * boolean
     */
    private boolean finishSecondJob(){
        System.out.printf("Phaser: All the workers have finished the second step.\n");
        System.out.printf("Phaser: It's time to third step.\n");
        return false;
    }

    /**
     * finishFristExercise:表示完成了第二阶段的工序
     * @author JM 
     * 2017-2-27 下午10:08:04
     * @return   
     * boolean
     */
    private boolean finishJob(){
        System.out.printf("Phaser: All the workers have finished the job.\n");
        System.out.printf("Phaser: Thank you for your time.\n");
        return true;
    }
}
测试类
import java.util.concurrent.Phaser;

public class Main {
    public static void main(String[] args) {
        MyPhaser myPhaser = new MyPhaser();
        Woker[] workers= new Worker[5];
        for (int i = 0; i < workers.length; i++) {
            // 创建五个工人对象,并且通过register()方法将他们注册到phaser。五个线程,phaser的每个阶段
            // (调用arriveAndAwaitAdvance()方法的地方)都要等待五个线程执行完才能继续执行下去
            students[i] = new Student(myPhaser);
            myPhaser.register();
        }
        Thread threads[] = new Thread[workers.length];
        for (int i = 0; i < workers.length; i++) {
            threads[i] = new Thread(workers[i], "Workers" + i);
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.printf("Main:The phaser has finished:%s.\n",myPhaser.isTerminated());
    }
}

MyPhaser中,重写了onAdvance(int phase,int registerParties)方法。其中phaser表示当前阶段数(每个线程在完成一阶段的任务时可以调用arriverAndAwaitAdvance()等待其他所有线程执行完这个阶段,然后再继续执行下去,这样每一次调用arriverAndAwaitAdvance()方法就会使phaser阶段数加一,所有的线程都完成之后并不是立即向下执行,而是先要执行onAdvance,这个就是onAdvance方法的意义)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,364评论 25 707
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,166评论 11 349
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,558评论 18 399
  • 无论是追逐车窗外绝世的风景,还是憧憬车轮滚滚向前的愉悦,又或者只是单纯的迷恋掌控方向盘的快感。每一次上路,都会带回...
    装甲车老司机阅读 458评论 0 7
  • 你是一个会聊天的人吗? 蔡康永在说话之道里写过:“聊天时每个人都想谈自己。当你想要被别人喜欢的时候,你只要把别人放...
    所长屁桃阅读 282评论 0 0