1.简介
我们程序常常在并发场景下使用,所以我们常常使用支持并发的数据结构,在java中有很多自带的java支持高并发的数据结构,在java.util.concurrent包下,基本由Doug Lea 编写。这次我们自己实现一个无锁高并发队列。
2. 基本原理
基本原理还是使用java.concurrent.atomic 包下的原子类,结合最终一致性即可
3. 实现
package liusheng.main;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
/**
* 使用的是原子类
*
* @param <E>
*/
public class ConcurrentQueue<E> {
static class Node<E> {
E e;
volatile Node<E> next;
public Node(E e) {
this.e = e;
}
}
AtomicReference<Node<E>> head = new AtomicReference<>();
AtomicReference<Node<E>> last = new AtomicReference<>();
AtomicInteger size = new AtomicInteger();
public ConcurrentQueue() {
Node<E> node = new Node<E>(null);
head.set(node);
last.set(node);
}
public void offer(E e) {
Node<E> node = new Node<>(e);
Node<E> pre = null;
do {
pre = last.get();
} while (!last.compareAndSet(pre, node));
pre.next = node;
size.getAndIncrement();
}
public E poll() {
Node<E> node, node1;
do {
node = head.get();
node1 = node.next;
} while (node1 != null && !head.compareAndSet(node, node1));
if (node1 != null) {
size.decrementAndGet();
return node1.e;
}
return null;
}
public static void main(String[] args) throws InterruptedException {
while (true) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
ConcurrentQueue<Long> queue = new ConcurrentQueue<>();
IntStream.range(0, 10).boxed().map(i -> (Runnable) () -> {
for (int j = 0; j < 1000; j++) {
queue.offer(System.currentTimeMillis());
}
}).forEach(executorService::execute);
AtomicInteger a = new AtomicInteger();
IntStream.range(0, 10).boxed().map(i -> (Runnable) () -> {
for (int j = 0; j < 1000; j++) {
if (queue.poll() != null) {
a.getAndIncrement();
}
}
}).forEach(executorService::execute);
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.DAYS);
// System.out.println(a.get() + queue.size.get());
assert a.get() + queue.size.get() == 10000;
}
}
}
4 总结
我们自己实现了一个无锁高并发的队列,可以加深我们对原子类的理解和使用,同时我们在以后看别人的实现,也容易了很多,也加深的了我对多线程的理解。有什么错误的地方望大佬指正