SpringCloud 微服务 (十) 消息队列MQ 基础

Easter79
• 阅读 724

之前学习了SpringCloud Bus结合MQ,没有多学习MQ,本次学习相关内容,先了解异步,同步就不说了

异步: 客户端非阻塞进程,服务端响应可以是非即时的

应用场景: 
①通知类的服务->发出去即可,无需回应; 
②请求的异步响应->就是客户端发送请求,服务端异步响应请求,客户端不会产生阻塞且是默认响应,但不会立刻送达;
①②都属于1对1交互模式;
③消息->可以实现1对多的交互模式,比如发布订阅模式下,客户端发布消息通知,可以被0到N个服务消费,再例如客户端发送消息等待其他服务的响应

MQ在分布式系统中都会用到的组件,还是很重要的

应用场景: 
①异步处理->比如用户注册的时候,需要手机(邮箱)验证,有的平台用户注册还可以加用户积分(经验)奖励,还有什么身份证认证等等很多,这时候在用户信息写入数据库后,可以通过异步消息让验证服务或者奖励各自执行各自的服务,提升效率,提升用户体验;

②流量控制->比如电商秒杀活动,秒杀时一般会因为流量突然暴涨,过大流量导致服务挂掉,解决此问题,一般会在服务前端加入消息队列,控制可容流量,如果消息队列长度超过最大可容数量,需要丢弃额外的请求(或者跳转其他页面)来控制流量,接着秒杀业务可以根据消息队列中有效的请求信息再做后续的处理;

③日志处理->比如kafka消息队列,其最初设计是为了处理日志,大数据中应用的比较多,当日志数据采集的时候,定时写入kafka队列,然后kafka对数据进行储存,转发等处理;

④服务解耦->接着上面秒杀话题,假设用户抢到,会有订单order服务,商品product服务,用户下单后,order服务需要通知product服务,如果order服务直接调用product服务的接口,这两个服务之间是耦合的;那么使用MQ,用户下单后,order完成持久化并将消息写入MQ队列,返回order订单完成,product服务订阅MQ队列中order的消息,采用推拉的方式获取order下单信息,product服务根据order下单的信息,进行相关product商品的信息的变动(扣库存),如果过程中product服务不能正常执行,也不会影响order服务,因为order服务写入MQ队列之后,就不在关心其他订阅服务的后续操作了,这样就现实了服务解耦;

就用order,product应用来做测试

order 服务

第一步老套路先引入maven依赖

org.springframework.boot spring-boot-starter-amqp

第二步yml配置还是和上篇差不多,将rabbitmq的配置写入了码云git仓库里,利用SpringCloud Bus读取

spring: application: name: order cloud: config: discovery: enabled: true service-id: CONFIG profile: dev

eureka: client: service-url: defaultZone: http://localhost:8761/eureka/

希望码云git早点支持动态SpringCloud Bus,能用还是能用,先用着,自己手动post请求好了

MQ接收方: 新建一个ReceiverMsg类,用来测试接收MQ的消息

package com.cloud.order.MQmsg;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReceiverMsg {
    //此处注解指定去获取名为myQueue的queues队列消息
    @RabbitListener(queues = "myQueue")
    public void printMQ(String message){
        log.info("【队列消息】ReceiverMsg ,printMQ={}",message);
    }

}

MQ发送方: 这边直接写在单元测试中,测试是否能成功,代码如下

package order;

import com.cloud.order.ServerApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

//由于之前对项目进行了项目多模块化,运行单元测试的时候需要引导main启动类
//没有可以不考虑classes = ServerApplication.class
@SpringBootTest(classes = ServerApplication.class)
@RunWith(SpringRunner.class)
public class MQTest extends OrderApplicationTests{
    
    //AmqpTemplate  操作MQ的API
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        amqpTemplate.convertAndSend("myQueue","hello , MQ");
    }

}

RabbitMQ : 开启docker中的rabbitmq,在http://192.168.99.100:15672/#/queues中新增一个名为myQueue的队列,用于上面测试

