RabbitMQ实现即时通讯居然如此简单!连后端代码都省得写了?

Stella981
• 阅读 747

摘要

有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功。最近发现RabbitMQ可以很方便的实现即时通讯功能,如果你没有特殊的业务需求,甚至可以不写后端代码,今天给大家讲讲如何使用RabbitMQ来实现即时通讯

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

MQTT相关概念

  • Publisher(发布者):消息的发出者,负责发送消息。

  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。

  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。

  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。

  • Payload(负载);可以理解为发送消息的内容。

  • QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0QoS 1QoS 2三个等级,下面分别介绍下:

    • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;

    • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;

    • QoS 2(Exactly Once):只有一次,确保消息只到达一次。

RabbitMQ启用MQTT功能

RabbitMQ启用MQTT功能,需要先安装然RabbitMQ然后再启用MQTT插件。

  • 首先我们需要安装并启动RabbitMQ,对RabbitMQ不了解的朋友可以参考《花了3天总结的RabbitMQ实用技巧,有点东西!》

  • 接下来就是启用RabbitMQ的MQTT插件了,默认是不启用的,使用如下命令开启即可;

    rabbitmq-plugins enable rabbitmq_mqtt 复制代码

  • 开启成功后,查看管理控制台,我们可以发现MQTT服务运行在1883端口上了。

MQTT客户端

我们可以使用MQTT客户端来测试MQTT的即时通讯功能,这里使用的是MQTTBox这个客户端工具。

  • 首先下载并安装好MQTTBox,下载地址:workswithweb.com/mqttbox.htm…

  • 点击Create MQTT Client按钮来创建一个MQTT客户端;

  • 接下来对MQTT客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可;

  • 再配置一个订阅者,订阅者订阅testTopicA这个主题,我们会向这个主题发送消息;

  • 发布者向主题中发布消息,订阅者可以实时接收到。

前端直接实现即时通讯

