解读Disruptor系列--解读源码(1)之初始化

解读Disruptor源码系列文章将从一个demo入手,逐步探究Disruptor中的源码实现。
对原理不熟悉的同学建议先看我之前的两个翻译和导读文章。
对Disruptor源码感兴趣的同学,可以下载我注释的Disruptor代码

完整版Demo

package com.coderjerry.disruptor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;

/**
* Disruptor例子
* jerry li
*/
public class DisruptorDSLExample {

  /**
  * 用户自定义事件
  */
  class ExampleEvent{
    Object data ;
    Object ext;
    @Override
    public String toString() {
      return "DisruptorDSLExample[data:"+this.data+",ext:"+ext+"]";
    }
  }

  /**
  * 用户事件工厂,实现EventFactory接口,用于初始化事件对象
  */
  class ExampleEventFactory implements EventFactory<ExampleEvent>{

    @Override
    public ExampleEvent newInstance() {
      return new ExampleEvent();
    }
  }

  /**
  * 生产者在发布事件时,使用翻译器将原始对象设置到RingBuffer的对象中
  */
  static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{

    static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();

    @Override
    public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
      event.data = arg0 ;
      System.err.println("put data "+sequence+", "+event+", "+arg0);
    }
  }

  // 用于事件处理(EventProcessor)的线程工厂
  ThreadFactory threadFactory =
      new ThreadFactoryBuilder()
          .setNameFormat("disruptor-executor-%d")
          .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
              System.out.println("Thread " + t + "throw " + e);
              e.printStackTrace();
            }
          })

          .build();
  Disruptor disruptor = null;

  // 初始化Disruptor
  public void createDisruptor(final CountDownLatch latch){

    disruptor = new Disruptor<ExampleEvent>(
        new ExampleEventFactory(),  // 用于创建环形缓冲中对象的工厂
        8,  // 环形缓冲的大小
        threadFactory,  // 用于事件处理的线程工厂
        ProducerType.MULTI, // 生产者类型,单vs多生产者
        new BlockingWaitStrategy()); // 等待环形缓冲游标的等待策略,这里使用阻塞模式,也是Disruptor中唯一有锁的地方

    // 消费者模拟-日志处理
    EventHandler journalHandler = new EventHandler() {
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(8);
        System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
      }
    };

    // 消费者模拟-复制处理
    EventHandler replicateHandler = new EventHandler() {
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(10);
        System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
      }
    };

    // 消费者模拟-解码处理
    EventHandler unmarshallHandler = new EventHandler() { // 最慢
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(1*1000);
        if(event instanceof ExampleEvent){
          ((ExampleEvent)event).ext = "unmarshalled ";
        }
        System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);

      }
    };

    // 消费者处理-结果上报,只有执行完以上三种后才能执行此消费者
    EventHandler resultHandler = new EventHandler() {
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
        latch.countDown();
      }
    };
    // 定义消费链,先并行处理日志、解码和复制,再处理结果上报
    disruptor
        .handleEventsWith(
          new EventHandler[]{
              journalHandler,
              unmarshallHandler,
              replicateHandler
          }
        )
        .then(resultHandler);
    // 启动Disruptor
    disruptor.start();

  }

  public void shutdown(){
    disruptor.shutdown();
  }

  public Disruptor getDisruptor(){
    return disruptor;
  }

  public static void main(String[] args) {
    final int events = 20; // 必须为偶数
    DisruptorDSLExample disruptorDSLExample = new DisruptorDSLExample();
    final CountDownLatch latch = new CountDownLatch(events);

    disruptorDSLExample.createDisruptor(latch);

    final Disruptor disruptor = disruptorDSLExample.getDisruptor();
    // 生产线程0
    Thread produceThread0 = new Thread(new Runnable() {
      @Override
      public void run() {
        int x = 0;
        while(x++ < events / 2){
          disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
        }
      }
    });
    // 生产线程1
    Thread produceThread1 = new Thread(new Runnable() {
      @Override
      public void run() {
        int x = 0;
        while(x++ < events / 2){
          disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);

        }
      }
    });

    produceThread0.start();
    produceThread1.start();

    try {
      latch.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    disruptorDSLExample.shutdown();
  }

}

构建Disruptor类

可以发现整个例子都是围绕Disruptor这个类实现的,相关内容可参见官方文档Disruptor Wizard。
其实不使用Disruptor类也是完全可以的,直接操作RingBuffer更加灵活也更麻烦。Disruptor类提供了操作RingBuffer和设置消费依赖的便捷API,如构建Ringbuffer、设置消费链、启动关闭Disruptor、暂停消费者、发布事件等。
接下来,我们把示例拆开看。

disruptor = new Disruptor<ExampleEvent>(
    new ExampleEventFactory(),  // 用于创建环形缓冲中对象的工厂
    8,  // 环形缓冲的大小
    threadFactory,  // 用于事件处理的线程工厂
    ProducerType.MULTI, // 生产者类型,单vs多生产者
    new BlockingWaitStrategy()); // 等待环形缓冲游标的等待策略,这里使用阻塞模式,也是Disruptor中唯一有锁的地方

