最近阅读了kafka network包的源码,主要是想了解下kafka底层通信的一些细节,这部分都是用NIO实现的,并且用的是最基本的NIO实现模板,代码阅读起来也比较简单。抛开zookeeper这部分的通信不看,我们就看最基本的producer和consumer之间的基于NIO的通信模块。在network中主要包含以下类:
我们挑选几个最主要的类说明,先从SocketServer的描述看起:
/**
* An NIO socket server. The thread model is
* 1 Acceptor thread that handles new connections
* N Processor threads that each have their own selectors and handle all requests from their connections synchronously
*/
在 SocketServer 中采用 processors 数组保存 processor
Private val processors = new Array[Processor](numProcessorThreads)
在AbstractServerThread继承了runnable,其中采用闭锁控制开始和结束,主要作用是为了实现同步。同时打开selector,为后续的继承者使用。
protected val selector = Selector.open();
protected val logger = Logger.getLogger(getClass())
private val startupLatch = new CountDownLatch(1)
private val shutdownLatch = new CountDownLatch(1)
private val alive = new AtomicBoolean(false)
这个类是后续讲到的两个类的基类,并且闭锁的应用是整个同步作用实现的关键,我们看一组 stratup 的闭锁操作,其中 Unit 在 scala 语法中你可以把他认为是 void ,也就是方法的返回值为空:
/**
* Wait for the thread to completely start up
*/
def awaitStartup(): Unit = startupLatch.await
/**
* Record that the thread startup is complete
*/
protected def startupComplete() = {
alive.set(true)
startupLatch.countDown
}
Acceptor继承了AbstractServerThread,虽然叫Acceptor,但是它并没有单独拿出来使用,而是直接被socketServer引用,这点在命名和使用上与一般的通信框架不同:
private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {
这个类中主要实现了ServerSocketChannel的相关工作:
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
serverChannel.socket.bind(new InetSocketAddress(port))
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
logger.info("Awaiting connections on port " + port)
startupComplete()
其内部操作和NIO一样:
/*
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
val socketChannel = serverSocketChannel.accept()
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)
if (logger.isDebugEnabled()) {
logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize()
+ "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]")
}
processor.accept(socketChannel)
}
Procesor类继承了abstractServerThread,其实主要是在Acceptor类中的accept方法中,又新启一个线程来处理读写操作:
private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
val time: Time,
val stats: SocketServerStats,
val maxRequestSize: Int) extends AbstractServerThread
所以整个kafka中使用的NIO的模型可以归结为下图:
socketServer中引用Acceptor处理多个client过来的connector,并为每个connection创建出一个processor去单独处理,每个processor中均引用独立的selector。
整体来说,这样的设计和我们在用NIO写传统的通信没有什么区别,只是这里在同步上稍微做了点儿文章。更详细的网络操作还是请看mina系列的分析。