Java使用easymqtt4j快速开发工业级mqtt企业级应用
easymqtt4j , netty +mqtt +subscriber+ publisher +broker+cluster server for java
easymqtt4j特点:
1、spring integration 集成模式,自由灵活。
2、完全支持mqtt 3.1、3.1.1国际标准协议,支持tcp\websocket等等,可配置。
3、客户端完全支持接入主流broker服务如:Eclipse Paho,Mosquitto,JBoss A-MQ 6.1, Apache ActiveMQ 5.10-SNAPSHOT,Apache Camel 2.13.0,HiveMQ,EMQ,mosquitto,moquette,JMQTT,mqttwk等等。
4、发布、订阅接口简单&统一Gateway。
5、完全支持event事件EventGateway,灵活自由控制。
6、支持handleEvent、connectionLost、 messageArrived、deliveryComplete。
7、支持preSend、postSend、afterSendCompletion。
8、支持preReceive、postReceive、afterReceiveCompletion。
使用方法&步骤:
1、引用jar
2、实现 MqttSubscriberGateway消息队列订阅 接口
3、实现 MqttEventGateway 事件 接口
4、MqttPublisherGateway消息发送 接口 ( 使用请参考 MqttScheduleTask 消息定时发送)
项目开源地址1: https://github.com/zengfr/easymqtt4j
项目开源地址2:https://gitee.com/zengfr/easymqtt4j
spring.mqtt.host.username=admin spring.mqtt.host.password=password #spring.mqtt.host.uris=ws://api.easylink.io:1983 spring.mqtt.host.uris=tcp://147.14.141.51:1883 # spring.mqtt.subscriber.id=subscriberId123 spring.mqtt.subscriber.topics=topic/#,testtopic/# spring.mqtt.subscriber.completionTimeout=3000 # spring.mqtt.publisher.id=publisherId456 spring.mqtt.publisher.defaulttopic=topic0 spring.mqtt.publisher.completionTimeout=3000
import com.zengfr.easymqtt4j.client.geteway.MqttSubscriberGateway; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; _/** _ * Created by zengfr on 2020/5/12. */ @Component public class MqttSubscriberGatewayImpl implements MqttSubscriberGateway { static Logger logger = LoggerFactory.getLogger(MqttSubscriberGatewayImpl.class); @Override public boolean handlerMqttMessage(Message<?> msg, String topic, String qos, String id, String timestamp) { logger.info(String.format("收到:%s", msg)); return false; } }
import com.zengfr.easymqtt4j.client.geteway.MqttPublisherGateway; import com.zengfr.easymqtt4j.client.util.MqttUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; _/** _ * Created by zengfr on 2020/5/12. */ @Configuration @EnableScheduling @EnableAsync public class MqttScheduleTask { private static Logger logger = LoggerFactory.getLogger(MqttScheduleTask.class); @Autowired private MqttPublisherGateway publisherGateway; @Scheduled(fixedRate = 1000*1,initialDelay = 1000*5) public void sendMqtt1() { logger.info("发送开始"); String d= MqttUtil.getNowString(); publisherGateway.publish("testtopic/0", 0, "hello 10 " + d); logger.info("发送结束"); }
@Scheduled(fixedRate \= 1000\*3,initialDelay \= 1000\*5)
public void sendMqtt2() throws InterruptedException {
int count = 11;
for (int i = 0; i < count; i++) { logger.info("发送开始"); String d= MqttUtil.getNowString(); publisherGateway.publish("topic/"+i, i%3, "hello 00 " + d); publisherGateway.publish("testtopic/"+i, i%3, "hello 10 " + d); logger.info("发送结束"); } } }
import com.zengfr.easymqtt4j.client.geteway.MqttEventGateway; import com.zengfr.easymqtt4j.client.util.MqttMsgUtil; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEvent; import org.springframework.integration.mqtt.event.MqttIntegrationEvent; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Component; _/** _ * Created by zengfr on 2020/5/12. */ @Component public class MqttEventGatewayImpl implements MqttEventGateway { static Logger logger = LoggerFactory.getLogger(MqttEventGatewayImpl.class); @Override public void handleEvent(MqttIntegrationEvent event) {
_logger_.info("event: {}", event);
}
@Override
public void handleEvent(ApplicationEvent event) {
_logger_.info("event: {}", event);
}
@Override
public void connectionLost(String clientId, Throwable cause) {
_logger_.info("connectionLost: {} {}", clientId, cause);
}
@Override
public void messageArrived(String clientId, String topic, MqttMessage message) { logger.info("messageArrived: {} {} {}", clientId, topic, message); }
@Override
public void deliveryComplete(String clientId, IMqttDeliveryToken token) { try { logger.info("deliveryComplete: {} {}", clientId, MqttMsgUtil.tokenToString(token)); } catch (MqttException e) { logger.error("error", e); } }
@Override
public void preSend(String clientId, Message<?> message, MessageChannel channel) {
}
@Override
public void postSend(String clientId, Message<?> message, MessageChannel channel, boolean sent) {
}
@Override
public void afterSendCompletion(String clientId, Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
}
@Override
public boolean preReceive(String clientId, MessageChannel channel) { return false; }
@Override
public Message> postReceive(String clientId, Message> message, MessageChannel channel) { return null; }
@Override
public void afterReceiveCompletion(String clientId, Message<?> message, MessageChannel channel, Exception ex) {
}
}
其他 参考 关键字:
MQTT的集成和使用基于MQTT协议在物联网系统中数据交互的重要角色,入门java项目中mqtt初始Java
Java连接MQTT 订阅和发布 Java JAVA开发MQTT总结
所以写的一个demo,在这里记录下来,方便有人使用的时候查阅,不涉及mqtt的具体讲解,只是贴代码和运行过程
java 实现mqtt发送和接收消息 客户端代码 Java
mqtt的特点就是可以用很少的网络和硬件资源来保证高并发量的数据传输,其传输的稳定性也可以手动设置Qos(消息质量)。
mqtt服务器多种多样,常见的有ActiveMqtt EMQ 等,不过无论是什么服务器,其底层机制都是一样的。
mqtt客户端可以由java、c语言等多种预言实现,java来示例
MQTT简单demo(java) 简单的谈了一些MQTT协议的一些知识, 知识具体的Java实现
具体 配置参考: