我们已经看到了形成Java并发程序设计基础的底层构建块。然而,对于实际编程来说,应该尽可能远离底层结构。使用由并发处理的专业人士实现的较高层次的结构要方便得多、安全得多。
对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插入元素,消费者线程则取出他妈。使用队列,可以安全地从一个线程向另一个线程传递数据。例如,考虑银行转账程序,转账线程将转账指令对象插入到一个队列中,而不是直接访问银行对象。另一个线程从队列中取出指令转账。只有该线程可以访问该银行对象的内部。因此不需要同步。(当然,线程安全的队列类的实现者不能不考虑锁和条件,但是,那是他们的问题而不是你的问题。)
一、阻塞队列方法
方法 | 正常动作 | 特殊情况下的动作 |
---|---|---|
put | 添加一个元素 | 如果队列满,则阻塞 |
take | 移出并返回头元素 | 如果队列空,则阻塞 |
add | 添加一个元素 | 如果队列满,抛出IllegalStateException |
remove | 移出并返回头元素 | 如果队列空,则抛出NoSuchElementException |
element | 返回队列头元素 | 如果队列空,抛出NoSuchElementException |
offer | 添加一个元素并返回true | 如果队列满,返回false |
poll | 移出并返回队列的头元素 | 如果队列空,返回null |
peek | 返回队列的头元素 | 如果队列空,返回null |
阻塞队列方法使用上分下面三类:
1 当将队列当做线程管理工具来使用时,将要用到put和take方法。
2 当视图向满的队列中添加或从空的队列中移出元素时,add、remove和element抛出异常。
3 当然,在一个多线程程序中,队列会在任何时空或满,因此,可以用offer、poll和peek代替。
二、java.util.concurrent包
java.util.concurrent包提供了阻塞队列的几个变种:
LinkedBlockingQueue:链表结构组成的无界(可指定容量)阻塞队列
ArrayBlockingQueue:数组结构组成的有界阻塞队列
PriorityBlockingQueue:无界优先级队列,非FIFO队列,元素按照优先级顺序被移出
DelayQueue:无界阻塞队列,只有延迟期满时才能提取元素
SynchronousQueue:阻塞队列,插入操作必须等待另一线程的移除操作 ,反之亦然
LinkedBlockingDeque:链表结构组成的无界(可指定容量)双端阻塞队列
绝大部分场景下,我们只要使用ArrayBlockingQueue或LinkedBlockingQueue就够了。
示例程序展示了如何使用阻塞队列来控制一组线程。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的行:
public class BlockingQueueTest {
private static final int FILE_QUEUE_SIZE = 10;
private static final int SEARCH_THREADS = 100;
private static final File DUMMY = new File("");
private static BlockingQueue<File> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);
public static void main(String[] args) {
try (Scanner in = new Scanner(System.in)) {
System.out.println("Enter base directory (e.g. /opt/jdk1.8.0/src): ");
String directory = in.nextLine();
System.out.println("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();
Runnable enumerator = () -> {
try {
enumerate(new File(directory));
queue.put(DUMMY);
} catch (InterruptedException e) {
}
};
new Thread(enumerator).start();
for(int i = 1; i <= SEARCH_THREADS; i++) {
Runnable searcher = () -> {
try {
boolean done = false;
while (!done) {
File file = queue.take();
if(file == DUMMY) {
queue.put(file);
done = true;
} else {
search(file, keyword);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
};
new Thread(searcher).start();
}
}
}
// 递归地枚举给定目录及其子目录中的所有文件
public static void enumerate(File directory) throws InterruptedException {
File[] files = directory.listFiles();
for (File file : files) {
if (file.isDirectory()) enumerate(file);
else queue.put(file);
}
}
// 搜索一个给定关键字的文件,并打印出所有匹配的行
public static void search(File file, String keyword) throws IOException {
try (Scanner in = new Scanner(file, "UTF-8")) {
int lineNumber = 0;
while (in.hasNextLine()) {
lineNumber++;
String line = in.nextLine();
if (line.contains(keyword))
System.out.printf("%s:%d:%n", file.getPath(), lineNumber, line);
}
}
}
}
生产者线程枚举所有子目录下的所有文件并把它们放到一个阻塞队列中。同时启动大量消费者线程,每个消费者线程从队列取出一个文件,打开它,打印所有包含该关键字的行,然后取出下一个文件。我们使用一个小技巧在工作结束后终止这个应用程序。为了发出完成信号,枚举线程放置一个虚拟对象到队列中(这就像在行李输送带上放着一个写着“最后一个包”的虚拟包)。当搜索线程取到这个虚拟对象时,将其放回并终止。
注意,不需要显示的线程同步。在这个示例中,已使用了阻塞队列作为一种同步机制。