前言
CAS(compare and swap, 比较并交换),是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。
简单来说,CAS可以保证多线程对数据写操作时数据的一致性。
CAS的思想:三个参数,一个当前内存值V、旧的预期值A、即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。
数据不一致问题
一个n++
的问题
public class Case {
public volatile int n;
public void add() {
n++;
}
}
通过javap -verbose Case
查看add方法的字节码指令
public void add();
flags: ACC_PUBLIC
Code:
stack=3, locals=1, args_size=1
0: aload_0
1: dup
2: getfield #2 // Field n:I
5: iconst_1
6: iadd
7: putfield #2 // Field n:I
10: return
可以看到n++
的操作分为以下三步
-
getfield
获得n的值 -
idd
进行加1操作 -
putfield
把累加后的值写回给n
变量n通过volatile修饰,可保证其在线程之间的可见性,但不能保证该3个指令的原子执行。例如:线程A、B同时操作n++
,n应该等于4,但结果为3。
解决办法
- 在
add
上添加synchronized
修饰解决。但性能较差 - 将n的类型由
int
改为AtomicInterger
原子变量
原子变量的实现
原子变量是如何保证数据的一致性?
答案:CAS
以AtomicInterger
为例,
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
// 变量偏移地址
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
public final int get() {
return value;
}
- Unsafe,是JDK中的一个内部类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。
- 变量valueOffset,表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。
- 变量value用volatile修饰,保证了多线程之间的内存可见性。
看看AtomicInteger
如何实现并发下的累加操作:
// delta:增加的值
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
调用unsafe.getAndAddInt方法
// unsafe.getAndAddInt
// var1:原子变量对象
// var2:变量值的偏移地址
// var4:增加的值
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
// 根据偏移地址var2,从对象var1中获得变量值
var5 = this.getIntVolatile(var1, var2);
// 比较上一步获得的变量值var5与当前内存中的变量值
// 若相同,则修改内存值var5+var4,否则继续循环
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
假设线程A和线程B同时执行getAndAdd操作(分别跑在不同CPU上):
-
AtomicInteger
里面的value原始值为3,即主内存中AtomicInteger
的value为3,根据Java内存模型,线程A和线程B各自持有一份value的副本,值为3。 - 线程A通过
getIntVolatile(var1, var2)
拿到value值3,这时线程A被挂起。 - 线程B也通过
getIntVolatile(var1, var2)
方法获取到value值3,运气好,线程B没有被挂起,并执行compareAndSwapInt方法比较内存值也为3,成功修改内存值为2。 - 这时线程A恢复,执行
compareAndSwapInt
方法比较,发现自己手里的值(3)和内存的值(2)不一致,说明该值已经被其它线程提前修改过了,那只能重新来一遍了。 - 重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行
compareAndSwapInt
进行比较替换,直到成功。
整个过程中,利用CAS保证了对于value的修改的并发安全
Unsafe类的CAS实现
继续深入看看Unsafe类中的compareAndSwapInt
方法实现。
public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);
Unsafe类中的compareAndSwapInt
,是一个本地方法,该方法的实现位于unsafe.cpp
中
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 1. 获得变量value在内存中的地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// 2. 通过Atomic::cmpxchg实现比较替换
// x是即将更新的值,e为原内存的值
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
下面再来看看操作系统中Atomic::cmpxchg
的实现
下面重点了解下LOCK_IF_MP
和LOCK前缀就行了,其他我也不懂
- Linux的x86实现:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP(); //判断是否是多处理器
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
- Windows的x86实现:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::isMP(); //判断是否是多处理器
_asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}
// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:
LOCK_IF_MP
根据当前系统是否为多核处理器决定是否为cmpxchg指令添加lock前缀。
- 如果是多处理器,为cmpxchg指令添加lock前缀。
- 反之,就省略lock前缀。(单处理器会不需要lock前缀提供的内存屏障效果)
intel手册对lock前缀的说明如下:
- 确保后续指令执行的原子性。
在Pentium及之前的处理器中,带有lock前缀的指令在执行期间会锁住总线,使得其它处理器暂时无法通过总线访问内存,很显然,这个开销很大。在新的处理器中,Intel使用缓存锁定来保证指令执行的原子性,缓存锁定将大大降低lock前缀指令的执行开销。 - 禁止该指令与前面和后面的读写指令重排序。
- 把写缓冲区的所有数据刷新到内存中。
上面的第2点和第3点所具有的内存屏障效果,保证了CAS同时具有volatile读和volatile写的内存语义。
CAS的缺点
CAS存在三大问题:ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作。其中ABA问题作为第三部分重点说明下。
循环时间长开销大
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,可以使用自旋CAS的方式来保证原子操作,但是对多个共享变量操作时,自旋CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。
ABA问题
ABA问题:CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会认为它的值没有发生变化,但是实际上却变化了。
一开始,我不明白ABA问题会有什么隐患,值没发生变化会有什么影响吗?
下面通过一个例子来说明ABA问题的隐患:
现有一个用单向链表实现的堆栈,栈顶为A,这时线程T1已经知道A.next为B,然后希望用CAS将栈顶替换为B:
head.compareAndSet(A,B);
在T1执行上面这条指令之前,线程T2介入,将A、B出栈,再push D、C、A,此时堆栈结构如下图,而对象B此时处于游离状态:
此时轮到线程T1执行CAS操作,检测发现栈顶仍为A,所以CAS成功,栈顶变为B,但实际上B.next为null,所以此时的情况变为:
其中堆栈中只有B一个元素,C和D组成的链表不再存在于堆栈中,平白无故就把C、D丢掉了。
以上就是由于ABA问题带来的隐患,各种乐观锁的实现中通常都会用版本戳version来对记录或对象标记,避免并发操作带来的问题,在Java中,
AtomicStampedReference<E>
也实现了这个作用,它通过包装[E,Integer]
的元组来对对象标记版本戳stamp,从而避免ABA问题。例如下面的代码分别用AtomicInteger和AtomicStampedReference来对初始值为100的原子整型变量进行更新,AtomicInteger会成功执行CAS操作,而加上版本戳的AtomicStampedReference对于ABA问题会执行CAS失败:
public class ABA {
private static AtomicInteger atomicInt = new AtomicInteger(100);
private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference(100, 0);
public static void main(String[] args) throws InterruptedException {
Thread intT1 = new Thread(new Runnable() {
public void run() {
atomicInt.compareAndSet(100, 101);
atomicInt.compareAndSet(101, 100);
}
});
Thread intT2 = new Thread(new Runnable() {
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
boolean c3 = atomicInt.compareAndSet(100, 101);
System.out.println(c3); // true
}
});
intT1.start();
intT2.start();
intT1.join();
intT2.join();
Thread refT1 = new Thread(new Runnable() {
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
}
});
Thread refT2 = new Thread(new Runnable() {
public void run() {
int stamp = atomicStampedRef.getStamp();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
System.out.println(c3); // false
}
});
refT1.start();
refT2.start();
}
}