解读Disruptor源码系列文章将从一个demo入手,逐步探究Disruptor中的源码实现。
对原理不熟悉的同学建议先看我之前的两个翻译和导读文章。
对Disruptor源码感兴趣的同学,可以下载我注释的Disruptor代码。
完整版Demo
package com.coderjerry.disruptor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
/**
* Disruptor例子
* jerry li
*/
public class DisruptorDSLExample {
/**
* 用户自定义事件
*/
class ExampleEvent{
Object data ;
Object ext;
@Override
public String toString() {
return "DisruptorDSLExample[data:"+this.data+",ext:"+ext+"]";
}
}
/**
* 用户事件工厂,实现EventFactory接口,用于初始化事件对象
*/
class ExampleEventFactory implements EventFactory<ExampleEvent>{
@Override
public ExampleEvent newInstance() {
return new ExampleEvent();
}
}
/**
* 生产者在发布事件时,使用翻译器将原始对象设置到RingBuffer的对象中
*/
static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{
static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();
@Override
public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
event.data = arg0 ;
System.err.println("put data "+sequence+", "+event+", "+arg0);
}
}
// 用于事件处理(EventProcessor)的线程工厂
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat("disruptor-executor-%d")
.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("Thread " + t + "throw " + e);
e.printStackTrace();
}
})
.build();
Disruptor disruptor = null;
// 初始化Disruptor
public void createDisruptor(final CountDownLatch latch){
disruptor = new Disruptor<ExampleEvent>(
new ExampleEventFactory(), // 用于创建环形缓冲中对象的工厂
8, // 环形缓冲的大小
threadFactory, // 用于事件处理的线程工厂
ProducerType.MULTI, // 生产者类型,单vs多生产者
new BlockingWaitStrategy()); // 等待环形缓冲游标的等待策略,这里使用阻塞模式,也是Disruptor中唯一有锁的地方
// 消费者模拟-日志处理
EventHandler journalHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(8);
System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
}
};
// 消费者模拟-复制处理
EventHandler replicateHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(10);
System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
}
};
// 消费者模拟-解码处理
EventHandler unmarshallHandler = new EventHandler() { // 最慢
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(1*1000);
if(event instanceof ExampleEvent){
((ExampleEvent)event).ext = "unmarshalled ";
}
System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);
}
};
// 消费者处理-结果上报,只有执行完以上三种后才能执行此消费者
EventHandler resultHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
latch.countDown();
}
};
// 定义消费链,先并行处理日志、解码和复制,再处理结果上报
disruptor
.handleEventsWith(
new EventHandler[]{
journalHandler,
unmarshallHandler,
replicateHandler
}
)
.then(resultHandler);
// 启动Disruptor
disruptor.start();
}
public void shutdown(){
disruptor.shutdown();
}
public Disruptor getDisruptor(){
return disruptor;
}
public static void main(String[] args) {
final int events = 20; // 必须为偶数
DisruptorDSLExample disruptorDSLExample = new DisruptorDSLExample();
final CountDownLatch latch = new CountDownLatch(events);
disruptorDSLExample.createDisruptor(latch);
final Disruptor disruptor = disruptorDSLExample.getDisruptor();
// 生产线程0
Thread produceThread0 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
// 生产线程1
Thread produceThread1 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
produceThread0.start();
produceThread1.start();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
disruptorDSLExample.shutdown();
}
}
构建Disruptor类
可以发现整个例子都是围绕Disruptor这个类实现的,相关内容可参见官方文档Disruptor Wizard。
其实不使用Disruptor类也是完全可以的,直接操作RingBuffer更加灵活也更麻烦。Disruptor类提供了操作RingBuffer和设置消费依赖的便捷API,如构建Ringbuffer、设置消费链、启动关闭Disruptor、暂停消费者、发布事件等。
接下来,我们把示例拆开看。
disruptor = new Disruptor<ExampleEvent>(
new ExampleEventFactory(), // 用于创建环形缓冲中对象的工厂
8, // 环形缓冲的大小
threadFactory, // 用于事件处理的线程工厂
ProducerType.MULTI, // 生产者类型,单vs多生产者
new BlockingWaitStrategy()); // 等待环形缓冲游标的等待策略,这里使用阻塞模式,也是Disruptor中唯一有锁的地方
这里调用构造方法创建了一个Disruptor对象,实际上创建了一个RingBuffer对象和一个Executor,并将引入传入私有化的构造方法创建了Disruptor对象。
// Disruptor.java
public Disruptor(
final EventFactory<T> eventFactory, // 用于创建环形缓冲中对象的工厂
final int ringBufferSize, // 环形缓冲的大小
final ThreadFactory threadFactory, // 用于事件处理的线程工厂
final ProducerType producerType, // 生产者类型,单vs多生产者
final WaitStrategy waitStrategy) // 等待环形缓冲游标的等待策略
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}
// RingBuffer.java
public static <E> RingBuffer<E> create(
ProducerType producerType,
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
switch (producerType) // 构建RingBuffer时通过producerType来区分单生产者或多生产者
{
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
// 单生产者模式创建RingBuffer
public static <E> RingBuffer<E> createSingleProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// 多生产者模式创建RingBuffer
public static <E> RingBuffer<E> createMultiProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// RingBuffer构造器
RingBuffer(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
super(eventFactory, sequencer);
}
这里注意下,在构造RingBuffer时,需要传入用于创建事件对象的工厂eventFactory和记录生产者序号的sequencer。根据生产者是否是多线程生产,Sequencer又分为单、多生产者模式,后续还会讲到。
构建Disruptor实例后,需要设置Disruptor的消费者。
设置消费者
// 消费者模拟-日志处理
EventHandler journalHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(8);
System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
}
};
// 消费者模拟-复制处理
EventHandler replicateHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(10);
System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
}
};
// 消费者模拟-解码处理
EventHandler unmarshallHandler = new EventHandler() { // 最慢
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(1*1000);
if(event instanceof ExampleEvent){
((ExampleEvent)event).ext = "unmarshalled ";
}
System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);
}
};
// 消费者处理-结果上报,只有执行完以上三种后才能执行此消费者
EventHandler resultHandler = new EventHandler() {
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
latch.countDown();
}
};
这里使用了两组消费者,第一组包含三个消费者,第二组包含一个消费者。当事件可消费后,只有当第一组全部消费者都处理完毕后,事件才能被第二组消费者处理。
// 定义消费链,先并行处理日志、解码和复制,再处理结果上报
disruptor
.handleEventsWith(
new EventHandler[]{
journalHandler,
unmarshallHandler,
replicateHandler
}
)
.then(resultHandler);
启动Disruptor
消费者设置成功后,即可启动Disruptor。
// 启动Disruptor
disruptor.start();
// Disruptor.java
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return ringBuffer;
}
ConsumerRepository这个类实现了Iterable接口,iterator()方法返回ConsumerInfo集合的迭代器。ConsumerInfo是一个封装类,对应EventBatchProcessor和WorkProcessor有两种实现。EventProcessorInfo对应BatchEventProcessor,保存了与一个事件处理过程相关的EventProcessor、EventHandler、SequenceBarrier的引用。WorkerPoolInfo对应WorkProcessor,保存了WorkerPool、SequenceBarrier的引用以及代表消费者组是否为消费者链尾的标志endOfChain。
如果看不懂,不要着急哈,后续讲到消费者的时候就会明白了。
// ConsumerRepository.java
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); // hander引用为key
private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
new IdentityHashMap<Sequence, ConsumerInfo>(); // 处理器的序列引用为key
private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
// 省略代码若干...
@Override
public Iterator<ConsumerInfo> iterator()
{
return consumerInfos.iterator();
}
}
调用ConsumerInfo.start()方法,其实就是启动了消费者线程:
// EventProcessorInfo.java
class EventProcessorInfo<T> implements ConsumerInfo
{
// 省略代码若干...
@Override
public void start(final Executor executor)
{
executor.execute(eventprocessor);
}
}
// WorkerPoolInfo.java
class WorkerPoolInfo<T> implements ConsumerInfo
{
// 省略代码若干...
@Override
public void start(final Executor executor)
{
workerPool.start(executor);
}
}
// WorkerPool.java
public final class WorkerPool<T>
{
// 省略代码若干...
public RingBuffer<T> start(final Executor executor)
{
if (!started.compareAndSet(false, true))
{
throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
}
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors)
{
processor.getSequence().set(cursor);
executor.execute(processor);
}
return ringBuffer;
}
至此,Disruptor的初始化和启动就完成了。主要是完成了RingBuffer数据结构的初始化、设置消费者以及启动。
后续将继续分享消费者代码。