一、Semaphore
(1)示例
package com.suncy.article.article8;
import lombok.SneakyThrows;
import java.util.concurrent.Semaphore;
public class SyncToolsDemo1 {
public static void main(String[] args) {
//1、Semaphore并发数的限制
//定义了可以访问资源的线程数量
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 4; i++) {
int finalI = i;
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
//获取请求许可
semaphore.acquire();
System.out.println(finalI);
Thread.sleep(1000);
//释放许可
semaphore.release();
}
}).start();
}
}
}
(2)结果
(3)结果说明
打印是先打印0、1,1秒之后打印2、3。因为设置的资源并发访问数是2,所以同时只有两个线程可以访问资源。
二、CountDownLatch和CyclicBarrier的使用
(1)示例
package com.suncy.article.article8;
import lombok.SneakyThrows;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
public class SyncToolsDemo2 {
public static void main(String[] args) throws InterruptedException {
//2、调用await的线程 需要等待countDownLatch为0后才能继续执行 不可重复使用
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
int finalI = i + 1;
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(finalI * 1000);
System.out.println("准备第" + finalI + "份食材");
countDownLatch.countDown();
}
}).start();
}
// 主线程等待检查
countDownLatch.await();
System.out.println("食材准备好了,厨师开始做饭");
//CyclicBarrier 可重复使用 调用await的线程阻塞,等待cyclicBarrier为0后继续执行后续操作
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
for (int i = 0; i < 4; i++) {
int finalI = i + 1;
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(finalI * 1000);
System.out.println("第" + finalI + "个人已经到了,等待其他人到");
cyclicBarrier.await();
System.out.println("人都到齐了," + finalI + "开始吃饭了");
Thread.sleep(finalI * 1000);
System.out.println("第" + finalI + "吃完饭了,坐到了麻将桌,等待其他人吃完饭");
cyclicBarrier.await();
System.out.println("人都到齐了," + finalI + "开始打麻将");
}
}).start();
}
}
}
(2)结果
(3)结果说明
CountDownLatch一般用于:主线程等待其他线程都执行完后,再进行后续操作。
CyclicBarrier一般用于:其他线程相互等待,直到达到了要求的线程数之后,再进行后续操作。
三、Phaser实现CountDownLatch和CyclicBarrier 相同的功能
(1)示例
package com.suncy.article.article8;
import lombok.SneakyThrows;
import java.util.concurrent.Phaser;
public class SyncToolsDemo3 {
public static void main(String[] args) throws InterruptedException {
System.out.println("设置phase为5");
Phaser phaser = new Phaser(5);
System.out.println("打印初始化之后的phase序列号:" + phaser.getPhase());
System.out.println("打印phaser注册的值:" + phaser.getRegisteredParties());
for (int i = 0; i < 5; i++) {
final int finalI = i + 1;
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(finalI * 1000);
System.out.println("第" + finalI + "个线程执行任务");
if (finalI == 5) {
System.out.println("第" + finalI + "个线程执行完毕,下次不再参与,此时减少phaser量");
phaser.arriveAndDeregister();
} else {
phaser.arrive();
}
}
}).start();
}
//等待第一轮阻塞的所有节点都到达
phaser.awaitAdvance(0);
System.out.println("第1轮同步完之后,打印phase序列号" + phaser.getPhase());
System.out.println("打印phaser注册的值:" + phaser.getRegisteredParties());
for (int i = 0; i < 4; i++) {
int finalI = i + 1;
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println("第" + finalI + "个线程正在执行任务");
phaser.arriveAndAwaitAdvance();
}
}).start();
}
phaser.awaitAdvance(1);
System.out.println("第2轮同步完之后,打印phase序列号" + phaser.getPhase());
System.out.println("给phaser新增一个");
phaser.register();
System.out.println("phaser新增一个之后的值:" + phaser.getRegisteredParties());
for (int i = 0; i < 5; i++) {
int finalI = i + 1;
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
phaser.arriveAndAwaitAdvance();
}
}).start();
}
System.out.println("打印phase序列号:" + phaser.getPhase());
phaser.awaitAdvance(2);
System.out.println("第3轮同步完之后,打印phase序列号" + phaser.getPhase());
}
}
(2)结果
(3)结果说明
getPhase() :拿到周期数,初始化之后周期数为0,每轮周期完成之后,会自动加1。
arriveAndDeregister():到达屏障,并减少屏障数。
arrive():到达屏障,继续执行,不阻塞。
arriveAndAwaitAdvance():到达屏障,阻塞,等待其他线程。
register():新增一个屏障。
awaitAdvance(1):根据周期数,判断当前是否需要阻塞。如果当前的周期数和参数的值相同,则阻塞等待,否则会继续执行。
四、Exchanger
(1)示例
package com.suncy.article.article8;
import lombok.SneakyThrows;
import java.util.concurrent.Exchanger;
public class SyncToolsDemo4 {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<String>();
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
String exchange = exchanger.exchange("一瓶饮料");
System.out.println("朋友小花用饮料换到了[" + exchange + "]");
}
}).start();
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
String exchange = exchanger.exchange("2块钱");
System.out.println("小明用2块钱换到了[" + exchange + "]");
}
}).start();
}
}
(2)结果
(3)结果说明
用于两个线程间的数据交换,尽量不要用于多个线程,情况不可控。