这里调用构造方法创建了一个Disruptor对象,实际上创建了一个RingBuffer对象和一个Executor,并将引入传入私有化的构造方法创建了Disruptor对象。

// Disruptor.java
public Disruptor(
        final EventFactory<T> eventFactory, // 用于创建环形缓冲中对象的工厂
        final int ringBufferSize, // 环形缓冲的大小
        final ThreadFactory threadFactory, // 用于事件处理的线程工厂
        final ProducerType producerType, // 生产者类型,单vs多生产者
        final WaitStrategy waitStrategy) // 等待环形缓冲游标的等待策略
{
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}

private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
    this.ringBuffer = ringBuffer;
    this.executor = executor;
}

// RingBuffer.java
public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType) // 构建RingBuffer时通过producerType来区分单生产者或多生产者
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }

// 单生产者模式创建RingBuffer
public static <E> RingBuffer<E> createSingleProducer(
    EventFactory<E> factory,
    int bufferSize,
    WaitStrategy waitStrategy)
{
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

    return new RingBuffer<E>(factory, sequencer);
}

// 多生产者模式创建RingBuffer
public static <E> RingBuffer<E> createMultiProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }

// RingBuffer构造器
RingBuffer(
    EventFactory<E> eventFactory,
    Sequencer sequencer)
{
    super(eventFactory, sequencer);
}

这里注意下,在构造RingBuffer时,需要传入用于创建事件对象的工厂eventFactory和记录生产者序号的sequencer。根据生产者是否是多线程生产,Sequencer又分为单、多生产者模式,后续还会讲到。
构建Disruptor实例后,需要设置Disruptor的消费者。

设置消费者

// 消费者模拟-日志处理
EventHandler journalHandler = new EventHandler() {
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    Thread.sleep(8);
    System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
  }
};

// 消费者模拟-复制处理
EventHandler replicateHandler = new EventHandler() {
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    Thread.sleep(10);
    System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
  }
};

// 消费者模拟-解码处理
EventHandler unmarshallHandler = new EventHandler() { // 最慢
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    Thread.sleep(1*1000);
    if(event instanceof ExampleEvent){
      ((ExampleEvent)event).ext = "unmarshalled ";
    }
    System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);

  }
};

// 消费者处理-结果上报,只有执行完以上三种后才能执行此消费者
EventHandler resultHandler = new EventHandler() {
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
    latch.countDown();
  }
};

这里使用了两组消费者,第一组包含三个消费者,第二组包含一个消费者。当事件可消费后,只有当第一组全部消费者都处理完毕后,事件才能被第二组消费者处理。

// 定义消费链,先并行处理日志、解码和复制,再处理结果上报
disruptor
    .handleEventsWith(
      new EventHandler[]{
          journalHandler,
          unmarshallHandler,
          replicateHandler
      }
    )
    .then(resultHandler);

启动Disruptor

消费者设置成功后,即可启动Disruptor。

// 启动Disruptor
disruptor.start();
// Disruptor.java
public RingBuffer<T> start()
{
    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository)
    {
        consumerInfo.start(executor);
    }

    return ringBuffer;
}

ConsumerRepository这个类实现了Iterable接口,iterator()方法返回ConsumerInfo集合的迭代器。ConsumerInfo是一个封装类,对应EventBatchProcessor和WorkProcessor有两种实现。EventProcessorInfo对应BatchEventProcessor,保存了与一个事件处理过程相关的EventProcessor、EventHandler、SequenceBarrier的引用。WorkerPoolInfo对应WorkProcessor,保存了WorkerPool、SequenceBarrier的引用以及代表消费者组是否为消费者链尾的标志endOfChain。
如果看不懂,不要着急哈,后续讲到消费者的时候就会明白了。

// ConsumerRepository.java
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
        new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); // hander引用为key
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
        new IdentityHashMap<Sequence, ConsumerInfo>(); // 处理器的序列引用为key
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();

    // 省略代码若干... 

    @Override
    public Iterator<ConsumerInfo> iterator()
    {
        return consumerInfos.iterator();
    }

}

调用ConsumerInfo.start()方法,其实就是启动了消费者线程:

// EventProcessorInfo.java
class EventProcessorInfo<T> implements ConsumerInfo
{

    // 省略代码若干...
    @Override
    public void start(final Executor executor)
    {
        executor.execute(eventprocessor);

    }
}

// WorkerPoolInfo.java
class WorkerPoolInfo<T> implements ConsumerInfo
{
     // 省略代码若干...
    @Override
    public void start(final Executor executor)

    {
        workerPool.start(executor);
    }
}

// WorkerPool.java
public final class WorkerPool<T>
{
     // 省略代码若干...
     public RingBuffer<T> start(final Executor executor)
     {
    if (!started.compareAndSet(false, true))
    {
        throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
    }

    final long cursor = ringBuffer.getCursor();
    workSequence.set(cursor);

    for (WorkProcessor<?> processor : workProcessors)
    {
        processor.getSequence().set(cursor);
        executor.execute(processor);
    }

    return ringBuffer;    
}

至此,Disruptor的初始化和启动就完成了。主要是完成了RingBuffer数据结构的初始化、设置消费者以及启动。
后续将继续分享消费者代码。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容