问题的提出:
最近在做若干安卓设备(共享项目使用的硬件)和服务器通信实现MQTT消息的的接收。由于MQTT的限制(注意:不管你用的是paho的库还是其他任何MQTT的库都一样,这是MQTT协议的限制。)而无法实现服务器只给某一台机器(根据机器的IMEI号)发送消息。一开始使用的方法,就是服务器只管群发(消息体里会带一个终端ID字段信息),安卓端收到消息后,会在消息体里拿到终端ID信息,和本机ID比对,是自己的消息,就处理消息,否则,就丢弃这个消息。当然这个解决办法是可以行的,但随着安卓设备的增多,会大大增加服务器的负担。于是,就有了如下的解决方法。
解决方法一:每个机器只订阅自己ID的主题,这个种方法虽然可行,但随着安卓机器的增加,activeMQ会维护太多的主题,影响性能,同时也不美观。
于是就有了如下的解决方法二:
在网上搜了一大堆,都是写的半半拉拉的,根本就行不通。这里作者经过实际动手测试,实际调通了代码。
第一步,下载你想使用的activeMQ的运行版和源码包,连个都要用,并且版本一定要一样。
第二步,配置本机的jdk1.8环境和maven环境(不再赘述)。
接下来,就是真正的代码部分了。
在activeMQ的源码目录里找到activemq-broker这个目录,然后在src/main/java/org/apache/activemq/broker/region/policy目录下增加一个java代码文件,注意java代码文件一定要使用utf-8编码。我这里增加的java文件名称叫:ClientIdFilterDispatchPolicy.java,该class从SimpleDispatchPolicy继承而来。
如下全部代码:
package org.apache.activemq.broker.region.policy;
import java.util.List;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.util.ByteSequence;
/**
* ClientIdFilter dispatch policy that sends a message to every subscription that
* matches the message in consumer ClientId.
*
* @org.apache.xbean.XBean
*
*/
public class ClientIdFilterDispatchPolicy extends SimpleDispatchPolicy {
public static final String PTP_CLIENTID = "PTP_CLIENTID";
//可自定义消息目标id在消息属性中的key
private String ptpClientId = PTP_CLIENTID;
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List
//指定特定后缀名的topic进入自定义分发策略
//System.out.println("===============================");
//System.out.println("getQualifiedName()="+node.getMessage().getDestination().getQualifiedName());
//System.out.println("getPhysicalName()="+node.getMessage().getDestination().getPhysicalName());
String _clientId = null;
_clientId = (String)node.getMessage().getProperty(ptpClientId);
if(_clientId == null){
//System.out.println("PTP_CLIENTID=null");
String topic = node.getMessage().getDestination().getPhysicalName();
if(topic.indexOf(".")>=0){
_clientId = topic.substring(topic.indexOf(".")+1, topic.length());
}
}else{
//System.out.println("PTP_CLIENTID=" + _clientId);
}
/*
if(_clientId == null){
System.out.println("_clientId=null");
}else{
System.out.println("_clientId=" + _clientId);
}
*/
if (_clientId == null)
return super.dispatch(node, msgContext, consumers);
ActiveMQDestination _destination = node.getMessage().getDestination();
int count = 0;
for (Subscription sub : consumers) {
System.out.println("isTopic:" + _destination.isTopic());
System.out.println("getClientId:" + sub.getContext().getClientId());
// Don't deliver to browsers
if (sub.getConsumerInfo().isBrowser()) {
continue;
}
// Only dispatch to interested subscriptions
if (!sub.matches(node, msgContext)) {
sub.unmatched(node);
continue;
}
//if (_clientId != null && _destination.isTopic() && _clientId.equals(sub.getContext().getClientId()) && _destination.getQualifiedName().endsWith(".ptp")) {
if (_clientId != null && _destination.isTopic() && _clientId.equals(sub.getContext().getClientId()) ) {
//把消息推送给满足要求的subscription
sub.add(node);
count++;
} else {
sub.unmatched(node);
}
}
return count > 0;
}
}
这里需要注意的是,这个java文件头部注释部分,一定要有这一行:
* @org.apache.xbean.XBean
否则,你永远无法加载这个分发策略。
确定无误后,进入命令行,进入到activeMQ源码的根目录下,注意:这个目录下有有个pom.xml文件的。然后执行:
mvn package -Dmaven.test.skip=true
第一次要等待很久的。等编译完成,就把activemq-broker-5.15.11.jar和activemq-spring-5.15.11.jar这两个文件复制出来覆盖activemq运行目录下的lib目录下的同名文件。然后修改activemq运行目录下的conf目录下的activemq.xml文件,修改该文件如下部分:
clientIdFilterDispatchPolicy这个配置,就是你新增的那个java文件类名,注意第一个字母小写。
然后你就可以去bin目录下执行activemq start启动了。
由于activemq的web console的限制,你不能在这里发送测试消息。我自己写了一个发送消息的java代码如下:
package cn.york.common.mqtt;
import java.util.Scanner;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;
/**
* 通过JMS发送activeMQT消息,从而实现点对点发送
* @author 李刚
* @version 1.0, 2020年1月4日
*
*/
public class JMSKit {
private static final String PTP_CLIENTID = "PTP_CLIENTID";
private ActiveMQConnectionFactory factory;
private Connection connection;
private Session session;
public boolean connect(String brokerURL, String clientId) {
//factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
// 创建连接工厂
factory = new ActiveMQConnectionFactory(brokerURL);
// 鉴权,如没有开启可省略
//factory.setUserName("admin");
//factory.setPassword("admin123");
// 创建JMS连接实例,并启动连接
connection = factory.createConnection();
connection.start();
// 创建Session对象,不开启事务,采用自动应答
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return true;
} catch (JMSException e) {
e.printStackTrace();
}
return false;
}
public void disconnect() {
// 关闭连接
try {
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
//向所有订阅者发送消息
public boolean publish(String topic, String message, int QOS, boolean retained) {
return publish(topic, message, QOS, retained, null);
}
//向指定clientId发送消息
public boolean publish(String topic, String message, int QOS, boolean retained, String clientId) {
// 创建主题
try {
Topic _topic = session.createTopic(topic);
// 创建生成者
MessageProducer producer = session.createProducer(_topic);
// 设置消息是否持久化。默认消息需要持久化
if(retained) {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}else {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
//创建消息
TextMessage _message = session.createTextMessage(message);
// 发送指定消息,配合主题分发策略使用,以附带用户ID ,分发策略对特定的主题进行拦截解析分发
if (StringUtils.isNotBlank(clientId)) {
_message.setStringProperty(PTP_CLIENTID, clientId);
}
// 发送消息。non-persistent 默认异步发送;persistent 默认同步发送
producer.send(_message);
producer.close();
return true;
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}
public static void main(String[] args) throws JMSException {
Scanner sc = new Scanner(System.in);
boolean isStart = true;
String userMsg = "";
String msg = "";
String[] messages = null;
String clientId = null;
while (isStart) {
userMsg = sc.nextLine();
if (StringUtils.isBlank(userMsg) || "stop".equals(userMsg)) {
System.out.println("Stop producer message!");
isStart = false;
}
messages = userMsg.split(":");
msg = "Hello MQ,Client msg:" + messages[0];
if (messages.length == 2) {
clientId = messages[1];
}
JMSKit kit = new JMSKit();
kit.connect("tcp://localhost:61616", "");
if(clientId==null) {
kit.publish("hello", msg, 2, false);
}else {
kit.publish("hello", msg, 2, false, clientId);
}
kit.disconnect();
}
sc.close();
}
}
这里是测试代码,当然,你也可以在其他地发调用,注意publish方法里这样代码:
if (StringUtils.isNotBlank(clientId)) {
_message.setStringProperty(PTP_CLIENTID, clientId);
}
这行代码才是最关键的。
分发策略会根据消息里是否有这个**PTP_CLIENTID**属性来确定是否执行定点分发。
这个代码测试代码运行起来后,在控制台直接输入一个字符串消息按回车,所有在线的客户端都会收到消息,如果输入“abcd:对应客户端的clientid”,则该消息将会只发给指定的那个客户端订阅者。
这里需要注意:ClientIdFilterDispatchPolicy.java文件里还有几个机制,就是如果消息里不带**PTP_CLIENTID**这个属性,则我会判断topic,如果topic里带了.clientid这样的格式,我会将clientid拆分出来。也会根据这个clientid进行定点分发。这样的话,就不需要用我上面那个JMSKit来发送消息了。直接使用任何一个mqtt客户端库连接上都可以发定点消息了。只不过这要注意的是所有客户端需要订阅的主题需要做个修改,比如以前是直接订阅hello这个主题,用这种方法,就需要订阅hello/#这个主题。
如有问题,可以私信我,共同交流沟通!