Netty为许多通用协议提供了编解码器和处理器,几乎可以开箱即用,这减少了你在那些相当繁琐的事务上本来会花费的时间与精力。我们将探讨这些工具以及它们所带来的好处,其中包括Netty对于SSL/TLS和WebSocket的支持,以及如何简单地通过数据压缩来压榨HTTP,以获取更好的性能。
1、通过SSL/TLS保护Netty应用程序
SSL和TLS这样的安全协议,它们层叠在其他协议之上,用以实现数据安全。我们在访问安全网站时遇到过这些协议,但是它们也可用于其他不是基于HTTP的应用程序,如安全SMTP(SMTPS)邮件服务器甚至是关系型数据库系统。
为了支持SSL/TLS,Java提供了javax.net.ssl包,它的SSLContext和SSLEngine类使得实现解密和加密相当简单直接。Netty通过一个名为SslHandler的ChannelHandler实现利用了这个API,其中SslHandler在内部使用了SSLEngine来完成实际的工作。
下图展示了使用SslHandler的数据流 以下代码展示了如何使用ChannelInitializer来将SslHandler添加到ChannelPipeline中。
public class SSLChannelInitializer extends ChannelInitializer<Channel>{ private final SslContext context; private final boolean startTls; //如果设置为true,第一个写入的消息将不会被加密(客户端应该设置为true) public SSLChannelInitializer(SslContext context, boolean startTls) { this.context = context; this.startTls = startTls; } @Override protected void initChannel(Channel ch) throws Exception { //对于每个SslHandler实例,都使用Channel的ByteBufAllocator从SslCOntext获取一个新的SSLEngine SSLEngine engine = context.newEngine(ch.alloc()); //将SslHandler作为第一个ChannelHandler添加到ChannelPipeline中 ch.pipeline().addLast("ssl",new SslHandler(engine,startTls)); } }
在大多数情况下,SslHandler将是ChannelPipeline中的第一个ChannelHandler。这确保了只有在所有其他的ChannelHandler将它们的逻辑应用到数据之后,才会进行加密。
例如,在握手阶段,两个节点将相互验证并且商定一种加密方式。你可以通过配置SslHandler来修改它的行为,或者在SSL/TLS握手一旦完成之后提供通知,握手阶段完成之后,所有的数据都将会被加密。SSL/TLS握手将会被自动执行。
2、构建基于Netty的HTTP/HTTPS应用程序
HTTP/HTTPS是最常见的协议套件之一,并且随着智能手机的成功,它的应用也日益广泛,因为对于任何公司来说,拥有一个可以被移动设备访问的网站几乎是必须的。这些协议也被用于其他方面。
HTTP是基于请求/响应模式的:客户端向服务器发送一个HTTP请求,然后服务器将会返回一个HTTP响应。Netty提供了多种编码器和解码器以简化对这个协议的使用。
下图分别展示了生产和消费HTTP请求和HTTP响应的方法。 正如上图所示,一个HTTP请求/响应可能由多个数据部分组成,并且它总是以一个LastHttpContent部分作为结束。FullHttpRequest和FullHttpResponse消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的HTTP消息都实现HttpObject接口。
以下代码中的HttpPipelineInitializer类展示了将HTTP支持添加到你的应用程序时多么简单——几乎只需要将正确的ChannelHandler添加到ChannelPipeline中。
public class HttpPipelineInitializer extends ChannelInitializer<Channel>{ private final boolean client; public HttpPipelineInitializer(boolean client) { this.client = client; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (client){ //如果是客户端,则添加HttpResponseDecoder以处理来自服务器的响应 pipeline.addLast("decoder",new HttpResponseDecoder()); //添加HttpResponseEncoder以向服务器发送请求 pipeline.addLast("encoder",new HttpResponseEncoder()); } else { //如果是服务器,则添加HttpRequestDecoder以接收来自客户端的请求 pipeline.addLast("decoder",new HttpRequestDecoder()); //添加HttpRequestEncoder以向客户端发送响应 pipeline.addLast("encoder",new HttpRequestEncoder()); } } }
3、聚合HTTP消息
在ChannelInitializer将ChannelHandler安装到ChannelPipeline中之后,你便可以处理不同类型的HttpObject消息了。但是由于HTTP的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。为了消除这项繁琐的任务,Netty提供了一个聚合器,它可以将多个消息部分合并为FullHttpRequest或者FullHttpResponse消息。通过这样的方式,你将总是看到完整的消息内容。
由于消息分段需要被缓冲,直到可以转发一个完整的消息给下一个ChannelInboundHandler,所以这个操作有轻微的开销。其所带来的好处便是你不必关心消息碎片了。
引入这种自动聚合机制只不过是向ChannelPipeline中添加另外一个ChannelHandler罢了。如以下代码所示。
public class HttpAggregatorInitializer extends ChannelInitializer<Channel>{ private final boolean isClient; public HttpAggregatorInitializer(boolean isClient) { this.isClient = isClient; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (isClient) { //如果是客户端,则添加HttpClineCodec pipeline.addLast("codec",new HttpClientCodec()); } else { //如果是服务端,则添加HttpServerCodec pipeline.addLast("codec",new HttpServerCodec()); } //将最大的消息大小为512KB的HttpObjectAggregator添加到ChannelPipeline pipeline.addLast("aggregator",new HttpObjectAggregator(512 * 1024)); } }
4、HTTP压缩
当使用HTTP时,建议开启压缩功能以尽可能多地减少传输数据的大小。虽然压缩会带来一些CPU时钟周期上的开销,但是通常来说它都是一个好主意,特别是对于文本数据来说。
Netty为压缩和解压缩提供了ChannelHandler实现,它们同时支持gzip和deflate编码。
HTTP请求的头部信息,客户端可以通过提供以下头部信息来指示服务器它所支持的压缩格式:
Get /encrypted-area HTTP/1.1
Host: www.example.com
Accept-Encoding:gzip,deflate
然而,需要注意的是,服务器没有义务压缩它所发送的数据。
以下代码展示了一个例子。
public class HttpCompressionInitializer extends ChannelInitializer<Channel>{ private final boolean isClient; public HttpCompressionInitializer(boolean isClient) { this.isClient = isClient; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (isClient) { //如果是客户端,则添加HttpClientCodec pipeline.addLast("codec",new HttpClientCodec()); //如果是客户端,则添加HttpContentDecompressor以处理来自服务器的压缩内容 pipeline.addLast("decompressor",new HttpContentDecompressor()); } else { //如果是服务器,则添加HttpServerCodec pipeline.addLast("codec",new HttpServerCodec()); //如果是服务器,则添加HttpContentCompressor来压缩数据 pipeline.addLast("decompressor",new HttpContentCompressor()); } } }
5、使用HTTPS
以下代码显示,启用HTTPS只需要将SslHandler添加到ChannelPipeline的ChannelHandler组合中。
public class HttpsCodecInitializer extends ChannelInitializer<Channel>{ private final SslContext context; private final boolean isClient; public HttpsCodecInitializer(SslContext context, boolean isClient) { this.context = context; this.isClient = isClient; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); SSLEngine engine = context.newEngine(channel.alloc()); //将SslHandler添加到ChannelPipeline中以使用HTTPS pipeline.addLast("ssl",new SslHandler(engine)); if (isClient) { //如果是客户端,则添加HttpClientCodec pipeline.addLast("codec",new HttpClientCodec()); } else { //如果是服务器,则添加HttpServerCodec pipeline.addLast("codec",new HttpServerCodec()); } } }
以上例子,说明了Netty的架构方式是如何将代码重用变为杠杆作用的。只需要简单地将一个ChannelHandler添加到ChannelPipeline中,便可以提供一项新的功能,甚至像加密这样重要的功能都能提供。
6、WebSocket
WebSocket解决了一个长期存在的问题:既然底层的协议(HTTP)是一个请求/响应模式的交互序列,那么如何实时地发布信息?AJAX提供了一定程度上的改善,但是数据流仍然是由客户端所发送的请求驱动。
WebSocket规范以及它的实现代表了对一种更加有效的解决方案的尝试。简单地说,WebSocket提供了“在一个单个的TCP连接上提供双向的通信·······结合WebSocketAPI·····它为网页和远程服务器之间的双向通信提供了一种替代HTTP轮询的方案。”
WebSocket在客户端和服务器之间提供了真正的双向数据交换。WebSocket现在可以用于传输任意类型的数据,很像普通的套接字。
下图给出了WebSocket协议的一般概念。在这个场景下,通信将作为普通的HTTP协议开始,随后升级到双向的WebSocket协议。 要想向你的应用程序中添加对于WebSocket的支持,你需要将适当的客户端或者服务器WebSocket ChannelHandler添加到ChannelPipeline中。这个类将处理由WebSocket定义的称为帧的特殊消息类型。
因为Netty主要是一种服务器端的技术,所以在这里我们重点创建WebSocket服务器。代码如下所示,这个类处理协议升级握手,以及3种控制帧——Close、Ping和Pong。Text和Binary数据帧将会被传递给下一个ChannelHandler进行处理。
public class WebSocketServerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast( new HttpServerCodec(), //为握手提供聚合的HttpRequest new HttpObjectAggregator(65535), //如果被请求的端点是“/websocket”则处理该升级握手 new WebSocketServerProtocolHandler("/websocket"), new TextFrameHandler(), new BinaryFrameHandler(), new ContinuationFrameHandler() ); } public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { //Handle text frame } } public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame) throws Exception { //Handle binary frame } } public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ContinuationWebSocketFrame continuationWebSocketFrame) throws Exception { //Handle continuation frame } } }
保护WenSocket:要想为WebSocket添加安全性,只需要将SslHandler作为第一个ChannelHandler添加到ChannelPipeline中。
7、空闲的连接和超时
只要你有效地管理你的网络资源,这些技术就可以使得你的应用程序更加高效、易用和安全。
检测空闲连接以及超时对于及时释放资源来说是至关重要的。
让我们仔细看看在实践中使用得最多的IdleStateHandler。以下代码展示了当使用通常的发送心跳信息到远程节点的方法时,如果在60秒之内没有接收或发送任何的数据,我们将如何得到通知;如果没有响应,则连接会被关闭。
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //IdleStateHandler将在被触发时发送一个IdleStateEvent事件 pipeline.addLast(new IdleStateHandler(0,0,60, TimeUnit.SECONDS)); pipeline.addLast(new HeartbeatHandler()); } //实现userEventTriggered方法以发送心跳消息 public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter{ private static final ByteBuf HEARTBEAT_SEQUENCE = //发送到远程节点的心跳消息 Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //发送心跳消息,并在发送失败时关闭该连接 if (evt instanceof IdleStateEvent){ ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt); } } } }
以上示例演示了如何使用IdleStateHandler来测试远程节点是否仍然还活着,并且在它失活时通过关闭连接来释放资源
如果连接超过60秒没有接收或者发送任何的数据,那么IdleStateHandler将会使用一个IdleStateEvent事件来调用fireUserEventTriggered()方法。HeartbeatHandler实现了userEventTriggered()方法,如果这个方法检测到IdleStateEvent事件,它将会发送心跳消息,并且添加一个将在发送操作失败时关闭该连接的ChannelFutureListener。
8、解码基于分隔符的协议和基于长度的协议
基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或消息段(通常被称为帧)的开头或者结尾。由RFC文档正式定义的许多协议(如SMTP、POP3、IMAP以及Telnet)都是这样。
下图展示了当帧由行尾序列\r\n分割时是如何被处理的。 以下代码展示了如何使用LineBasedFrameDecoder来处理上图的场景。
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //该LineBasedFrameDecoder将提取的帧转发给下一个ChannelInboundHandler pipeline.addLast(new LineBasedFrameDecoder(64*1024)); //添加FrameHandler以接收帧 pipeline.addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override //传入单个帧的内容 protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // do something with the data extracted from the frame } } }
如果你正在使用除了行尾符之外的分隔符的帧,那么你可以以类似的方法使用DelimiterBasedFrameDecoder,只需要将特定的分隔符序列指定到其构造函数即可。
这些解码器是实现你自己的基于分隔符的协议的工具。作为示例,我们将使用下面的协议规范:
——传入数据流是一系列的帧,每个帧都由换行符(\n)分割
——每个帧都由一系列的元素组成,每个元素都由单个空格字符分割
——一个帧的内容代表了一个命令、定义为一个命令名称后跟着数目可变的参数
我们用于这个协议的自定义解码器将定义以下类:
——Cmd——将帧(命令)的内容存储在ByteBuf中,一个ByteBuf用于名称,另一个用于参数
——CmdDecoder——从被重写了的decode()方法中获取一行字符串,并从它的内容构建一个Cmd的实例
——CmdHandler——从CmdDecoder获取解码的Cmd对象,并对它进行一些处理;
——CmdHandlerinitializer——为了简便起见,我们将会把前面的这些类定义为专门的ChannelInitializer的嵌套类,其将会把这些ChannelInboundHandler安装到ChannelPipeline中。
以下代码,这个解码器的关键是扩展LineBasedFrameDecoder。
public class CmdHandlerInitializer extends ChannelInitializer<Channel>{ public static final byte SPACE = (byte)' '; @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //添加CmdDecoder以提取Cmd对象,并将它转发给下一个ChannelInboundHandler pipeline.addLast(new CmdDecoder(64*1024)); //添加CmdHandler以接收和处理Cmd对象 pipeline.addLast(new CmdHandler()); } //Cmd POJO public static final class Cmd{ private final ByteBuf name; private final ByteBuf args; public Cmd(ByteBuf name, ByteBuf args) { this.name = name; this.args = args; } public ByteBuf getName() { return name; } public ByteBuf getArgs() { return args; } } public static final class CmdDecoder extends LineBasedFrameDecoder{ public CmdDecoder(int maxLength) { super(maxLength); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { //从ByteBuf中提取由行尾符序列分隔的帧 ByteBuf frame = (ByteBuf) super.decode(ctx, buffer); if (frame == null){ //如果输入中没有帧,则返回null return null; } //查找第一个空格字符的索引 int index = frame.indexOf(frame.readerIndex(),frame.writerIndex(),SPACE); //使用包含有命令名称和参数的切片创建新的Cmd对象 return new Cmd(frame.slice(frame.readerIndex(),index),frame.slice(index + 1,frame.writerIndex())); } } public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Cmd cmd) throws Exception { //处理传经ChannelPipeline的Cmd对象 // do something with the command } } }
9、基于长度的协议
基于长度的协议通过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记它的结束。
下图展示了FixedLengthFrameDecoder的功能,其在构造时已经指定了帧长度为8字节。 你将经常会遇到被编码到消息头部的帧大小不是固定值的协议。为了处理这种变长帧,你可以使用LengthFieldBasedFrameDecoder,它将从头部字段确定帧长,然后从数据流中提取指定的字节数。
下图展示了示例,其中长度字段在帧中的偏移量为0,并且长度为2字节。 LengthFieldBasedFrameDecoder提供了几个构造函数来支持各种各样的头部配置情况。以下代码展示了如何使用其3个构造参数分别为maxFrameLength、lengthFieldOffset和lengthFieldLength的构造函数。在这个场景中,帧的长度被编码到了帧起始的前8个字节中。
public class LengthFieldBasedFrameDecoder extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new io.netty.handler.codec.LengthFieldBasedFrameDecoder(64*1024,0,8)); //添加FrameHandler以处理每个帧 pipeline.addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { //处理帧的数据 // Do something with the frame } } }
10、写大型数据
因为网络饱和的可能性,如何在异步框架中高效地写大块的数据是一个特殊的问题。由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险,所以在写大型数据时,需要准备好处理到远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。
NIO的零拷贝特性,这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。所有的一切都发生在Netty的核心中,所以应用程序所有需要做的就是使用一个FileRegion接口的实现,其在Netty的API文档中的定义是:“通过支持零拷贝的文件传输的Channel来发送的文件区域。”
以下代码展示了如何通过从FileInputStream创建一个DefaultFileRegion,并将其写入Channel,从而利用零拷贝特性来传输一个文件的内容。
//创建一个FileInputStream
FileInputStream in = new FileInputStream(file);
FileRegion region = new DefaultFileRegion(
in.getChannel(),0,file.length()); //发送该DefaultFileRegion,并注册一个ChannelFutureListener channel.writeAndFlush(region).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { //处理失败 Throwable cause = channelFuture.cause(); // Do something } } } );
这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗。
关键是interface ChunkedInput ,其中类型参数B是readChunk()方法返回的类型。Netty预置了该接口的4个实现。
以下代码说明了ChunkedStream的用法,它是实践中最常用的实现。所示的类使用了一个File以及一个SslContext进行实例化。当initChannel()方法被调用时,它将使用所示的ChannelHandler链初始化该Channel。当Channel的状态变为活动时,WriteStreamHandler将会逐块地把来自文件中的数据作为ChunkedStream写入。数据在传输之前将会由SslHandler加密。
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel>{ private final File file; private final SslContext sslCtx; public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) { this.file = file; this.sslCtx = sslCtx; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //将SslHandler添加到ChannelPipeline中 pipeline.addLast(new SslHandler(sslCtx.newEngine(channel.alloc()))); //添加ChunkedWriteHandler以处理作为ChunkedInput传入的数据 pipeline.addLast(new ChunkedWriteHandler()); //一旦连接建立,WriteStreamHandler就开始写文件数据 pipeline.addLast(new WriteStreamHandler()); } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter{ //当连接建立时,channelActive方法将使用ChunkedInput写文件数据 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ctx.writeAndFlush( new ChunkedStream(new FileInputStream(file))); } } }
逐块输入:要使用你自己的ChunkedInput实现,请在ChannelPipeline中安装一个ChunkedWriteHandler
11、JDK序列化数据
JDK提供了ObjectOutputStream和ObjectInputStream,用于通过网络对POJO的基本数据类型和图进行序列化和反序列化。该API并不复杂,而且可以被应用于任何实现了java.io.Serializable接口的对象。但是它的性能也不是非常高效。
如果你的应用程序必须要和使用了ObjectOutputStream和ObjectInputStream的远程节点交互,并且兼容性也是你最关心的,那么JDK序列化将是正确的选择。
12、使用了JBoss Marshalling进行序列化
如果你可以自由地使用外部依赖,那么JBoss Marshalling将是一个理想的选择:他比JDK序列化最多快3倍,而且也更加紧凑。
以下代码展示了如何使用MarshallingDecoder和MarshallingEncoder。同样,几乎只是适当地配置ChannelPipeline罢了。
public class MarshallingInitializer extends ChannelInitializer<Channel>{ private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) { this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new MarshallingDecoder(unmarshallerProvider)); pipeline.addLast(new MarshallingEncoder(marshallerProvider)); pipeline.addLast(new ObjectHandler()); } public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception { //do something } } }
13、通过Protocol Buffers序列化
Netty序列化的最后一个解决方案是利用Protocol Buffers的编解码器,它是一个由Google公司开发的、现在已经开源的数据交换格式。
Protocol Buffers以一种紧凑而高效的方式对结构化的数据进行编码以及解码。它具有许多编程语言绑定,使得它很适合跨语言的项目。
在这里我们又看到了,使用protobuf只不过是将正确的ChannelHandler添加到ChannelPipeline中,如下代码。
public class ProtoBufInitializer extends ChannelInitializer<Channel>{ private final MessageLite lite; public ProtoBufInitializer(MessageLite lite) { this.lite = lite; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new ProtobufDecoder(lite)); pipeline.addLast(new ObjectHandler()); } public static final class ObjectHandler extends SimpleChannelInboundHandler<Object>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { // do something with the object } } }