RabbitMQ队列延迟

Stella981
• 阅读 524

RabbitMQ队列延迟

1. 场景:

“订单下单成功后,15分钟未支付自动取消”

1.传统处理超时订单
采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,
并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,
即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,
然后再做其他的业务操作

2.rabbitMQ延时队列方案
一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,
并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失

2. TTL和DLX

   rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换器(DLX)和设置过期时间(TTL)结合起来实现延迟队列

1.TTL
TTL是Time To Live的缩写, 也就是生存时间。
RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
如果两种方式一起使用消息对TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。

设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。

2.DLX和死信队列
DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

死信队列是指队列(正常)上的消息(过期)变成死信后,能够后发送到另外一个交换机(DLX),然后被路由到一个队列上,
这个队列,就是死信队列

成为死信一般有以下几种情况:
消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
消息的TTL-存活时间已经过期
队列长度限制被超越(队列满)

注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,
注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明
x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键    

3. 延迟队列

   通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费

注1:延迟队列(即死信队列)产生流程见“images/01 死信队列产生流程.png”

4. 开发步骤

   1.生产者创建一个正常消息,并添加消息过期时间/死信交换机/死信路由键这3个参数

关键代码1
new Queue(name, durable, exclusive, autoDelete, arguments);
new Queue(NORMAL_QUEUE, true, false, false, map)
参数说明:
name:队列名字
durable:true则持久队列
exclusive:如果我们声明一个排他队列(该队列将仅由声明者的连接使用),则为true
autoDelete:服务器不再使用时应删除队列,则为true
arguments:用于声明队列的参数
map.put("x-message-ttl", 10000);//message在该队列queue的存活时间最大为10秒
map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
map.put("x-dead-letter-routing-key", DELAY_ROUTING_KEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键

关键代码2
new DirectExchange(NORMAL_EXCHANGE, true, false);

2.消费者A
正常情况下,由消费者A去消费队列“normal-queue”中的消息,但实际上没有,而是等消息过期

3.消费者B
消息过期后,变成死信,根据配置会被投递到DLX,然后根据死信路由键投到死信队列(即延时队列)中

5. 子模块间共享Model

1.创建公共子模块common
添加公共的JavaBean对象,并使用lombok简化代码

@Data:会为类的所有属性自动生成setter/getter、equals、canEqual、hashCode、toString方法
@NoArgsConstructor:无参构造器
@AllArgsConstructor:全参构造器
2.主模块

pom

rabbitmq-provider rabbitmq-consumer common 3.各子模块 jar

4.配置公共common模块
在主模块的POM的中添加公共子模块common

com.zking common 0.0.1-SNAPSHOT ...

看代码

创建一个工程rabbitmq03  ,普通maven项目

pom.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4   <modelVersion>4.0.0</modelVersion>
 5   <parent>
 6     <groupId>org.springframework.boot</groupId>
 7     <artifactId>spring-boot-starter-parent</artifactId>
 8     <version>2.2.2.RELEASE</version>
 9     <relativePath/> <!-- lookup parent from repository -->
10   </parent>
11   <groupId>com.yuan</groupId>
12   <artifactId>rabbitmq03</artifactId>
13   <version>0.0.1-SNAPSHOT</version>
14   <name>rabbitmq03</name>
15   <packaging>pom</packaging>
16   <description>Demo project for Spring Boot</description>
17 
18   <properties>
19     <java.version>1.8</java.version>
20   </properties>
21 
22   <modules>
23     <module>rabbitmq-provider</module>
24     <module>rabbitmq-consumer</module>
25   </modules>
26 
27   <dependencies>
28     <dependency>
29       <groupId>org.springframework.boot</groupId>
30       <artifactId>spring-boot-starter-amqp</artifactId>
31     </dependency>
32     <dependency>
33       <groupId>junit</groupId>
34       <artifactId>junit</artifactId>
35       <scope>test</scope>
36     </dependency>
37     <dependency>
38       <groupId>org.springframework.boot</groupId>
39       <artifactId>spring-boot-starter-web</artifactId>
40     </dependency>
41 
42     <dependency>
43       <groupId>org.projectlombok</groupId>
44       <artifactId>lombok</artifactId>
45       <version>1.18.10</version>
46       <scope>provided</scope>
47     </dependency>
48 
49   </dependencies>
50 
51   <build>
52     <plugins>
53       <plugin>
54         <groupId>org.springframework.boot</groupId>
55         <artifactId>spring-boot-maven-plugin</artifactId>
56       </plugin>
57     </plugins>
58   </build>
59 
60 </project>

创建生产者模块rabbitmq-provider

RabbitMQ队列延迟

pom.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5 
 6     <parent>
 7         <groupId>com.yuan</groupId>
 8         <artifactId>rabbitmq03</artifactId>
 9         <version>0.0.1-SNAPSHOT</version>
10     </parent>
11     <artifactId>rabbitmq-provider</artifactId>
12     <version>0.0.1-SNAPSHOT</version>
13     <name>rabbitmq-provider</name>
14     <description>子模块-生产者</description>
15     <packaging>jar</packaging>
16 </project>

QueueDelayConfig

 1 package com.yuan.rabbitmqprovider.rabbitmq;
 2 
 3 
 4 import org.springframework.amqp.core.Binding;
 5 import org.springframework.amqp.core.BindingBuilder;
 6 import org.springframework.amqp.core.DirectExchange;
 7 import org.springframework.amqp.core.Queue;
 8 import org.springframework.context.annotation.Bean;
 9 import org.springframework.context.annotation.Configuration;
10 
11 import javax.lang.model.element.NestingKind;
12 import java.util.HashMap;
13 import java.util.Map;
14 
15 @Configuration
16 public class QueueDelayConfig {
17 
18     /**
19      * 定义正常的队列、交换机、路由键
20      */
21     public static final String NORMAL_QUEUE="normal-queue";
22     public static final String NORMAL_EXCHANGE="normal-exchange";
23     public static final String NORMAL_ROUTINGKEY="normal-routingkey";
24 
25     /**
26      * 定义死信的队列、交换机、路由键
27      */
28     public static final String DELAY_QUEUE="delay-queue";
29     public static final String DELAY_EXCHANGE="delay-exchange";
30     public static final String DELAY_ROUTINGKEY="delay-routingkey";
31 
32 
33     /**
34      * 定义正常队列
35      * @return
36      */
37     @Bean
38     public Queue normalQueue(){
39         //设定消息过期时间/死信交换机/死信路由键3个参数
40         Map<String, Object> map = new HashMap<String, Object>();
41         map.put("x-message-ttl", 15000);//message在该队列queue的存活时间最大为15秒
42         map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
43         map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键
44 
45         return new Queue(NORMAL_QUEUE, true, false, false, map);
46     }
47 
48     @Bean
49     public DirectExchange normalExchange(){
50         return new DirectExchange(NORMAL_EXCHANGE, true, false);
51     }
52 
53     @Bean
54     public Binding normalRoutingkey(){
55         return BindingBuilder.bind(normalQueue())
56                 .to(normalExchange())
57                 .with(NORMAL_ROUTINGKEY);
58     }
59 
60 
61     /**
62      * 定义死信队列
63      */
64     @Bean
65     public Queue delayQueue(){
66         return new Queue(DELAY_QUEUE, true);
67     }
68 
69     @Bean
70     public DirectExchange delayExchange(){
71         return new DirectExchange(DELAY_EXCHANGE);
72     }
73 
74     @Bean
75     public Binding delayRoutingkey(){
76         return BindingBuilder.bind(delayQueue())
77                 .to(delayExchange())
78                 .with(DELAY_ROUTINGKEY);
79     }
80 
81 
82 
83 
84 }

SendController

 1 package com.yuan.rabbitmqprovider.controller;
 2 
 3 
 4 import com.yuan.rabbitmqprovider.rabbitmq.QueueDelayConfig;
 5 import lombok.extern.slf4j.Slf4j;
 6 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 import org.springframework.web.bind.annotation.RequestMapping;
 9 import org.springframework.web.bind.annotation.RestController;
10 
11 import java.time.LocalDateTime;
12 import java.time.format.DateTimeFormatter;
13 import java.util.HashMap;
14 import java.util.Map;
15 
16 @RestController
17 @Slf4j
18 public class SendController  {
19 
20     @Autowired
21     private RabbitTemplate rabbitTemplate;
22 
23     @RequestMapping("/sender")
24     public Map<String, Object> sender(){
25         Map<String, Object> data = this.createData();
26 
27         rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE,
28                 QueueDelayConfig.NORMAL_ROUTINGKEY,data);
29         Map<String, Object> result = new HashMap<String, Object>();
30     result.put("msg","OK");
31     result.put("code","1");
32     return result;
33     }
34 
35 
36 
37     private Map<String, Object> createData(){
38         Map<String, Object> map = new HashMap<String, Object>();
39 
40         String date = LocalDateTime.now().format(DateTimeFormatter.BASIC_ISO_DATE.
41                 ofPattern("yyyy-MM-dd HH:mm:ss"));
42         map.put("msg","hello rabbitmq!!");
43         map.put("success",true);
44         map.put("createdate", date);
45 
46 
47         return map;
48     }
49 
50 
51 
52 }

