本文参考至《Java7并发编程实战手册》
Exchanger类允许在两个线程之间定义同步点,当两个线程都到达同步点时,它们交换数据。也就是第一个线程的数据进入到第二个线程中,第二线程的数据进入到第一个线程中。
这里有一个生产者和消费者的例子:生产者每次都会生产10个数据,这10个数据放在一个箩筐(List)里面,Exchanger会在:生产者生产完10个数据并且消费者消费完10个数据的时候,将两者的箩筐换过来。这样生产者的箩筐里面就没有数据(确保生产者生产的东西有地方装);消费者的箩筐里面就有10个数据(确保消费者有数据消费)。
生产者:
package Exchanger;
import java.util.List;
import java.util.concurrent.Exchanger;
/**
* Producer:生产者
*
* @author JM
* @date 2017-2-28 下午10:33:08
* @since JDK 1.7
*/
public class Producer implements Runnable {
private List<String> buffer;
private final Exchanger<List<String>> exchanger;
public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
super();
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
String message = "Event " + ((i * 10) + j);
System.out.printf("Producer:create---------- %s\n", message);
buffer.add(message);
}
System.out.println("Before exchange Producer:have11111111111@@@@@@@@@@ " + buffer.size());
try {
// 如果这个时候消费者也刚好在这里等待,那么生产者和消费者的数据产生对换(否则将等待消费者到达这里再进行数据对换)
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("After exchange Producer:have222222222222@@@@@@@@@@ " + buffer.size());
}
}
}
消费者:
package Exchanger;
import java.util.List;
import java.util.concurrent.Exchanger;
public class Consumer implements Runnable {
private List<String> buffer;
private final Exchanger<List<String>> exchanger;
public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
super();
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Before exchange Consumer:have111111111******** " + buffer.size());
try {
// 如果这个时候生产者也刚好在这里等待,那么生产者和消费者的数据产生对换(否则将等待生产者到达这里再进行数据对换)
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("After exchange Consumer:have2222222******** " + buffer.size());
for (int j = 0; j < 10; j++) {
String message = buffer.get(0);
System.out.printf("Consumer:use----------- %s\n", message);
buffer.remove(0);
}
}
}
}
测试类:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
public class Main {
public static void main(String[] args) {
List<String> consumerBuffer = new ArrayList<String>();
List<String> producerBuffer = new ArrayList<String>();
Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
Producer producer = new Producer(producerBuffer, exchanger);
Consumer consumer = new Consumer(consumerBuffer, exchanger);
Thread producerThread = new Thread(producer);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
}
}