1.并发容器概述
- ConcurrentHashMap:线程安全的HashMap
- CopyOnWriteArrayList:线程安全的List
- BlockingQueue:这是一个接口,表示阻塞队列,非常适合用于作为数据共享的通道
- ConcurrentLinkedQueue:高效的非阻塞并发队列,使用链表实现。可以看做一个线程安全的LinkedList
- ConcurrentSkipListMap:是一个Map,使用跳表的数据结构进行快速查找
2.已淘汰并发容器
- Vector和Hashtable
3.ConcurrentHashMap
为什么HashMap是线程不安全的?
同时put碰撞导致数据丢失
同时put扩容导致数据丢失
死循环造成的CPU100%
1.7的ConcurrentHashMap实现和分析
- Java 7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,任然是数组和链表组成的拉链法
- 每个segment独立上ReentrantLock锁,每个segment之间互不影响,提高了并发效率
- ConcurrentHashMap默认有16个Segments,所以最多可以同时支持16个线程并发写(操作分别分布在不同Segment上)。这个默认值可以在初始化,一旦初始化以后,是不可以扩容的。
1.8的ConcurrentHashMap实现和分析
putVal流程
- 判断key value不为空
- 计算hash值
- 根据对应位置节点的类型,来赋值,或者helpTransfer,或者增长链表,或者给红黑树增加加点
- 检查瞒住阈值就“红黑树化”
- 返回oldVal
get流程
- 计算hash值
- 找到对应的位置,根据情况进行:
- 直接取值
- 红黑树里找值
- 遍历链表取值
- 返回找到的结果
组合操作并不保证线程安全
即get又set
public class OptionsNotSafe implements Runnable {
private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<String, Integer>();
public static void main(String[] args) throws InterruptedException {
scores.put("小明", 0);
Thread t1 = new Thread(new OptionsNotSafe());
Thread t2 = new Thread(new OptionsNotSafe());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(scores);
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
Integer score = scores.get("小明");
Integer newScore = score + 1;
scores.put("小明", newScore);
}
}
}
{小明=1019}
解决
public class OptionsNotSafe implements Runnable {
private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<String, Integer>();
public static void main(String[] args) throws InterruptedException {
scores.put("小明", 0);
Thread t1 = new Thread(new OptionsNotSafe());
Thread t2 = new Thread(new OptionsNotSafe());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(scores);
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
while (true) {
Integer score = scores.get("小明");
Integer newScore = score + 1;
boolean b = scores.replace("小明", score, newScore);
if (b) {
break;
}
}
}
}
}
{小明=2000}
4.CopyOnWriteArrayList
- 代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样
- Vector和SynchronizedList的锁的粒度太大,并发效率相对比较低,并且迭代时无法编辑
- CopyOnWrite并发容器还包括CopyOnWriteArraySet,用来替代同步Set
CopyOnWriteArrayList适用场景 - 读操作可以尽快地快,而写即使慢一些也没有太大关系
- 读多写少:黑名单,每日更新;监听器:迭代操作远多于修改操作
CopyOnWriteArrayList读写规则
回顾读写锁:读读共享、其他都互斥(写写互斥、读写互斥、写读互斥)
读写锁规则的升级:读取是完全不用加锁的,并且更厉害的是,写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
System.out.println("list is" + list);
String next = iterator.next();
System.out.println(next);
if (next.equals("2")) {
list.remove("5");
}
if (next.equals("3")) {
list.add("3 found");
}
}
}
}
list is[1, 2, 3, 4, 5]
1
list is[1, 2, 3, 4, 5]
2
list is[1, 2, 3, 4]
3
list is[1, 2, 3, 4, 3 found]
4
list is[1, 2, 3, 4, 3 found]
5
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(new Integer[]{1, 2, 3});
System.out.println(list);
Iterator<Integer> itr1 = list.iterator();
list.add(4);
System.out.println(list);
Iterator<Integer> itr2 = list.iterator();
itr1.forEachRemaining(System.out::println);
itr2.forEachRemaining(System.out::println);
}
}
[1, 2, 3]
[1, 2, 3, 4]
1
2
3
1
2
3
4
CopyOnWriteArrayList源码
add
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
remove
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}
get
private E get(Object[] a, int index) {
return (E) a[index];
}
CopyOnWrite的缺点
CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。
内存占用问题。
因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。
针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。
数据一致性问题。
CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。
CopyOnWriteArrayList为什么并发安全且性能比Vector好?
我知道Vector是增删改查方法都加了synchronized,保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于Vector,CopyOnWriteArrayList支持读多写少的并发情况。
5.并发队列Queue(阻塞队列、非阻塞队列)
为什么要使用队列?
- 用队列可以在线程间传递数据:生产者消费者模式、银行转账
- 考虑锁等线程安全问题的重任从“你”转移到了“队列”上
并发队列简介
常用队列
5.1ArrayBlockingQueue
- 有界
- 指定容量
- 公平:还可以指定是需要保证公平,如果想保证公平的话,那么等待了最长时间的线程会被优先处理,不过这会同时带来一定的性能损耗
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
Interviewer r1 = new Interviewer(queue);
Consumer r2 = new Consumer(queue);
new Thread(r1).start();
new Thread(r2).start();
}
}
class Interviewer implements Runnable {
BlockingQueue<String> queue;
public Interviewer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("10个候选人都来啦");
for (int i = 0; i < 10; i++) {
String candidate = "Candidate" + i;
try {
queue.put(candidate);
System.out.println("安排好了" + candidate);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
queue.put("stop");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
BlockingQueue<String> queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg;
try {
while(!(msg = queue.take()).equals("stop")){
System.out.println(msg + "到了");
}
System.out.println("所有候选人都结束了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
10个候选人都来啦
安排好了Candidate0
安排好了Candidate1
安排好了Candidate2
安排好了Candidate3
Candidate0到了
Candidate1到了
Candidate2到了
Candidate3到了
安排好了Candidate4
安排好了Candidate5
安排好了Candidate6
安排好了Candidate7
Candidate4到了
Candidate5到了
安排好了Candidate8
安排好了Candidate9
Candidate6到了
Candidate7到了
Candidate8到了
Candidate9到了
所有候选人都结束了
ArrayBlockingQueue源码
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
如果当前队列数据等于队里最大容量,就等待,否则,数据加入到队列。
5.2LinkedBlockingQueue
- 无界
- 容量Integer.MAX_VALUE
- 内部结构:Node、两把锁。分析put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
5.3 PriorityBlockingQueue
PriorityBlockingQueue是一个无界队列,它没有限制,在内存允许的情况下可以无限添加元素;它又是具有优先级的队列,是通过构造函数传入的对象来判断,传入的对象必须实现comparable接口。
- 支持优先级
- 自然顺序(而不是先进先出)
- 无界队列
- PriorityQueue的线程安全版本
public class Person implements Comparable<Person>{
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Person(int id, String name) {
super();
this.id = id;
this.name = name;
}
public Person() {
}
@Override
public String toString() {
return this.id + ":" + this.name;
}
@Override
public int compareTo(Person person) {
return this.id > person.getId() ? 1 : ( this.id < person.getId() ? -1 :0);
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Person> pbq = new PriorityBlockingQueue<>();
pbq.add(new Person(3,"person3"));
System.err.println("容器为:" + pbq);
pbq.add(new Person(2,"person2"));
System.err.println("容器为:" + pbq);
pbq.add(new Person(1,"person1"));
System.err.println("容器为:" + pbq);
pbq.add(new Person(4,"person4"));
System.err.println("容器为:" + pbq);
System.err.println("分割线----------------------------------------------------------------" );
System.err.println("获取元素 " + pbq.take().getId());
System.err.println("容器为:" + pbq);
System.err.println("分割线----------------------------------------------------------------" );
System.err.println("获取元素 " + pbq.take().getId());
System.err.println("容器为:" + pbq);
System.err.println("分割线----------------------------------------------------------------" );
System.err.println("获取元素 " + pbq.take().getId());
System.err.println("容器为:" + pbq);
System.err.println("分割线----------------------------------------------------------------" );
System.err.println("获取元素 " + pbq.take().getId());
System.err.println("容器为:" + pbq);
System.err.println("分割线----------------------------------------------------------------" );
}
}
容器为:[3:person3]
容器为:[2:person2, 3:person3]
容器为:[1:person1, 3:person3, 2:person2]
容器为:[1:person1, 3:person3, 2:person2, 4:person4]
分割线----------------------------------------------------------------
获取元素 1
容器为:[2:person2, 3:person3, 4:person4]
分割线----------------------------------------------------------------
获取元素 2
容器为:[3:person3, 4:person4]
分割线----------------------------------------------------------------
获取元素 3
容器为:[4:person4]
分割线----------------------------------------------------------------
获取元素 4
容器为:[]
分割线----------------------------------------------------------------
PriorityBlockingQueue源码
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
5.4 SynchronousQueue
- 它的容量为0
- 需要注意的是,SynchronousQueue的容量不是1而是0,因为SynchronousQueue不需要去持有元素,它所做的就是直接传递(direct handoff)
- 效率很高
public class SynchronousQueueDemo {
static class SynchronousQueueProducer implements Runnable {
protected BlockingQueue<String> blockingQueue;
final Random random = new Random();
public SynchronousQueueProducer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = UUID.randomUUID().toString();
System.out.println("Put: " + data);
blockingQueue.put(data);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class SynchronousQueueConsumer implements Runnable {
protected BlockingQueue<String> blockingQueue;
public SynchronousQueueConsumer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = blockingQueue.take();
System.out.println(Thread.currentThread().getName()
+ " take(): " + data);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
synchronousQueue);
new Thread(queueProducer).start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
synchronousQueue);
new Thread(queueConsumer1).start();
SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
synchronousQueue);
new Thread(queueConsumer2).start();
}
}
Put: 1a26d098-cf95-4419-a8d6-892b12e10c0e
Thread-2 take(): 1a26d098-cf95-4419-a8d6-892b12e10c0e
Put: 072c059f-cc6a-48d9-bfad-aa7f867b2f23
Thread-1 take(): 072c059f-cc6a-48d9-bfad-aa7f867b2f23
Put: ced8b199-9dc6-479b-97e2-1ef811e21cba
Thread-2 take(): ced8b199-9dc6-479b-97e2-1ef811e21cba
Put: 368071a7-3301-4b9e-8634-b480aff2de47
Thread-1 take(): 368071a7-3301-4b9e-8634-b480aff2de47
Put: cd9c6d2b-c3f7-4603-a411-6ba44eca09b5
Thread-2 take(): cd9c6d2b-c3f7-4603-a411-6ba44eca09b5
......
插入数据的线程和获取数据的线程,交替执行
源码解析
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
应用场景
Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。
SynchronousQueue注意点
- SynchronousQueue没有peek等函数,因为peek的含义是取出头结点,但是SynchronousQueue的容量是0,所以连头结点都没有,也就是没有peek方法。同理,没有iterate相关方法
- 是一个极好的用来直接传递的并发数据结构
5.5 DelayQueue
java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。
延时队列实现了Iterator接口,但iterator()遍历顺序不保证是元素的实际存放顺序。
DelayedQuene的优先级队列使用的排序方式是队列元素的compareTo方法,优先级队列存放顺序是从小到大的,所以队列元素的compareTo方法影响了队列的出队顺序。
若compareTo方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。
public class DelayedQueneDemo {
public static void main(String[] args) throws InterruptedException {
Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
Item item2 = new Item("item2", 10, TimeUnit.SECONDS);
Item item3 = new Item("item3", 15, TimeUnit.SECONDS);
DelayQueue<Item> queue = new DelayQueue<>();
queue.put(item1);
queue.put(item2);
queue.put(item3);
System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
for (int i = 0; i < 3; i++) {
Item take = queue.take();
System.out.format("name:{%s}, time:{%s}\n", take.name, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
}
}
}
class Item implements Delayed {
/* 触发时间*/
private long time;
String name;
public Item(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Item item = (Item) o;
long diff = this.time - item.time;
if (diff <= 0) {// 改成>=会造成问题
return -1;
} else {
return 1;
}
}
@Override
public String toString() {
return "Item{" +
"time=" + time +
", name='" + name + '\'' +
'}';
}
}
begin time:2020-03-11T11:19:00.706
name:{item1}, time:{2020-03-11T11:19:05.659}
name:{item2}, time:{2020-03-11T11:19:10.659}
name:{item3}, time:{2020-03-11T11:19:15.659}
5.6 非阻塞并发队列
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。