Java多线程同步
前言:
本章节是参考网上文章并自行研究锁的一部分总结,由于本人从事Android开发,所以在针对锁的底层实现时,会对比x86和ARM架构下对应的实现,如有问题请及时指出;
1. Java锁
- Lock出现之前,Java使用synchronized实现多线程同步,JDK5之后,JUC包下引入Lock接口及其实现类实现同步功能,Lock相比synchronized更灵活,拥有手动获取和释放锁、非阻塞式获取锁、响应中断、获取超时等新特点;
- Java提供了种类丰富的锁,按照特性的不同,可以按以下宏观概念理解:
- 乐观锁和悲观锁;
- 自旋锁;
- 无锁、偏向锁、轻量级锁、重量级锁;
- 公平锁和非公平锁;
- 可重入锁;
- 独享锁(排他锁)和共享锁;
1.1 乐观锁VS悲观锁
- 悲观锁:
对于同一数据的多线程并发操作,悲观锁认为,自己在使用数据的时候一定有其他线程来修改数据,因此get数据的时候先加锁,确保数据不会被其他线程修改,Java中的synchronized关键字和Lock的实现类都是悲观锁; - 乐观锁:
认为自己在使用数据时不会被别的线程修改,所以不会添加锁,只是在更新数据的时候去判断之前有没有其他线程更新了这个数据。如果没有被更新,当前线程write成功;如果被更新,当前线程根据不同的实现类,执行不同的操作(例如报错或自动重试)。乐观锁在Java中是通过使用无锁编程实现的,采用CAS算法,Java原子类中的递增操作就是通过CAS自旋实现的; - 使用场景:
- 悲观锁适合write场景,synchronized,ReentrantLock,都是显式的加锁后再操作同步资源或代码块;
- 乐观锁适合read场景,AtomicInteger.incrementAndGet(),Java层不锁定,直接操作资源;
- 乐观锁使用CAS算法实现多线程同步:
CAS(Compare and Swap),无锁算法,在不使用锁,并且线程没有被阻塞的情况下实现多线程之间数据同步。CAS算法涉及到三个操作数:需要读写的内存值V;进行比较的的值A;待写入的新值B;
当且仅当V==A,通过原子方式让V=B("比较+更新"整体是一个原子操作),否则由一个do()while{}循环,继续从头执行上述步骤,自旋次数也是有上限的(默认10次,看具体的操作系统,可以通过JVM参数修改);java.utils.concurrent包中的原子类就是通过CAS实现的,例如AtomicInteger,后续介绍: - CAS高效,因为自旋减少切换线程所需的开销,但存在几个问题:
- ABA的问题:CAS需要在操作值之前检查内存值是否发生变化,没有变化才更新内存值;但如果内存值由A->B->A最终变回A,但其实内存值是变化过的,这种情况不能被忽略,所以在变量前添加版本号即可;
- while循环时间长开销大:如果长时间while,会一直自旋,一直占用CPU资源,开销也很大;
- 只能保证一个共享变量的原子操作:无法保证同时对多个共享变量的原子性操作,所以JDK1.5开始提供了AtomicReference,把多个变量放到一个对象来保证执行CAS操作时的原子性;
- CAS无锁算法和加锁的效率,不一定谁低谁高,不同场景需要使用不同的方式解决安全问题;如果线程数和CPU核数一致,CAS效率高;但如果线程很多,远大于CPU核数,则CAS效率很低,因为CAS会占用CPU资源;
1.2 自旋锁
1.2.1 自旋锁介绍
阻塞或唤醒一个Java线程需要操作系统切换CPU状态,这种状态切换需要耗费处理器时间。如果同步代码内容很简单,状态转换消耗的时间可能比执行代码的还长,为了这种情况切换线程(线程挂起、恢复现场,状态同步),得不偿失;
如果机器有多个处理器,能让2个或以上线程同时并行执行,则可以让后来的请求锁的thread不放弃CPU的执行时间,而是通过自旋的方式"稍稍等待一下",看看当前持有锁的线程是否很快就释放锁。如果自旋完成后,前面线程释放了锁,则当前线程可以直接获取锁并且执行同步代码,而不必走"阻塞->唤醒"这样耗时耗资源的步骤,这就是自旋锁;
-
说白了,自旋锁可以简单理解为:如果发现锁被其他线程拿着,在当前线程,一定条件的while循环,而不是阻塞当前线程:
缺点:
不能代替线程阻塞,自旋是可以减少线程切换的开销,但会占用CPU执行时间。如果锁被占用的时间很少,那自旋锁效果很好,反之,也会浪费CPU资源。所以自旋的while是有限制的,超过自旋次数就挂起线程;
1.2.2 自旋锁具体实现原理
- 实现算法就是CAS,对应具体的实现类就是juc包下Atomic开头的原子类,下面从java.util.concurrent.atomic.AtomicInteger.getAndAddInt()逐步分析:
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // Unsafe对象可以直接获取并操作内存的数据 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); // 该成员变量对应真正value在AtomicInteger对象所在内存地址中的偏移量offset private static final long VALUE; static { try { VALUE = U.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (ReflectiveOperationException e) { throw new Error(e); } } // 真正的int值,volatile修饰,保证线程之间是可见的,在volatile章节已介绍可见性的实现 private volatile int value; ...... public final int getAndAdd(int delta) { // 最终调用Unsafe.getAndAddInt() return U.getAndAddInt(this, VALUE, delta); } }
CAS 本质:依赖核
- 接着看sun.misc.Unsafe类:
可以看到,native方法compareAndSwapInt()其实包括了2个操作:比较+更新,多个操作如何保证原子性?// sun.misc.Unsafe.getAndAddInt() public final int getAndAddInt(Object o, long offset, int delta) { int v; do { // 根据value在AtomicInteger对象所在内存地址中的偏移量offset获取当前内存中的值v,然后判断内存中的值是否等于v v = this.getIntVolatile(o, offset); // 然后调用native方法compareAndSwapInt(),对比v和(v+delta)的值是否相等,如果相等则表示没有其他线程更改过这个值,接着将新值设置到内存 } while(!this.compareAndSwapInt(o, offset, v, v + delta)); return v; }
- 继续往下找,以openjdk中HotSpot虚拟机为例,查看openjdk/hotspot/src/share/vm/prims/unsafe.cpp文件,从中找到Unsafe_CompareAndSwapInt,代码如下:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); // p就是内存中的Unsafe对象 oop p = JNIHandles::resolve(obj); // 从Unsafe对象中,根据偏移量offset拿到真正保存int值的地址 jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); // 调用Atomic::cmpxchg()函数进行"比较+更新"的操作,x就是待更新的值,e是原值,Atomic::cmpxchg()的返回值 return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END
- 上述最终调用Atomic::cmpxchg()函数实现,该函数的实现是平台相关的,不同平台实现不同,Linux x86系统的实现如下:
inline jint Atomic::cmpxchg(jint exchange_value, volatile jint* dest, jint compare_value) { // 这里mp表示当前系统是否为多核架构 // 如果是就给总线加锁,所以同一芯片上的其他处理器就暂时不能通过总线访问内存,保证了该指令在多处理器环境下的原子性 int mp = os::is_MP(); // LOCK_IF_MP():判断是否是多核架构,如果是,则需要添加"lock"指令,才能保证不被其他处理器打断cmpxchgl操作; __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" // 汇编模版,%1指向输入参数中的exchange_value,%3指向dest,类推... : "=a" (exchange_value) // 输出操作数,"a"对应累加寄存器EAX,下面的"r"指其他寄存器 : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) // 输入参数 : "cc", "memory"); // 一些其他参数,"memory" return exchange_value; }
这段ASM汇编代码简单翻译一下就是:汇编指令cmpxchgl会比较EAX寄存器的值(compare_value,该线程已经拿到的值)和exchange_value(待更新的值),如果相等,就把dest的值(即真实地址中的值)赋给exchange_value并返回给上层Unsafe_CompareAndSwapInt()中,在该函数中会将dest的值和当前线程已拿到的值e比较,此时不等,则CAS失败,继续do{}while{};否则,将待更新的值exchange_value写入EAX寄存器,CAS成功;
- 总结:
Linux x86平台,多核架构,JDK中AtomicXX类的CAS算法是由汇编指令"lock cmpxchgl"实现的;单核架构,不添加"lock"指令;该汇编指令保证了"比较+更新"操作的原子性; - 对比 Linux ARM架构的实现,代码在linux/arch/arm/include/asm/atomic.h文件中:
... #if __LINUX_ARM_ARCH__ >= 6 /* * ARMv6 UP and SMP safe atomic ops. We use load exclusive and * store exclusive to ensure that these are atomic. We may loop * to ensure that the update happens. */ ... \ #define ATOMIC_OP_RETURN(op, c_op, asm_op) \ static inline int atomic_##op##_return_relaxed(int i, atomic_t *v) \ { \ unsigned long tmp; \ int result; \ \ prefetchw(&v->counter); \ \ __asm__ __volatile__("@ atomic_" #op "_return\n" \ "1: ldrex %0, [%3]\n" \ " " #asm_op " %0, %0, %4\n" \ " strex %1, %0, [%3]\n" \ " teq %1, #0\n" \ " bne 1b" \ : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) \ : "r" (&v->counter), "Ir" (i) \ : "cc"); \ \ return result; \ }
- 可以从上面汇编代码里看到,Linux ARMv6及之后的架构,使用ldrex和strex指令独占的访问内存,实现原子性;
- 简单理解,这俩指令会将被访问的内存段设置"独占"标记,这俩指令也是ARM架构平台实现线程同步工具的基础;
- 补充LongAdder,专门用于基础数据类型递增的场景,替换AtomicXX类:
- 大致实现:
在increment()操作中,创建Cell数组,分块Cell累加,累加单元和CPU核数有关,利用多核CAS运算,提高整个递增的效率,最后longValue()获取结果时,所有的Cell累加后将结果抛到调用者; - 使用场景:
累加操作,且量级很大的时候替代AtomicXX使用;
- 大致实现:
1.2.3 Unsafe魔法类
不只是AtomicXXX的实现,看过Java锁的代码后发现,所有的锁最终都会调用sun.misc.Unsafe的API来保证操作的原子性,接着了解一下黑科技Unsafe类;
-
主要功能如下:
-
Unsafe.java是单例,提供静态方法getUnsafe()获取Unsafe对象,当且仅当调用getUnsafe()的类为BootstrapClassLoader加载时才合法,否则抛出SecurityException异常:
public final class Unsafe { // 单例对象 private static final Unsafe theUnsafe; private Unsafe() { } @CallerSensitive public static Unsafe getUnsafe() { Class var0 = Reflection.getCallerClass(); // 仅在引导类加载器`BootstrapClassLoader`加载时才合法 if(!VM.isSystemDomainLoader(var0.getClassLoader())) { throw new SecurityException("Unsafe"); } else { return theUnsafe; } } }
-
Unsafe类是JDK提供给Java开发者的一个操作系统后门,可以直接对操作系统的内存等资源进行操作,即:增强了Java语言对底层资源操作的能力。既然可以像C/C++一样操作内存空间,使用Unsafe是比较高风险的,需要谨慎,下面简单描述下Unsafe常见的能力:
-
内存操作:
- 主要包含堆外内存的分配、拷贝、释放、给定地址值操作等方法;
- 通常Java中创建的对象都位于堆内内存(heap),heap由JVM管控,是Java进程内存,遵循JVM垃圾回收机制;而Unsafe提供的接口是对 "对外内存"进行操作,不受JVM内存管理机制约束,所以使用不当很危险;
- 使用堆外内存的好处:1.改善垃圾回收时造成的停顿现象;2.提升I/O性能,通常在I/O通信过程中,会存在堆内内存到堆外内存的数据拷贝操作,对于需要频繁进行内存间数据拷贝且生命周期较短的暂存数据,都建议存储到堆外内存;
- 应用:nio包下的DirectByteBuffer是Java层使用堆外内存的具体实现,内部接口就是使用Unsafe实现的;
-
CAS:
- CAS:实现并发算法时用到的一种技术,Compare And Swap;
- CAS操作包含3个操作数:内存值value、预期的原值old、新值new;
- 执行CAS操作时,比较value和old,如果value==old,则更新内存值value=new,否则根据具体的实现或进行自旋,或报错等等;
- 应用:JUC包下的AtomicXXX最终执行Unsafe.compareAndSwapXXX();还有Java AQS、CurrentHashMap等;
-
Class相关:
- 提供Class和它的静态字段的操作相关方法,包含静态字段内存定位、定义类、定义匿名类、检验&确保初始化等;
- 应用:从Java 8开始,JDK使用invokedynamic(JDK 7引入的运行动态语言的一条新的虚拟机指令)及VM Anonymous Class(一种模版机制)结合来实现Java语言层面上的Lambda表达式;
-
操作对象:
- 对对象成员属性、非常规的实例化等操作(Unsafe.allocateInstance()等);
- 常规的对象实例化:使用new关键字,如果Foo类只有有参构造函数,且没显示声明无参构造函数,则只能调用有参构造函数创建对象;
- 非常规对象实例化:上述Foo,可以使用Unsafe.allocateInstance()绕过JVM安全检查,直接调用Foo隐式的无参构造函数实例化对象;
- 应用:Gson反序列化;日常开发需要注意这俩问题:Gson与Kotlin碰撞出一个不安全的操作、Gson 又搞了个坑
-
线程调度:
- 包括线程阻塞(park())、唤醒(unpark())、锁机制(Unsafe.monitorEnter()...)等方法;
- Java锁的核心类AQS;
-
获取系统信息:
- 获取系统相关信息:返回系统指针的大小、获取内存页的大小;
- 应用:nio包下的工具类Bits,DirectByteBuffer中使用了Bits;
-
内存操作:
1.3 无锁 VS 偏向锁 VS 轻量级锁 VS 重量级锁
- JDK 1.6中引入了这4个概念,具体是指锁的四种状态,针对synchronized设定的,对于Android系统的synchronized可以参考synchronized 第1.3节第9点最后的流程图;
- 无锁:
没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功; - 偏向锁:
是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价(如果一段synchronized代码,只有一个线程在执行,每次执行都CAS操作,该操作中包含"获取锁->释放锁",没有必要)。只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。释放后锁的状态恢复到无锁或轻量级锁; - 轻量级锁:
是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能;偏向锁和轻量级锁对应 Android虚拟机ART 的"瘦锁"态。 - 重量级锁:
若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁,这里的重量级锁对应 Android虚拟机ART 的"胖锁"态,即 使用ART封装的Mutex锁。
1.4 公平锁 VS 非公平锁
- 公平锁:
公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。 - 非公平锁:
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。 - Java中具体的实现,ReentrantLock可以选择创建哪种锁,默认是创建非公平锁:
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
- 在JDK8中,对比ReentrantLock实现 公平锁/非公平锁 获取锁的代码:
// 非公平锁的实现 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { ... return true; } return false; }
唯一区别在于,公平锁在尝试获取锁时,增加了!hasQueuedPredecessors()判断,接着查看该方法:// 公平锁的实现 static final class FairSync extends Sync { ... /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 区别!!! if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { ... return true; } return false; } }
该方法的作用是,判断当前线程是否位于同步队列中的第一个,如果是则返回true;即:通过同步队列来实现多个线程按照申请锁的顺序来获取锁;而非公平锁则直接尝试获取锁;public final boolean hasQueuedPredecessors() { ... Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
1.5 可重入锁
- 可重入锁也叫递归锁,指在同一线程,使用同一个锁对象时,外层方法获得锁后,再进入内层方法时,自动获得锁,不会因为外层方法没释放锁导致线程阻塞,当然可重入的次数是有上限的;
- synchronized是可重入锁,之前了解过,ART虚拟机,使用synchronized时,对象锁的可重入状态封装在c++代码LockWord类的成员变量uint32_t value_中,同一个线程多次获取对象锁时,这种情况说明是瘦锁状态,该value_字段的27-16位就保存了 Lock Count,即同一线程获取该对象锁的次数;
- ReentrantLock也是可重入锁,ReentrantLock内部通过Sync类实现所有的同步机制,而Sync继承自AbstractQueuedSynchronizer(AQS,后面介绍),AbstractQueuedSynchronizer中state变量用来记录锁被获取的次数(state是一个被volatile修饰的int类型),通过公平锁FairSync.lock()方法逐步分析,代码如下:
接着看AQS.acquire(int)方法:static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 如果创建的是公平锁,那ReentrantLock.lock()方法最终会调到这里 final void lock() { // Sync继承自AQS,该方法定义在AQS中 acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 介绍公平锁/非公平锁的时候知道,区别就在这里,获得锁的顺序是否按照申请锁的顺序,由一个队列维护先后顺序 ... } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
简单理解就是:
当前线程获得锁之前,调用lock.lock()时,会走到AQS.tryAcquire()中,具体实现在子类,但都是先拿到state进行判断,如果state==0,说明该锁没有被任何一个线程持有,则让当前线程安全的(通过CAS操作保证获取锁的过程是原子的)拿到该锁;
如果state!=0,则说明该锁已经被某个线程持有,则继续判断持有锁的线程是否是当前线程;
如果是当前线程,则state++,说明当前线程再一次拿到该锁;
如果不是当前线程,则回到AQS.acquire()方法中,执行acquireQueued(),将本次获取锁的请求放入队列,本次获取锁的操作失败,走到后续的失败处理流程中,后续介绍;
1.6 独享锁 VS 共享锁
独享锁和共享锁同样也是一种概念;
是对可重入锁的进一步优化,因为单纯的可重入锁ReentrantLock,state状态记录的是包括读和写,即"读,写"都需要加锁,效率还是比较低,所以引入"独享锁(排他锁)和共享锁";
独享锁也叫排他锁,是指该锁一次只能被一个线程所持有。如果线程T对共享数据A加上排它锁后,则其他线程不能再对A加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。synchronized和Lock接口的实现类都是独享锁;
共享锁是指该锁可被多个线程所持有。如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。
-
具体的实现类是java.util.concurrent.locks.ReentrantReadWriteLock,实现了ReadWriteLock接口:
内部有两把锁ReadLock和WriteLock分别标记读和写,都是Sync的子类,Sync是AQS(AbstractQueuedSynchronizer)的子类,state就保存在AQS中;读锁是共享锁,写锁是独享锁。读锁可保证并发读非常高效,而读写、写读、写写的过程互斥,因为读锁和写锁是分离的。所以ReentrantReadWriteLock的并发性相比一般的互斥锁有了很大提升;
-
其原理简单概括:将32位int类型的变量state分为【高16位】和【低16位】,分别存储读锁个数和写锁个数,先看写锁的代码tryAcquire():
// 写锁代码 protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); // 取到当前ReentrantReadWriteLock锁的state int w = exclusiveCount(c); // 取写锁的个数w,即独享锁个数 if (c != 0) { // 如果已经有线程持有了锁(c!=0) // 如果写线程数(w)为0(换言之仅存在读锁) 或者持有锁的线程不是当前线程就返回失败 // 当已经存在某个线程拿到读锁时,不允许其他线程再获得写锁,否则无法保证数据对所有线程都是最新的,所以此处直接返回false,表示获取写锁失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) // 如果写锁的重入数大于最大数(65535,2的16次方-1)就抛出一个Error。 throw new Error("Maximum lock count exceeded"); // 成功拿到写锁! setState(c + acquires); return true; } // 如果当且写线程数为0,并且当前线程需要阻塞(该方法只用于公平锁,即:因为最终调用的是hasQueuedPredecessors(),只有在等待队列的线程,才会被阻塞),则获取写锁失败;或者如果通过CAS增加写线程数失败也返回失败 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 如果c=0,w=0或者c>0,w>0(重入),则设置当前线程或锁的拥有者,获得写锁成功 setExclusiveOwnerThread(current); return true; }
接着看读锁的代码:
// 读锁代码 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 如果其他线程已经获取了写锁,则当前线程获取读锁失败,并进入等待状态 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 拿到读锁个数 int r = sharedCount(c); // readerShouldBlock()方法依然调用的是hasQueuedPredecessors() // 执行compareAndSetState()方法,更新state的值 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { ... return 1; } return fullTryAcquireShared(current); }
-
总结:
- 对于ReentrantReadWriteLock,获得读锁和写锁的操作是互斥的,可能会造成线程阻塞;而获得读锁和读锁的操作是共享的,不会造成线程阻塞;
- 对于ReentrantLock,不论创建的是公平锁还是非公平锁,不论是读操作还是写操作,添加的都是独享锁;
- 所以ReentrantReadWriteLock在"读读操作"时的效率比ReentrantLock高;
2. AbstractQueuedSynchronizer原理
2.1 介绍
- AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架;
- Java中的大部分同步类(Lock接口、ReadWriteLock接口、Semaphore、CountDownLatch等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的;
- 即:Java层又实现的一套同步锁框架,使用一个阻塞队列(双向链表),解决多线程并发问题;只不过AQS通过CAS无锁算法和自旋,可以让多个线程在尽可能不暂停线程的情况下,达到抢占锁的目的;
- 本节将从ReentrantLock的实现分析AQS;
2.2 Lock的使用方法
介绍AQS原理前补一下ReentrantLock的基本使用方法,它比synchronized更加灵活,上锁和解锁的逻辑被开发者控制:
Lock mLock;
public void testReentrantLock() {
mLock = new ReentrantLock(true);
try {
if (mLock.tryLock(1000, TimeUnit.MILLISECONDS)) {
// Do somethings ...
} else {
// Other things ...
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
mLock.unlock();
}
}
2.3 AQS的实现原理
- 从最主要的
ReentrantLock.lock()
加锁方法着手分析,以NonfairSync实现为例,代码如下:final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
- 如果对state CAS成功,则当前线程直接获得锁成功;
- 如果对state CAS失败,则进入
acquire();
方法进行后续处理;
- 获取锁成功的逻辑,在上面已经介绍过,不同类型的锁有不同的实现;而对于获取锁失败后,肯定存在某种排队等待的机制,让线程继续拥有获得锁的机会,而不是直接结束流程;从上面代码可以知道,需要看下acquire()方法的实现逻辑,而该方法定义在
java.util.concurrent.locks.AbstractQueuedSynchronizer
中,所以下面分析一下AQS的原理; - AQS的整体结构如下图所示:
- AQS核心思想:如果共享资源没有被占用,则将使用共享资源的线程设置为当前线程;如果共享资源被占用,则通过阻塞、等待唤醒的机制,确保锁可以正常的被分配,线程仍然有机会获得锁;
- 这种阻塞等待唤醒机制是依赖CLH(一种单链表)变体实现的,AQS中的队列是CLH变体的虚拟双向队列(FIFO),通过将每条请求共享资源的线程封装成一个节点Node来实现锁的分配,而每个Node都是去操作AQS的volatile成员变量state实现同步的;
- 继续回到acquire()方法代码:
tryAcquire()方法是尝试获得锁和处理重入问题,如果失败则会继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,该步骤内部就是将当前线程信息封装成Node对象,加入等待队列中,接着将该线程park;具体来说,是将新Node添加到双向链表的尾部,并将头节点指向刚创建的Node对象,接着看acquireQueued()方法;public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这就是获取锁时的阻塞等待机制的实现,至于被阻塞的线程什么时候被唤醒,需要了解一下解锁的过程;final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 开始自旋,对整条链表,从后往前遍历 for (;;) { // 先拿到当前节点的【前驱节点】 final Node p = node.predecessor(); // 如果正好是头节点,则说明当前节点是等待队列的头部,轮到当前节点再次尝试拿锁 if (p == head && tryAcquire(arg)) { // 如果成功拿到锁,则指针前移,将node的前驱节点置null,即让prev出队 setHead(node); p.next = null; // help GC failed = false; // 并且不需要让当前线程中断,因为成功拿到锁了,所以此处是唯一跳出循环的地方 return interrupted; } // 然后根据前驱节点,判断当前节点是否需要阻塞 // 如果需要则通过parkAndCheckInterrupt()将当前线程阻塞,防止死循环浪费CPU资源 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
- ReentrantLock的解锁过程不区分公平锁和非公平锁,查看java.util.concurrent.locks.ReentrantLock.unlock()函数:
public void unlock() { // 最终还是调用了AQS.release(int)函数 sync.release(1); }
- 到java.util.concurrent.locks.AbstractQueuedSynchronizer:
public final boolean release(int arg) { // 已获取锁的线程执行完同步代码块后,尝试释放锁,即将当前锁的成员变量exclusiveOwnerThread置空,并通过CAS操作安全的更新AQS.state的值 if (tryRelease(arg)) { // 如果lock没有被任何线程占用,则通过unparkSuccessor()函数将当前线程Node的后继节点(因为是一条链表,此时后继节点一定是被阻塞的)唤醒 // 【注意】:为什么是当前阻塞队列head的后继节点呢?因为每个阻塞队列的head是一个Dummy(哑元)节点,是个占位节点,他的next才是阻塞队列的首个节点 // unparkSuccessor()内部通过LockSupport.unpark(thread)将后继节点线程唤醒 // 唤醒节点之前也会把当前线程节点状态置成初始状态0,方便后续操作将该节点从链表中移除 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
2.3.1 wait/notify和await/singal和park/unpark
-
obj.wait()/notify()
:依托于synchronized,即底层依赖Monitor实现线程阻塞和唤醒,和synchronized配合使用;public void test() { Object o = new Object(); synchronized (obj) { try { obj.wait(); } cache (...) { } } }
-
ReentrantLock.ConditionObject.await()/signal()
:是条件变量的方法,底层还是基于park()/unpark();
;即:ConditionObject内部本质就是一个双向队列(但只用了nextWaiter); -
LockSupport.park(thread)/unpark(thread)
:精准的暂停或唤醒某一个thread对象,而上面的obj.notify();
是随机唤醒一个thread;底层设计基于"许可",C层使用volatile int _counter
保存这个许可;(以上是JDK的实现);
2.4 AQS的应用
AQS是Java中自定义线程同步锁的基础框架,在此基础上可以自定义各种同步工具:
- ReentrantLock
- ReentrantReadWriteLock
- Semaphore:
限量锁,在构造方法中设置permits最大允许多少个线程可访问一段代码,如果访问的线程超过permit,则剩下的thread等待;场景类似厕所蹲坑; - CountDownLatch:
可以在构造函数中,设置门闩个数,当门闩个数为0,才会走之后的逻辑;场景类似LOL,等10个人都加载完毕,游戏才能开始; - ThreadPoolExecutor
2.5 总结
JDK 1.6中引入了多种多线程同步工具
- AtomicXXX原子类,是通过CAS无锁算法,在一定程度上实现同步的,而CAS算法的实现,是通过Unsafe魔法类保证"比较+更新"操作的原子性的;
- Lock接口、ReadWriteLock接口的实现类等,是基于AQS实现同步,AQS引入双向链表队列结构,将"申请锁"的操作封装并放入等待队列中,实现线程排队获取锁的机制,此外AQS的底层也是通过CAS算法保证操作的原子性;
- AQS本质:
原子变量(int state)+队列(双向链表)+ park/unpark阻塞唤醒线程
,用上面3要素来搭建一个Java层的锁框架,真正实现还是各种和业务场景相关的子类;
参考
Java核心技术 卷I
不可不说的Java“锁”事
Java CAS 原理剖析
Java魔法类:Unsafe应用解析
从ReentrantLock的实现看AQS的原理及应用