线程安全容器CopyOnWrite

JDK1.5增加了并发编程包concurrent, 包括

  • 并发容器类CopyOnWrite系列,Concurrent系列,BlockingQueue系列
  • 原子操作类Atomic包(AtomicInteger, AtomicLong...)
  • synchronized外的锁机制lock
  • 线程安全工具:CountDownLatch,Semaphore..

这里看CopyOnWrite容器, 以CopyOnWriteArrayList为例, 先看看旧版本提供的线程安全的容器Vector和SynchronizedList.
Vector是从JDK1.0就提供的线程安全的容器类.
SynchronizedList是Collections工具类的静态内部类.

public static <T> List<T> synchronizedList(List<T> list) {
    //处判断是否实现了 RandomAccess 接口来分别处理理ArrayList和LinkedList
    return (list instanceof RandomAccess ?
            new SynchronizedRandomAccessList<>(list) :
            new SynchronizedList<>(list));
}

add()方法:

  • Vector使用同步方法实现
public void add(int index, E element) {
    insertElementAt(element, index);
}

public synchronized void insertElementAt(E obj, int index) {
    modCount++;
    if (index > elementCount) {
        throw new ArrayIndexOutOfBoundsException(index
                                                 + " > " + elementCount);
    }
    ensureCapacityHelper(elementCount + 1);
    System.arraycopy(elementData, index, elementData, index + 1, elementCount - index);
    elementData[index] = obj;
    elementCount++;
}
  • SynchronizedList使用同步代码块实现, 调用的还是容器自己的方法
 public void add(int index, E element) {
     synchronized (mutex) {list.add(index, element);}
 }

get(), remove()方法

  • Vector
public synchronized E get(int index) {
    if (index >= elementCount)
        throw new ArrayIndexOutOfBoundsException(index);
    return elementData(index);
}

public synchronized E remove(int index) {
    modCount++;
    if (index >= elementCount)
        throw new ArrayIndexOutOfBoundsException(index);
    E oldValue = elementData(index);
    int numMoved = elementCount - index - 1;
    if (numMoved > 0)
        System.arraycopy(elementData, index+1, elementData, index,
                         numMoved);
    elementData[--elementCount] = null; // Let gc do its work
    return oldValue;
}
  • SynchronizedList
public E get(int index) {
    synchronized (mutex) {return list.get(index);}
}

public E remove(int index) {
    synchronized (mutex) {return list.remove(index);}
}

可以看到,对于同步的实现,Vector通过同步方法,而SynchronizedList通过同步代码块调用的容器本身(ArrayList,LinkedList)的实现

但是在使用迭代器Iterator遍历Vector时,还是会出现并发修改异常java.util.ConcurrentModificationException

public class VectorTest {

    private static Vector<Integer> vector;

    public static void main(String[] args) {
        vector = new Vector<>();

//        singleThreadIterator();
        multiThreadIterator();
    }

    //单线程Iterator迭代时, 修改容器元素会抛出异常
    private static void singleThreadIterator() {
        for (int i = 0; i < 5; i++) {
            vector.add(i);
        }

        for (Integer integer : vector) {
            vector.remove(1);  //这里会抛出异常
        }
    }
    
