lock版生产者和消费者代码示例
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerWithLock {
private static final int CAPACITY = 5;
private final Queue<Integer> queue = new LinkedList<>();
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public void produce(int item) throws InterruptedException {
lock.lock();
try {
//生产者在队列满时等待
while (queue.size() == CAPACITY) {
System.out.println("Queue is full, producer is waiting...");
notFull.await();
}
queue.offer(item);
System.out.println("Produced: " + item);
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
int item;
lock.lock();
try {
//消费者在队列空时等待
while (queue.isEmpty()) {
System.out.println("Queue is empty, consumer is waiting...");
notEmpty.await();
}
item = queue.poll();
System.out.println("Consumed: " + item);
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProducerConsumerWithLock pc = new ProducerConsumerWithLock();
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
pc.produce(i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
pc.consume();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
- 使用
显式的锁
和条件变量
(Condition)可以降低
虚拟唤醒的概率 - 在使用条件(Condition)时,通常一个条件对象代表一种
特定
的等待/通知机制。使用两个不同的条件对象
(notFull 和 notEmpty)是因为它们分别代表了不同的等待/通知条件。可以更准确
地控制线程的等待和唤醒操作
假设使用同一个条件对象,则无法区分是等待队列为空(notEmpty)还是等待队列为满(notFull),这样在调用signal()方法时就无法正确地通知对应的等待线程。因此,为确保能够正确地控制不同的等待条件,一般会使用不同的条件对象来表示不同的等待条件。
改写:使用flag实现判断
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean isFull = false;
public void produce() throws InterruptedException {
lock.lock();
try {
while (isFull) {
condition.await(); // 等待队列不满
}
// 生产物品的逻辑
isFull = true;
condition.signal(); // 通知消费者队列已满
} finally {
lock.unlock();
}
}
public void consume() throws InterruptedException {
lock.lock();
try {
while (!isFull) {
condition.await(); // 等待队列已满
}
// 消费物品的逻辑
isFull = false;
condition.signal(); // 通知生产者队列不满
} finally {
lock.unlock();
}
}
}
ReentrantLock来实现线程同步与synchornized区别
灵活性:
ReentrantLock
相比synchronized
更加灵活。它提供了额外的功能,如公平锁、可中断的锁等。而synchronized
是Java内置的关键字,功能相对较简单。尝试获取锁:
ReentrantLock
提供了tryLock()
方法,该方法尝试获取锁,如果锁定已被其他线程占用,它会立即返回结果,而不是进行阻塞。通过检查返回值,可以灵活地选择下一步的操作。而synchronized
关键字只能等待锁的释放,无法进行非阻塞的尝试。条件变量:
ReentrantLock
通过Condition
接口提供了更强大的条件变量功能。使用Condition
可以实现精确的等待和唤醒机制,提供更灵活的线程交互。而synchronized
只能使用wait()
和notify()
,功能较为简单。性能: 一般情况下,
synchronized
关键字比ReentrantLock
的性能稍好。synchronized
是在JVM层面进行优化的,而ReentrantLock
是使用Java的Lock接口实现的,会涉及到额外的方法调用。但是,在高并发的情况下,ReentrantLock
的性能可能会优于synchronized
,因为它提供了更高级的同步策略和更细粒度的控制。
可重入锁
public class Lock {
private boolean isLocked = false;
public synchronized void lock() throws InterruptedException {
while (isLocked) {
wait(); // 如果锁已被锁定,则等待
}
isLocked = true; // 锁定资源
}
public synchronized void unlock() {
isLocked = false; // 释放资源
notify(); // 通知等待队列中的一个线程可以获取锁
}
}
public class Lock {
// 锁状态
boolean isLocked = false;
// 持有锁的线程
Thread lockedBy = null;
// 锁计数器
int lockedCount = 0;
// 获取锁操作
public synchronized void lock() throws InterruptedException {
Thread thread = Thread.currentThread();
// 如果锁已经被其他线程持有,并且当前线程不是持有锁的线程,则进入等待
while (isLocked && lockedBy != thread) {
wait();
}
// 获取锁成功,更新状态
isLocked = true;
lockedCount++;
lockedBy = thread;
}
// 释放锁操作
public synchronized void unlock() {
// 判断当前线程是否持有锁
if (Thread.currentThread() == this.lockedBy) {
lockedCount--;
// 如果计数器为 0,表示锁不被持有,释放锁并唤醒其他等待线程
if (lockedCount == 0) {
isLocked = false;
notify();
}
}
}
}
可重入锁的设计在于
加锁时,不光是标识位
判断有没有锁
还需要判断是不是当前
线程的锁
如果已经加锁且不是当前线程加的锁的情况下才执行wait
加锁和解锁是相配对的加了几次锁就应该解锁几次
解锁的时候也是双重判断
先判断带锁的是不是当前线程
是不是加锁线程,再判断锁的次数
是否为0
如果不是当前线程就不能够解锁,因为如果所有线程都能够解锁那么就相当于每个线程都有一把钥匙。
思考:下面示例会出现什么问题
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class ListTest {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
ArrayList并不是线程安全的数据结构,因此可能会引发并发访问的问题。这会导致以下潜在问题:
ConcurrentModificationException异常: 当一个线程在遍历ArrayList的同时,另一个线程修改了ArrayList的结构(增加或删除元素)时,就会出现ConcurrentModificationException异常(补充:写写互斥、写读互斥、读读不互斥)
解决方案:
-
使用线程安全的集合类: 可以使用
Collections.synchronizedList(new ArrayList<>())
或CopyOnWriteArrayList
等线程安全的集合类来代替ArrayList。 -
加锁同步: 在对ArrayList进行操作时,使用
ReentrantLock
或synchronized
关键字进行加锁,确保在同一时刻只有一个线程在修改ArrayList。 -
使用并发集合类: 可以考虑使用
ConcurrentLinkedQueue
等并发集合类,它们提供了高效的线程安全操作。
Vector
是一个线程安全的动态数组,它通过使用独占锁(synchronized)来实现线程安全。Vector
在每个公共方法上都使用了 synchronized
关键字,以确保在多线程环境下操作 Vector
的安全性。
举个例子,Vector
的 add
方法的源码如下所示:
public synchronized boolean add(E e) {
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
在这个方法中,关键字 synchronized
就是独占锁
的体现,它确保在同一时刻只有一个线程可以执行这个方法,从而避免了多线程环境下的竞态条件问题。
另外,如果使用 Collections.synchronizedList(new ArrayList<>())
来创建一个线程安全的 List,实际上是通过封装 ArrayList
并使用 synchronized
关键字来实现线程安全的。下面是一个示例:
List<String> synchronizedList = Collections.synchronizedList(new ArrayList<>());
在这种情况下,synchronizedList
中的每个操作都会被 synchronized
关键字保护,以确保线程安全性。需要注意的是,虽然这种方式确保了线程安全,但在高并发情况下性能可能不如专门设计的线程安全容器(如 ConcurrentHashMap
、CopyOnWriteArrayList
等)。
解释:因为当一个线程持有锁时,其他线程就无法进行操作,需要等待当前线程释放锁才能继续执行,这样就会导致一定程度上的性能下降。相比之下,像ConcurrentHashMap
、CopyOnWriteArrayList
等专门设计的线程安全容器在高并发环境下通常拥有更好的性能表现。这些容器采用了更加高效的并发控制机制,允许多个线程同时对容器进行 ,从而提高了并发访问的效率。
CopyOnWriteArrayList
是 Java 中的一个线程安全的并发集合,它可以在不使用的情况下实现线程安全。
实现原理是“写时复制”,即在修改集合时,它会创建一个新的数组来存储修改后的数据,而不是直接修改原始数组。这种方式可以保证在并发环境下不会发生数据不一致的情况(解决了ConcurrentModificationException异常)
具体实现原理如下
- 初始化时,
CopyOnWriteArrayList
内部使用一个普通的数组来存储数据。 - 当需要修改集合时,会先将原始数组复制一份,并在新的数组上进行修改。
- 修改完成后,将新数组替换原始数组,保证其他线程仍然可以读取到原始数据。
- 当其他线程需要修改集合时,需要重新复制一份新的数组,并在新数组上进行修改,以保证线程安全。
-
总结:修改时复制新数组对新数组修改,修改完后将旧数组指向新数组;读取时读取旧数组。
需要注意的是,CopyOnWriteArrayList 的迭代器是基于快照(snapshot)的,即它不会反映在迭代器创建之后对集合的修改。如果想要反映最新的修改,需要重新获取一个新的迭代器
解释:迭代器(Iterator)是 Java 中用于遍历集合(Collection)元素的接口。它提供了 hasNext() 和 next() 等方法,可以逐个访问集合中的元素,直到遍历完所有元素为止。这个就是说,在你重新获取了一个新的迭代器后,该迭代器与之前的迭代器不再是同一个,它们对应的是不同的。因此,当你遍历新的迭代器时,它能反映集合最新的修改。
CopyOnWriteArrayList
更加适用于读多写少的场景
-
思考:为什么是读多写少?写操作越多需要拷贝出新数组的次数越多,就越影响性能
在ConcurrentHashMap
的内部结构中,每个Segment
包含一个数组,这个数组的每个元素都是一个 HashEntry
。这样的设计使得ConcurrentHashMap
能够更好地支持并发操作。
在传统的HashMap
中,所有的键值对都存储在同一个数组中,当多个线程同时访问HashMap
时,可能会导致线程安全问题。为了解决并发访问的问题,ConcurrentHashMap
引入了 Segment
的概念,即将整个 HashMap
拆分为多个 Segment
,每个Segment
相当于一个小的独立的 HashMap
,可以独立地加锁以减小锁的粒度。
每个 Segment
内部存储着若干个HashEntry
,这些 HashEntry
实际上就是键值对的存储单元。每个 HashEntry
包含了键、值,以及指向下一个HashEntry
的引用,形成了一个链表结构,用来解决哈希冲突问题。这种设计使得在并发环境下,多个线程可以同时修改不同的 Segment
,减小了锁的粒度,提高了并发性能。
因此,ConcurrentHashMap
的内部结构可以看作是多个 Segment
,每个 Segment
包含了多个HashEntry
,通过分段锁和链表解决哈希冲突,实现了高效的并发访问。这使得 ConcurrentHashMap
在多线程环境下能够提供更好的性能和线程安全性
参考:CocurrentHashMap实现原理及源码解析_cocurrentmap-CSDN博客
JUC常用工具类
- CountDownLatch
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
// 创建一个计数器对象,设置初始计数值为2
CountDownLatch latch = new CountDownLatch(2);
// 创建两个线程,它们会执行相同的任务,即睡眠2秒来模拟任务执行
Thread thread1 = new Thread(new Task(latch));
Thread thread2 = new Thread(new Task(latch));
// 启动线程
thread1.start();
thread2.start();
try {
// 主线程调用await方法等待计数器减为0
latch.await();
System.out.println("两个线程都已完成任务,主线程继续执行。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 实现Runnable接口的任务类
static class Task implements Runnable {
private CountDownLatch latch;
public Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println("线程开始执行任务...");
// 模拟任务执行
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程完成任务。");
// 每个线程完成任务后调用countDown方法,减少计数器的值
latch.countDown();
}
}
}
线程开始执行任务...
线程开始执行任务...
线程完成任务。
线程完成任务。
两个线程都已完成任务,主线程继续执行。
主线程会调用 latch.await() 方法来等待计数值减为0,即等待两个线程都完成任务。每个线程在完成任务后会调用 latch.countDown() 方法来减少计数值
作用:CountDownLatch 实现线程间的协调,等待多个线程完成任务后再继续执行主线程的操作,控制线程执行顺序
- CyclicBarrier
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
private static final int THREAD_COUNT = 3;
public static void main(String[] args) {
// 创建一个 CyclicBarrier,设置等待的线程数量为 THREAD_COUNT
CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
System.out.println("所有线程已经达到 barrier 点,继续执行");
});
for (int i = 1; i <= THREAD_COUNT; i++) {
new Thread(new Task(barrier, i)).start(); // 创建并启动多个线程,每个线程执行 Task 中的任务
}
}
static class Task implements Runnable {
private CyclicBarrier barrier;
private int id;
public Task(CyclicBarrier barrier, int id) {
this.barrier = barrier;
this.id = id;
}
@Override
public void run() {
System.out.println("线程 " + id + " 开始执行任务");
try {
Thread.sleep(2000); // 模拟执行任务的过程,这里用 2 秒睡眠来模拟
System.out.println("线程 " + id + " 完成任务,等待其他线程");
barrier.await(); // 等待其他线程到达 barrier
System.out.println("线程 " + id + " 继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
barrier.await()
是 CyclicBarrier 类中的一个方法,用于让当前线程等待,直到所 有参与者(线程)都到达 barrier 点。具体作用和行为如下:
- 当某个线程调用
barrier.await()
方法时,它会被,直到所有参与者线程都到达 barrier 点。 - 如果当前线程是最后一个到达 barrier 点的线程,那么所有被阻塞的线程将被释放,可以继续执行后续的任务。
- 如果有任何一个参与者线程遇到异常,那么 await 方法会抛出 BrokenBarrierException 异常,并且所有其他线程都会被唤醒并抛出 BrokenBarrierException 异常。
在 CyclicBarrier 中,通过多次调用 barrier.await()
方法,可以让一组线程在特定的点上
进行同步,等待所有线程都准备好之后再一起执行后续任务
CyclicBarrier 的构造函数中设置的等待的线程数量与 barrier.await() 方法相对应,意味着在调用 barrier.await() 方法时,需要有 THREAD_COUNT 个线程同时调用,才能使所有线程达到 barrier 点
总结:
CountDownLatch:
-
作用:
CountDownLatch
是 Java 中的一个同步辅助类,它可以让一个或多个线程等待其他线程执行完特定操作后再继续执行。 -
使用规则:主要通过
countDown()
和await()
方法来实现。countDown()
方法用于减少计数器,而await()
方法则让调用线程等待计数器变为零。通常在主线程中创建CountDownLatch
对象,并传递计数器初始值,然后在其他线程执行完特定任务后调用countDown()
方法来减少计数器。 -
应用场景:
- 等待多个线程执行完毕后再执行某个操作。
- 主线程等待多个子线程完成任务后再继续执行。
- 例如,一个主线程需要等待多个子线程完成各自的初始化工作后才能开始执行,这时就可以使用
CountDownLatch
。
CyclicBarrier:
-
作用:
CyclicBarrier
是另一个同步辅助类,它允许一组线程互相等待,直到达到某个共同的屏障点,然后再一起继续执行。 -
使用规则:需要通过构造函数指定等待的线程数目和达到屏障时要执行的动作。然后在各个线程中通过调用
await()
方法等待,直到所有线程都达到屏障点。 -
应用场景:
- 当多个线程都需要等待其他线程的结果,并且它们都到达某个共同的状态后再一起继续执行时,可以使用
CyclicBarrier
。 - 例如,在多线程计算中,每个线程计算一部分数据,然后等待其他线程计算完毕,最后合并计算结果,这时就可以使用
CyclicBarrier
。
- 当多个线程都需要等待其他线程的结果,并且它们都到达某个共同的状态后再一起继续执行时,可以使用
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
// 假设有三个部分的计算任务
private static final int NUMBER_OF_THREADS = 3;
// 存储各个线程计算结果的数组
private static int[] results = new int[NUMBER_OF_THREADS];
public static void main(String[] args) {
// 创建 CyclicBarrier 实例,当三个线程达到屏障点时,执行合并结果的操作
CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_THREADS, new Runnable() {
@Override
public void run() {
// 合并计算结果
int sum = 0;
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
sum += results[i];
}
System.out.println("Sum of all parts: " + sum);
}
});
// 创建并启动三个线程
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
final int threadIndex = i;
new Thread(new Runnable() {
@Override
public void run() {
// 在这里进行实际的计算工作
System.out.println("Thread " + threadIndex + " is calculating its part");
results[threadIndex] = (threadIndex + 1) * 100; // 假设的计算结果
try {
// 等待其他线程完成计算
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
读写锁
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockExample {
// 创建一个读写锁
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static void main(String[] args) {
// 创建一个写线程,在写操作时占用写锁
Thread writerThread = new Thread(() -> {
lock.writeLock().lock(); // 占用写锁
try {
System.out.println("Writer thread is writing..."); // 写操作,输出信息
Thread.sleep(2000); // 模拟写操作的耗时
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock(); // 释放写锁
System.out.println("Writer thread has released the write lock"); // 输出信息
}
});
// 创建一个读线程,在读操作时占用读锁
Thread readerThread1 = new Thread(() -> {
lock.readLock().lock(); // 占用读锁
try {
System.out.println("Reader thread 1 is reading..."); // 读操作,输出信息
Thread.sleep(1000); // 模拟读操作的耗时
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.readLock().unlock(); // 释放读锁
System.out.println("Reader thread 1 has released the read lock"); // 输出信息
}
});
// 创建另一个读线程,在读操作时占用读锁
Thread readerThread2 = new Thread(() -> {
lock.readLock().lock(); // 占用读锁
try {
System.out.println("Reader thread 2 is reading..."); // 读操作,输出信息
Thread.sleep(1000); // 模拟读操作的耗时
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.readLock().unlock(); // 释放读锁
System.out.println("Reader thread 2 has released the read lock"); // 输出信息
}
});
// 启动三个线程:一个写线程和两个读线程
writerThread.start();
readerThread1.start();
readerThread2.start();
}
}
输出结果如下
Writer thread is writing...
Reader thread 1 is reading...
Reader thread 2 is reading...
Reader thread 1 has released the read lock
Reader thread 2 has released the read lock
Writer thread has released the write lock
可以看到写线程会先执行并占用写锁,两个读线程则会顺并行执行,并共享读锁
BlockingQueue
在多线程编程领域,阻塞队列通常被用于实现生产者-消费者模式。这种模式中,一个或多个生产者向阻塞队列中不断地插入元素,一个或多个消费者从阻塞队列中不断地取出元素;如果队列已满,生产者线程将被阻塞,直到消费者取出一个元素;如果队列已空,消费者线程将被阻塞,直到生产者插入一个元素。
阻塞队列可以帮助我们处理并发
问题,避免
了程序员手动实现锁、条件等多线程细节的复杂性和出错的可能性。因此,阻塞队列已经被广泛地应用于许多高并发的场景,比如线程池、消息队列、任务调度
等各种异步处理场景。通过合理地使用阻塞队列,我们可以保证多线程应用程序的可靠性和高效性
四组主要API
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
// 创建一个容量为5的ArrayBlockingQueue
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);
// 使用put和take方法
new Thread(() -> {
try {
blockingQueue.put(1); // 阻塞式地插入元素1,若队列已满,则线程被挂起,直到队列有空闲位置
System.out.println("Put 1 into the queue.");
Thread.sleep(1000);
blockingQueue.put(2); // 阻塞式地插入元素2
System.out.println("Put 2 into the queue.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("Take from queue: " + blockingQueue.take()); // 阻塞式地取出元素,若队列为空,则线程被挂起,直到队列有元素可取
System.out.println("Take from queue: " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 使用offer和poll方法
new Thread(() -> {
if (blockingQueue.offer(3)) { // 非阻塞式地插入元素3,若队列已满,则返回false
System.out.println("Offer 3 into the queue.");
}
try {
Thread.sleep(1000);
if (blockingQueue.offer(4)) {
System.out.println("Offer 4 into the queue.");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
System.out.println("Poll from queue: " + blockingQueue.poll()); // 非阻塞式地取出元素,若队列为空,则返回null
System.out.println("Poll from queue: " + blockingQueue.poll());
}).start();
}
}
put会阻塞线程直到队列有空闲位置;当队列为空时,take会阻塞线程直到队列有数据可取
offer和poll则会返回
特定的布尔值或空值来表示放入或取出是否成功,而不会阻塞线程
SynchronousQueue阻塞队列
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {
public static void main(String[] args) {
// 创建一个 SynchronousQueue 对象
SynchronousQueue<Integer> syncQueue = new SynchronousQueue<>();
// 启动一个线程,向 SynchronousQueue 中插入元素
Thread t1 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
syncQueue.put(i); // 阻塞式插入元素,线程会等待消费者线程消费该元素
System.out.println("producer: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
// 启动另一个线程,从 SynchronousQueue 中取出元素并打印
Thread t2 = new Thread(() -> {
try {
while (true) {
int value = syncQueue.take(); // 阻塞式取出元素,线程会等待生产者线程插入该元素
System.out.println("consumer: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t2.start();
}
}
调用 put() 方法会导致线程阻塞,直到另一个线程调用 take() 方法来消费这个元素。这种一对一
的生产者-消费者关系保证了在 SynchronousQueue 中只有一个
元素存在,从而实现了元素的直接传递
SynchronousQueue 不存储
队列元素,因此在消费者线程从队列中取出元素之前,生产者线程会等待消费者线程对该元素进行消费
作用:确保队列中的元素在两个线程之间直接传递,而没有临时存储的过程
总结:
- lockingQueue 也常用于实现任务调度、优先级队列等功能。常见的 BlockingQueue 实现包括 ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue
- SynchronousQueue要求生产者直接将任务交给消费者执行,不允许存储元素,SynchronousQueue 在一些并发编程领域的高级场景中发挥着重要作用
线程池调优
volatile
1、可见性
import java.util.concurrent.TimeUnit;
public class ExampleVolatile {
private static volatile int num = 0;
public static void main(String[] args) {
new Thread(() -> {
while (num == 0) {
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println(num);
}
}
如果不将变量 num 声明为 volatile,那么在子线程中的 while 循环中,子线程将无法感知到主线程对 num 的修改,从而导致死循环。
这是因为在多线程环境中,每个线程都有自己的线程缓存
,用于存储线程的局部变量副本
。当一个线程修改了变量时,它是将修改后的值写入自己的线程缓存
中,而不会立即写回主内存
。因此,在子线程中,由于没有使用 volatile 关键字修饰的变量不会直接从主内存中获取最新的值,而是从自己的线程缓存中读取,所以子线程无法感知到主线程对 num 的改变
- 通过将 num 声明为 volatile,可以保证 num 的修改对所有线程可见。这样,当主线程将 num 的值设置为1时,子线程在下一次循环中会从主内存中读取到最新的值,从而结束循环
2、不保证原子性(Atomicity)
volatile 修饰的变量进行读取和赋值操作是原子的,但是对于复合操作(例如 i++)并不具备原子性。因此,当多个线程同时对一个 volatile 变量进行非原子性操作时,可能会发生数据不一致的情况。为了保证原子性操作,应当使用 synchronized 关键字或者 java.util.concurrent 包中的原子类
package com.kuang.tvolatile;
public class VDemo02 {
// volatile 不能保证原子性
private volatile static int num = 0;
public static void add() {
num++;
}
public static void main(String[] args) {
// 理论上 num 结果应该为 2 万
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " + num);
}
}
说明:理论上 num 的结果应该为 20000,但是volatile不保证原子性
所以不会是20000,可以使用synchronized
或者lock
或者使用JUC中atomic
包下的原子类工具
外,使用 Thread.yield() 可以让出
当前线程的执行权
,让其他线程有机会执行。在这个示例中,我们使用 Thread.yield() 来等待其他线程的执行结束,确保在主线程中输出 num 的值时,各个子线程已经完成自增操作
补充:Thread.yield() 方法不会导致线程阻塞,而是让当前线程主动让出
CPU 执行权,让其他具有相同优先级
的线程有机会执行。调用 Thread.yield() 方法后,当前线程会从运行状态变为就绪状态,等待重新获取 CPU 时间片
执行。
setPriority(int priority)
方法来设置线程的优先级
3、禁止指令重排
在 volatile 变量的读写过程中,JVM 会禁止特定类型的指令重排序,保证了程序在执行过程中的顺序一致性。这样可以避免出现线程安全问题,确保程序中 volatile 变量的修改顺序与程序员编写的顺序保持一致。
PS指令重排:重新调整指令的执行顺序。处理器为了提高执行效率,可能会对指令的执行顺序进行优化,但是必须保证最终的执行结果与不优化的情况下一致