java 锁
源码看 github
java 中的锁,宏观上分为乐观锁和悲观锁
- 乐观锁:读多写少,都是基于CAS实现的,CAS是一种原子操作。
- 悲观锁: 写多,遇到并发写的可能性高,是采用AQS框架实现的。
乐观锁:
认为数据每次都不会被修改,但是在使用前都会进行对比,如果已经被修改,则重试,直到没有被修改为止。
其实现的机制是CAS,原子操作,它适用于多读操作;
悲观锁
认为每次数据都被修改,故需要在数据读写的时候都加上锁,这样就造成了别的线程的阻塞,直到锁被释放。java的互斥锁,synchronized就是一种悲观锁。其依靠AQS框架实现;
CAS
即compareAndSwap,它是一种原子性操作:它是依靠处理器提供的cmpxchg指令
完成的
cmpxchg是汇编指令 (百度百科):
- 作用:比较并交换操作数.
- 如:CMPXCHG r/m,r 将累加器AL/AX/EAX/RAX中的值与首操作数(目的操作数)比较,如果相等,第2操作数(源操作数)的值装载到首操作数,zf置1。如果不等,首操作>数的值装载到AL/AX/EAX/RAX并将zf清零.
- 该指令只能用于486及其后继机型。第2操作数(源操作数)只能用8位、16位或32位寄存器。第1操作数(目地操作数)则可用寄存器或任一种存储器寻址方式。
一般变量累加操作:
变量的自增操作i++,分三个步骤:
1. 从内存中读取出变量i的值
2. 将i的值加1
3. 将加1后的值写回内存
这说明 i++ 并不是一个原子操作。因为,它分成了三步,有可能当某个线程执行到了第②时被中断了,那么就意味着只执行了其中的两个步骤,没有全部执行。
下面一个例子:
package test;
import org.w3c.dom.css.Counter;
import java.util.concurrent.atomic.AtomicInteger;
class MyThread extends Thread{
public volatile static int cout; // 使用volatile关键字,说明其只有内存可见性但无原子性
public static int coutp;
public static AtomicInteger atomicInteger=new AtomicInteger(0); // 采用CAS实现
private static void addCout(){
for (int i = 0; i <100 ; i++) {
cout++;
coutp++;
atomicInteger.getAndIncrement();
}
System.out.print(Thread.currentThread().getName()+ " cout="+cout+" coutp="+ coutp+ " atomicInteger="+atomicInteger+"\n");
}
public void run(){
addCout();
}
}
public class Increase {
public static void main(String[] args) {
MyThread[] myThreads=new MyThread[100];
for (int i = 0; i <100 ; i++) {
myThreads[i]=new MyThread();
}
for (int i = 0; i < 100; i++) {
myThreads[i].start();
}
}
}
===result:
....
Thread-63 cout=9655 coutp=9674 atomicInteger=9700
Thread-60 cout=9755 coutp=9774 atomicInteger=9800
Thread-62 cout=9555 coutp=9574 atomicInteger=9600
Thread-61 cout=9855 coutp=9874 atomicInteger=9900
Thread-58 cout=9955 coutp=9974 atomicInteger=10000
CAS累加:
先看一下 AtomicInteger
的累加方法:
/**
* AtomicInteger,调用了unsafe类的方法
**/
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
/**
* Unsafe 类
**/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); // CAS循环操作
return var5;
}
public native int getIntVolatile(Object var1, long var2);
上面说到,它是依靠处理器提供的cmpxchg指令
完成的
CAS比较与交换的伪代码可以表示为:
do{
备份旧数据;
基于旧数据构造新数据;
}while(!CAS( 内存地址,备份的旧数据,新数据 ))
就是指当两者进行比较时,如果相等,则证明共享数据没有被修改,替换成新值,然后继续往下运行;如果不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作。容易看出 CAS 操作是基于共享数据不会被修改的假设,采用了类似于数据库的 commit-retry 的模式。当同步冲突出现的机会很少时,这种假设能带来较大的性能提升。(https://www.cnblogs.com/Mainz/p/3546347.html)。
CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会堵塞,而是被告知这次竞争中失败,并可以再次尝试;
当然,CAS也有一些问题:(https://my.oschina.net/xinxingegeya/blog/499223)
-
ABA
问题
ABA问题: 因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。从Java1.5开始JDK的atomic包里提供了一个类
AtomicStampedReference
来解决ABA问题。这个类的compareAndSet
方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference 预期引用
* @param newReference the new value for the reference 更新后的引用
* @param expectedStamp the expected value of the stamp 预期标志
* @param newStamp the new value for the stamp 更新后的标志
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
- 循环开销大
循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
- 只能保证一个共享变量
只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
自旋锁
了解了CAS,就来看一下自旋锁:自旋锁,基本作用与互斥锁类似,为线程或者进程之间的同步。但是对于互斥锁,如A线程获得了锁,B线程试图去获取该锁时,B线程会被阻塞。但是,如果两个线程资源竞争不是特别激烈,而处理器阻塞一个线程引起的线程上下文的切换的代价高于等待资源的代价的时候(锁的已保持者保持锁时间比较短),那么线程B可以不放弃CPU时间片,而是在“原地”忙等,直到锁的持有者释放了该锁,这就是自旋锁的原理,可见自旋锁是一种非阻塞锁。
一种实现方式如下:
package test;
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock {
private AtomicReference<Thread> sign=new AtomicReference<>();
public void lock(){
Thread current=Thread.currentThread();
//lock函数将current设置为当前线程,并且预测原来的值为空。unlock函数将current设置为null,并且预测值为当前线程。当有第二个线程调用lock操作时由于current值不为空,导致循环
//一直被执行,直至第一个线程调用unlock函数将current设置为null,第二个线程才能进入临界区。
while (!sign.compareAndSet(null,current)){
}
}
public void unlock(){
Thread current=Thread.currentThread();
//lock函数将current设置为当前线程,将current设置为null,第二个线程才能进入临界区。
sign.compareAndSet(current,null);
}
}
一个例子:
package test;
import java.util.concurrent.TimeUnit;
public class ThreadTest implements Runnable{
private static int sum;
private SpinLock lock;
public ThreadTest(SpinLock lock){
this.lock=lock;
}
public static void main(String[] args) throws InterruptedException {
SpinLock lock=new SpinLock();
for (int i = 0; i < 100; i++) {
ThreadTest threadTest=new ThreadTest(lock);
Thread t=new Thread(threadTest);
t.start();
}
TimeUnit.MILLISECONDS.sleep(200);
}
@Override
public void run() {
this.lock.lock();
sum++;
System.out.print(sum+" "+Thread.currentThread().getName()+"\n");
this.lock.unlock();
}
}
===RESULT
...
84 Thread-68
85 Thread-67
86 Thread-66
87 Thread-65
88 Thread-64
89 Thread-63
90 Thread-62
91 Thread-61
92 Thread-4
93 Thread-60
94 Thread-59
95 Thread-58
96 Thread-55
97 Thread-71
98 Thread-77
99 Thread-54
100 Thread-57
自旋锁存在的问题及改进
- 过多占用CPU时间:如果当前持有者长时间不释放该锁,则等待者将长时间的占据cpu时间片,导致资源浪费。因此,要设定最大自旋等待时间,若超过这个时间,等待者会放弃cpu时间片,进行阻塞;
- 死锁,即有一个线程两次试图获取自旋锁(如递归程序),第一次这个线程获得了该锁,当第二次试图加锁的时候,检测到锁已被占用(其实是被自己占用),那么这时,线程会一直等待自己释放该锁,而不能继续执行,这样就引起了死锁。这也说明,自旋锁是非可重入锁.因此递归程序使用自旋锁应该遵循以下原则:递归程序决不能在持有自旋锁时调用它自己,也决不能在递归调用时试图获得相同的自旋锁。
改进死锁,其引入线程计数器,可重入锁的原理也是如此:
package test;
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock {
private AtomicReference<Thread> sign=new AtomicReference<>();
private int count; // 增加计数器
public void lock(){
Thread current=Thread.currentThread();
if(current==sign.get()){ // 同一线程,获取锁,计数器加一
count++;
return;
}
while (!sign.compareAndSet(null,current)){
// 等待锁被释放
}
}
public void unlock(){
Thread current=Thread.currentThread();
if(current==sign.get()){
if(count>0)
count--; // 释放一个锁,该锁的计数器减一
else
sign.compareAndSet(current,null);
}
}
}
对比运行一下:
# 将ThreadTest 的run方法,重写为:
@Override
public void run() {
this.lock.lock();
this.lock.lock();
sum++;
System.out.print(sum+" "+Thread.currentThread().getName()+"\n");
this.lock.unlock();
this.lock.unlock();
}
改进前死锁,电脑直接卡死了。
而改进后,可重入运行OK