    //多线程,如果一个线程在使用迭代器,这时候修改了容器元素,也会抛出异常
    private static void multiThreadIterator() {
       for (int i = 0; i < 5; i++) {
            vector.add(i);
        }
        //修改
        new Thread(() -> {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            vector.remove(3);
        }).start();

        //读
        new Thread(() -> {
            for (Integer integer : vector) {
                System.out.println(integer);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
}

测试 SynchronizedList 也会出现并发修改异常. 这是因为他们确实通过synchronize对读写删改做了同步, 但是迭代器并没有做同步的处理.

解决办法

    /**
     * 方式一: 使用Iteator提供的remove()方法
     */
    private static void fix1() {
        for (int i = 0; i < 5; i++) {
            vector.add(i);
        }
        Iterator<Integer> iterator = vector.iterator();
        while(iterator.hasNext()) {
            Integer next = iterator.next();
            if(next == 1){
                iterator.remove();
            }
        }
    }

    //方式二: 使用ListIterator的 add() 和set() 方法
    private static void fix2() {
        for (int i = 0; i < 5; i++) {
            vector.add(i);
        }

        Iterator<Integer> iterator = vector.listIterator();
        while(iterator.hasNext()) {
            Integer next = iterator.next();
            if(next == 1){
                ((ListIterator<Integer>) iterator).add(10);
            }
        }
    }

    //方式三: 在Iterator迭代的时候加锁
    private static void fix3() {
        for (int i = 0; i < 5; i++) {
            vector.add(i);
        }

        //修改
        new Thread(() -> {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            vector.remove(3);
        }).start();

        //读
        new Thread(() -> {
            synchronized (vector) {
                for (Integer integer : vector) {
                    System.out.println(integer);

                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

可以看到,解决Iterator迭代并发修改异常的方法:

  1. 使用Iteator提供的remove()方法, 或者ListIterator提供的remove(),add(),set()方法.
    2.多线程的情况,可以在Itreator迭代的时候手动给容器加锁,这样可以避免迭代的时候其他线程对容器的修改. 这样做的弊端是 其他线程必须等待他遍历完释放锁.

多线程时读取可能会耗费大量时间,别的线程只能synchronize结束才能操作容器类,效率不高. 对于这种情况, JDK提供了读写分离的容器类CopyOnWriteArrayList

CopyOnWriteArrayList

CopyOnWriteArrayList 是 在JDK1.5引入java.util.concurrent包,属于比较新的提供并发编程的容器类.
CopyOnWrite是一种写入时复制的机制,大概过程是 : 读取时都读取原始对象,写入时需要拷贝一份原始对象的副本,等写入完成替换原对象. 这样解决了读写锁在读取时不能写入的情况. 当然,多线程写入时还是需要锁机制来控制的

private transient volatile Object[] elements; //volatile保证可见性
  • 写入
    add(), get(), remove() 都需要更改原有的数据,都属于写操作.以add()为例
final transient ReentrantLock lock = new ReentrantLock();

public void add(int index, E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();  //尝试获取锁
    try {
        Object[] elements = getArray();
        int len = elements.length;
        if (index > len || index < 0)
            throw new IndexOutOfBoundsException("Index: "+index+
                                                ", Size: "+len);
        Object[] newElements;
        int numMoved = len - index;
        //拷贝一个新数组
        if (numMoved == 0)
            newElements = Arrays.copyOf(elements, len + 1);
        else {
            newElements = new Object[len + 1];
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index, newElements, index + 1,
                             numMoved);
        }
        //修改拷贝的数组
        newElements[index] = element;
        //将地址指向 修改完的数组
        setArray(newElements);
    } finally {
        lock.unlock();  //finally中释放lock
    }
}

可以看到,add()操作使用了lock来控制写操作的并发.同一个时间段只能有一个线程能拷贝并修改数组. remove()和set()同理也是使用lock来控制.

  • 读取
    写入操作的是array的副本, 对原始数据没有任何影响. 而get()操作就是直接读取 原始数组array的数据, 所以这里不需要锁机制.
public E get(int index) {
    return get(getArray(), index);
}

@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
    return (E) a[index];
}

并发修改

为什么for-each或者iterator不会报出并发修改异常呢?

//CopyOnWriteArrayList的iterator()方法
public Iterator<E> iterator() {
    return new COWIterator<E>(getArray(), 0);
}
final Object[] getArray() {
    return array;
}

//COWIterator的构造器迭代的snapshot, 就是传入的原数组array,而修改操作的都是array的副本拷贝,不存在并发修改的情况.
static final class COWIterator<E> implements ListIterator<E> {
    /** Snapshot of the array */
    private final Object[] snapshot;
    /** Index of element to be returned by subsequent call to next.  */
    private int cursor;
    private COWIterator(Object[] elements, int initialCursor) {
        cursor = initialCursor;
        snapshot = elements;
    }
}

看下CopyOnWriteArrayList#iterator()方法, 迭代器引用的是array, 而修改操作都是基于array的副本进行的. 不存在对array的并发修改情况.

总结

Vector , SynchronizedList 一个使用同步方法,一个使用同步块来实现多线程的操作. 但是在遍历迭代器时修改元素,还是会报 并发修改异常. 我们可以使用迭代器自身的 操作方法或者手动加锁来避免这个问题.
CopyOnWriteArrayList用过COW实现了读写分离, 但是也有以下的问题:

  1. 多个线程的写操作会创建多个array数组的副本.会造成内存的浪费.
    (这个数组还是动态扩容的,实际大小 > 元素的数量)
  2. 写加锁,读不加锁. 所以读的时候可能写入还没有完成.也就是说CopyOnWriteArrayList 能保证最终的一致性,不能保证实时的一致性.

Thanks

https://xxgblog.com/2016/04/02/traverse-list-thread-safe/#CopyOnWriteArrayList

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容