1. 自建hadler 继承 ChannelDuplexHandler
1.1完整代码
package com.lgdz.netty.server;
import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.jmx.JmxReporter; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong;
@Slf4j @ChannelHandler.Sharable public class MetricsHandler extends ChannelDuplexHandler {
**private** AtomicLong **totalConnectionNumber** \= **new** AtomicLong();
{
MetricRegistry metricRegistry = **new** MetricRegistry();
metricRegistry.register(**"totalConnectionNumber"**, **new** Gauge<Long>() {
@Override
public Long getValue() { return totalConnectionNumber.longValue(); } });
ConsoleReporter consoleReporter = ConsoleReporter._forRegistry_(metricRegistry).build();
consoleReporter.start(10, TimeUnit._SECONDS_); **//10秒检测一次TCP客户端连接数**
JmxReporter jmxReporter = JmxReporter._forRegistry_(metricRegistry).build();
jmxReporter.start();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { totalConnectionNumber.incrementAndGet(); super.channelActive(ctx); }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { totalConnectionNumber.decrementAndGet(); super.channelInactive(ctx); }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { _//_在这里可以处理硬件发送过来的数据 _// log.debug("数据对象长度:" + ((byte[]) msg).length); //java.lang.ClassCastException: java.lang.String cannot be cast to [B _ } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { super.write(ctx, msg, promise); } }
2.把 metricsHandler 添加到 pipeline 。
2.1完整代码
package com.lgdz.netty.server;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;
_/** _ * @Author: _代 _ * _@Description:netty__服务器配置 _ * @Date: _Created in 10:20 2020/12/29 _ */ @Component _//_实现__ApplicationContextAware__以获得__ApplicationContext__中的所有__bean public class NettyServer implements ApplicationContextAware {
**private static final** Logger _logger_ \= LoggerFactory._getLogger_(NettyServer.**class**);
**private** Channel **channel**;
**private** EventLoopGroup **bossGroup**;
**private** EventLoopGroup **workerGroup**;
@Resource
private HelloServerInHandler helloServerInHandler;
**private** Map<String, Object> **exportServiceMap** \= **new** HashMap<String, Object>();
@Value(**"${dai.server.host}"**)
String **host**;
@Value(**"${rpcServer.ioThreadNum:5}"**)
**int** **ioThreadNum**;
_//__内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值_
@Value("${rpcServer.backlog:1024}") int backlog;
@Value(**"${dai.server.port}"**)
**int** **port**;
_/\*\*
_ * _启动 _ * @throws _InterruptedException _ */
@PostConstruct public void start() { logger.info("begin to start rpc server"); // 主从 Reactor _多线程模式 _ bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(ioThreadNum);
MetricsHandler metricsHandler = **new** MetricsHandler();
ServerBootstrap serverBootstrap = **new** ServerBootstrap();
serverBootstrap.group(**bossGroup**, **workerGroup**)
.channel(NioServerSocketChannel.**class**)
.option(ChannelOption._SO\_BACKLOG_, **backlog**)
_//__注意是__childOption
_ .childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer
_/\* .addLast("decoder",new MyDecode())
_ .addLast("encoder",new MyEncode())*/ /* .addLast(new MyCustomMessageDecoder()) .addLast(new MyEncode())*/ .addLast("metricHandler", metricsHandler); //检测客户端连接数
}
});
**try** {
**channel** \= serverBootstrap.bind(**host**,**port**).sync().channel();
} **catch** (InterruptedException e) {
**channel**.close();
**return**;
}
_logger_.info(**"========================================================================================"**);
_logger_.info(**"NettyRPC server listening on port "** \+ **port** \+ **" and ready for connections..."**);
_logger_.info(**"========================================================================================"**);
}
@PreDestroy
public void stop() { logger.info("destroy server resources"); if (null == channel) { logger.error("server channel is null"); } bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); bossGroup = null; workerGroup = null; channel = null; }
_/\*\*
_ * _利用此方法获取__spring ioc__接管的所有__bean _ * @param _ctx _ * @throws _BeansException _ */ public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceMap = ctx.getBeansWithAnnotation(ServiceExporter.class); // 获取所有带有 ServiceExporter 注解的 Spring Bean logger.info("获取到所有的RPC服务:{}", serviceMap); if (serviceMap != null && serviceMap.size() > 0) { for (Object serviceBean : serviceMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(ServiceExporter.class) .targetInterface() .getName(); logger.info("register service mapping:{}",interfaceName); exportServiceMap.put(interfaceName, serviceBean); } }else{ System.out.println("kong======================================="); } } }