一、设计线程安全的类
在设计线程安全类的过程中,需要包含以下三个基本要素:
- 找出构成对象状态的所有变量
- 找出约束状态变量的不变性条件
- 建立对象状态的并发访问管理策略
1.1 收集同步需求
要确保类的线程安全性,就需要确保它的不变性条件不会再并发访问的情况下被破坏,这就需要对其状态进行推断。对象和变量都有一个状态空间,即所有可能的取值。状态空间越小,就越容易判断线程的状态。final类型的域使用的越多,就越能简化对象可能状态的分析过程。(在极端的情况下,不可变对象只有唯一的状态)
如果不了解对象的不变性条件与后验条件,那么就不能确保线程安全性。要满足在状态变量的有效值或状态转换上的各种约束条件,就需要借助于原子性和封装性。
二、实例封闭
如果某对象不是线程安全的,那么可以通过多种技术使其在多线程程序中安全的使用。你可以确保该对象只能由单个线程访问(线程封闭),或者通过一个锁来保护该对象的所有访问。
封装简化了线程安全类的实现过程,他提供了一种实例封闭机制(Instance Confinement),通常页简称为“封闭”。当一个对象被封装到另一个对象中时,能够访问被封装对象的所有代码路径都是已知的。与对象可以由整个程序访问的情况相比,更易于对代码进行分析。通过将封闭机制与合适的加锁策略结合起来,可以确保线程安全的方式来使用非线程安全的对象。
将数据封装在对象内部,可以将数据的访问限制在对象的方法上,从而更容易确保线程在访问数据时总能持有正确的锁。
被封闭对象一定不能超出它们既定的作用域。对象可以封闭在类的一个实例中,或者封闭在某个作用域内,再或者封闭在线程内。当然,对象本身不会逸出--出现逸出情况的原因通常是由于开发人员在发布对象时超出了对象既定的作用域。
三、同步容器类
同步容器类包括Vector和HashTable,二者是早期JDK的一部分,此外还包括在JDK 1.2中添加的一些功能相似的类,这些的同步封装器类是由Collections.synchronizedXxx等工厂方法创建的。这些类实现线程安全的方式是:将他们的状态封装起来,并对每个共有方法进行同步,使得每次只有一个线程能访问容器的状态。
3.1 同步容器类的问题
同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护符合操作。容器上常见的复合操作包括:迭代(反复访问元素,直到遍历完容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)yi'ji以及条件运算。在同步容器类中,这些复合操作在没有客户端加锁的情况下,仍然是线程安全的,但当其他线程并发的修改容器时,他们可能会表现出意料之外的行为。
四、并发容器
Java 5.0提供了多种并发容器来改进同步容器的性能。同步容器将所有对容器状态的访问都串行化,以实现他们的线程安全性。这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。
另一方面,并发容器是针对多个线程并发访问设计的。在Java 5.0中增加了ConcurrentHashMap,用来替代同步且基于散列的Map,以及CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。在新的ConcurrentMap接口中增加了对一些常见符合操作的支持,例如“若没有则添加”、替换以及有条件删除等。
通过并发容器来代替同步容器,可以极大地提供伸缩性并降低风险。
4.1 CocurrentHashMap
同步容器类在执行每个操作期间都持有一个锁。在一些操作中,例如HashMashMap.get或List.contains,可能包含大量的工作:当遍历散列桶或链表来查找某个特定的对象时,必须在许多元素上调用equals。在基于散列的容器中,如果hashCode不能很均匀的分布散列值,那么容器中的元素就不会均匀的分布在整个容器中。某些情况下,某个糟糕的散列函数还会把一个散列表变成线性链表。当遍历很长的链表并且在某些或者全部元素上调用equals方法时,会花费很长时间,而其他线程在这段时间内都不能访问容器。
ConcurrentHashMap使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制被称为分段锁(Lock Striping)。在这种机制中,任意数量的读取线程可以并发的访问Map,执行读取操作的线程和执行写入操作的线程可以并发的访问Map,并且一定数量的写入线程可以并发的修改Map。
4.2 CopyOnWriteArrayList
CopyOnWriteArrayList用于替代同步List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。
“写入时赋值(Copy-On-Write)”容器的线程安全性在于,只要正确的发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。“写入时复制”容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰。】
4.3 阻塞队列和生产者-消费者模式
阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远都不会充满,因此无界队列的put方法也永远不会阻塞。
在基于阻塞队列构建的生产者-消费者设计中,当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。生产者不需要知道消费者的标识或数量,或者他们是否是唯一生产者,只需将数据放入队列即可。同样,消费者也不需要知道生产者是谁,或者工作来自何处。BlockingQueue简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者。一种常见的生产者-消费者设计模式就是线程池与工作队列的组合,在Executor任务执行框架中就体现了这种模式。
在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:他们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。
public class BlockingQueueDemo {
public static class Basket {
BlockingQueue<String> basket = new ArrayBlockingQueue(3);
public void product() throws InterruptedException {
basket.put("An apple");
}
public String consume() throws InterruptedException {
String apple = basket.take();
return apple;
}
public int getAppleNum() {
return basket.size();
}
}
public static void testBasket() {
final Basket basket = new Basket();
class Producer implements Runnable {
public void run() {
try {
while (true) {
System.out.println("准备生产苹果:" + System.currentTimeMillis());
basket.product();
System.out.println("生产完毕:" + System.currentTimeMillis());
System.out.println("生产完的苹果有" + basket.getAppleNum() + "个");
Thread.sleep(300);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
public void run() {
try {
while (true) {
System.out.println("消费者准备消费苹果:" + System.currentTimeMillis());
basket.consume();
System.out.println("消费者消费苹果完毕:" + System.currentTimeMillis());
System.out.println("消费完后有苹果:" + basket.getAppleNum() + "个");
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = new Producer();
Consumer consumer = new Consumer();
service.submit(producer);
service.submit(consumer);
// 程序运行10s后,所有任务停止
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
service.shutdownNow();
}
public static void main(String[] args) {
BlockingQueueDemo.testBasket();
}
}
4.4 阻塞方法和中断方法
线程可能会阻塞或暂停执行,原因有多种:等待I/O操作结束,等待获取一个锁,等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果。当线程阻塞时,它通常被挂起,并处于某种阻塞状态(BLOCKED、WAITING或TIMED_WAITING)。阻塞操作与执行时间很长的普通操作的差别在于,被阻塞的线程必须等待某个不受他控制的事件发生后才能继续执行,例如等待I/O操作完成,等待某个锁变成可用,或者等待外部计算的结束。当某个外部事件发生时,线程被置hui回RUNNABLE状态,并可以再次被调度执行。
Thread提供了interrupt方法,用于中断线程或者查询线程是否已经被中断。每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。
中断时一种协作机制。一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作。当在代码中调用了一个将抛出InterruptedException异常方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应。对于库代码来说,有两种基本选择:
传递InterruptedException。避开这个异常通常是最明智的策略-只需把InterruptedException传递给方法的调用者。传递的方法包括,根本不捕获该异常,或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常。
恢复中断。有时候不能抛出InterruptedException,例如当代码是Runnable的一部分时。在这些状态下,必须捕获InterruptedException,并通过调用当前线程上的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
...
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
}
}
}
五、同步工具类
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。所有的同步工具类都包含一些特定的结构化属性:他们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效的等待同步工具类进入到预期状态。
5.1 闭锁
闭锁是一种同步工具类,可以延迟线程的进度直到到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门是一直关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。
5.2 FutureTask
FutureTask表示的计算是通过Callable来实现的,相当于一种可以生成结果的Runnable,并可以处于以下三种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。
Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递能实现结果的安全发布。
5.3 信号量
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore中管理者一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二zhi二值信号量,即初始值为1的Semaphore。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。
Semaphore可以用于实现资源池,例如数据库连接池。我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时解除阻塞。如果将Semaphore的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用acquire方法获取一个许可,在将资源返回给池之后调用release释放许可,那么acquire将一直阻塞直到资源池不为空。
同样,你可以使用Semaphore将任何一种容器变成有界阻塞容器。信号量的计数值会初始化为容器容量的最大值。add操作在向底层容器添加一个元素之前,首先要获取一个许可。如果add操作没有添加任何元素,那么会立刻释放许可。同样,remove操作释放一个许可,使更多的元素能够添加到容器中。
5.4 栅栏
栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier可以是一定数量的参与方反复的在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调研await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当城管通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。