SpringCloud 微服务 (十) 消息队列MQ 基础

启动服务,可以看到有已经有两个SpringCloudBus队列了,一个是SpringCloud Config,一个是上面的order服务,注册中心eureka一直都是开启状态

接着执行单元测试,在myQueue中的波动,说明有流量: ↓↓↓

SpringCloud 微服务 (十) 消息队列MQ 基础

调用方法中的print,也在控制台中打印日志 : ↓↓↓

SpringCloud 微服务 (十) 消息队列MQ 基础

循序渐进,每次操作推拉队列信息的时候,需要方法写一下,rabbitmq面板中手动加一次,肯定是很lower的

将ReceiverMsg类中的注解,改成以下方式,即可自动批量创建Queue

@RabbitListener(queuesToDeclare = {@Queue("myQueue"),@Queue("myQueue2")})

启动项目后,即可生成Queue

Exchange交换机,消息不直接发送到队列,而是发送到了交换机,通过队列绑定交换机转给队列

如果需要绑定Exchange,可以改动注解

@RabbitListener(bindings = @QueueBinding( value = @Queue("myQueue"), exchange = @Exchange("MyExchange") ))

继续order服务中,假设有很多卖方参与,卖奶茶的、卖mac的等等

不同卖方服务会发出不同的MQ消息,比如卖奶茶的只关心奶茶订单、卖mac的只关心mac订单,相互不关心其他的组的信息,机子跑不动了,就模拟多服务处理方式 ↓↓↓

接收方: 

@Component
@Slf4j
public class ReceiverMsg {

    @RabbitListener(bindings = @QueueBinding(
            key = "milkTea",
            value = @Queue("milkTeaOrder"),
            exchange = @Exchange("MyOrder")
    ))
    public void milkTeaMQ(String message){
        log.info("【队列消息】ReceiverMsg.milkTeaMQ ,milkTea={}",message);
    }

    @RabbitListener(bindings = @QueueBinding(
            key = "mac",
            value = @Queue("macOrder"),
            exchange = @Exchange("MyOrder")
    ))
    public void macMQ(String message){
        log.info("【队列消息】ReceiverMsg.macMQ ,macMQ={}",message);
    }
}

测试 发送方:

@SpringBootTest(classes = ServerApplication.class)
@RunWith(SpringRunner.class)
public class MQTest extends OrderApplicationTests{

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void sendMilkTea() {
        //第一个参数exchange; 第二个参数key; 第三个参数发送的msg
        amqpTemplate.convertAndSend("myOrder","milkTea","hello , milkTeaMQ");
    }

    @Test
    public void sendMac() {
        amqpTemplate.convertAndSend("myOrder","mac","hello , macMQ");
    }
}

以上是RabbitMQ的基础使用学习,还有很多高级内容,比如消息延迟处理,优先级等等

之后慢慢研究,有相关学习的同学,可以分享一下地址,Mark一起学习学习,谢谢

----------------------------------------------------------------

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
BIO、NIO、AIO 介绍和适用场景分析
IO的方式通常分为几种,同步阻塞的BIO、同步非阻塞的NIO、异步非阻塞的AIO。一、同步阻塞的BIO在JDK1.4之前,我们建立网络连接的时候采用BIO模式,需要先在服务端启动一个serverSocket,然后在客户端启动socket来对服务端进行通信,默认情况下服务端需要对每个请求建立一堆线程等待请求,而客户端发送请求后,先咨询服务端是否
Stella981 Stella981
3年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Stella981 Stella981
3年前
RabbitMQ之概念介绍(二)
简介:  MQ全称为MessageQueue,消息队列是一种应用程序间的通信方法。  其是消费者生产者模型的一个典型代表,一端往消息队列中不断写入消息,另一端不断读取/订阅消息。简单使用场景举例:  将项目中无需即时返回且耗时的操作提取出来,进行异步处理,大大降低了服务器的请求响应时间。  如:打印日志模块无需即时返回
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
5
获赞
1.2k