只谈谈,不全覆盖
CAS
悲观锁,假设执行当前区域代码都会产生冲突,为了解决冲突,线程A
获取锁时,其它线程会被阻塞处于停顿状态,直到锁释放(可能会造成死锁)。CAS
概念属于一种乐观锁的策略(或者称为无锁),假设执行当前区域代码都不会发生冲突,不会发生冲突就自然没有线程阻塞,也不会产生死锁问题。如果出现了冲突,CAS(compare and swap) 比较和替换
就会不断的去重试,直到当前操作没有冲突。
CAS 的 ABA 问题
- 线程1从内存
M
位置取出A
- 线程2从内存
M
位置取出A
- 线程2做了预期值比较,将
A
替换为B
并放到M
位置 - 线程2从内存
M
位置取出B
- 线程2做了预期值比较,将
B
替换为A
并放到M
位置 - 线程1做了预期值比较,将
A
替换为C
并放到M
位置
此时线程1影响了线程2的状态,也就发生了ABA
的问题。所以为了解决乐观锁并发时造成的ABA
问题,都会使用AtomicStampedReference 类
或者AtomicMarkableReference 类
。
volatile
volatile
从主内存中加载到线程工作内存中的值是最新的。也就是说它解决的是多线程并发变量读时的可见性问题,但无法保证访问变量的原子性。而且volatile
只能修饰变量。
原子基本类型
-
AtomicBoolean
保证布尔值的原子性 -
AtomicInteger
保证整型的原子性 -
AtomicLong
保证长整型的原子性
原子数组
-
AtomicIntegerArray
保证整型数组的原子性 -
AtomicLongArray
保证长整型数组原子性
原子字段
-
AtomicIntegerFieldUpdater
保证整型的字段更新 -
AtomicLongFieldUpdater
保证长整型的字段更新
使用原子字段类时,所声明的字段类型必须为
volatile
使用方法如下:
private int sum = 100;
private volatile int sum1 = 100;
// 当 sum 或 sum1 为100 时只允许有一个线程进入
private void atomic4() {
AtomicIntegerFieldUpdater<T> tAtomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(T.class, "sum1");
T t = new T();
for (int i = 0; i < 10; i++) {
singleThreadPool.execute(() -> {
if (sum == 100) {
System.out.println(Thread.currentThread().getName() + " :" + "已修改");
}
});
}
System.out.println("=====");
for (int i = 0; i < 10; i++) {
singleThreadPool.execute(() -> {
if(tAtomicIntegerFieldUpdater.compareAndSet(t, 100, 120)){
System.out.print(Thread.currentThread().getName() + " :" + "tAtomicIntegerFieldUpdater 已修改");
}
});
}
singleThreadPool.shutdown();
}
原子引用类型
-
AtomicReferenceArray
保证引用数组的原子性 -
AtomicReferenceFieldUpdater
保证引用类型的字段更新 -
AtomicStampedReference
可以解决CAS
的ABA
问题,类似提供版本号 -
AtomicMarkableReference
可以解决CAS
的ABA
问题,提供是或否进行判断
原子累加器 JDK 1.8 新增
原有的
Atomic
系列类通过CAS
来保证并发时操作的原子性,但是高并发也就意味着CAS
的失败次数会增多,失败次数的增多会引起更多线程的重试,最后导致AtomicLong
的效率降低。新的四个类通过减少并发,将单一value
的更新压力分担到多个value
中去,降低单个value
的“热度”以提高高并发情况下的吞吐量
DoubleAccumulator
DoubleAdder
LongAccumulator
LongAdder
实例应用
- 只贴了代码片段。验证累加器、整型、数组的原子性
package concurrent;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 张博
*/
public class ThreadFactoryBuilder implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
ThreadFactoryBuilder() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder();
private static ExecutorService singleThreadPool = new ThreadPoolExecutor(1000, 5000,
10, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
// 阻塞主线程,循环5000次后通知主线程关闭
private CountDownLatch begin = new CountDownLatch(5000);
// 模拟并发量一次200
private Semaphore semaphore = new Semaphore(200);
private static int count = 0;
private void atomic7() {
DoubleBinaryOperator doubleBinaryOperator = (x, y) -> x + y;
DoubleAccumulator doubleAccumulator = new DoubleAccumulator(doubleBinaryOperator, count);
AtomicInteger atomicInteger = new AtomicInteger(0);
int[] ints = new int[5000];
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5000);
for (int i = 0; i < 5000; i++) {
singleThreadPool.execute(() -> {
try {
// 允许200个线程进入,模拟提供稳定并发量
semaphore.acquire();
count();
atomicCount(doubleAccumulator);
atomicIntegerCount(atomicInteger);
array(ints);
atomicArray(atomicIntegerArray);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 每执行一次减1
begin.countDown();
});
}
try {
// 没到0一直等待,直到模拟并发结束
begin.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
singleThreadPool.shutdown();
System.out.println(count);
System.out.println(doubleAccumulator.get());
System.out.println(atomicInteger.get());
System.out.println(Arrays.toString(ints));
System.out.println("=========================================================================================================");
System.out.println(atomicIntegerArray.toString());
for (int i = 0; i < atomicIntegerArray.length(); i++) {
if ((atomicIntegerArray.get(i) - 5000) != 0) {
System.out.println(atomicIntegerArray.get(i));
}
}
}
/**
* 时间:2018/8/3 上午11:48
* @apiNote 线程不安全的累加
*/
private void count() {
count++;
}
/**
* 时间:2018/8/3 上午11:48
* @apiNote 原子累加
*/
private void atomicCount(DoubleAccumulator doubleAccumulator) {
doubleAccumulator.accumulate(1);
}
/**
* 时间:2018/8/3 上午11:48
* @apiNote 原子的i++
*/
private void atomicIntegerCount(AtomicInteger atomicInteger) {
atomicInteger.incrementAndGet();
}
/**
* 时间:2018/8/3 上午11:48
* @apiNote 线程不安全的数组操作
*/
private void array(int[] ints) {
for(int k = 0; k < 5000; k++) {
ints[k] += 1;
}
}
/**
* 时间:2018/8/3 上午11:48
* @apiNote 原子的数组操作
*/
private void atomicArray(AtomicIntegerArray atomicIntegerArray) {
for(int k = 0; k < 5000; k++) {
atomicIntegerArray.addAndGet(k, 1);
}
}
使用AtomicStampedReference
解决CAS
的ABA
问题
/**
* 时间:2018/8/3 上午11:58
* @apiNote 模拟并发导致 CAS 的 ABA 问题
*/
private void aba() {
// 原子引用类型
AtomicReference<String> stringAtomicReference = new AtomicReference<>("A");
// 原子时间戳引用
AtomicStampedReference<String> stampedReference = new AtomicStampedReference<>("A", 0);
// 线程1
singleThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " : -> stringAtomicReference " + stringAtomicReference.compareAndSet("A", "B"));
System.out.println(Thread.currentThread().getName() + " : -> stringAtomicReference " + stringAtomicReference.compareAndSet("B", "A"));
System.out.println("=====");
});
// 线程2
singleThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " : -> stringAtomicReference " + stringAtomicReference.compareAndSet("A", "C"));
System.out.println("=====");
});
// 线程3
singleThreadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " :" + stampedReference.compareAndSet("A", "B", stampedReference.getStamp(), stampedReference.getStamp() + 1));
System.out.println(Thread.currentThread().getName() + " :" + stampedReference.compareAndSet("B", "A", stampedReference.getStamp(), stampedReference.getStamp() + 1));
System.out.println("=====");
});
// 线程4
singleThreadPool.execute(() -> {
// 模拟并发时与线程3同时得到内存中的A的时间戳
int stamp = stampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + " : 线程4 处理 cas 之前" + stamp);
// 线程4休眠2秒,模拟让线程3已经操作完成 A -> B -> A 的 CAS
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 得到线程3操作完的时间戳
System.out.println(Thread.currentThread().getName() + " :" + stampedReference.getStamp());
// 线程4进行 A -> C 的 CAS 操作。这时会失败。解决 ABA 问题
System.out.println(Thread.currentThread().getName() + " :" + stampedReference.compareAndSet("A", "C", stamp, stamp + 1));
});
singleThreadPool.shutdown();
}