AbstractQueuedSynchronizer原理剖析

AbstractQueuedSynchronizer 队列同步器是用来构建锁或者其他同步组件的基础框架,它使用int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者期望它能够成为实现大部分同步需求的基础。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的三个方法(getState,setState,compareAndSetState)来进行操作因为他们能够保证状态的改变是安全的。继承AQS的子类推荐被定义为同步组件的静态内部类,AQS自身并没有实现任何同步接口,它仅仅是定义若干同步状态获取和释放的方法供自定义同步组件使用,AQS可以支持独占式的获取同步状态共享式的获取同步状态。

在锁的实现中聚合同步器,利用同步器实现锁的语义。锁是面向使用者的,定义了使用者与锁的交互接口,隐藏实现细节;同步器面向的是锁的实现者,简化锁的实现方式,屏蔽同步状态的管理,线程排队,等待,唤醒等底层操作。

同步器的设计基于模版方法,同步组件的实现者需要继承同步器并重写指定的抽象方法,调用同步器提供的模版方法的时候,组件重写的方法将被调用。

接下来分析同步器是如何完成线程同步的,主要包括:同步队列,同步状态获取与释放。

同步队列

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败后,同步器会将当前线程以及等待状态等信息封装为Node节点,加入同步队列中,同时会阻塞当前线程,让同步状态释放,会把首节点,会把首节点唤醒,使其再次尝试获取同步状态。

Node节点中存放线程引用,等待状态,前驱,后继节点

Node节点属性如下:

  • waitStatus

    CANCELLED = 1 由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化

    SIGNAL = -1 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行

    CONDITION = -2 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用single方法后,该节点将从等待队列中转移到同步队列中,加入到对同步状态的获取中

    PROPAGATE = -3 表示下一次共享式同步状态获取将会无条件地被传播下去

    INITIAL =0 初始状态

  • Node prev 前驱节点,当节点加入同步队列时被设置

  • Node next 后继节点

  • Node nextWaiter 等待队列中的后继节点。如果当前节点是共享的,那么这个字段是一个SHARED常量,也就是说节点类型(独占式,共享式)和等待队列中的后继节点公用同一个字段

  • Thread thread 获取同步状态的线程

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    
    volatile int waitStatus;

    /**
     * Link to predecessor node that current node/thread relies on
     * for checking waitStatus. Assigned during enqueuing, and nulled
     * out (for sake of GC) only upon dequeuing.  Also, upon
     * cancellation of a predecessor, we short-circuit while
     * finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes
     * head only as a result of successful acquire. A
     * cancelled thread never succeeds in acquiring, and a thread only
     * cancels itself, not any other node.
     */
    volatile Node prev;

    /**
     * Link to the successor node that the current node/thread
     * unparks upon release. Assigned during enqueuing, adjusted
     * when bypassing cancelled predecessors, and nulled out (for
     * sake of GC) when dequeued.  The enq operation does not
     * assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that
     * node is at end of queue. However, if a next field appears
     * to be null, we can scan prev's from the tail to
     * double-check.  The next field of cancelled nodes is set to
     * point to the node itself instead of null, to make life
     * easier for isOnSyncQueue.
     */
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

同步器可以重写的方法:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

共享式获取同步状态,返回大于等于0的值,表示获取成功,反之获取失败

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

共享式释放同步状态

protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占

同步器提供的模版方法:

独占式获取同步状态,成功就从该方法返回,失败的话当前线程信息及等待信息就被包装为Node节点放入同步队列中

public final void acquire(int arg) {
  //调具体同步组件实现的方法获取同步状态,失败后将当前线程封装为等待获取同步状态的Node节点放入同步队列中
    if (!tryAcquire(arg) &&
        //由于添加节点操作时enq函数中是循环进行的,成功入队后需要acquireQueued再检查
        acquireQueued(
          addWaiter(Node.EXCLUSIVE)//添加节点到同步队列
          , arg))
        selfInterrupt();
}

//如上模版方法中
tryAcquire  由各个同步组件自己实现

 private Node addWaiter(Node mode) {
 //将当前线程以等待获取同步的状态构建Node节点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        //尾节点不为null说明队列中存在节点
        //将尾节点赋值给pred
        //CAS将新节点设置尾部节点,设置成功则将当前节点赋值给历史尾节点的后继节点,失败说明有别的线程入队了
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //进行相关队列初始化
        enq(node);
        return node;
    }
    
--------
  //CAS将新节点设置为尾节点
  private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
