java并发(五)Lock以及AbstractQueuedSynchronizer

在jdk1.5之前,java中锁功能的实现只能依靠synchronized来实现,在这之后,并发包中新增了Lock接口用来支持新的锁功能。
我们先来看一下jdk8中对Lock的描述,相对于synchronized关键字而言,能够获得更多对锁的操作,也更加灵活,可以与Condition联合使用以应对不同的场景,但是同时也增加了并发编程的复杂度。
我们首先来看一下,与synchronized的区别,文档中是这样描述的:

  • synchronized
    使用synchronized关键字的时候,加锁与释放锁总是以 block-structured 的方式,也就是我们必须对整个代码块加锁或者释放锁(当然这隐式的,由jvm控制的),当获取到多个锁时,释放的顺序必须是相反的,而且所有已经获得的锁的释放必须是在加锁的语义范围内。我们举几个例子来看一下:
public class Test {

  static final Test object = new Test();
  static final Test object1 = new Test();

  public static void main(String[] args) throws InterruptedException {
    Test o = new Test();
    Thread t1 = new Thread(new Runnable() {
      @Override
      public void run() {
        o.test();
      }
    });
    Thread t2 = new Thread(new Runnable() {
      @Override
      public void run() {
        o.test2();
      }
    });
    t1.start();
    //先启动线程1
    Thread.sleep(100);
    t2.start();

  }

  public void test() {

    synchronized (object) {
      //进入同步快,获取锁
      try {
        System.out.println(Thread.currentThread().getId() + "获得锁0");
        synchronized (object1) {
          System.out.println(Thread.currentThread().getId() + "获得锁1");
          //模拟任务
          Thread.sleep(3000);
          System.out.println(Thread.currentThread().getId() + "释放锁1");
        }
        //目的是为了能够看出先释放锁1
        Thread.sleep(3000);
        //当前同步快结束,释放锁
        System.out.println(Thread.currentThread().getId() + "释放锁0");
      } catch (InterruptedException e) {

      }
    }
  }

  public void test2() {
    synchronized (object1) {
      System.out.println("别的线程释放锁1,我获取锁1");
    }
  }
}
执行结果:13获得锁0
13获得锁1
13释放锁1
别的线程释放锁1,我获取锁1
13释放锁0

大家可以自己运行上面的例子,通过上面的程序我们很容易就可以理解synchronized的加锁释放锁的顺序以及范围。
相比于synchronized,Lock接口提供的方法更加灵活,不是必须的块状加锁,你可以自由的控制加锁顺序。但是自由也带来了很多问题,lock锁不会自动释放,需要注意的是Lock实现提供的锁当中,加锁必须对应释放锁,也就是说加一次,必须释放一次,尤其是在不同作用域是,要确保是否加锁,必要的时候释放锁,但是顺序并不做要求;
一般我们遵循下面的语法:不在try中获取锁是因为获取锁时发生异常会执行finally块,这个时候并没有获得锁,导致释放锁失败。

 * Lock l = ...;
 * l.lock();
 * try {
 *   // access the resource protected by this lock
 * } finally {
 *   l.unlock();
 * }}

下面我们看一下Lock接口的方法

  • Lock接口
   /**获取锁,调用该方法当前线程将会获取锁,获得锁后返回(阻塞)
    */
   void lock();
   /**可中断的获得锁,在获取锁的过程中可以响应中断
    */
   void lockInterruptibly() throws InterruptedException;
   /**非阻塞的获取锁,调用后立即返回,获得返回true,否则返回false
    */
    boolean tryLock();
   /**非阻塞的超时获取锁,调用后在以下三种情况返回,获得返回true,否则返回false
    *在超时时间内获得锁
    *超时时间被线程被中断
    *超时时间结束
    */
   boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
   /**释放锁
    */
   void unlock();
   /**获取通知组件,通俗的说给相当于wait和notify,只有获得锁后才可以调用(wait同样会释放锁)
    */
   Condition newCondition();
