ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message

Wesley13
• 阅读 586

有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。

类似这种需求,ActiveMQ提供了一种broker端消息定时调度机制。

我们只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息。

一共有四个属性:

Property name

type

description

AMQ_SCHEDULED_DELAY

long

延迟投递的时间

AMQ_SCHEDULED_PERIOD

long

重复投递的时间间隔

AMQ_SCHEDULED_REPEAT

int

重复投递次数

AMQ_SCHEDULED_CRON

String

Cron表达式

当然ActiveMQ也提供了一个封装的消息类型:_org.apache.activemq.ScheduledMessage_.

使用示例,延迟60秒:

        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");        long time = 60 * 1000;
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
        producer.send(message);

延迟30秒,投递10次,间隔10秒:

        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");        long delay = 30 * 1000;        long period = 10 * 1000;        int repeat = 9;
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
        producer.send(message);

使用 CRON 表达式的例子:

        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
        producer.send(message);

CRON表达式的优先级高于另外三个参数,如果在设置了CRON的同时,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔为period。就是说设置是叠加的效果。例如每小时都会发生消息被投递10次,延迟1秒开始,每次间隔1秒:

        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
        producer.send(message);
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
九鹤 九鹤
3年前
RabbitMq 的高级特性
消息可靠性当作为消息的投递着不希望,任何消息投递失败或者消息丢失rabbitmq提供了两种方式来复制投递失败,确保消息的可靠性confirm确认模式return退回模式消息从投递者到product到交换机(exchange)返回一个confirmCallback(不管投递是否成功)都会执行这个回调函数,只是返回的布尔值不一样)exchange到queue
Wesley13 Wesley13
3年前
ActiveMQ
前言JMS的消息确认模式,定义了客户端(消息发送者或者消费者)与broker确认消息的方式,可以认为是客户端与Broker之间建立一种简单的“担保”机制。在java的JMS标准中,javax.jms.Session包定义了4种消息确认模式,分别是:\\AUTO\_ACKNOWLEDGE:\\自动确认\\
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
Android消息循环分析
我们的常用的系统中,程序的工作通常是有事件驱动和消息驱动两种方式,在Android系统中,Java应用程序是靠消息驱动来工作的。消息驱动的原理就是:1\.有一个消息队列,可以往这个队列中投递消息;2\.有一个消息循环,不断从消息队列中取出消息,然后进行处理。在Android中通过Looper来封装消息循环,同时在其中封装了一个消息队
Stella981 Stella981
3年前
RabbitMQ小技巧
导读在使用RabbitMQ消息中间件时,因为消息的投递是异步的,默认情况下,RabbitMQ会删除那些无法路由的消息。为了能够检出消息是否顺利投递到队列,我们需要相应的处理机制。今天就来验证一下相关的验证机制。!RabbitMQ小技巧确定消息投递情况RabbitMQ小技巧确定消息投递情况(https://imgblog.csdnim
Wesley13 Wesley13
3年前
ActiveMQ消息传送机制以及ACK机制详解
 AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的。一.ActiveMQ消息传送机制  Producer客户端使用来发送消息的,Consumer客户端用来消费消息;它们的协同中心就是ActiveMQbr
Wesley13 Wesley13
3年前
IM消息送达保证机制实现(二):保证离线消息的可靠投递
1、前言本文的上篇《IM消息送达保证机制实现(一):保证在线实时消息的可靠投递(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fwww.52im.net%2Fthread29411.html)》中,我们讨论了在线实时消息的投递可以通过应用层的确认、发送方的超时重传、接收
Stella981 Stella981
3年前
Noark入门之协议映射
0x00消息控制器消息控制器,主要作用就是为每个模块提供消息处理的入口.这里的消息不仅仅是协议,还有内部指令,事件等等逻辑入口,这也是为了响应线程模型作出的一种支撑,只要入口在此消息控制器内,那必然走期望的线程调度。@Controller用于标识一个类为当前模块的消息控制器入口.@Controller(threadGroup
Stella981 Stella981
3年前
RabbitMQ 如何保证消息的可靠性
一条消费成功被消费经历了生产者MQ消费者,因此在这三个步骤中都有可能造成消息丢失。一消息生产者没有把消息成功发送到MQ1.1事务机制AMQP协议提供了事务机制,在投递消息时开启事务支持,如果消息投递失败,则回滚事务。自定义事务管理器@Configuration