进入idea,新建一个maven项目
主要是模拟150个设备同时并发,并发时间持续15min
1.创建客户端,构造请求发送到对应的rabbitmq的队列,用的protobuf协议。
import com.google.protobuf.ByteString;
import com.rabbitmq.client.*;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class Producer {
//上传的命令字段值参数化
private final static String[] COMMANDS = new String[]{
"XXXXX", "YYYYYY" "ZZZZZ",
};
private int index;
public Producer(int index) {
this.index = index;
}
public byte[] message(byte[] command) {
//根据proto文本消息生成的slot3编辑脚本,构建一个消息
Slot3.SlotMessage.Builder slots = Slot3.SlotMessage.newBuilder();
slots.setOpenId("XXX");
slots.setProductId("YYYY");
//长整型
String NO = String.valueOf(70000000000l + index);
slots.setNodeEui(NO);
slots.setCommand(Slot3.SlotMessage.Command.DOWNLINK);
SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String time1 = f.format(date);
slots.setTimestamp(time1);
Slot3.SlotMessage.Payload.Builder payload = Slot3.SlotMessage.Payload.newBuilder();
ByteString command2 = ByteString.copyFrom(command);
payload.setData(command2);
slots.addPayload(payload);
slots.setAppMessageType(0);
Slot3.SlotMessage msg = slots.build();
System.out.println("before:" + msg);
System.out.println("===msg Byte:");
byte[] msgbyteArray = msg.toByteArray();
System.out.println(msgbyteArray);
return msgbyteArray;
}
/**
* 构造函数
* RabbitMQ客户端相关配置
* 连接AMQP broker并初始化队列
*/
public void produce() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("10.10.XX.XX");
factory.setPort(5671);
factory.setUsername("rabbitmq用户名");
factory.setPassword("rabbitmq密码");
factory.setVirtualHost("/");
//创建发送消息rabbitmq信息的连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchange = "交互机名称";
String queue = "队列名称";
for (String input : COMMANDS) {
byte[] body = message(decodeHex(input));
channel.basicPublish(exchange, queue, null, body);
try {
//生成0~1秒的随机
// Double code = (Math.random() * 9 + 1) * 100;
// Long ms =code.longValue();
//每个命令请求间隔1s
Thread.sleep(1000l);
} catch (Exception e) {
e.printStackTrace();
}
}
channel.close();
connection.close();
}
/**
* Hex解码.
*/
public static byte[] decodeHex(String input) {
try {
return Hex.decodeHex(input.toCharArray());
} catch (DecoderException e) {
throw new RuntimeException(e);
}
}
}
View Code
2.模拟150个设备同时并发,发送消息
1 public class ConsumerClient {
2
3 public static void main(String[] args) throws Exception {
4 //并发150次
5 for (int i = 0; i < 150; i++) {
6 final int index = i;
7 new Thread(() -> {
8 //创建producer
9 Producer producer = new Producer(index);
10 Long start = System.currentTimeMillis();
11
12 while (true) {
13 try {
14 //发送消息
15 producer.produce();
16 } catch (Exception e) {
17 e.printStackTrace();
18 }
19 //持续运行15分钟
20 Long end = System.currentTimeMillis();
21 if (end - start >= 15 * 60 * 1000l) {
22 break;
23 }
24 }
25
26 }).start();
27
28 }
29 }
30 }
View Code
3.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Commandxingneng</groupId>
<artifactId>Commandxingneng</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.26</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
</project>
View Code
4.运行ConsumerClient.java就好了
5.proto3文本内容,怎么生成编辑脚本,参考python模拟上报消息到rabbitMQ(protobuf) ,java一样
syntax = "proto3";
message SlotMessage
{
string openId = 1;
string productId = 2;
string nodeEui = 3;
Command command = 4;
string timestamp = 5;
bool encrypted = 6;
repeated Payload payload = 7;
int32 appMessageType = 8;
enum Command {
U = 0;
D = 1;
C = 2;
O_RESULT = 3;
}
message Payload {
bytes data = 1;
Connect connect = 2;
repeated OResult oResult = 3;
enum Connect{
OFF = 0;
ON = 1;
HE = 2;
}
message OResult {
string d = 1;
int32 r = 2;
string s = 3;
}
}
}
6.查看web页面的rabbitmq消息处理情况,是否有阻塞
7.用nmon工具监控被压测的服务器,需要安装和服务器版本相对应的nmon版本
./nmon -c 120 -s 10 -f
-f :按标准格式输出文件名称 生成文件:
-t : 输出最耗资源的进程
-s :每隔n秒采集一次,这里为10秒
-c :采集次数,这里为90,即监控=10*120/60=20分钟
ps -ef | grep nmon #查询nmon进程
kill -9 进程ID #强行中断监控进程
nmon文件转换
sort localhost_170616_0138.nmon>localhost_170616_0138.csv
最后从服务器中导出csv文件,用nmon analyser v55 工具分析;