Disruptor版本是3.4.2.
List-1
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
new IdentityHashMap<>();
private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
new IdentityHashMap<>();
private final Collection<ConsumerInfo> consumerInfos = new ArrayList<>();
...
如List-1所示,ConsumerRepository类名称以Repository最为后缀,Repository来自DDD,是仓储的意思,即与存储有关,而ConsumerRepository中存放的是消费者信息。使用到了JDK的IdentityHashMap,这个map在fastjson中使用到,这里对这个map就不再深入,其底层上使用的数据结构是与HashMap不同的。
当调用Disruptor的handleEventsWith方法时,就会把EventHandler存到ConsumerRepository的eventProcessorInfoByEventHandler、consumerInfos;调用Disruptor的handleEventsWithWorkerPool时,就会把WorkHandler存到ConsumerRepository的eventProcessorInfoBySequence、consumerInfos。
List-2
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
//将eventHandler封装到EventProcessor中
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
如List-2所示,eventHandler被封装到BatchEventProcessor中,再放入到consumerRepository中,进而放入到ConsumerRepository的eventProcessorInfoByEventHandler、consumerInfos。
List-3
EventHandlerGroup<T> createWorkerPool(
final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
consumerRepository.add(workerPool, sequenceBarrier);
final Sequence[] workerSequences = workerPool.getWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}
如List-3所示,多个WorkHandler被封装到一个WorkerPool中,之后该WorkerPool被放入到consumerRepository中,进而放入到ConsumerRepository的eventProcessorInfoBySequence、consumerInfos。