文章目录
了解高并发必须知道的概念
了解Java并发包Concurrent发展简述
1.JUC之魔法类(Unsafe)解析
2.Unsafe实现CAS的核心 API
四.AtomicInteger源码浅析
2.ABA问题的解决方案之AtomicStampedReference
2.使用Unsafe实现一个简单原子类型
并发编程最佳学习路线
【Java多线程】了解线程的锁池和等待池概念
【Java基础】多线程从入门到掌握
【Java多线程】线程通信
了解高并发必须知道的概念
【Java多线程】高并发修炼基础之高并发必须了解的概念
了解Java并发包Concurrent发展简述
【Java多线程】JUC之Java并发包Concurrent发展简述(各版本JDK中的并发技术)
了解锁的分类
【Java多线程】成神之路中必须要了解的锁分类
一.CAS是什么
我们平时用 (synchronized,Lock) 都属于 悲观锁 。它总是人为 每次修改数据之前都可能被其他人(线程)修改,所以在访问资源的时候就会对资源进行加锁 。 当线程获取到锁后,其他需要获取锁的线程就要 阻塞 ,等待持有锁的线程释放锁。
CAS机制属于 乐观锁 ,乐观锁的核心思路就是 每次不加锁而是假设修改数据之前其他线程一定不会修改,如果因为修改过产生冲突就失败就重试,直到成功为止 。
CAS的全称是 Compare And Swap ,翻译过来就是 比较并交换 。是一种 无锁算法 ,是Java提供的 非阻塞原子性操作 。 在不使用锁的情况下实现多线程下的同步。在并发包中(
java.util.concurrent.atomic ) 原子类型 都是使用CAS来实现乐观锁的。
特点: CAS算法是 非阻塞 的, 当有多个线程对内存中的数据进行CAS操作时,CPU能保证只会有一个线程能更新成功 ,其余线程并不会阻塞,而是继续尝试获取更新,当然也可以主动放弃。因此不可能出现死锁的情况。也就是说 无锁操作天生免疫死锁 。
原理:在Java中可以通过 Unsafe类 实现CAS操作,而Unsafe类最终调用的是 native方法 ,即在 Java中通过JNI调用C或者C+系统函数 ,来调用 CPU提供的 cmpxchgl指令 实现的原子性的。
CAS算法的过程是这样:它包含 3个参数 CAS(V、E、N)
基本思想 : 将内存位置的值V与预期值E比较,如果V=E,那么处理器会自动将V更新为A。否则,处理器不能做任何操作。V:要更新的变量E:预期值N:新值
.
CAS操作可以分为3个步骤:
1) 将预期值E与内存中的值V比较;
2) 如果V值不等于E值,说明其他线程做了更新,那么什么也不做,如果E与V的值相等,那么就将V的值设置为N (保证了新值总是基于最新的信息计算的)
3) 返回操作是否成功。
什么是预期值? :也就是 你认为现在变量应该是什么样子,如果变量不是你想象的那样,那说明已经被别人修改过。你就重新读取,再次尝试修改即可。
简单的来说CAS就是在一个 死循环 中判断 预期的值E 和 内存中的值V 是否相等,相等的话就将 V修改为N 成功后退出循环,如果不相等的话就继续循环直到E等于V并且更新V=N成功退出循环
注意: CAS有循环开销大的问题 ,因为会一直循环到 预期值E和内存值V相等修改成功 。 同时CAS只能保证一个共享变量的原子性的问题 。不过在 JDK1.5 之后加入了 “AtomicReference类“来保证引用对象之间的原子性。
二.Unsafe类是什么
1.JUC之魔法类(Unsafe)解析
看我的这篇文章 【Java多线程】JUC之魔法类(Unsafe)解析 就行了,很适合小白入门
2.Unsafe实现CAS的核心 API
1.compareAndSwapObject2.compareAndSwapInt3.compareAndSwapInt
原子变量提供的原子性来自 CAS操作 ,CAS来自 Unsafe 提供的api,然后由CPU的 cmpxchg 指令 来保证。
(cmpxchg是汇编指令,作用:比较并交换操作数)
三.Atomic工具包
Atomic工具包是JDK1.5出现的并发工具,主要用于 保证变量的原子性和可见性。
1.常用原子类型
普通原子类型:提供对boolean、int、long和对象的原子性操作。
AtomicBooleanAtomicIntegerAtomicLongAtomicReference
原子类型数组:提供对数组元素的原子性操作。
AtomicLongArrayAtomicIntegerArrayAtomicReferenceArray
原子类型字段更新器:提供对指定对象的指定字段进行原子性操作。
AtomicLongFieldUpdaterAtomicIntegerFieldUpdaterAtomicReferenceFieldUpdater
带版本号的原子引用类型:以版本戳的方式解决原子类型的ABA问题。
AtomicStampedReferenceAtomicMarkableReference
原子累加器(JDK1.8):AtomicLong和AtomicDouble的升级类型,专门用于数据统计,性能更高。
DoubleAccumulatorDoubleAdderLongAccumulatorLongAdder
2.使用案例
2.1.测试原子累加和非原子累加
publicclassAtomicIntegerTest{publicstaticAtomicInteger total =newAtomicInteger();publicstaticInteger count =0;privatestaticInteger threadNum =1000;publicstaticCountDownLatch downLatch =newCountDownLatch(threadNum);publicstaticvoidmain(String[] args) throws InterruptedException{ Long startTime = System.currentTimeMillis();// 创建1000个线程,每个线程中的run()方法将count累加1000次for(inti =0; i < threadNum; i++) {newThread(() -> {// 每个线程累加1000次for(intj =0; j < threadNum; j++) {//原子自增total.getAndIncrement();//非原子自增count++; }//加入等待downLatch.countDown(); }).start(); }//等待所有线程完成downLatch.await(); System.out.println("非原子:"+count); System.out.println("原子:"+total.get()); }}
执行结果:
创建 1000 个线程分别对 total 进行累加,由于没有对 total 做任何线程安全操作所以 结果一定是小于或者等于 1000000
通过 Integer 的原子类, AtomicInteger 类来进行自增操作,然后调用 getAndIncrement() ,保证每次都是 原子操作 。最终的结果输出: 1000000 。
2.2.使用Unsafe属性进行原子操作
获取属性偏移量
通过 CAS 方式进行修改
publicclassUnsafeTest{publicstatic Unsafe U; static {try{ Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); U = (Unsafe)f.get(null); }catch(Throwable e) { e.printStackTrace(); } }publicstatic void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException { User user = new User();// 获取字段Field age = user.getClass().getDeclaredField("age");// 获取字段相对Java对象的"起始地址"的偏移量long offset = U.objectFieldOffset(age);// cas设置值,如果age的内存值等于10,就将age的内存值修改为20,返回true,否则返回falseboolean success = U.compareAndSwapInt(user, offset,10,20) ; System.out.println("修改结果: "+ success ) ;// 打印数据System.out.println("查询结果: "+ user.getAge()); }}classUser{privateint age;publicUser() {this.age =10; }publicint getAge() {returnage; }}
执行结果
3.>>>>>>>>>具体用法
Atomic类型的具体用法详细说明请看文章 【Java基础】多线程从入门到掌握 的 第十六节-使用Atomic(原子类)
四.AtomicInteger源码浅析
publicclassAtomicIntegerextendsNumberimplementsjava.io.Serializable{privatestaticfinallongserialVersionUID =6214790243416807050L;// 获取指针类Unsafe privatestaticfinalUnsafe unsafe = Unsafe.getUnsafe();//变量value在AtomicInteger实例对象内的内存偏移量 privatestaticfinallongvalueOffset;static{try{//通过unsafe类的objectFieldOffset()方法,获取value变量在对象内存中的偏移量 //通过该偏移量valueOffset,使用unsafe类的内部方法可以直接对内存中value进行取值或赋值操作 valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value")); }catch(Exception ex) {thrownewError(ex); } }//当前AtomicInteger封装的int变量value,该属性通过volatile保证其在线程间是可见性。privatevolatileintvalue;//构造方法 publicAtomicInteger(intinitialValue){ value = initialValue; }publicAtomicInteger(){ }//获取当前最新值 publicfinalintget(){returnvalue; }//设置当前值,具备volatile效果,方法用final修饰是为了更进一步的保证线程安全。 publicfinalvoidset(intnewValue){ value = newValue; }//最终会设置成newValue,使用该方法后可能导致其他线程在之后的一小段时间内可以获取到旧值,有点类似于延迟加载publicfinalvoidlazySet(intnewValue){ unsafe.putOrderedInt(this, valueOffset, newValue); }//设置新值并获取旧值,底层调用的是CAS操作即unsafe.compareAndSwapInt()方法publicfinalintgetAndSet(intnewValue){returnunsafe.getAndSetInt(this, valueOffset, newValue); }//如果当前值为预期值expect,则设置为update(当前值指的是value变量) publicfinalbooleancompareAndSet(intexpect,intupdate){returnunsafe.compareAndSwapInt(this, valueOffset, expect, update); }//当前值加1返回旧值,底层CAS操作 publicfinalintgetAndIncrement(){returnunsafe.getAndAddInt(this, valueOffset,1); }//当前值减1,返回旧值,底层CAS操作 publicfinalintgetAndDecrement(){returnunsafe.getAndAddInt(this, valueOffset,-1); }//当前值增加delta,返回旧值,底层CAS操作 publicfinalintgetAndAdd(intdelta){returnunsafe.getAndAddInt(this, valueOffset, delta); }//当前值加1,返回新值,底层CAS操作publicfinalintincrementAndGet(){returnunsafe.getAndAddInt(this, valueOffset,1) +1; }//当前值减1,返回新值,底层CAS操作 publicfinalintdecrementAndGet(){returnunsafe.getAndAddInt(this, valueOffset,-1) -1; }//当前值增加delta,返回新值,底层CAS操作 publicfinalintaddAndGet(intdelta){returnunsafe.getAndAddInt(this, valueOffset, delta) + delta; }//省略一些不常用的方法....}
AtomicInteger是基于Unsafe类中CAS相关操作实现的,是无锁操作。
publicfinalintgetAndIncrement(){returnunsafe.getAndAddInt(this, valueOffset,1); }
调用了Unsafe类中的 getAndAddInt() 方法,该方法执行一个 CAS操作 ,保证变量的 原子性。
可看出 getAndAddInt 通过一个 do-while循环不断的重试更新要设置的值,直到成功为止, 调用的是Unsafe类中的 compareAndSwapInt 方法, 是一个CAS操作方法。即: 如果 var1+var2 在内存的值等于 var5 ,修改 var1+var2 在内存的值为 var5 + var4(也就是 内存值+=1 ),返回true,如果不相等则返回false,继续 循环尝试更新
如果将 compareAndSwapInt(var1, var2, var5, var5 + var4) 换成 compareAndSwapInt(obj, offset, expect, update) 就比较清楚了,意思就是如果 obj 内的 value 和 expect 相等,就证明没有其他线程改变过这个变量,那么就更新它为 update ,如果这一步的 CAS 没有成功,那就采用 自旋 的方式继续进行 CAS 操作,取出乍一看这也是2个步骤了啊, 其实是在JNI 里调用 C的系统函数 发送 CPU指令cmpxchgl 完成的。所以还是 原子操作 。
publicfinalintgetAndAddInt(Object var1,longvar2,intvar4){intvar5;do{ var5 =this.getIntVolatile(var1, var2); }while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));returnvar5;}
上述源码分析是基于JDK1.8的,如果是1.8之前的方法实现如下`:
//JDK 1.7的源码,由for的死循环实现,并且直接在AtomicInteger实现该方法,//JDK1.8后,该方法实现已移动到Unsafe类中,直接调用getAndAddInt方法即可publicfinalintincrementAndGet(){for(;;) {intcurrent = get();intnext = current +1;if(compareAndSet(current, next))returnnext; }}
五.CAS缺陷
1.CAS缺点有哪些?
循环时间长时开销大:如果CAS不成功,则会原地自旋,因为会一直循环到预期值E和内存值V相等修改成功 ,如果长时间自旋会给CPU带来非常大的执行开销。果长时间自旋会给CPU带来非常大的执行开销。
只能保证一个共享变量的原子操作:当对1个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是 对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用 锁 ,或者有一个取巧的办法,就是 把多个共享变量合并成一个共享变量来操作 。比如有2个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。在 JDK1.5 之后加入了 “AtomicReference类“来保证引用对象之间的原子性。可以把多个变量放在一个对象里来进行CAS操作。
ABA问题如果 内存地址V 初次读取的值是 A ,并且在准备赋值的时候检查到它的值仍然为 A ,那我们就能说它的值没有被其他线程改变过了吗?如果在这段期间它的值 曾经被改成了B,后来又被改回为A,那CAS操作就会误认为它从来没有被改变过 。 无法正确判断这个变量是否已被修改过,一般称这种情况为ABA问题。即线程1在把A的值改成了B,线程2又把B改回了A,此时再来一个线程3操作时发现A的值并没有改变,但实际上A的值是有被操作过的,为了避免在这种情况下CAS锁失效.. ABA问题的解决思路就是使用 版本号 。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么 A-B-A 就会变成 1A-2B-3A 。在 JDK1.5 之后加入了 “AtomicReference类“来解决ABA问题 。这个类的 compareAndSet 方法作用是 首先检查 当前引用 是否等于 预期引用 ,并且 当前标志 是否等于 预期标志 ,如果 全部相等 ,则以原子方式将该引用和该标志的值设置为给定的更新值。 解决了多线程反复读写时,无法预知值是否已被修改的问题。
因此,在使用CAS前要考虑清楚 “ABA”问题是否会影响程序并发的正确性, 如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更高效。
2.ABA问题的解决方案之AtomicStampedReference
在JDK1.5后. Atomic包加入了 AtomicStampedReference类通过给每个变量加入了一个 版本号来避免ABA问题。每次修改变量值的操作,都会比对当前值和期望值,当前版本号和预期版本号,只有二者都相同是才能更新成功
底层实现:由一个 自定义键值对Pair 存储对象引用reference和版本号stamp,并构造volatile修饰的私有实例; 更新数据时,不但会对比当前值和期望值,还会对比当前版本号和预期版本号,只有二者都相同,才会调用 Unsafe的compareAndSwapObject方法执行数值和版本号替换 。
它的构造方法有 2 个参数( 初始值,初始版本号 ),在修改数据时需提供一个新的值和一个新的版本号,这样在多线程情况下只要数据被修改了那么版本号一定会发生改变,另一个线程拿到的是旧的版本号所以会修改失败。tips: AtomicMarkableReference : 带boolean值标识的原子引用类型 ,true和false两种切换状态表示是否被修改。不靠谱。
在此之前我们先模拟一个 ABA 问题:
publicclassABATest{//初始原子类型的值为10privatestaticAtomicInteger index =newAtomicInteger(10);publicstaticvoidmain(String[] args){newThread(() -> {//如果内存值为10,则修改为101index.compareAndSet(10,101);//如果内存值为101,则修改为10index.compareAndSet(101,10); System.out.println(Thread.currentThread().getName() +": 10->101->10"); },"zhangSan").start();newThread(() -> {try{ TimeUnit.SECONDS.sleep(2); }catch(InterruptedException e) { e.printStackTrace(); }//如果内存值为10,则修改为1000,并返回修改结果boolean result = index.compareAndSet(10,1000); System.out.println(Thread.currentThread().getName() +": 更新结果:"+ result +", 更新后的值: "+ index.get()); },"liSi").start(); }}
执行结果
通过 AtomicStampedReference 来解决这个问题 : 就是我们在进入第2个线程之前,先读取当前的 版本号 ,然后进入更新。这个读取的 版本号 可能是一个旧的值。如果出现这种情况,那么我们执行 compareAndSet 就会返回失败。
publicclassABATest2{//初始原子类型的值为10,版本号为1privatestaticAtomicStampedReference index =newAtomicStampedReference(10,1);publicstaticvoidmain(String[] args){newThread(() -> {//如果内存值为10,且内存版本号相等,则修改为101,且版本号+1index.compareAndSet(10,101, index.getStamp(), index.getStamp() +1);//如果内存值为101,,且内存版本号相等,则修改为10,且版本号+1index.compareAndSet(101,10, index.getStamp(), index.getStamp() +1); System.out.println(Thread.currentThread().getName() +": 10->101->10"); },"zhangSan").start();//获取当前版本号intstamp = index.getStamp();newThread(() -> { 、try{ TimeUnit.SECONDS.sleep(2); }catch(InterruptedException e) { e.printStackTrace(); }//如果内存值为10,且内存版本号相等,则修改为1000,且版本号+1,返回true,否则什么也不做,返回falseboolean result = index.compareAndSet(10,1000, stamp, stamp +1); System.out.println(Thread.currentThread().getName() +": 更新结果:"+ result +", 更新后的值: "+ index.getReference()); },"liSi").start(); }}
执行结果
六.拓展
1.CAS实现单例模式
publicclassSingleton{privatestaticAtomicReference singletonAtomicReference =newAtomicReference<>();privateSingleton(){ }publicstaticSingletongetInstance(){while(true) {// 获得singletonSingleton singleton = singletonAtomicReference.get();// 如果singleton不为空,就返回singletonif(singleton !=null) {returnsingleton; }// 如果singleton为空,创建一个singletonsingleton =newSingleton();// CAS操作,预期值是NULL,新值是singleton// 如果成功,返回singleton// 如果失败,进入第二次循环,singletonAtomicReference.get()就不会为空了(当前值等于null,更新当前值为singleton)if(singletonAtomicReference.compareAndSet(null, singleton)) {returnsingleton; } } }publicstaticvoidmain(String[] args) throws InterruptedException{intthreadNum =1000; CountDownLatch downLatch =newCountDownLatch(threadNum);for(inti =0; i < threadNum; i++) {newThread(() -> {for(intj =0; j < threadNum; j++) { System.out.println(Singleton.getInstance()); }//加入等待downLatch.countDown(); }).start(); }//等待所有线程完成downLatch.await(); System.out.println("执行结束"); }}
2.使用Unsafe实现一个简单原子类型
测试原子和非原子操作
publicclassMyAtomicInteger{privatestatic long offset;//偏移地址privatestatic Unsafe unsafe; static {try{ Field theUnsafeField = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafeField.setAccessible(true); unsafe = (Unsafe) theUnsafeField.get(null); Field field = MyAtomicInteger.class.getDeclaredField("value"); offset = unsafe.objectFieldOffset(field);//获得偏移地址}catch(Exception e) { e.printStackTrace(); } }privatevolatileint value;publicvoid increment() { int tempValue;do{ tempValue = unsafe.getIntVolatile(this, offset);//拿到值}while(!unsafe.compareAndSwapInt(this, offset, tempValue, value +1));//CAS自旋}publicintget() {returnvalue; }publicfinalint incrementAndGet() {returnunsafe.getAndAddInt(this, offset,1) +1; }/** *@paramobj 实例 *@paramoffset 实例变量的偏移量 *@paramexpect 预期值 *@return*/publicfinalint getAndAddInt(Object obj, long offset, intexpect) { int tempValue;do{ tempValue = unsafe.getIntVolatile(obj, offset);//实例,示例属性的偏移量,预期值,更新的值//如果偏移量的值等于预期值值,更新offset=tempValue + expect,返回true,否则不更新offset,返回false}while(!unsafe.compareAndSwapInt(obj, offset, tempValue, tempValue +expect));//CAS自旋returntempValue; }}
publicclassMyAtomicIntegerTest{publicstaticMyAtomicInteger total =newMyAtomicInteger();publicstaticInteger count =0;privatestaticInteger threadNum =1000;publicstaticCountDownLatch downLatch =newCountDownLatch(threadNum);publicstaticvoidmain(String[] args) throws InterruptedException{ Long startTime = System.currentTimeMillis();for(inti =0; i < threadNum; i++) {newThread(() -> {for(intj =0; j < threadNum; j++) { total.incrementAndGet(); count++; }//加入等待downLatch.countDown(); }).start(); }//等待所有线程完成downLatch.await(); System.out.println("非原子:"+count); System.out.println("原子:"+total.get()); }}
执行结果
看完本文如果对你有帮助的话,可以转发关注支持一下。