JDK1.5增加了并发编程包concurrent, 包括
- 并发容器类CopyOnWrite系列,Concurrent系列,BlockingQueue系列
- 原子操作类Atomic包(AtomicInteger, AtomicLong...)
- synchronized外的锁机制lock
- 线程安全工具:CountDownLatch,Semaphore..
这里看CopyOnWrite容器, 以CopyOnWriteArrayList为例, 先看看旧版本提供的线程安全的容器Vector和SynchronizedList.
Vector是从JDK1.0就提供的线程安全的容器类.
SynchronizedList是Collections工具类的静态内部类.
public static <T> List<T> synchronizedList(List<T> list) {
//处判断是否实现了 RandomAccess 接口来分别处理理ArrayList和LinkedList
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
add()方法:
- Vector使用同步方法实现
public void add(int index, E element) {
insertElementAt(element, index);
}
public synchronized void insertElementAt(E obj, int index) {
modCount++;
if (index > elementCount) {
throw new ArrayIndexOutOfBoundsException(index
+ " > " + elementCount);
}
ensureCapacityHelper(elementCount + 1);
System.arraycopy(elementData, index, elementData, index + 1, elementCount - index);
elementData[index] = obj;
elementCount++;
}
- SynchronizedList使用同步代码块实现, 调用的还是容器自己的方法
public void add(int index, E element) {
synchronized (mutex) {list.add(index, element);}
}
get(), remove()方法
- Vector
public synchronized E get(int index) {
if (index >= elementCount)
throw new ArrayIndexOutOfBoundsException(index);
return elementData(index);
}
public synchronized E remove(int index) {
modCount++;
if (index >= elementCount)
throw new ArrayIndexOutOfBoundsException(index);
E oldValue = elementData(index);
int numMoved = elementCount - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
elementData[--elementCount] = null; // Let gc do its work
return oldValue;
}
- SynchronizedList
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
public E remove(int index) {
synchronized (mutex) {return list.remove(index);}
}
可以看到,对于同步的实现,Vector通过同步方法,而SynchronizedList通过同步代码块调用的容器本身(ArrayList,LinkedList)的实现
但是在使用迭代器Iterator遍历Vector时,还是会出现并发修改异常java.util.ConcurrentModificationException
public class VectorTest {
private static Vector<Integer> vector;
public static void main(String[] args) {
vector = new Vector<>();
// singleThreadIterator();
multiThreadIterator();
}
//单线程Iterator迭代时, 修改容器元素会抛出异常
private static void singleThreadIterator() {
for (int i = 0; i < 5; i++) {
vector.add(i);
}
for (Integer integer : vector) {
vector.remove(1); //这里会抛出异常
}
}
//多线程,如果一个线程在使用迭代器,这时候修改了容器元素,也会抛出异常
private static void multiThreadIterator() {
for (int i = 0; i < 5; i++) {
vector.add(i);
}
//修改
new Thread(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
vector.remove(3);
}).start();
//读
new Thread(() -> {
for (Integer integer : vector) {
System.out.println(integer);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
测试 SynchronizedList 也会出现并发修改异常. 这是因为他们确实通过synchronize对读写删改做了同步, 但是迭代器并没有做同步的处理.
解决办法
/**
* 方式一: 使用Iteator提供的remove()方法
*/
private static void fix1() {
for (int i = 0; i < 5; i++) {
vector.add(i);
}
Iterator<Integer> iterator = vector.iterator();
while(iterator.hasNext()) {
Integer next = iterator.next();
if(next == 1){
iterator.remove();
}
}
}
//方式二: 使用ListIterator的 add() 和set() 方法
private static void fix2() {
for (int i = 0; i < 5; i++) {
vector.add(i);
}
Iterator<Integer> iterator = vector.listIterator();
while(iterator.hasNext()) {
Integer next = iterator.next();
if(next == 1){
((ListIterator<Integer>) iterator).add(10);
}
}
}
//方式三: 在Iterator迭代的时候加锁
private static void fix3() {
for (int i = 0; i < 5; i++) {
vector.add(i);
}
//修改
new Thread(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
vector.remove(3);
}).start();
//读
new Thread(() -> {
synchronized (vector) {
for (Integer integer : vector) {
System.out.println(integer);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
可以看到,解决Iterator迭代并发修改异常的方法:
- 使用Iteator提供的
remove()
方法, 或者ListIterator提供的remove(),add(),set()
方法.
2.多线程的情况,可以在Itreator迭代的时候手动给容器加锁,这样可以避免迭代的时候其他线程对容器的修改. 这样做的弊端是 其他线程必须等待他遍历完释放锁.
多线程时读取可能会耗费大量时间,别的线程只能synchronize结束才能操作容器类,效率不高. 对于这种情况, JDK提供了读写分离的容器类CopyOnWriteArrayList
CopyOnWriteArrayList
CopyOnWriteArrayList 是 在JDK1.5引入java.util.concurrent
包,属于比较新的提供并发编程的容器类.
CopyOnWrite是一种写入时复制的机制,大概过程是 : 读取时都读取原始对象,写入时需要拷贝一份原始对象的副本,等写入完成替换原对象. 这样解决了读写锁在读取时不能写入的情况. 当然,多线程写入时还是需要锁机制来控制的
private transient volatile Object[] elements; //volatile保证可见性
- 写入
add(), get(), remove() 都需要更改原有的数据,都属于写操作.以add()为例
final transient ReentrantLock lock = new ReentrantLock();
public void add(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock(); //尝试获取锁
try {
Object[] elements = getArray();
int len = elements.length;
if (index > len || index < 0)
throw new IndexOutOfBoundsException("Index: "+index+
", Size: "+len);
Object[] newElements;
int numMoved = len - index;
//拷贝一个新数组
if (numMoved == 0)
newElements = Arrays.copyOf(elements, len + 1);
else {
newElements = new Object[len + 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index, newElements, index + 1,
numMoved);
}
//修改拷贝的数组
newElements[index] = element;
//将地址指向 修改完的数组
setArray(newElements);
} finally {
lock.unlock(); //finally中释放lock
}
}
可以看到,add()操作使用了lock来控制写操作的并发.同一个时间段只能有一个线程能拷贝并修改数组. remove()和set()同理也是使用lock来控制.
- 读取
写入操作的是array
的副本, 对原始数据没有任何影响. 而get()操作就是直接读取 原始数组array
的数据, 所以这里不需要锁机制.
public E get(int index) {
return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
并发修改
为什么for-each或者iterator不会报出并发修改异常呢?
//CopyOnWriteArrayList的iterator()方法
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
final Object[] getArray() {
return array;
}
//COWIterator的构造器迭代的snapshot, 就是传入的原数组array,而修改操作的都是array的副本拷贝,不存在并发修改的情况.
static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next. */
private int cursor;
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
}
看下CopyOnWriteArrayList#iterator()
方法, 迭代器引用的是array
, 而修改操作都是基于array
的副本进行的. 不存在对array
的并发修改情况.
总结
Vector , SynchronizedList 一个使用同步方法,一个使用同步块来实现多线程的操作. 但是在遍历迭代器时修改元素,还是会报 并发修改异常. 我们可以使用迭代器自身的 操作方法或者手动加锁来避免这个问题.
CopyOnWriteArrayList用过COW实现了读写分离, 但是也有以下的问题:
- 多个线程的写操作会创建多个
array
数组的副本.会造成内存的浪费.
(这个数组还是动态扩容的,实际大小 > 元素的数量) - 写加锁,读不加锁. 所以读的时候可能写入还没有完成.也就是说CopyOnWriteArrayList 能保证最终的一致性,不能保证实时的一致性.
Thanks
https://xxgblog.com/2016/04/02/traverse-list-thread-safe/#CopyOnWriteArrayList