既然MQTTBox客户端可以直接通过RabbitMQ实现即时通讯,那我们是不是直接使用前端技术也可以实现即时通讯?答案是肯定的!下面我们将通过html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通讯!

  • 由于RabbitMQ与Web端交互底层使用的是WebSocket,所以我们需要开启RabbitMQ的MQTT WEB支持,使用如下命令开启即可;

    rabbitmq-plugins enable rabbitmq_web_mqtt 复制代码

  • 开启成功后,查看管理控制台,我们可以发现MQTT的WEB服务运行在15675端口上了;

  • WEB端与MQTT服务进行通讯需要使用一个叫MQTT.js的库,项目地址:github.com/mqttjs/MQTT…

  • 实现的功能非常简单,一个单聊功能,需要注意的是配置好MQTT服务的访问地址为:ws://localhost:15675/ws

                Title   

         
         
         发送     清空     
     
     复制代码

  • 接下来我们订阅不同的主题开启两个页面测试下功能(页面放在了SpringBoot应用的resource目录下了,需要先启动应用再访问):

  • 之后互相发送消息,让我们来看看效果吧!

  • 在SpringBoot中使用

    没有特殊业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但是有时候我们需要通过服务端去通知前端,此时就需要在应用中集成MQTT了,接下来我们来讲讲如何在SpringBoot应用中使用MQTT。

    • 首先我们需要在pom.xml中添加MQTT相关依赖;

            org.springframework.integration     spring-integration-mqtt  复制代码
    • application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;

      rabbitmq:   mqtt:     url: tcp://localhost:1883     username: guest     password: guest     defaultTopic: testTopic 复制代码

    • 编写一个Java配置类从配置文件中读取配置便于使用;

      /**  * MQTT相关配置  * Created by macro on 2020/9/15.  / @Data @EqualsAndHashCode(callSuper = false) @Component @ConfigurationProperties(prefix = "rabbitmq.mqtt") public class MqttConfig {     /*      * RabbitMQ连接用户名      /     private String username;     /*      * RabbitMQ连接密码      /     private String password;     /*      * RabbitMQ的MQTT默认topic      /     private String defaultTopic;     /*      * RabbitMQ的MQTT连接地址      */     private String url; } 复制代码

    • 添加MQTT消息订阅者相关配置,使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处理订阅消息;

      /**  * MQTT消息订阅者相关配置  * Created by macro on 2020/9/15.  */ @Slf4j @Configuration public class MqttInboundConfig {     @Autowired     private MqttConfig mqttConfig;     @Bean     public MessageChannel mqttInputChannel() {         return new DirectChannel();     }     @Bean     public MessageProducer inbound() {         MqttPahoMessageDrivenChannelAdapter adapter =                 new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",                         mqttConfig.getDefaultTopic());         adapter.setCompletionTimeout(5000);         adapter.setConverter(new DefaultPahoMessageConverter());         //设置消息质量:0->至多一次;1->至少一次;2->只有一次         adapter.setQos(1);         adapter.setOutputChannel(mqttInputChannel());         return adapter;     }     @Bean     @ServiceActivator(inputChannel = "mqttInputChannel")     public MessageHandler handler() {         return new MessageHandler() {             @Override             public void handleMessage(Message<?> message) throws MessagingException {                 //处理订阅消息                 log.info("handleMessage : {}",message.getPayload());             }         };     } } 复制代码

    • 添加MQTT消息发布者相关配置;

      /**  * MQTT消息发布者相关配置  * Created by macro on 2020/9/15.  */ @Configuration public class MqttOutboundConfig {     @Autowired     private MqttConfig mqttConfig;     @Bean     public MqttPahoClientFactory mqttClientFactory() {         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();         MqttConnectOptions options = new MqttConnectOptions();         options.setServerURIs(new String[] { mqttConfig.getUrl()});         options.setUserName(mqttConfig.getUsername());         options.setPassword(mqttConfig.getPassword().toCharArray());         factory.setConnectionOptions(options);         return factory;     }     @Bean     @ServiceActivator(inputChannel = "mqttOutboundChannel")     public MessageHandler mqttOutbound() {         MqttPahoMessageHandler messageHandler =                 new MqttPahoMessageHandler("publisherClient", mqttClientFactory());         messageHandler.setAsync(true);         messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());         return messageHandler;     }     @Bean     public MessageChannel mqttOutboundChannel() {         return new DirectChannel();     } } 复制代码

    • 添加MQTT网关,用于向主题中发送消息;

      /**  * MQTT网关,通过接口将数据传递到集成流  * Created by macro on 2020/9/15.  / @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway {     /*      * 发送消息到默认topic      /     void sendToMqtt(String payload);     /*      * 发送消息到指定topic      /     void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);     /*      * 发送消息到指定topic并设置QOS      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); } 复制代码

    • 添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;

      /**  * MQTT测试接口  * Created by macro on 2020/9/15.  */ @Api(tags = "MqttController", description = "MQTT测试接口") @RestController @RequestMapping("/mqtt") public class MqttController {     @Autowired     private MqttGateway mqttGateway;     @PostMapping("/sendToDefaultTopic")     @ApiOperation("向默认主题发送消息")     public CommonResult sendToDefaultTopic(String payload) {         mqttGateway.sendToMqtt(payload);         return CommonResult.success(null);     }     @PostMapping("/sendToTopic")     @ApiOperation("向指定主题发送消息")     public CommonResult sendToTopic(String payload, String topic) {         mqttGateway.sendToMqtt(payload, topic);         return CommonResult.success(null);     } } 复制代码

    • 调用接口向主题中发送消息进行测试;

    • 后台成功接收到消息并进行打印。

      2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息 2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息 2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息 复制代码

    总结

    消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。如果没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件即可实现即时通讯,有特殊需求的时候也可以使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通讯的一个好选择!

    项目源码地址

    github.com/macrozheng/…

    本文 GitHub github.com/macrozheng/… 已经收录,欢迎大家Star!

    作者:MacroZheng
    链接:https://juejin.im/post/6883273274248134663
    来源:掘金
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    点赞
    收藏
    评论区
    推荐文章
    blmius blmius
    3年前
    MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
    文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
    皕杰报表之UUID
    ​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
    Jacquelyn38 Jacquelyn38
    3年前
    2020年前端实用代码段,为你的工作保驾护航
    有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
    Stella981 Stella981
    3年前
    KVM调整cpu和内存
    一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
    Easter79 Easter79
    3年前
    Twitter的分布式自增ID算法snowflake (Java版)
    概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
    Wesley13 Wesley13
    3年前
    mysql设置时区
    mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
    Wesley13 Wesley13
    3年前
    00:Java简单了解
    浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
    Stella981 Stella981
    3年前
    Django中Admin中的一些参数配置
    设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
    Wesley13 Wesley13
    3年前
    MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
    背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
    Python进阶者 Python进阶者
    1年前
    Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
    大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这