最后配置一下yml文件

 1 server:
 2   port: 8081
 3   servlet:
 4     context-path: /rabbitmq-provider
 5 spring:
 6   rabbitmq:
 7     virtual-host: /
 8     username: guest
 9     password: guest
10     host: 192.168.238.129
11     port: 5672

创建消费者模块rabbitmq-consumer

pom.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <parent>
 6         <groupId>com.yuan</groupId>
 7         <artifactId>rabbitmq03</artifactId>
 8         <version>0.0.1-SNAPSHOT</version>
 9     </parent>
10     <artifactId>rabbitmq-consumer</artifactId>
11     <version>0.0.1-SNAPSHOT</version>
12     <name>rabbitmq-consumer</name>
13     <description>子模块-消费者</description>
14     <packaging>jar</packaging>
15 </project>

QueueRecevier

 1 package com.yuan.rabbitmqconsumer.controller;
 2 
 3 import lombok.extern.slf4j.Slf4j;
 4 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 5 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 6 import org.springframework.stereotype.Component;
 7 
 8 import java.util.Map;
 9 
10 @Component
11 @Slf4j
12 @RabbitListener(queues = {"delay-queue"})  //消费端监听队列,如果delay-queue死信队列中有消息过来就会被消费掉
13 public class QueueRecevier {
14 
15     @RabbitHandler
16     public void handlerMessage(Map<String, Object> data){
17         log.info("QueueRecevier.handlerMessage,data={}",data);
18     }
19 
20 
21 
22 
23 }

