在JUC这个线程同步工具包下,有几个比较游戏的类,Semaphore、CountdownLatch和CyclicBarrier,你都用过吗?下面我们就来简单介绍下他们的用法,并且提供些简单的代码示例,方便大家理解。
一、简介
-
Semaphore:通常翻译成
信号量
,用来控制共享变量可以同时被线程访问的数量。通过构造方法指定计数,线程使用
acquire()
方法获取许可,当达到执行计数后,其他线程将不能再次获取,并进入阻塞,知道获取许可的线程执行1release()1释放许可。 -
CountdownLatch:常被称作
门栓
, 用来进行线程的同步协作,等待所有线程到达后,在执行后续操作。通过构造方法指定线程数量,主线程使用
await()
进行等待线程到达,工作线程使用countDown()
进行报到,也就是让计数减一。 -
CyclicBarrier:常被称为
栅栏
,用来进行线程的同步协作,等待达到预设的计数,在执行后续操作。通过构造方法指定计数,线程使用
await()
方法进行同步等待,当线程等待数达到计数值时继续执行。
二、使用案例
2.1 Semaphore
共十个线程,设置两个信号量:
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "占用时间:" + LocalDateTime.now());
Thread.sleep(2000);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
结果如下,每两个线程的执行时间是相同的,即每次只允许两个线程同时执行:
线程Thread-2占用时间:2022-02-16T09:56:25.589
线程Thread-1占用时间:2022-02-16T09:56:25.589
线程Thread-0占用时间:2022-02-16T09:56:27.593
线程Thread-4占用时间:2022-02-16T09:56:27.593
线程Thread-5占用时间:2022-02-16T09:56:29.595
线程Thread-3占用时间:2022-02-16T09:56:29.595
线程Thread-8占用时间:2022-02-16T09:56:31.603
线程Thread-9占用时间:2022-02-16T09:56:31.603
线程Thread-6占用时间:2022-02-16T09:56:33.615
线程Thread-7占用时间:2022-02-16T09:56:33.615
2.2 CountdownLatch
设置数值为10,10个线程,只有当10个线程全部到达后,主线程才会继续执行:
public static void main(String[] args) throws InterruptedException {
// 使用倒计数门闩器 ,迫使主线程进入等待 ;设置门栓的值为10
CountDownLatch latch = new CountDownLatch(10);
new Thread(() -> {
for (int i = 0; i < 10; i++) {
//门栓值减1
latch.countDown();
System.out.println("当前门栓值:" + latch.getCount());
}
}).start();
//阻塞主线程,等门栓值为0,主线程执行
latch.await();
System.out.println("主线程执行。。。");
}
结果如下:
当前门栓值:9
当前门栓值:8
当前门栓值:7
当前门栓值:6
当前门栓值:5
当前门栓值:4
当前门栓值:3
当前门栓值:2
当前门栓值:1
当前门栓值:0
主线程执行。。。
2.3 CyclicBarrier
设置计数值为6,1个主线程,5个工作线程,当6个线程全部到达后,才会继续执行:
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "准备就绪");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "到达");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "准备开始");
cyclicBarrier.await();
}
结果:
Thread-0准备就绪
Thread-2准备就绪
Thread-3准备就绪
Thread-1准备就绪
Thread-4准备就绪
main准备开始
Thread-2到达
Thread-3到达
Thread-0到达
Thread-1到达
Thread-4到达
三、原理
3.1 Semaphore
首先看下类图:
仅仅包含我们常见的三个内部类:Sync,FairSync,NonfairSync。Sync是AQS的子类。
3.1.1 构造方法
直接看最底层,我们设置的计数被设置成state:
Sync(int permits) {
setState(permits);
}
3.1.2 acquire()方法
下面简单分析其源码,首先来看acquire()方法:
获取AQS当中的可中断共享锁:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
AQS方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
尝试获取共享锁tryAcquireShared(),前面学习中提到过,默认是非公平锁:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
真正获取锁的逻辑:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取状态
int available = getState();
int remaining = available - acquires;
// 如果小于0,获取失败, 进入 doAcquireSharedInterruptibly
if (remaining < 0 ||
// 如果cas成功,返回正数,表示成功
compareAndSetState(available, remaining))
return remaining;
}
}
doAcquireSharedInterruptibly方法:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 添加到等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {
// 获取前面的节点
final Node p = node.predecessor();
if (p == head) {
// 如果是头结点,尝试获取许可
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 不成功, 设置上一个节点 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.1.3 release()方法
走的是Sync的释放共享锁方法
public void release() {
sync.releaseShared(1);
}
AQS的方法:
public final boolean releaseShared(int arg) {
// 尝试释放锁,Semaphore的方法
if (tryReleaseShared(arg)) {
// 释放,AQS的方法,处理队列和状态相关内容
doReleaseShared();
return true;
}
return false;
}
Semaphore自己实现的方法,循环,知道成功为止:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 使用cas
if (compareAndSetState(current, next))
return true;
}
}
3.2 CountdownLatch
类图如下:
只有一个内部类Sync。Sync继承自AQS。
3.2.1 构造方法:
直接点进去看最后面,如下:
Sync(int count) {
setState(count);
}
state被设置为我们指定的值。
3.2.2 await() 方法
同样使用的是Sync的方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
不同之处是其自己实现的tryAcquireShared方法:
protected int tryAcquireShared(int acquires) {
// 构造设置的值肯定是大于0,此处一定是-1,所以会阻塞
return (getState() == 0) ? 1 : -1;
}
3.2.3 countDown()方法
也是通过AQS的方法如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
直接看其自己实现的tryReleaseShared方法,
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 每一次countDown就将 state - 1
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
3.3 CyclicBarrier
3.3.1 构造方法
直接看底层构造,parties 就是我们设置的线程数量,初始化时count与parties 相等:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
3.3.2 await()方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
看看其dowait()方法,很长,包含各种策略,主要看中文注释的重点位置就行了,建议写代码跟踪一下:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 使用的ReetrantLock
final ReentrantLock lock = this.lock;
// 上锁,防止多线程造成并发问题
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 总线程数-1
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒所有等待中的线程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 循环,直到触发、中断、中断或超时
for (;;) {
try {
// 默认是false
if (!timed)
// conditiont条件队列 的await()
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
// 返回剩余计数
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
当所有的index减少为0时,会走次方法nextGeneration(),此方法主要的作用就是更新栅栏状态,并且唤醒所有等待的线程。
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
上面的流程有几个点:
- 上锁lock
- 减去计数index
- 如果index为0,就执行nextGeneration唤醒所有等待的线程,并重置状态。
- 释放锁unlock
3.4 简单总结
- Semaphore、CountdownLatch和CyclicBarrier都是在JUC下的,用于线程同步的工具类。
- Semaphore、CountdownLatch的核心还是AQS,而CyclicBarrier则不是。
- Semaphore、CountdownLatch的状态修改都是基于CAS(比较并替换),而CyclicBarrier使用了ReentrantLock。
- 虽说CyclicBarrier没有直接使用AQS的子类,但是其使用的ReentrantLock仍然是通过AQS实现的。
- AQS是JUC下的核心。
关于上面的三个类,就简单介绍完了,我们在工作当中其实很容易记混,希望本文可以给你带来一点帮助,让你能够在项目当中正确的使用它们。