1.概述
这篇文章主要用来介绍Semaphore源码。应该也是最后一篇了。还有一个chm感觉有些地方还有点问题,就先不发了。等哪天看懂了再说!Semaphore其实在平时还是比较常用的,可以用到限流的等作用。也就是信号量级的意思。下面我们来看一下实现!
2.实现结构和原理
底层不用说,也是通过AQS进行构建了,类似于我们之前看到过的CountDownLatch。只不过对于不同的场景给出了不同的实现而已。
因此这里肯定有一个sync的内部类!这里还提供了公平和非公平两种实现,因此存在一个非公平的内部类和公平的内部类。同样默认为非公平的,毕竟性能好,除非特殊需求,采用公平实现!
我们平时使用的数据库连接池,对象池,等有数量的缓存池也是通过此思想实现的!
3.具体方法实现原理
构造方法
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire方法的实现
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
我们来看一下非公平的实现:
final int nonfairTryAcquireShared(int acquires) {
//无限循环其实就是自旋获取。
for (;;) {
//获取剩余可用
int available = getState();
//获取减去需要的剩余
int remaining = available - acquires;
//<0直接return。代表需要阻塞。获取成功,返回当前剩余数>0,不需要阻塞
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平实现:
protected int tryAcquireShared(int acquires) {
for (;;) {
//因为是公平实现,所以如果队列中有线程再等待,我们直接返回-1
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
这里说一下,返回负数,就是需要阻塞的意思。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
因为之前AQS没有分析acquireSharedInterruptibly这个方法,这里我们简单的看一下
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前结点插入到队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前一个结点,如果是head,重新尝试获取
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
//获取成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我们再来看一下释放资源的方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//这里就是将状态更新而已
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // 这里表示溢出了。也就是超过了int最大值
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
真正释放的逻辑再下面方法中
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}