标红处的log使用需要下载一个插件Lombok

RabbitMQ队列延迟

直接右边install, 然后重启idea

yml文件配置

 1 server:
 2   port: 8082
 3   servlet:
 4     context-path: /rabbitmq-consumer
 5 spring:
 6   rabbitmq:
 7     virtual-host: /
 8     username: guest
 9     password: guest
10     host: 192.168.238.129
11     port: 5672

启动生产者,访问http://localhost:8081/rabbitmq-provider/sender  发送请求。

RabbitMQ队列延迟

生产端推送消息到正常队列等待被消费,我们设定的过期时间是15秒,,,

RabbitMQ队列延迟

RabbitMQ队列延迟

启动消费端,消费端会根据我们设定的监听去监听队列中是否有消息有则会被消费掉。。

RabbitMQ队列延迟

6. json转换

1.生产者
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);//指定json转换器
return rabbitTemplate;
}

2.消费者
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter);
return factory; }

创建公共子模块common-vo

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <parent>
 6         <groupId>com.yuan</groupId>
 7         <artifactId>rabbitmq03</artifactId>
 8         <version>0.0.1-SNAPSHOT</version>
 9     </parent>
10     <artifactId>common-vo</artifactId>
11     <version>0.0.1-SNAPSHOT</version>
12     <name>common-vo</name>
13     <packaging>jar</packaging>
14     <description>公共子模块</description>
15 
16 
17 
18 </project>

创建一个model的Package,创建一个Order

package com.yuan.commonvo.model;

import lombok.Data;

import java.lang.reflect.ParameterizedType;
import java.util.Date;


@Data
public class Order {

    private  long orderId;
    private  String orderNo;
    private Date createdate;


}

vo包下创建一个OrderVo

 

package com.yuan.commonvo.vo;

import com.yuan.commonvo.model.Order;


public class OrderVo extends Order {

}

完了之后在父模块中添加common-vo子模块的一个pom依赖

<modules>
    <module>rabbitmq-provider</module>
    <module>rabbitmq-consumer</module>
    <module>common-vo</module>
  </modules>

<dependency>  <groupId>com.yuan</groupId>  <artifactId>common-vo</artifactId>  <version>0.0.1-SNAPSHOT</version></dependency>

修改生产者SendController

@RequestMapping("/sender")
    public Map<String, Object> sender(){
//        Map<String, Object> data = this.createData();

        OrderVo orderVo = new OrderVo();
        orderVo.setOrderId(1);
        orderVo.setOrderNo("P001");

        rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE,
                QueueDelayConfig.NORMAL_ROUTINGKEY,orderVo);
        Map<String, Object> result = new HashMap<String, Object>();
    result.put("msg","OK");
    result.put("code","1");
    return result;
    }

添加QueueProviderMessageConvert

package com.yuan.rabbitmqprovider.rabbitmq;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class QueueProviderMessageConvert {    @Bean    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){        RabbitTemplate rabbitTemplate=new RabbitTemplate();        rabbitTemplate.setConnectionFactory(connectionFactory);        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());        return  rabbitTemplate;    }    @Bean    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){        return new Jackson2JsonMessageConverter();    }}

修改消费端QueueRecevier

package com.yuan.rabbitmqconsumer.controller;


import com.yuan.commonvo.vo.OrderVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RabbitListener(queues = {"delay-queue"})  //消费端监听队列,如果delay-queue死信队列中有消息过来就会被消费掉
public class QueueRecevier {

    @RabbitHandler
    public void handlerMessage(OrderVo orderVo){
        log.info("QueueRecevier.handlerMessage,data={}",orderVo);
    }




}

添加消费端QueueRecevierMessageConvert

 

package com.yuan.rabbitmqconsumer.controller;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class QueueRecevierMessageConvert {    @Bean    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){        RabbitTemplate rabbitTemplate=new RabbitTemplate();        rabbitTemplate.setConnectionFactory(connectionFactory);        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());        return  rabbitTemplate;    }    @Bean    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){        return new Jackson2JsonMessageConverter();    }}

测试:

RabbitMQ队列延迟

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
thinkcmf+jsapi 实现微信支付
首先从小程序端接收订单号、金额等参数,然后后台进行统一下单,把微信支付的订单号返回,在把订单号发送给前台,前台拉起支付,返回参数后更改支付状态。。。回调publicfunctionnotify(){$wechatDb::name('wechat')where('status',1)find();
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
3年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这