Disruptor的版本3.4.2.
List-1
interface ConsumerInfo
{
Sequence[] getSequences();
SequenceBarrier getBarrier();
boolean isEndOfChain();
void start(Executor executor);
void halt();
void markAsUsedInBarrier();
boolean isRunning();
}
如List-1所示,Disruptor本质上是生产者与消费者线程组成的协作框架,而ConsumerInfo则抽象出了消费者的信息,它的实现有EventProcessorInfo、WorkerPoolInfo,这俩个实现是有区别的,先来看EventProcessorInfo的实现,如下List-2
List-2
class EventProcessorInfo<T> implements ConsumerInfo
{
private final EventProcessor eventprocessor;
private final EventHandler<? super T> handler;
private final SequenceBarrier barrier;
private boolean endOfChain = true;
EventProcessorInfo(
final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
{
this.eventprocessor = eventprocessor;
this.handler = handler;
this.barrier = barrier;
}
public EventProcessor getEventProcessor()
{
return eventprocessor;
}
@Override
public Sequence[] getSequences()
{ //eventprocessor.getSequence()作为返回
return new Sequence[]{eventprocessor.getSequence()};
}
public EventHandler<? super T> getHandler()
{
return handler;
}
@Override
public SequenceBarrier getBarrier()
{
return barrier;
}
@Override
public boolean isEndOfChain()
{
return endOfChain;
}
@Override
public void start(final Executor executor)
{
executor.execute(eventprocessor);
}
@Override
public void halt()
{
eventprocessor.halt();
}
/**
*
*/
@Override
public void markAsUsedInBarrier()
{
endOfChain = false;
}
@Override
public boolean isRunning()
{
return eventprocessor.isRunning();
}
}
List-2中,可以看出基本上内部是交给EventProcessor来处理的,只有在getHandler()方法上使用到了EventHandler。
WorkerPoolInfo则不同,如下List-3所示,内部是WorkerPool来处理,而WorkPool内部则有多个WorkerHandler,WorkerHandler内部使用的是WorkProcessor而非EventProcessor,所以基本可以看出与EventProcessorInfo的不同之处在于。
List-3
class WorkerPoolInfo<T> implements ConsumerInfo
{
private final WorkerPool<T> workerPool;
private final SequenceBarrier sequenceBarrier;
private boolean endOfChain = true;
WorkerPoolInfo(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier)
{
this.workerPool = workerPool;
this.sequenceBarrier = sequenceBarrier;
}
@Override
public Sequence[] getSequences()
{
return workerPool.getWorkerSequences();
}
@Override
public SequenceBarrier getBarrier()
{
return sequenceBarrier;
}
@Override
public boolean isEndOfChain()
{
return endOfChain;
}
@Override
public void start(Executor executor)
{
workerPool.start(executor);
}
@Override
public void halt()
{
workerPool.halt();
}
@Override
public void markAsUsedInBarrier()
{
endOfChain = false;
}
@Override
public boolean isRunning()
{
return workerPool.isRunning();
}
}
ConsumerInfo中的Sequence、SequenceBarrier后续单独来分析,因为这俩个类几乎贯穿了Disruptor的整个package实现中。