AbstractQueuedSynchronizer

在说AbstractQueuedSynchronizer之前,我们先来下AbstractOwnableSynchronizer,这是AQS的基类,在java源码中,对他的定义是这样的:一个被线程独占的同步器,也就是通俗意义上的排他锁,但是这个类并不具有锁的功能,他只是提供了创建这些锁或者同步器的基础(也就是当前独占锁的线程),我们在实现一个同步器的时候可以使用这些信息来实现对线程管理相关的功能。
他提供了两个基础方法

  • setExclusiveOwnerThread(Thread thread)// 设置当前独占锁的线程
  • Thread getExclusiveOwnerThread()//获取当前独占锁的线程

下面我们看一下锁的另外一个重要组件AbstractQueuedSynchronizer,AbstractQueuedSynchronizer是java并发包中很重要的队列同步器,他为ReentrantLock、semaphore、countdownLatch、以及线程池中worker等依赖等待队列的同步工具和锁提供了基础框架支持。并发包中的锁(在其他同步工具中没有实现Lock接口而已)基本都是通过实现Lock接口并且内部聚合一个或多个AQS的子类来实现线程同步的,可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
我们来看一张UML图,了解一下锁的实现

并发包的锁

AbstractQueuedSynchronizer在内部维护了一个volatile int state用来表示同步状态和一个FIFO双向队列(这个队列并不是容器化的,只是通过保存头指针和尾指针来维护虚拟的链表)。
子类通过继承AQS并且实现它的抽象方法来实现同步功能。同步功能的基础建立在对同步状态的管理上,AQS提供了三个方法来管理同步状态:

  • getState()
  • setState()//一般用来当前线程已经获得共享资源时,+1(重入)
  • compareAndSetState()//一般用来当前线程竞争共享资源时
    state在不同的同步器实现中意义是不同的,例如在ReentrantLock中state初始化为0,每次同一线程获得锁后,state+1,其他线程想要获得锁,只能等state为0。在CountDownLatch中则是一个计数器,每次调用release方法,计数器-1,线程阻塞,直到计数值为0,解除所有线程阻塞;在semaphore中,则表示共享资源的数量(或者说许可的数量),每次调用acquire则许可-1,调用release许可+1,其余的实现可以参考java源码。
    同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来
    供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。
  • AQS定义两种资源共享方式:
    • Exclusive(独占,如ReentrantLock)
    • Share(共享,如Semaphore、CountDownLatch、ReentrantReadWriteLock)
  • 自定义同步器实现时主要实现以下几种方法:
    • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    • tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。
      在自定义同步器实现上述方法之后,只需调用AQS提供的模板方法即可。如下图:


      AQS模板方法
下面我们看一下AQS是如何实现同步的:
  • 同步队列
    同步队列是AQS内部所依赖的管理同步状态FIFO双向队列。
    我们来看一下队列的实现
    AQS定义了了一个内部Node类来实现队列
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//等待队列中线程当前的状态
volatile int waitStatus;//
volatile Node prev;//
volatile Node next;//
volatile Thread thread;//获取同步状态的线程
Node nextWaiter;//队列中的

waitStatus有四种:
static final int CANCELLED = 1;//在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
static final int SIGNAL = -1;//后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
static final int CONDITION = -2;//节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
static final int PROPAGATE = -3;//表示下一次共享式同步状态获取将会被无条件地传播下去

同步队列基本结构

