- CountDownLatch
- CyclicBarrier
- Semaphore
CountDownLatch
1. CountDownLatch 的使用
private void countDownTest() {
// 1. 首先我们声明一个CountDownLatch实例,参数为我们需要同步的线程个数
final CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Log.i(TAG, "run: Thread A run");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 2. 在操作完毕后通知
countDownLatch.countDown();
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Log.i(TAG, "run: Thread B run");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 2. 操作完毕后通知
countDownLatch.countDown();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.i(TAG, "run: Thread B run next");
}
}
});
try {
// 3. 在需要同步的线程进行等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.i(TAG, "countDownTest: main---- run");
executorService.shutdown();
}
我们看下输出的日志情况
ph_MainActivity: run: Thread A run
ph_MainActivity: run: Thread B run
ph_MainActivity: countDownTest: main---- run
ph_MainActivity: run: Thread B run next
从使用的方法及结果我们可以看到,CountDownLatch 可以实现join 的功能,但是比join更灵活,可以结合线程池使用;并且可以在线程执行的任何时刻进行同步,不是必须在任务结束时
2. CountDownLaunch 原理解析
从UML图中我们得知其使用的AQS实现的。
- 构造方法
// count 是线程在通过之前必须被调用的countDown的次数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
- await :当线程调用await 方法后线程会被阻塞,当其他线程调用了相应次数的countdown 方法,计数器的state 的值为0 时;或者其他线程调用了本线程的intrrupt 方法后 会抛出异常放回
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
那不Sync 中没有实现acquireSharedInterruptibly,我们在AQS中看下
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 如果获取失败则进入阻塞队列
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared 在Sync 中有实现
// 如果当前同步器的状态 为0 的话,表示可获得锁,否则进入阻塞队列
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
- await(long timeout, TimeUnit unit) 与await 方法类似,只不过当超时会返回false 而结束等待
- countDown:调用该方法后计数器值会递减,递减后如果计数器值为0则唤醒所有因调用await 方法二阻塞的线程。
public void countDown() {
// 委托Sync 调用AQS方法
sync.releaseShared(1);
}
// 共享模式下的释放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 这个方法就是线程在获得锁时,唤醒后续节点时调用的方法
doReleaseShared();
return true;
}
return false;
}
释放锁主要是在tryReleaseShared 中做的,在Sync 中有实现
// 对 state 进行递减,直到 state 变成 0;
// state 递减为 0 时,返回 true,其余返回 false
protected boolean tryReleaseShared(int releases) {
// 自旋保证 CAS 一定可以成功
for (;;) {
int c = getState();
// state 已经是 0 了,直接返回 false
if (c == 0)
return false;
// 对 state 进行递减
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
我们可以看到CountDownLaunch 主要使用了AQS实现,主要通过重写 tryAcquireShared 和 tryReleaseShared 方法进行了控制。
CyclicBarrier
1. CyclicBarrier 使用
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
Log.i(TAG, "run: cyclicBarrier over!");
}
});
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Log.i(TAG, "run: A =====1 ");
cyclicBarrier.await();
Log.i(TAG, "run: A =====2 ");
cyclicBarrier.await();
Log.i(TAG, "run: A =====3 ");
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Log.i(TAG, "run: B =====1 ");
cyclicBarrier.await();
Log.i(TAG, "run: B =====2 ");
cyclicBarrier.await();
Log.i(TAG, "run: B =====3 ");
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
运行结果:
ph_MainActivity: run: A =====1
ph_MainActivity: run: B =====1
ph_MainActivity: run: cyclicBarrier over!
ph_MainActivity: run: B =====2
ph_MainActivity: run: A =====2
ph_MainActivity: run: cyclicBarrier over!
ph_MainActivity: run: A =====3
ph_MainActivity: run: B =====3
CyclicBarrier 使多个线程相互等待,假如计数器为n,前n-1个线程都会因为到达屏障而被阻塞,当第n个线程调用await 后,计数器的值为0了,这时候会发通知唤醒前n-1个线程。并且CyclicBarrier 是可以复用的,可以定制突破屏障后的操作
2. CyclicBarrier 实现
CyclicBarrier是基于独占锁实现的,底层还是基于AQS。
parties:用于记录多少个线程调用await 才会冲破屏障的个数,即我们初始化传入的值
count:开始为parties的值,当调用一次await 后就-1,当为0时到达屏障调用await的线程结束等待,随后便会恢复为parties 的值用来复用。
初始化方法:只是进行简单的赋值
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
- await()方法:当线程调用该方法后会进行阻塞,直到满足一下某个条件才会继续执行:parties 为0,即都到了屏障点;其他线程调用了本线程的interrupt方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取锁并上锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//如果屏障被打破则抛出BrokenBarrierException异常,在调用breakBarrier 方法时会被打破
if (g.broken)
throw new BrokenBarrierException();
// 如果线程被interrupt 则打破屏障并抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// count 进行减1操作
int index = --count;
// 如果为0,即所有的线程都到达了屏障点
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果设置的破除屏障点后需要执行的任务不为空则执行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒所有的线程并重置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 循环进行等待,直到被唤醒、打破,或者超时?TODO 为什么使用循环???
for (;;) {
try {
// 如果没有设置超时,则调用await方法直接进行等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// 唤醒所有的线程
trip.signalAll();
// 重置屏障参数
count = parties;
generation = new Generation();
}
- await(timeout, unit):与await 类似,只不过当超时后会抛出TimeOutException 返回
Semaphore
1. Semaphore 使用方法
final Semaphore semaphore = new Semaphore(0);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Runnable() {
@Override
public void run() {
Log.i(TAG, "run: A===");
semaphore.release();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
Log.i(TAG, "run: B===");
semaphore.release();
}
});
semaphore.acquire(2);
Log.i(TAG, "semaphT: 1=======end");
executorService.execute(new Runnable() {
@Override
public void run() {
Log.i(TAG, "run: C===");
semaphore.release();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.i(TAG, "run: D===");
semaphore.release();
}
});
semaphore.tryAcquire(2,1000,TimeUnit.MILLISECONDS);
Log.i(TAG, "semaphT: 2=======end");
输出结果:
ph_MainActivity: run: B===
ph_MainActivity: run: A===
ph_MainActivity: semaphT: 1=======end
ph_MainActivity: run: C===
ph_MainActivity: semaphT: 2=======end
ph_MainActivity: run: D===
Semaphore 和CyclicBarrier 类似可以重复使用
2. SemaphoreUML 图
Semaphore 的源码我们就不再分析了如果感兴趣可以去看一下