本文通过一个实例来讲解如何使用 Netty 框架来开发网络协议服务器,项目使用 Gradle 工具来构建和运行,并且支持 Docker 部署。项目代码已在 GitHub 开源,JW Netty Demo。
Netty 简介
Netty 是一个异步、事件驱动的网络应用框架,使用它可以快速开发出可维护良好的、高性能的网络协议服务器。它大幅简化和流程化了网络编程,比如 TCP 和 UDP 套接字服务器开发。难能可贵的是,在保证快速和易用性的同时,使用 Netty 开发的应用并没有丧失可维护性和性能。
上图是来自于官网的 Netty 架构图,可以看到整体结构非常清晰,每一层各司其职。
项目介绍
本项目实现了一个用来接收和存储传感器数据的 TCP 网络协议服务器。这些数据由连接在硬件设备上的传感器采集,然后由硬件设备上报到服务器。一个硬件设备可以连接多个传感器,每次上报硬件设备会把所有传感器的数据一并上报。服务端需要计算所有传感器数据的平均值,并保存起来。这是一个典型的物联网数据采集场景。
协议
考虑到物联网环境硬件设备性能不高、网络带宽较小且稳定性不够,我们需要尽可能降低协议编解码开销、减少报文大小。因此最好使用二进制协议,而不是 JSON 这样的文本格式。
本项目采用的二进制协议如下:
| 起始位(2 bytes) | 报文总长(2 bytes) | 协议版本(1 byte) | 设备号(4 bytes) | 时间戳(4 bytes) | 数据项长度(1 byte) | 数据项值 | 更多数据项... | 校验和(2 bytes) |
- 所有段都是无符号整数,字节顺序为网络字节序
- 起始位固定为
0x55 0xaa
,用来防止网络传输过程中数据错乱 - 报文总长包含了起始位和最后的校验和
- 数据项值占用的字节数由其前面一个段的值决定
- 数据项可以有很多个,只要报文总长不超过 65535 就行
代码解读
目录结构
.
├── Dockerfile # Docker 镜像构建配置文件
├── README.md
├── bin
├── build
├── build.gradle # Gradle 构建配置文件
├── docker-compose.yml # Docker Compose 配置文件
├── gradle # Gradle 目录
├── gradlew # Gradle 包装脚本,通过它来执行构建任务
├── gradlew.bat # Windows 平台下的包装脚本
├── settings.gradle # Gradle 设置
└── src
├── main
│ ├── java
│ │ └── net
│ │ └── jaggerwang
│ │ └── jwnettydemo
│ │ ├── Main.java # 应用入口
│ │ ├── config # 配置
│ │ │ ├── ApplicationConfig.java
│ │ │ └── MongoDBConfig.java
│ │ ├── decoder # 报文解码器
│ │ │ ├── Message.java
│ │ │ └── MessageDecoder.java
│ │ ├── handler # 报文处理器
│ │ │ └── MessageHandler.java
│ │ └── saver # 报文保存器
│ │ ├── Metric.java
│ │ └── MetricSaver.java
│ └── resources
│ ├── application.properties # 应用配置
│ └── log4j2.xml # 日志配置
└── test
└── java
└── net
└── jaggerwang
└── jwnettydemo
├── decoder # 报文解码单元测试
│ └── MessageDecoderTests.java
└── saver # 报文保存单元测试
└── MetricSaverTests.java
可以看到,项目结构为典型的 Java 项目结构。
配置文件
通过配置文件,使得同一套代码可以在不同环境中都能运行。为了不为每套环境都维护一套配置文件,本项目将依赖环境的配置抽取了出来,使得运行时可以通过环境变量来覆盖指定配置项。这样各个环境的配置文件就保持了一致,可以共用一份。
配置文件有两个,一个是应用配置,一个是日志配置。日志组件用的是 Log4j 2,配置文件里环境变量的嵌入格式遵循的是 Log4j 风格,包括默认值的指定方式。
应用配置文件内容如下:
# Path
path.app=${PATH_APP:-/Users/jagger/projects/jw/jw-netty-demo}
path.data=${PATH_DATA:-/Users/jagger/data/jw/jw-netty-demo}
# Server
server.port=${SERVER_PORT:-8080}
# MongoDB
mongodb.uri=${MONGODB_URI:-mongodb://localhost:27017/}
mongodb.db=${MONGODB_DB:-jw_netty_demo}
核心代码
整个报文处理过程分两步,第一步解码,由类 MessageDecoder
负责,第二步保存解码出来的 Message,由类 MessageHandler
负责。下面我们重点来看这两个类的实现。
MessageDecoder 类
package net.jaggerwang.jwnettydemo.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
public class MessageDecoder extends ReplayingDecoder {
private static final Logger logger = LogManager.getLogger();
public static final int HEADER_LENGTH = 13;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
Message msg = new Message();
msg.setStartFirst(in.readUnsignedByte());
msg.setStartSecond(in.readUnsignedByte());
logger.debug("decode start ok: {} {}", msg.getStartFirst(), msg.getStartSecond());
msg.setLength(in.readUnsignedShort());
logger.debug("decode length ok: {}", msg.getLength());
msg.setVersion(in.readUnsignedByte());
logger.debug("decode version ok: {}", msg.getVersion());
msg.setDeviceNo(in.readUnsignedInt());
logger.debug("decode device no ok: {}", msg.getDeviceNo());
msg.setTime(in.readUnsignedInt());
logger.debug("decode time ok: {}", msg.getTime());
int pos = HEADER_LENGTH;
while (pos < msg.getLength() - 2) {
short len = in.readUnsignedByte();
pos += 1;
long data;
switch (len) {
case 1:
data = (long) in.readUnsignedByte();
pos += 1;
break;
case 2:
data = (long) in.readUnsignedShort();
pos += 2;
break;
case 4:
data = in.readUnsignedInt();
pos += 4;
break;
default:
logger.error("unsupported data length: {}", len);
continue;
}
msg.getDatas().add(data);
logger.debug("decode one data ok: {}", data);
}
msg.setChecksum(in.readUnsignedShort());
logger.debug("decode checksum ok: {}", msg.getChecksum());
out.add(msg);
}
}
MessageDecoder 类继承自 Netty 的 ReplayingDecoder 类,该类解决了网络报文接收不完整的问题。从网络中接收报文时,不能保证一次接收到整个报文,很可能只是其中一部分。如果在应用里来检测报文完整性,在接收到完整报文时才解码,会比较麻烦。通过继承 ReplayingDecoder 类,实现它的 decode 方法,应用就可以假设报文是完整的。实际处理过程中,如果解码时发现报文不完整,decode 会抛出异常,ReplayingDecoder 捕获该异常后将解码位置归零,下次再重头开始。
MessageHandler 类
package net.jaggerwang.jwnettydemo.handler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import net.jaggerwang.jwnettydemo.decoder.Message;
import net.jaggerwang.jwnettydemo.saver.Metric;
import net.jaggerwang.jwnettydemo.saver.MetricSaver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@Sharable
public class MessageHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Message message = (Message) msg;
if (!message.isValid()) {
logger.error("invalid message: {}", message);
ctx.close();
return;
}
logger.debug("received message: {}", message);
MetricSaver saver = new MetricSaver();
Metric metric = new Metric(message);
saver.save(metric);
logger.debug("saved metric: {}", metric);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
MessageHandler 在 Netty Channel 处理管道里位于 MessageDecoder 之后,因此它接收到的是已经解码出来的 Message 对象。它不用关心协议细节,只需要简单地把 Message 对象保存到数据库里就可以。
开发
构建工具
本项目使用 Gradle 工具来构建和运行,但是不需要额外去安装。只要使用 ./gradlew <task>
执行任意任务,如果没有检测到 Gradle 工具,就会自动下载并安装到项目下。
本项目的 build.gradle
配置文件如下:
apply plugin : 'application'
mainClassName = 'net.jaggerwang.jwnettydemo.Main'
dependencies {
compile 'io.netty:netty-all:4.1.21.Final'
compile 'org.apache.logging.log4j:log4j-api:2.10.0'
compile 'org.apache.logging.log4j:log4j-core:2.10.0'
compile 'org.mongodb:mongodb-driver:3.6.3'
compile 'com.aliyun:hitsdb-client:0.0.5'
compile 'org.apache.kafka:kafka-clients:0.10.0.0'
compile 'org.apache.kafka:connect-json:0.10.0.0'
compile 'com.aliyun.openservices:ons-sasl-client:0.1'
compile 'com.fasterxml.jackson.core:jackson-core:2.9.4'
compile 'com.fasterxml.jackson.core:jackson-databind:2.9.4'
compile 'com.fasterxml.jackson.module:jackson-module-parameter-names:2.9.4'
compile 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.9.4'
compile 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.9.4'
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'commons-codec:commons-codec:1.10'
compileOnly 'org.projectlombok:lombok:1.16.18'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.1.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.1.0'
}
repositories {
mavenCentral()
}
test {
useJUnitPlatform()
}
运行
执行下面的命令来运行项目:
$ ./gradlew run
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :run
2018-四月-26 21:21:49 INFO net.jaggerwang.jwnettydemo.config.ApplicationConfig - load properties ok
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2018-四月-26 21:21:50 INFO net.jaggerwang.jwnettydemo.Main - server started on port 8080
<=========----> 75% EXECUTING [17s]
> :run
运行起来的 Server 会在 8080 端口监听请求,按照协议组装报文后发往 localhost:8080
就可以上报数据。
测试
使用下面的命令来运行单元测试:
$ ./gradlew test
Starting a Gradle Daemon (subsequent builds will be faster)
Deprecated Gradle features were used in this build, making it incompatible with Gradle 5.0.
See https://docs.gradle.org/4.6/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 7s
4 actionable tasks: 4 up-to-date
本项目使用 JUnit 5 来编写和运行单元测试。目前有两个单元测试,一个是报文解码,另一个是报文保存。
打包
使用下面的命令来打包和运行:
$ ./gradlew installDist
Deprecated Gradle features were used in this build, making it incompatible with Gradle 5.0.
See https://docs.gradle.org/4.6/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 2s
5 actionable tasks: 3 executed, 2 up-to-date
$ ./build/install/jw-netty-demo/bin/jw-netty-demo
2018-四月-26 22:23:22 INFO net.jaggerwang.jwnettydemo.config.ApplicationConfig - load properties ok
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2018-四月-26 22:23:22 INFO net.jaggerwang.jwnettydemo.Main - server started on port 8080
打包好的应用会安装到目录 ./build/install/jw-netty-demo
下, 可以使用其下的 bin/jw-netty-demo
脚本来运行应用。
Docker 部署
构建镜像
执行下面的命令来构建镜像:
$ docker build -t jw-netty-demo .
构建镜像的配置文件如下:
FROM java:8
ENV APP_PATH=/app
ENV DATA_PATH=/data
WORKDIR $APP_PATH
COPY . .
RUN ./gradlew installDist
VOLUME $DATA_PATH
EXPOSE 8080
CMD ./build/install/jw-netty-demo/bin/jw-netty-demo
在容器里运行服务
执行下面的命令来运行项目所有服务:
$ docker-compose up
Docker Compose 配置文件如下:
version: "2"
services:
app:
image: jw-netty-demo:latest
environment:
PATH_APP: /app
PATH_DATA: /data
MONGODB_URI: mongodb:27017
ports:
- 19900:8080
volumes:
- ~/data/jw-netty-demo/app:/data
mongodb:
image: mongo:3
volumes:
- ~/data/jw-netty-demo/mongodb:/data/db
Docker Compose 会同时启动 app 服务,及其依赖的 mongodb 服务。
参考资料
本文转自 https://blog.jaggerwang.net/netty-high-performance-protocol-server-develop/,如有侵权,请联系删除。