Jetty的请求入口
ServerConnector.java
的 accepted
方法(ServerSocketChannel
#accept
后的处理逻辑)。
Jetty的请求流程
一个请求的流程:
1.
Acceptor
监听连接请求,当有连接请求到达时就接受连接,一个连接对应一个 Channel,Acceptor 将 Channel 交给 ManagedSelector 来处理。2.
ManagedSelector
把 Channel 注册到 Selector 上,并创建一个EndPoint
和Connection
跟这个 Channel 绑定,接着就不断地检测 I/O 事件。3.I/O 事件到了就调用
EndPoint
的方法拿到一个 Runnable,并扔给线程池执行。4.线程池中调度某个线程执行 Runnable。
5.Runnable 执行时,调用回调函数,这个回调函数是
Connection
注册到EndPoint
中的。6.回调函数内部实现,其实就是调用
EndPoint
的接口方法来读数据。7.
Connection
解析读到的数据,生成请求对象并交给Handler
组件去处理。
SelectorManager
Jetty 的 Selector 由 SelectorManager
类管理,而被管理的 Selector 叫作 ManagedSelector
。 (这里SelectorManager
的具体实现类是 ServerConnector
的内部类 ServerConnectorManager
。) SelectorManager 内部有一个 ManagedSelector 数组,真正干活的是 ManagedSelector。 主要做两个事情:
选择一个
ManagedSelector
来处理 Channel提交一个任务 Accept 给
ManagedSelector
// SelectorManager.java public void accept(SelectableChannel channel, Object attachment) { // 选择一个 ManagedSelector 来处理 Channel final ManagedSelector selector = chooseSelector(); // 提交一个任务 Accept 给 ManagedSelector selector.submit(selector.new Accept(channel, attachment)); }
ManagedSelector
ManagedSelector
在处理这个任务Accept
主要做了两步:
第一步,调用 Selector 的
register
方法把 Channel 注册到 Selector 上,拿到一个 SelectionKey。// ManagedSelector$Accept.java 内部类 @Override public void update(Selector selector){ try{ // 把 Channel 注册到 Selector 上,拿到一个 SelectionKey key = channel.register(selector, 0, attachment); execute(this); // 执行当前Runnable, 跳转 run() } catch (Throwable x) { IO.close(channel); _selectorManager.onAcceptFailed(channel, x); LOG.debug(x); } } @Override public void run(){ try{ // 创建一个 EndPoint 和 Connection,并跟这个 SelectionKey(Channel)绑在一起 createEndPoint(channel, key); _selectorManager.onAccepted(channel); } catch (Throwable x){ LOG.debug(x); failed(x); } }
第二步,创建一个
EndPoint
和Connection
,并跟这个 SelectionKey(Channel)绑在一起:// ManagedSelector.java private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException{ //1. 创建 Endpoint EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); //2. 创建 Connection Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment()); //3. 把 Endpoint、Connection 和 SelectionKey 绑在一起 endPoint.setConnection(connection); selectionKey.attach(endPoint); endPoint.onOpen(); endPointOpened(endPoint); // 将Connection 注入到 EndPoint, 跳转 Connection 的 onOpen 方法 (内部转 fillInterested) _selectorManager.connectionOpened(connection); }
如上,HttpConnection
(Connection 的具体实现类之一) 并不会主动向 EndPoint
读取数据,而是向在 EndPoint
中注册一堆回调方法:
// HttpConnection.java
public void onOpen(){
super.onOpen();
if (isRequestBufferEmpty())
fillInterested();
else
getExecutor().execute(this);
}
public void fillInterested(){
// 告诉 EndPoint,数据到了你就调我这些回调方法 _readCallback 吧,有点异步 I/O 的感觉,也就是说 Jetty 在应用层面模拟了异步 I/O 模型。
getEndPoint().fillInterested(_readCallback);
}
ManagedSelector 的 EatWhatYouKill
这时候,ManagedSelector
启动时候的任务 EatWhatYouKill
的 无限循环,监测到SelectorProducer
的 processSelected
方法选择 出一个 Runnable 则,会自动执行。 这个 Runnable 从上步骤来看,则是 调用EndPoint
的 onSelected 方法返回一个 Runnable,然后把这个 Runnable 直接执行或扔给线程池执行。
// ManagedSelector.java
_selectorManager.execute(_strategy::produce);
// ManagedSelector$SelectorProducer.java
public Runnable produce() {
while (true) {
// 查看key绑定的 EndPoint 是否有可执行的回调函数 (读 或 写 或 NULL)
// 获得后递交全局线程池执行
// 或 ManagedSelector$Acceptor 执行接受客户端连接的函数, 返回 NULL
Runnable task = processSelected();
if (task != null) {
return task;
}
// 执行 ManagedSelector$Acceptor 注册 SelectionKey.OP_ACCEPT 接受事件
processUpdates();
// 执行 Endpoint 的 update 方法,切换注册 读、写事件
updateKeys();
// 唤醒select, 获取就绪的key
if (!select()) {
return null;
}
}
}
具体里面就是 调用 EndPoint
的 onSelected
方法
// ManagedSelector$SelectorProducer.java
...
if (attachment instanceof Selectable){
// Try to produce a task
Runnable task = ((Selectable)attachment).onSelected();
if (task != null)
return task;
}
EndPoint (ChannelEndPoint)
(这里EndPoint
的具体实现类是 ChannelEndPoint
)
这里怎么选择出这个 Runnable 呢??
// ChannelEndPoint.java
public Runnable onSelected() {
....
boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
// return task to complete the job
Runnable task = fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);
return task;
}
很简单,依据 selector 监听到连接上是读就绪(channel通道中有数据可读),还算写就绪(channel通道中有数据可写)。
Connection (HttpConnection)
这里如果是读,则进来
AbstractConnection$ReadCallback
,即是HttpConnection
的 onFillable() 方法调用 EndPoint 的接口去读数据,读完后让 HTTP 解析器
HttpParser
去解析字节流,HTTP 解析器会将解析后的数据,包括请求行、请求头相关信息存到 Request 对象里, 然后丢给我们的 第一个Handler。//HttpConnection.java
public void onFillable(){
// HTTP 解析器去解析字节流 boolean handle = parseRequestBuffer(); ... // 将解析后的数据,包括请求行、请求头相关信息存到 Request 对象, 然后丢给我们的 第一个Handler if (handle){ boolean suspended = !_channel.handle(); ...
}
//HttpChannel.java
public boolean handle(){
...
dispatch(DispatcherType.REQUEST, () -> {
for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers()) { customizer.customize(getConnector(), _configuration, _request); if (_request.isHandled()) return; } getServer().handle(HttpChannel.this); });
....
}
比如HttpChannelOverHttp
(HttpChannel的具体实现类之一)
- 处理完后,
AbstractConnection$SendCallback
调用 EndPoint的 写方法
相关
Jetty源码导读一:启动过程 [https://my.oschina.net/langxSpirit/blog/3144656)
by 斯武丶风晴 https://my.oschina.net/langxSpirit