之前我们已经熟悉了Disruptor的启动和事件生产操作,接下来我们一同探究Disruptor如何消费事件。
0x00 概念回顾
我们先回顾下Disruptor消费相关的名词概念:
Event: Disruptor中传输的事件。
RingBuffer: 存储和更新事件的容器。
EventHandler: 用户实现接口,包含消费处理逻辑,代表Disruptor一个消费者。
EventProcessor: EventProcessor继承了Runnable接口,包含处理Disruptor事件的主循环。
多播事件: 队列和Disruptor在表现行为上最大的区别。队列中的一个事件只能被一个消费者消费,而Disruptor中的事件会发布给所有消费者。特别适合同一数据的独立并行处理操作。
消费者依赖图(消费链):同一事件需要被多个消费者消费时,消费者之间可能有依赖关系,如消费者A,B,C,B和C依赖A先执行,但是B和C可以并行消费。
0x01 EventProcessor接口概览
OK,咱们正式开始对Disruptor消费者的源码解读。
Disruptor的消费者依赖EventProcessor循环处理可用事件。EventProcessor顾名思义,就是事件处理器(handle和process都可以翻译为“处理”,但是process侧重于机器的处理,而handle侧重于有人工的处理,所以使用handle表示用户逻辑的处理,使用process表示机器的处理),这个接口有两个实现类,分别是WorkProcessor和BatchEventProcessor,它们对应的逻辑处理消费者分别是EventHandler和WorkHandler。下面是EventProcessor的UML类图及EventHandler和EventProcessor的接口定义。
/**
* Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
* 处理事件的回调接口
*/
public interface EventHandler<T>
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
/**
* EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
* <p>
* An EventProcessor will generally be associated with a Thread for execution.
* 事件执行器,等待RingBuffer有可用消费事件。一个事件处理器关联一个执行线程
*/
public interface EventProcessor extends Runnable
{
/**
* Get a reference to the {@link Sequence} being used by this {@link EventProcessor}.
*
* @return reference to the {@link Sequence} for this {@link EventProcessor}
*/
Sequence getSequence();
/**
* Signal that this EventProcessor should stop when it has finished consuming at the next clean break.
* It will call {@link SequenceBarrier#alert()} to notify the thread to check status.
*/
void halt();
boolean isRunning();
}
EventProcessor接口继承了Runnable接口,主要有两种实现:单线程批量处理BatchEventProcessor和多线程处理WorkProcessor。
在使用Disruptor帮助类构建消费者时,使用handleEventsWith方法传入多个EventHandler,内部使用多个BatchEventProcessor关联多个线程执行。这种情况类似JMS中的发布订阅模式,同一事件会被多个消费者并行消费。适用于同一事件触发多种操作。
而使用Disruptor的handleEventsWithWorkerPool传入多个WorkHandler时,内部使用多个WorkProcessor关联多个线程执行。这种情况类似JMS的点对点模式,同一事件会被一组消费者其中之一消费。适用于提升消费者并行处理能力。
0x02 消费技术实现
我们先回顾下Disruptor消费者的两个特点:消费者依赖图(即下文所谓的“消费链”)和事件多播。
假设现在有A,B,C,D四个消费者,它们都能组成什么样的形式呢?从众多的排列组合中,我挑了4组比较有代表性的消费链形式。
- 第1组中,消费者A消费按成后,B、C、D可同时消费;
- 第2组中,消费者A、B、C、D顺序消费;
- 第3组中,消费者A、B顺序消费后,C、D同时消费;
- 第4组中,消费者A在消费完成后,B和C可以同时消费,但是必须在都消费完成后,D才能消费。
标号为1、3、4的消费链都使用了事件多播,可见事件多播属于消费链的一种组合形式。注意,在上面4种组合中,每个组合的每一水平行,都属于一个消费者组。
这些还只是较为简单的消费链组成,实际中消费链可能会更复杂。
那么在Disruptor内部是怎么实现消费链的呢?
我们可以先思考下。如果想把独立的消费者组成消费链,那么后方的消费者(组)必然要知道在它前方的消费者(组)的处理情况,否则就做不到顺序消费。同时,消费者也要了解生产者的位置,来判断是否有可用事件。之前我们分析生产者代码的时候,已经讲过,生产者为了不覆盖没有消费完全的事件,必须知道最慢消费者的处理情况。
做到了这些才会有能力去控制消费者组成消费链。下面让我们具体看Disruptor中的实现。
0x02.1 使用BatchEventProcessor单线程批处理事件
在使用BatchEventProcessor时,通过Disruptor#handleEventsWith方法可以获取一个EventHandlerGroup,再通过EventHandlerGroup的and和then方法可以构建一个复杂的消费者链。EventHandlerGroup表示一组事件消费者,内部持有了Disruptor类实例disruptor,其大部分功能都是通过调用disruptor实现,其实可以算作是Disruptor这个辅助类的一部分。
// EventHandlerGroup.java
public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
{
return handleEventsWith(handlers);
}
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return disruptor.createEventProcessors(sequences, handlers);
}
// Disruptor.java
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
// 由EventHandlerGroup调用时,barrierSequences是EventHandlerGroup实例的序列,也就是上一个事件处理者组的序列,作为当前事件处理的门控,防止后边的消费链超前
// 如果是第一次调用handleEventsWith,则barrierSequences是一个空数组
EventHandlerGroup<T> **createEventProcessors**(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
// 对应此事件处理器组的序列组
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
// 批量处理事件的循环
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
// 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加。(所谓门控,是指后续消费链的消费,不能超过前边。)
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
// 为消费链下一组消费者,更新门控序列
// barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值
// processorSequences是本次要设置的事件处理器组的序列
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
ringBuffer.addGatingSequences(processorSequences); // 将本组序列添加到Sequencer中的gatingSequences中
for (final Sequence barrierSequence : barrierSequences) // 将上组序列从Sequencer中的gatingSequences中,gatingSequences一直保存消费链末端消费者的序列组
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); // 取消标记上一组消费者为消费链末端
}
}
可以看到,使用BatchEventProcessor构建消费者链时的逻辑都在createEventProcessors这个方法中。
先简单说下ConsumerRepository,这个类主要保存消费者的各种关系,如通过EventHandler引用获取EventProcessorInfo信息,通过Sequence获取ConsumerInfo信息等。因为要使用引用做key,所以数据结构使用IdentityHashMap。IdentityHashMap
和HashMap最大的不同,就是使用==而不是equals比较key。
这个createEventProcessors方法接收两个参数,barrierSequences表示当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组。createEventProcessors方法的另一个参数eventHandlers,这个参数是代表事件消费逻辑的EventHandler数组。
Disruptor为每个EventHandler实现类都创建了一个对应的BatchEventProcessor。
在构建BatchEventProcessor时需要以下传入三个构造参数:dataProvider是数据存储结构如RingBuffer;sequenceBarrier用于跟踪生产者游标,协调数据处理;eventHandler是用户实现的事件处理器,也就是实际的消费者。
注意,Disruptor并非为每个BatchEventProcessor都创建一个新的SequenceBarrier,而是每个消费者组共用一个SequenceBarrier。
BatchEventProcessor定义如下。至于为什么要叫做BatchEventProcessor,可以看看在run()方法里每次waitFor获取的availableSequence是当前能够使用的最大值,然后再循环处理这些数据。这样当消费者有瞬时抖动,导致暂时落后生产者时,可在下一次循环中,批量处理所有落后的事件。
/**
* Convenience class for handling the batching semantics of consuming entries from a {@link RingBuffer}
* and delegating the available events to an {@link EventHandler}.
* <p>
* If the {@link EventHandler} also implements {@link LifecycleAware} it will be notified just after the thread
* is started and just before the thread is shutdown.
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
*
* 每个EventHandler对应一个EventProcessor执行者,BatchEventProcessor每次大循环可以获取最高可用序号,并循环调用EventHandler
*/
public final class BatchEventProcessor<T>
implements EventProcessor
{
private final AtomicBoolean running = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider; // 数据提供者,默认是RingBuffer,也可替换为自己的数据结构
private final SequenceBarrier sequenceBarrier; // 默认为ProcessingSequenceBarrier
private final EventHandler<? super T> eventHandler; // 此EventProcessor对应的用户自定义的EventHandler实现
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 当前执行位置
private final TimeoutHandler timeoutHandler;
private final BatchStartAware batchStartAware; // 每次循环取得一批可用事件后,在实际处理前调用
/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
*/
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;
if (eventHandler instanceof SequenceReportingEventHandler)
{
((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
}
batchStartAware =
(eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
timeoutHandler =
(eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
// ... 省略部分代码
/**
* It is ok to have another thread rerun this method after a halt().
*
* @throws IllegalStateException if this object instance is already running in a thread
*/
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{ // availableSequence返回的是可用的最大值
final long availableSequence = sequenceBarrier.waitFor(nextSequence); // 使用给定的等待策略去等待下一个序列可用
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 批处理在此处得以体现
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// eventHandler处理完毕后,更新当前序号
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
}
0x02.2 消费者可用序列屏障-SequenceBarrier
我们重点看一下SequenceBarrier,可直译为“序列屏障”。SequenceBarrier的主要作用是协调获取消费者可处理到的最大序号,内部持有着生产者和其依赖的消费者序列。它的接口定义如下。
public interface SequenceBarrier
{
/**
* Wait for the given sequence to be available for consumption.<br>
* 等待指定序列可用
* @param sequence to wait for
* @return the sequence up to which is available
* @throws AlertException if a status change has occurred for the Disruptor
* @throws InterruptedException if the thread needs awaking on a condition variable.
* @throws TimeoutException
*
*/
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
/**
* Get the current cursor value that can be read.<br>
* 获取当前可读游标值
*
* @return value of the cursor for entries that have been published.
*
*/
long getCursor();
/**
* The current alert status for the barrier.<br>
* 当前的alert状态
*
* @return true if in alert otherwise false.
*/
boolean isAlerted();
/**
* Alert the {@link EventProcessor}s of a status change and stay in this status until cleared.<br>
*
* 通知消费者状态变化。当调用EventProcessor#halt()将调用此方法。
*/
void alert();
/**
* Clear the current alert status.<br>
* 清楚alert状态
*/
void clearAlert();
/**
* Check if an alert has been raised and throw an {@link AlertException} if it has.
* 检查是否发生alert,发生将抛出异常
* @throws AlertException if alert has been raised.
*/
void checkAlert() throws AlertException;
}
SequenceBarrier实例引用被EventProcessor持有,用于等待并获取可用的消费事件,主要体现在waitFor这个方法。
要实现这个功能,需要3点条件:
- 知道生产者的位置。
- 因为Disruptor支持消费者链,在不同的消费者组之间,要保证后边的消 费者组只有在前消费者组中的消费者都处理完毕后,才能进行处理。
- 暂时没有事件可消费,在等待可用消费时,还需要使用某种等待策略进行等待。
看下SequenceBarrier实现类ProcessingSequenceBarrier的代码是如何实现waitFor方法。
final class ProcessingSequenceBarrier implements SequenceBarrier
{
private final WaitStrategy waitStrategy; // 等待可用消费时,指定的等待策略
private final Sequence dependentSequence; // 依赖的上组消费者的序号,如果当前为第一组则为cursorSequence(即生产者发布游标序列),否则使用FixedSequenceGroup封装上组消费者序列
private volatile boolean alerted = false; // 当触发halt时,将标记alerted为true
private final Sequence cursorSequence; // AbstractSequencer中的cursor引用,记录当前发布者发布的最新位置
private final Sequencer sequencer; // MultiProducerSequencer 或 SingleProducerSequencer
public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length) // 依赖的上一组序列长度,第一次是0
{
dependentSequence = cursorSequence;
}
else // 将上一组序列数组复制成新数组保存,引用不变
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
// 检查是否停止服务
checkAlert();
// 获取最大可用序号 sequence为给定序号,一般为当前序号+1,cursorSequence记录生产者最新位置,
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
// 返回已发布最高的序列值,将对每个序号进行校验
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
// ...
}
0x02.3 该用什么姿势等待可用事件-WaitStrategy
看来实际的等待操作还是在WaitStrategy#waitFor完成的。
// WaitStrategy.java
/**
* Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}. <br>
* 消费者等待可用事件的策略
*/
public interface WaitStrategy
{
/**
* Wait for the given sequence to be available. It is possible for this method to return a value
* less than the sequence number supplied depending on the implementation of the WaitStrategy. A common
* use for this is to signal a timeout. Any EventProcessor that is using a WaitStrategy to get notifications
* about message becoming available should remember to handle this case. The {@link BatchEventProcessor} explicitly
* handles this case and will signal a timeout if required.
*
* @param sequence to be waited on. 给定序号
* @param cursor the main sequence from ringbuffer. Wait/notify strategies will
* need this as it's the only sequence that is also notified upon update. 生产者游标
* @param dependentSequence on which to wait. 依赖的序列,一般是上一个消费者组序列的FixedSequenceGroup封装。如果消费者是第一组,则为cursor。
* @param barrier the processor is waiting on. 在等待时需要判断是否对消费者有alert操作
* @return the sequence that is available which may be greater than the requested sequence.
* @throws AlertException if the status of the Disruptor has changed.
* @throws InterruptedException if the thread is interrupted.
* @throws TimeoutException
*/
long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException;
/**
* Implementations should signal the waiting {@link EventProcessor}s that the cursor has advanced. <br>
* 当生产者发布新事件后,将通知等待的EventProcessor。当用锁机制时才会包含相应逻辑。
*/
void signalAllWhenBlocking();
}
在各种等待策略中,我们选取阻塞策略研究。
public final class BlockingWaitStrategy implements WaitStrategy
{
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition();
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if (cursorSequence.get() < sequence) // 当前游标小于给定序号,也就是无可用事件
{
lock.lock();
try
{
while (cursorSequence.get() < sequence) // 当给定的序号大于生产者游标序号时,进行等待
{
barrier.checkAlert();
// 循环等待,在Sequencer中publish进行唤醒;等待消费时也会在循环中定时唤醒。
// 循环等待的原因,是要检查alert状态。如果不检查将导致不能关闭Disruptor。
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// 给定序号大于上一个消费者组最慢消费者(如当前消费者为第一组则和生产者游标序号比较)序号时,需要等待。不能超前消费上一个消费者组未消费完毕的事件。
// 那么为什么这里没有锁呢?可以想一下此时的场景,代码运行至此,已能保证生产者有新事件,如果进入循环,说明上一组消费者还未消费完毕。
// 而通常我们的消费者都是较快完成任务的,所以这里才会考虑使用Busy Spin的方式等待上一组消费者完成消费。
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}
@Override
public String toString()
{
return "BlockingWaitStrategy{" +
"processorNotifyCondition=" + processorNotifyCondition +
'}';
}
}
阻塞等待策略使用Lock+Condition的方式等待生产者生产可用事件,而使用Busy Spin的方式等待可能出现的上一个消费者组未消费完成的情况。
这里给我们一个提示,在构建低延迟系统时,因为锁的性能消耗,尽量不要使用锁。如果必须要用锁,也要把锁粒度调到最小。
另外,消费者在等待可用消费事件时,会循环调用barrier.checkAlert(),再去调用锁的条件等待,等待可用消费事件。
有三个地方可以唤醒等待中的消费线程。两种是在Sequencer实现类中,一是有可用事件发布,通知消费线程继续消费;二是在调用next()获取可用的RingBuffer槽位时,发现RingBuffer满了(生产者速度大于消费者,导致生产者没有可用位置发布事件),将唤醒消费者线程,此功能在3.3.5版本新增(Resignal any waiting threads when trying to publish to a full ring buffer )。开始我百思不得,为什么要在buffer满了的时候不断唤醒消费者线程,直到看到这个issue才明白。大意是在log4j2中使用Disruptor时发生了死锁,为了避免在发布事件时,由于某种原因导致没有通知到消费者,在生产者尝试往一个已满的buffer发布数据时,就会再通知消费者进行消费。而这个bug最终也被Log4j认领,与Disruptor无关。Disruptor这里的再次通知也是为了更加保险。
//*ProducerSequencer.java
// next(n)中的代码
// 由于慢消费者,无可用坑位,只有当消费者消费,向前移动后,才能跳出循环
// 由于外层判断使用的是缓存的消费者序列最小值,这里使用真实的消费者序列进行判断,并将最新结果在跳出while循环之后进行缓存
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{ // 唤醒等待的消费者,正常情况下并无意义,只是为了避免极少数情况下未知原因导致的发布时锁机制出现异常,未通知到消费者
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
还有一种唤醒就是关闭Disruptor时,消费者关闭前将会处理完当前批次数据(并非RingBuffer的所有数据,而是此次循环取出的最大可用序号以下的所有未处理数据),如果消费者线程当前在等待状态,将被唤醒并终结。
BatchEventProcessor就讲到这。
0x02.4 使用WorkProcessor多线程处理事件
下面说一说WorkHandler+WorkProcessor。
上面讲过,使用EventHandler+BatchEventProcessor这种方式类似JMS的发布订阅,同一个事件会被不同线程的EventHandler并行消费。那么,如果单线程处理能力不足,想多线程处理同一主题下的不同事件该怎么办呢?这种方式就类似JMS的点到点模式,多个消费者可以监听同一个队列,谁先拿到就归谁处理。
在Disruptor中使用WorkHandler+WorkProcessor实现以上功能。当需要使用这种模式,可在设置Disruptor消费者时,通过使用handleEventsWithWorkerPool和thenHandleEventsWithWorkerPool设置消费链。
disruptor
.handleEventsWithWorkerPool(
new WorkHandler[]{
journalHandler,
journalHandler,
journalHandler
}
)
.thenHandleEventsWithWorkerPool(resultHandler);
先看下相关的源码。
// Disruptor
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
return createWorkerPool(new Sequence[0], workHandlers);
}
EventHandlerGroup<T> createWorkerPool(
final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
consumerRepository.add(workerPool, sequenceBarrier);
final Sequence[] workerSequences = workerPool.getWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
}
// WorkerPool.java WorkerPool构造方法
public WorkerPool(
final RingBuffer<T> ringBuffer,
final SequenceBarrier sequenceBarrier,
final ExceptionHandler<? super T> exceptionHandler,
final WorkHandler<? super T>... workHandlers)
{
this.ringBuffer = ringBuffer;
final int numWorkers = workHandlers.length;
workProcessors = new WorkProcessor[numWorkers];
for (int i = 0; i < numWorkers; i++)
{
workProcessors[i] = new WorkProcessor<T>( // 为每个WorkHandler新建一个WorkProcessor
ringBuffer,
sequenceBarrier,
workHandlers[i],
exceptionHandler,
workSequence);
}
}
在使用线程池处理事件时,与单线程处理相比,最大的不同在于新增了一个WorkerPool。WorkerPool用于管理一组WorkProcessor,它的属性、方法如下。
WorkProcessor的原理和BatchEventProcessor类似,只是多了workSequence用来保存同组共用的处理序列。在更新workSequence时,涉及多线程操作,所以使用CAS进行更新。
WorkProcessor的run()方法如下。
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
boolean processedSequence = true;
long cachedAvailableSequence = Long.MIN_VALUE;
long nextSequence = sequence.get();
T event = null;
while (true)
{
try
{
// if previous sequence was processed - fetch the next sequence and set
// that we have successfully processed the previous sequence
// typically, this will be true
// this prevents the sequence getting too far forward if an exception
// is thrown from the WorkHandler
if (processedSequence) // 表示nextSequence序号的处理情况(不区分正常或是异常处理)。只有处理过,才能申请下一个序号。
{
processedSequence = false;
do
{
// 同组中多个消费线程有可能会争抢一个序号,使用CAS避免使用锁。
// 同一组使用一个workSequence,WorkProcessor不断申请下一个可用序号,对workSequence设置成功才会实际消费。
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);
}
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
// 缓存的可用序号比要处理的序号大,才能进行处理
if (cachedAvailableSequence >= nextSequence)
{
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);
processedSequence = true;
}
else // 更新缓存的可用序列。这个cachedAvailableSequence只用在WorkProcessor实例内,不同实例的缓存可能是不一样的
{ // 和单线程模式类似,返回的也是最大可用序号
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
// handle, mark as processed, unless the exception handler threw an exception
exceptionHandler.handleEventException(ex, nextSequence, event);
processedSequence = true;
}
}
notifyShutdown();
running.set(false);
}
代码逻辑和BatchEventProcessor类似,就不再赘述啦。
还有一点需要留意,Disruptor通过EventHandlerGroup代表一个消费者组,就表示之前那四张图中一个水平线上的消费者组。这样不同的消费者组之间不必关心各自的实现,从而可以实现更加复杂和灵活的消费链,即依赖图表。
0x03 消费者小结
从小语文老师就教育我们写作文要总结,好习惯不能忘~
本文主要探讨了Disruptor消费者内部概要实现,重点阐述了BatchEventProcessor、WorkProcess的消费代码原理。同时省略了超时通知、开始和结束通知、异常控制等内容,并非不重要,而只是尽量言简意赅,达到抛砖引玉的目的。
BatchEventProcessor主要用于处理单线程并行任务,同一消费者组的不同消费者会接收相同的事件,并在所有事件处理完毕后进入下一消费者组进行处理(是不是类似JUC里的Phaser、CyclicBarrier或CountDownLatch呢)。WorkProcessor通过WorkerPool管理多个WorkProcessor,达到多线程处理事件的目的,同一消费者组的多个WorkProcessor不会处理同一个事件。通过选择不同的WaitStragegy实现,可以控制消费者在没有可用事件处理时的等待策略。
好啦,有关Disruptor消费者的分享就到这。
欢迎大家留言讨论,一同探讨,一同进步。