1. IdleStateHandler的参数 :
第一个参数设置未读时间,第二个参数设置为未写时间,第三个为都未进行操作的时间
一般第一个时间是服务器设置多长时间没有收到客户端消息的时间
第二个是netty客户端设置多长时间给服务器发送一条消息的时间(如果是untiy的需要自己实现)
2. idle具体实现代码:
import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j public class ServerIdleCheckHandler extends IdleStateHandler { public ServerIdleCheckHandler() { super(40, 0, 0, TimeUnit.SECONDS); }
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) { _//40s__没有接受到客户端的消息,就关闭连接 _ log.info("idle check happen, so close the connection"); ctx.close(); return; }
**super**.channelIdle(ctx, evt);
}
}
3. 把实现idle添加到pipeline中
4. 把 idle实现添加到pipeline中具体代码 (也是Netty 服务端启动代码)
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;
_/** _ * @Author: _代 _ * _@Description:netty__服务器配置 _ * @Date: _Created in 10:20 2020/12/29 _ */ @Component _//_实现__ApplicationContextAware__以获得__ApplicationContext__中的所有__bean public class NettyServer implements ApplicationContextAware {
**private static final** Logger _logger_ \= LoggerFactory._getLogger_(NettyServer.**class**);
**private** Channel **channel**;
**private** EventLoopGroup **bossGroup**;
**private** EventLoopGroup **workerGroup**;
@Resource
private HelloServerInHandler helloServerInHandler;
**private** Map<String, Object> **exportServiceMap** \= **new** HashMap<String, Object>();
@Value(**"${dai.server.host}"**)
String **host**;
@Value(**"${rpcServer.ioThreadNum:5}"**)
**int** **ioThreadNum**;
_//__内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值_
@Value("${rpcServer.backlog:1024}") int backlog;
@Value(**"${dai.server.port}"**)
**int** **port**;
_/\*\*
_ * _启动 _ * @throws _InterruptedException _ */ @PostConstruct public void start() { logger.info("begin to start rpc server"); // 主从 Reactor _多线程模式 _ bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(ioThreadNum);
MetricsHandler metricsHandler = **new** MetricsHandler();
ServerBootstrap serverBootstrap = **new** ServerBootstrap();
serverBootstrap.group(**bossGroup**, **workerGroup**)
.channel(NioServerSocketChannel.**class**)
.option(ChannelOption._SO\_BACKLOG_, **backlog**)
_//__注意是__childOption
_ .childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer
}
});
**try** {
**channel** \= serverBootstrap.bind(**host**,**port**).sync().channel();
} **catch** (InterruptedException e) {
**channel**.close();
**return**;
}
_logger_.info(**"========================================================================================"**);
_logger_.info(**"NettyRPC server listening on port "** \+ **port** \+ **" and ready for connections..."**);
_logger_.info(**"========================================================================================"**);
}
@PreDestroy
public void stop() { logger.info("destroy server resources"); if (null == channel) { logger.error("server channel is null"); } bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); bossGroup = null; workerGroup = null; channel = null; }
_/\*\*
_ * _利用此方法获取__spring ioc__接管的所有__bean _ * @param _ctx _ * @throws _BeansException _ */ public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceMap = ctx.getBeansWithAnnotation(ServiceExporter.class); // 获取所有带有 ServiceExporter 注解的 Spring Bean logger.info("获取到所有的RPC服务:{}", serviceMap); if (serviceMap != null && serviceMap.size() > 0) { for (Object serviceBean : serviceMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(ServiceExporter.class) .targetInterface() .getName(); logger.info("register service mapping:{}",interfaceName); exportServiceMap.put(interfaceName, serviceBean); } }else{ System.out.println("kong======================================="); } } }