Java控制并发流程

什么是控制并发流程

控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑;
比如让线程A等待线程B执行完毕后再执行等合作策略

有哪些控制并发流程的工具类

作用 说明
Semaphore 信号量,可以通过控制“许可证”的数量,来保证线程之间的配合 线程只有在拿到“许可证”后才能继续运行,相比于其他的同步器,更灵活
CyclicBarrier 线程会等待,直到足够多线程达到了事先规定的数目。一旦达到触发条件,就可以进行下一步的动作 适用于线程之间相互等待处理结果就绪的场景
Phaser 和CyclicBarrier类似,但是计数可变 Java7加入的
CountDownLatch 和CyclicBarrier类似,数量递减到0时,触发动作 不可重复使用
Exchanger 让两个线程在合适时交换对象 适用场景:当两个线程工作在同一个类的不同实例上时,用于交换数据
Condition 可以控制线程的“等待”和“唤醒” 是Object.wait()的升级版

1:CountDownLatch

倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作

  • CountDownLatch(int count):仅有一个构造函数,参数count为需要倒数的数值
  • await():调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
  • countDown():将count值减1,直到为0时,等待的线程会被唤起
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步
 */
public class CountDownLatchDemo2 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + "准备完毕,等待发令枪");
                    try {
                        begin.await();
                        System.out.println("No." + no + "开始跑步了");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + "到达终点");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        end.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        Thread.sleep(5000);
        System.out.println("发令枪响,比赛开始");
        begin.countDown();

        end.await();
        System.out.println("所有人到达终点,比赛结束");
    }

}

2:Semaphore(信号量)

Semaphore可以用来限制或管理数量有限的资源的使用情况
信号量的作用是维护一个“许可证”的计数,线程可以获取许可证,那信号量剩余的许可证就减1,线程也可以释放一个许可证,那信号量剩余的许可证就加1,当信号量所拥有的许可证数量为0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证
信号量使用流程
1、初始化Semaphore并指定许可证的数量
Semaphore semaphore = new Semaphore(3);
2、在需要被限制的代码前加acquire()或者acquireUninterruptibly()方法,两者区别在于acquire()可以响应中断,acquireUninterruptibly()不能响应中断
semaphore.acquire();
3、在任务执行结束后,调用release()来释放许可证
semaphore.release();
信号量主要方法介绍

  • new Semaphore(int permits, boolean fair)
    这里可以设置是否使用公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证,可以分发给之前等了最长时间的线程;如果是非公平的话,可以在一定程度上插队
  • tryAcquire()
    看看现在有没有空闲的许可证,如果有的话就获取,如果没有的话也没关系,我不必陷入阻塞,我可以去做别的事,过一会再来查看许可证的空闲情况
  • tryAcquire(timeout)
    和tryAcquire()一样,但是多了一个超时时间,比如“在3秒内获取不到许可证,我就去做别的事”
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {

    /**
     * 设置总共有3个许可证
     */
    static Semaphore semaphore = new Semaphore(3, true);

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 100; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                // 获得几个许可证,则需要释放几个许可证(此处也可以填写1、2、3)
//                semaphore.acquire(2); // 获取2个许可证
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "拿到了许可证");
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "释放了许可证");
            // 释放对应数量的许可证(对应获取的许可证数量)
//            semaphore.release(2); //释放2个许可证
            semaphore.release();
        }
    }

}

信号量特殊用法:

  • 一次获取或释放多个许可证
    比如TaskA会调用很消耗资源的method1(),而TaskB调用的不太消耗资源的method2(),假设我们一共有5个许可证,那么我们就可以要去TaskA获取5个许可证才能执行,而TaskB只需要获取到一个许可证就能执行,这样就避免了A和B同时运行的情况,我们可以根据自己的需求合理分配资源

注意点:

  1. 获取和释放的许可证数量必须一致,比如每次都获取2个但是只释放1个甚至不释放,随着时间的推移,到最后许可证数量不够用,会导致程序卡死
  2. 注意在初始化Semaphore的时候设置公平性,一般设置为true会更合理
  3. 并不是必须由获取许可证的线程释放那个许可证,事实上,获取和释放许可证对线程并无要求,也许是A获取了,然后由B释放,只要逻辑合理即可
  4. 信号量的作用,除了控制临界区最多同时有N个线程访问外,另一个作用是可以实现“条件等待”,例如线程1需要在线程2完成准备工作后才能开始工作,那么就线程1 acquire(),而线程2完成任务后release(),这样的话,相当于是轻量级的CountDownLatch

3:Condition接口(又称条件对象)

  • 当线程1需要等待某个条件的时候,它就去执行condition.await()方法,一旦执行了await()方法,线程就会进入阻塞状态
  • 然后通常会有另外一个线程,假设是线程2,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal()方法,这时JVM就会从被阻塞的线程里找,找到那些等待该condition的线程,当线程1收到可执行信号的时候,它的线程状态就会变成Runnable可执行状态

signalAll()和signal()区别

  • signalAll()会唤起所有的正在等待的线程
  • signal()是公平的,只会唤起等待时间最长的那个线程

使用Condition实现生产者消费者模式

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 演示用Condition实现生产者消费者模式
 */
public class ConditionDemo2 {

    private final PriorityQueue<Integer> queue = new PriorityQueue<>();
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }

    /**
     * 消费者
     */
    class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 取数据
                    queue.poll();
                    // 取完一个数据后,立马唤醒生产者继续生产
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    /**
     * 生产者
     */
    class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    int queueSize = 10;
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 生产数据
                    queue.offer(1);
                    // 唤醒消费者继续消费
                    notEmpty.signalAll();
                    System.out.println("从队列里插入了一个数据,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}

Condition注意点

  1. 实际上,如果说Lock用了代替synchronized,那么Condition就是用了代替相对应的Object.wait/notify的,所以在用法和性质上,几乎都一样
  2. await方法会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动先释放锁
  3. 调用await的时候,必须持有锁,否则会抛出异常,和Object.wait一样

4:CyclicBarrier(循环栅栏)

CyclicBarrier(循环栅栏)和CountDownLatch很类似,都能阻塞一组线程
当有大量线程相互配合,分别计算不同任务,并且需要最后统一汇总的时候,我们可以使用CyclicBarrier。
CyclicBarrier可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就会被撤销,所有线程再统一出发,继续执行剩下的任务

演示CyclicBarrier

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 演示CyclicBarrier
 */
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        // 规定5个人
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到齐了,大家统一出发");
            }
        });

        for (int i = 0; i < 5; i++) {
            // 启动5个线程
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable {

        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + id + "现在前往集合地点");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + id + "已经到了集合地点,等待其他人到达");
                // 等待其他人
                cyclicBarrier.await();
                System.out.println("线程" + id + "出发了");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

}

CyclicBarrier和CountDownLatch的区别

  • 作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch用于事件,而CyclicBarrier是用于线程的
  • 可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例;而CyclicBarrier是可以重复使用的。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342