我们来看一下同步队列是如何构造的,也就是同步状态的获取与释放
在独占模式下,

      //tryAcquire()是由实现类重写的,在重入锁中在详说
        public final void acquire(int arg) { 
         if (!tryAcquire(arg) &&  
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  
         selfInterrupt();    
        }      

当前线程尝试直接去获取资源,如果成功则直接返回,当线程获取同步状态失败时,同步器会将当前线程以及waitStatus等信息构造成一个Node节点,并将其加入同步队列的尾部(使用CAS),同时会阻塞当前线程,当同步状态释放时,会唤醒队列中的头结点,让其再次尝试获取同步状态。

  • JDK8实现

      private Node addWaiter(Node mode) {
            //构造节点
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
              //获取当前尾指针
              Node pred = tail;
            //如果队列不为空
            if (pred != null) {
                //设置要入队节点的前节点为尾结点
                 node.prev = pred;
                 if (compareAndSetTail(pred, node)) {
                  //CAS设置新尾结点成功则返回新尾结点
                 pred.next = node;
                 return node;
           }
         }
        //若队列为空,则进行入队操作
        enq(node);
         return node;
      }  
          private Node enq(final Node node) {
              for (;;) {//自旋进行下面的cas入队操作
                   Node t = tail;
                      //再次判断队列是否为空
                   if (t == null) { // Must initialize
                        //初始化头指针和尾指针
                       if (compareAndSetHead(new Node()))
                       tail = head;
                       } else {//不为空,则跟上面同样的操作
                         node.prev = t;
                       if (compareAndSetTail(t, node)) {
                             t.next = node;
                             return t;
                        }
                    }
               }
           }
    

在加入同步队列之后,调用acquireQueued(Node node,int arg)方法,我们来看一下操作流程

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {//当前线程自旋尝试获取同步状态
                  //获取当前节点的前节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//head.next为头结点
                    //如果当前节点是头结点并且当前节点获取同步状态成功,设置当前节点为头指针,移除掉旧的头指针
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;//
                    return interrupted;//响应中断
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //在获取失败之后检查该线程是否应该被阻塞(简单点说,就是我通知过我的前节点,你干完了记得叫醒我),这里使用的是LockSupport的park,后面再说
                    interrupted = true;//响应中断
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //waitStatus上面已经说明过,获取前一个节点的waitStatus
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 前节点已经通知过当同步状态被释放的时候会唤醒它,所以他可以安全的阻塞
             */
            return true;
        if (ws > 0) {
            /*
             * 前节点任务已经被取消了,跳过前面被取消节点, 一直找到未被取消的,当前节点作为他的后继节点
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 为初始化的状态,表示我们我还没有通知前面的节点,你释放的时候叫醒我
             * 我们要再次确认一下,万一你释放了呢,通知完了我就可以睡觉了,前提是我没被中断打扰
             * 
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

说完了同步状态的获取,我们再来说一下同步状态的释放

独占式同步状态释放

不多说,上源码

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
          //如果成功释放掉
            Node h = head;
           //  当前同步队列不为空,并且头指针被初始化过了
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
 private void unparkSuccessor(Node node) {
        /*
         * 如果状态是负的,也就是说需要被唤醒
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
          //找到 头结点,就是需要被唤醒的阶段
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
          //如果waitStatus>0 ,也就是说被取消了,把这个节点移除
            s = null;
            //那就从尾部往前找,找到没被取消的,并且队列不会空
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒这个线程
            LockSupport.unpark(s.thread);
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容

  • ReentrantLock 介绍 一个可重入的互斥锁,它具有与使用{synchronized}方法和语句访问的隐式...
    tomas家的小拨浪鼓阅读 4,037评论 1 4
  • 理解多线程的并发锁,可结合多进程的分布式锁(如Zookeeper的互斥锁、读写锁的实现原理),本质是相通的 介绍 ...
    jiangmo阅读 702评论 0 1
  • 我真喜欢过节啊 那样我才有借口跟你说一句话 XX节快乐啊
    归有路阅读 184评论 0 0
  • 爷爷年轻时是一名志愿军战士,在朝鲜战场上光荣负伤,身体落下残疾,但并非缺胳膊少腿的那种,而是双耳受到重创,...
    欢喜轩阅读 188评论 0 5
  • 之前遇到了很多关于导航栏的问题, 一直想封装一下, 以后就一劳永逸了, 一直没时间, 最近做项目, 实在忍不了了,...
    iOS_渔翁阅读 426评论 0 2