RabbitMQ延迟消息发送

Stella981
• 阅读 491

为什么使用延迟消息?

不同于同步消息,有些业务场景下希望可以实现延迟一定时间再消费消息。

典型的场景有微信、支付宝等第三方支付回调接口,会在用户支付后3秒、5秒、30秒等等时间后向应用服务器发送回调请求,确保应用服务器可以正确收到消息。

那有些朋友就会说了,把需要定时处理的数据存到数据库中用定时任务就可以实现,为什么还弄个异步消息。增加后台维护成本。

使用定时任务当然没有问题可以实现该问题。在小数据量情况下没有问题。但当数据量交大的时候怎么办?如果每个任务的延迟时间不同怎么办?

其他方式实现消息队列

名称

实现方式

详细说明

Redis

使用zset数据结构

使用zset的score属性存放执行时间戳,起一个死循环的线程不断的取第一个Key值,如果当前时间戳大于该Key的socre 值时将它取出来消费,注意不需要遍历整个Zset集合,以免造成性能浪费

定时任务

给定周期扫描待处理消息

使用该方式间隔时间不好控制,给短会造成无意义的扫描,增加数据库压力,给长了误差较大

定时任务

动态创建唯一性定时任务

一次性的任务会增加数据库存储,需要定时清理,如相差时间较近的任务较多,也会造成性能较差

时间轮

自定义

自定义一个时间轮的数据结构,启动一个后台线程,延迟一秒,获取时间轮中的任务启动子线程独立执行时间轮的任务

如何选择消息中间件?

中间件

是否原生支持

说明

RocketMQ

支持

不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息。预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

RabbitMQ

不支持

可使用消息的TTL和死信Exchange实现

Kafka

不支持

可使用TimingWheel 实现

AcitveMQ

支持

因自己在使用RabbitMQ做为消息中间件,所以直接选用了RabbitMQ来实现。

实现之前

在实现之前我们先需要知道RabbitMQ以下两个概念。

  • TTL(Time To Live)消息过期时间。

消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。

  • DLX(Dead-Letter-Exchange)死信交换器。

它的作用其实是用来接收死信消息(dead message)的。

  1. 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
  2. 消息过期
  3. 队列达到最大长度

因为消息如果未被正常消费并设置了requeue为false时会进入死信队列,我们可以监控消费死信队列中消息,来观察和分析系统的问题。

RabbitMQ可以从两种维度设置消息过期时间,分别是队列和消息本身。两种方式哪个时间小先执行哪个。

实现思路

想到有两种实现方式和效果。甚至可以结合使用。

第一种:设定固定几个延迟时间(像RocketMQ中间件)

RabbitMQ延迟消息发送

第二种:实现自定义任意时间延迟

RabbitMQ延迟消息发送

以上两种方式各有优缺点,我自己实现的是第二种,下面详细说明

图中后半段死信路由与应用消费基本相同,只要在消费端绑将一个正常队列与死信路由绑定就行。

/**
 * @Author: maomao
 * @Date: 2019-09-04 18:34
 */
@Slf4j
@Component
public class FreeCloudMQConsume {
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "free.cloud.out.mq",durable = "true"),
                                 exchange = @Exchange(value = "free.cloud.die.exchange",type = ExchangeTypes.TOPIC),
                                 key = "free.cloud.out.mq.dead.message.#")})
    public void print(String message){
        log.info("print 5 ---- > {}",message);
    }
}

调用方发送消息

/**
     * 创建延迟队列,会随指定延迟时间+5秒后删除队列
     * @param queueName
     * @param delayMillis
     * @return
     */
    private static Queue createDelayQueue(String queueName, Integer delayMillis) {
        /**
         * 队列名称  //死信时间 ,死信重新投递的交换机 ,路由到队列的routingKey
         */
        String time = String.valueOf(System.currentTimeMillis());
        String delayQueueName = queueName + ".delay_" + delayMillis + "_" + time;
        return QueueBuilder.durable(delayQueueName)
                //设置消息失效时间
                .withArgument("x-message-ttl",delayMillis * 1000)
                //设置队列自动删除时间 ,比消息延迟时间多5秒
                .withArgument("x-expires", (delayMillis + 5) * 1000)
                //设置死信路由
                .withArgument("x-dead-letter-exchange", "free.cloud.die.exchange")
                //设置死信路由routingKey
                .withArgument("x-dead-letter-routing-key", queueName + ".dead.message." + time)
                .build();
    }

    /**
     * 发送延迟消息
     * @param queueName
     * @param message
     * @param delayMillis
     */
    public static void sendDelayMessage(String queueName,Object message,Integer delayMillis){
        //死信消息队列(动态创建,会销毁)
        Queue delayQueue = createDelayQueue(queueName, delayMillis);
        //创建队列
        addQueue(delayQueue);
        //延迟消息路由Key
        StringBuilder delayRoutingKey = new StringBuilder(queueName + ".delay");
        delayRoutingKey.append(".").append(message.hashCode() + "_" + RandomUtil.randomString(5));
        //绑定延迟路由
        RabbitMqUtil.addBinding(delayQueue,delayExchange,delayRoutingKey.toString());
        getRabbitTemplate().convertAndSend("free.cloud.delay.exchange",delayRoutingKey.toString(),message);
    }

以上是自定义延迟消息的关键实现代码,完整代码可以 点击这里 获取

效果 RabbitMQ延迟消息发送 RabbitMQ延迟消息发送 RabbitMQ延迟消息发送 RabbitMQ延迟消息发送 RabbitMQ延迟消息发送

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Easter79 Easter79
3年前
thinkcmf+jsapi 实现微信支付
首先从小程序端接收订单号、金额等参数,然后后台进行统一下单,把微信支付的订单号返回,在把订单号发送给前台,前台拉起支付,返回参数后更改支付状态。。。回调publicfunctionnotify(){$wechatDb::name('wechat')where('status',1)find();
Stella981 Stella981
3年前
Spring Boot(十四)RabbitMQ延迟队列
一、前言延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。实现延迟队列的方式有两种:1.通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;2.使用rabbitmqdelayed
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
RabbitMQ如何保证队列里的消息99.99%被消费?
1\.本篇概要其实,还有1种场景需要考虑:当消费者接收到消息后,还没处理完业务逻辑,消费者挂掉了,那消息也算丢失了?,比如用户下单,订单中心发送了1个消息到RabbitMQ里的队列,积分中心收到这个消息,准备给这个下单的用户增加20积分,但积分还没增加成功呢,积分中心自己挂掉了,导致数据出现问题。那么如何解
Wesley13 Wesley13
3年前
JMS消息的概念解释
1、默认生产者消息是持久的:会存数据库\消费者的持久:createDurableSubscriber是指消费者能收到所有它订阅时间点之后的消息,即使消费者注册后关闭,当它重启就能收到注册时间点之后所有的消息;即当此消费用户ID(AAA)在producer发送之前就已经注册,那么此id能收到producer发送的所有消息,如果是在produce
Stella981 Stella981
3年前
Spring Boot与RabbitMQ结合实现延迟队列的示例
背景何为延迟队列?顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。场景二:用户希望通过手机远程遥控
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这