--------
  //进行相关队列初始化
  //如果队列为空,首先将新节点设置为头节点,并将首节点赋值给尾节点,第二波循环的时候,设置尾节点为当前节点的前驱节点,CAS设置新节点为尾节点,并将当前节点设置为前驱节点的后继节点
  private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
          //尾节点为空,CAS新节点为head节点,并将首节点赋值给尾节点,首尾节点相同
            if (t == null) { // Must initialize
              //尾节点为null,说明是第一个节点入队,需要一个哨兵节点 new Node()(和线程无关的节点)
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
              //尾节点不为null(可能是第一次加的哨兵节点也可能是真实的线程节点),将尾节点设置为当前节点的前驱节点,CAS当前节点为尾部节点,CAS成功则将当前节点设置为历史尾节点的后继节点,CAS失败意味着当前队列尾节点变了--其他线程加入队列成功,则继续循环设置,直到将当前节点放入同步队列,并将当前节点设置为历史尾节点的后继节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

////入参为成功入同步队列的当前节点和获取同步状态的参数。---addWaiter添加节点到同步队列的方法中的enq方法通过循环CAS操作将并发的入队操作串行化,保证节点入队,节点入队后就进入自旋状态(阻塞当前线程),当条件满足时获取同步状态并退出同步队列
final boolean acquireQueued(final Node node, int arg) {//方法名称也表示该方法是条件满足时获取同步状态退出同步队列,条件不满足时自旋阻塞当前线程
        boolean failed = true;
        try {
            boolean interrupted = false;
          //入队后的节点在死循环中尝试获取同步状态,
            for (;;) {
              //返回当前节点的前驱节点,,如果前驱节点为null 抛出NEP---上面enq方法的循环操作保证节点前驱节点不会为null,enq方法保证第一个节点进入同步队列的时候,head,tail,prev,next节点为同一个节点,
                final Node p = node.predecessor();
              //只有前驱节点是头节点的当前节点才能尝试获取同步状态,因为1 头节点是成功获取到同步状态的节点,而头节点的的线程释放同步状态后会唤醒其后继节点,后继节点被唤醒后需要检查自己的前驱节点是否是头节点;2 维护同步队列的FIFO原则
                if (p == head && tryAcquire(arg)) {
                  //头节点的后继节点获取同步状态成功后,将当前节点设置为头节点,将历史头节点移除FIFO队列(方式就是将历史头节点的后继节点置为null)所以头节点是获取同步状态成功的节点,下一个获取同步状态的节点就是头节点的后继节点,这里就理解了上面if中的条件,
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
              //循环获取同步状态失败时的处理,
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
//入参为当前节点前驱节点和当前插入节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  //检查前驱节点等待状态,
        int ws = pred.waitStatus;
  //如果是SIGNAL表示当前节点等待被唤醒,当前节点前驱节点执行介绍或被取消时唤醒当前节点
        if (ws == Node.SIGNAL)
            return true;
  //当前节点前驱节点被取消了,继续寻找当前节点前驱节点的前驱节点,一直往前检查节点,直到将当前节点设置为未被取消的最后一个节点的后继节点
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
              //一直往前检查节点,直到将当前节点设置为未被取消的最后一个节点的后继节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
//
 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

//取消当前节点获取同步状态的操作
   private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
     //一直寻找当前节点前驱节点的前驱节点没有被取消的最后一个节点,并设置为当前节点的前驱节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }


与独占式获取同步状态相同,但是该方法响应中断,线程被中断时抛出 InterruptedException

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

//
 private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
   //将当前节点添加到同步队列
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
              //获取当前节点前驱节点
                final Node p = node.predecessor();
              //如果前驱节点是头节点且获取到同步状态,
                if (p == head && tryAcquire(arg)) {
                  //将当前节点设置为头节点,前驱节点置为null,节点线程信息设置null
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

独占式获取同步状态附加中断响应和超时返回失败

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
      //当前线程已经被中断就响应中断抛出异常
        throw new InterruptedException();
  //否则获取同步状态,同步状态获取失败后尝试在超时前再获取
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

共享式获取同步状态,成功就从该方法返回,失败的话当前线程信息及等待信息就被包装为Node节点放入同步队列中,可以有多个线程获取到同步状态

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
      //获取共享状态失败后,
        doAcquireShared(arg);
}

//
 private void doAcquireShared(int arg) {
   //将当前节点以共享的状态添加到队列中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
          //加入同步队列的节点死循环获取同步状态
            for (;;) {
              //当前节点前驱节点
                final Node p = node.predecessor();
              //前驱节点为头节点时获取同步状态
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

以上模版方法中tryAcquireShared需要各个同步组件自己实现,

响应中断

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

等待超时返回

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

独占式释放同步状态,释放状态后,将同步队列中第一个节点包含的线程唤醒

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//唤醒节点获取同步状态
 private void unparkSuccessor(Node node) {
       //获取待唤醒节点等待状态
        int ws = node.waitStatus;
        if (ws < 0)
          //CAS设置等待状态为初始化
            compareAndSetWaitStatus(node, ws, 0);

        //获取当前节点后继节点,后继节点null或被取消时
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
          //循环从tail开始寻找tail的前驱节点,直到找到最后一个或找到当前节点位置, 未被取消的节点,并将该节点设置为当前节点的后继节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
   //如果当前节点后继节点不为null,则唤醒线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }

共享式释放同步状态

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,527评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,314评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,535评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,006评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,961评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,220评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,664评论 3 392
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,351评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,481评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,397评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,443评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,123评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,713评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,801评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,010评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,494评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,075评论 2 341

推荐阅读更多精彩内容