Rabbitmq 延迟队列实现定时任务,实战

Stella981
• 阅读 522

点击上方“Java专栏”,选择“置顶或者星标”

第一时间阅读精彩文章!

1、☞ 程序员进阶必备资源免费送「21种技术方向!」 点击查看☜

2、☞ 《Java面试手册》.PDF    点击查看

场景

开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期、订单定时关闭、微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列、基于优先级队列的JDK延迟队列、时间轮等。因为我们项目中本身就使用到了Rabbitmq,所以基于方便开发和维护的原则,我们使用了Rabbitmq延迟队列来实现定时任务,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看我之前的文章Spring boot集成RabbitMQ

Rabbitmq延迟队列

Rabbitmq本身是没有延迟队列的,只能通过Rabbitmq本身队列的特性来实现,想要Rabbitmq实现延迟队列,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)

死信交换机

一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。

一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

上面的消息的TTL到了,消息过期了。

队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机

消息TTL(消息存活时间)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

 1.  `byte[] messageBodyBytes = "Hello, world!".getBytes();` 

 
   
 
 
 2.  `AMQP.BasicProperties properties = new AMQP.BasicProperties();` 

 
   
 
 
 3.  `properties.setExpiration("60000");` 

 
   
 
 
 4.  `channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);` 


  





 

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去

处理流程图

Rabbitmq 延迟队列实现定时任务,实战

创建交换机(Exchanges)和队列(Queues)

创建死信交换机

Rabbitmq 延迟队列实现定时任务,实战 如图所示,就是创建一个普通的交换机,这里为了方便区分,把交换机的名字取为:delay

创建自动过期消息队列

这个队列的主要作用是让消息定时过期的,比如我们需要2小时候关闭订单,我们就需要把消息放进这个队列里面,把消息过期时间设置为2小时Rabbitmq 延迟队列实现定时任务,实战 创建一个一个名为delay_queue1的自动过期的队列,当然图片上面的参数并不会让消息自动过期,因为我们并没有设置x-message-ttl参数,如果整个队列的消息有消息都是相同的,可以设置,这里为了灵活,所以并没有设置,另外两个参数x-dead-letter-exchange代表消息过期后,消息要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,跟发送消息的routing-key一个道理,根据这个key将消息放入不同的队列

创建消息处理队列

这个队列才是真正处理消息的队列,所有进入这个队列的消息都会被处理Rabbitmq 延迟队列实现定时任务,实战 消息队列的名字为delay_queue2

消息队列绑定到交换机

进入交换机详情页面,将创建的2个队列(delay_queue1和delay_queue2)绑定到交换机上面Rabbitmq 延迟队列实现定时任务,实战 自动过期消息队列的routing key 设置为delay

绑定delay_queue2Rabbitmq 延迟队列实现定时任务,实战 delay_queue2 的key要设置为创建自动过期的队列的x-dead-letter-routing-key参数,这样当消息过期的时候就可以自动把消息放入delay_queue2这个队列中了

绑定后的管理页面如下图:Rabbitmq 延迟队列实现定时任务,实战

当然这个绑定也可以使用代码来实现,只是为了直观表现,所以本文使用的管理平台来操作

发送消息

 1.  `String msg = "hello word";` 

 
   
 
 
 2.  `MessageProperties messageProperties = new MessageProperties();` 

 
   
 
 
 3.   `messageProperties.setExpiration("6000");`

 
   
 
 
 4.   `messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());`

 
   
 
 
 5.   `Message message = new Message(msg.getBytes(), messageProperties);`

 
   
 
 
 6.   `rabbitTemplate.convertAndSend("delay", "delay",message);`


  





 

主要的代码就是

 1.  `messageProperties.setExpiration("6000");` 


  





 

设置了让消息6秒后过期

注意:因为要让消息自动过期,所以一定不能设置delay_queue1的监听,不能让这个队列里面的消息被接受到,否则消息一旦被消费,就不存在过期了

接收消息

接收消息配置好delay_queue2的监听就好了

 1.  `package wang.raye.rabbitmq.demo1;`

 
   
 
 
 2.  `import org.springframework.amqp.core.AcknowledgeMode;` 

 
   
 
 
 3.  `import org.springframework.amqp.core.Binding;` 

 
   
 
 
 4.  `import org.springframework.amqp.core.BindingBuilder;` 

 
   
 
 
 5.  `import org.springframework.amqp.core.DirectExchange;` 

 
   
 
 
 6.  `import org.springframework.amqp.core.Message;` 

 
   
 
 
 7.  `import org.springframework.amqp.core.Queue;` 

 
   
 
 
 8.  `import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;` 

 
   
 
 
 9.  `import org.springframework.amqp.rabbit.connection.ConnectionFactory;` 

 
   
 
 
 10.  `import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;` 

 
   
 
 
 11.  `import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;` 

 
   
 
 
 12.  `import org.springframework.beans.factory.annotation.Autowired;` 

 
   
 
 
 13.  `import org.springframework.context.annotation.Bean;` 

 
   
 
 
 14.  `import org.springframework.context.annotation.Configuration;`

 
   
 
 
 15.  `@Configuration`

 
   
 
 
 16.  `public class DelayQueue {` 

 
   
 
 
 17.   `/** 消息交换机的名字*/`

 
   
 
 
 18.   `public static final String EXCHANGE = "delay";`

 
   
 
 
 19.   `/** 队列key1*/`

 
   
 
 
 20.   `public static final String ROUTINGKEY1 = "delay";`

 
   
 
 
 21.   `/** 队列key2*/`

 
   
 
 
 22.   `public static final String ROUTINGKEY2 = "delay_key";`

 
   
 
 
 23.   `/**`

 
   
 
 
 24.   `* 配置链接信息`

 
   
 
 
 25.   `* @return`

 
   
 
 
 26.   `*/`

 
   
 
 
 27.   `@Bean`

 
   
 
 
 28.   `public ConnectionFactory connectionFactory() {`

 
   
 
 
 29.   `CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);`

 
   
 
 
 30.   `connectionFactory.setUsername("kberp");`

 
   
 
 
 31.   `connectionFactory.setPassword("kberp");`

 
   
 
 
 32.   `connectionFactory.setVirtualHost("/");`

 
   
 
 
 33.   `connectionFactory.setPublisherConfirms(true); // 必须要设置`

 
   
 
 
 34.   `return connectionFactory;`

 
   
 
 
 35.   `}`

 
   
 
 
 36.   `/**` 

 
   
 
 
 37.   `* 配置消息交换机`

 
   
 
 
 38.   `* 针对消费者配置` 

 
   
 
 
 39.   `FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念` 

 
   
 
 
 40.   `HeadersExchange :通过添加属性key-value匹配` 

 
   
 
 
 41.   `DirectExchange:按照routingkey分发到指定队列` 

 
   
 
 
 42.   `TopicExchange:多关键字匹配` 

 
   
 
 
 43.   `*/` 

 
   
 
 
 44.   `@Bean` 

 
   
 
 
 45.   `public DirectExchange defaultExchange() {` 

 
   
 
 
 46.   `return new DirectExchange(EXCHANGE, true, false);`

 
   
 
 
 47.   `}` 

 
   
 
 
 48.   `/**`

 
   
 
 
 49.   `* 配置消息队列2`

 
   
 
 
 50.   `* 针对消费者配置` 

 
   
 
 
 51.   `* @return`

 
   
 
 
 52.   `*/`

 
   
 
 
 53.   `@Bean`

 
   
 
 
 54.   `public Queue queue() {` 

 
   
 
 
 55.   `return new Queue("delay_queue2", true); //队列持久` 

 
   
 
 
 56.   `}`

 
   
 
 
 57.   `/**`

 
   
 
 
 58.   `* 将消息队列2与交换机绑定`

 
   
 
 
 59.   `* 针对消费者配置` 

 
   
 
 
 60.   `* @return`

 
   
 
 
 61.   `*/`

 
   
 
 
 62.   `@Bean` 

 
   
 
 
 63.   `@Autowired`

 
   
 
 
 64.   `public Binding binding() {` 

 
   
 
 
 65.   `return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);` 

 
   
 
 
 66.   `}` 

 
   
 
 
 67.   `/**`

 
   
 
 
 68.   `* 接受消息的监听,这个监听会接受消息队列1的消息`

 
   
 
 
 69.   `* 针对消费者配置` 

 
   
 
 
 70.   `* @return`

 
   
 
 
 71.   `*/`

 
   
 
 
 72.   `@Bean` 

 
   
 
 
 73.   `@Autowired`

 
   
 
 
 74.   `public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {` 

 
   
 
 
 75.   `SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());` 

 
   
 
 
 76.   `container.setQueues(queue());` 

 
   
 
 
 77.   `container.setExposeListenerChannel(true);` 

 
   
 
 
 78.   `container.setMaxConcurrentConsumers(1);` 

 
   
 
 
 79.   `container.setConcurrentConsumers(1);` 

 
   
 
 
 80.   `container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认` 

 
   
 
 
 81.   `container.setMessageListener(new ChannelAwareMessageListener() {`

 
   
 
 
 82.   `public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {`

 
   
 
 
 83.   `byte[] body = message.getBody();` 

 
   
 
 
 84.   `System.out.println("delay_queue2 收到消息 : " + new String(body));` 

 
   
 
 
 85.   `channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费` 

 
   
 
 
 86.   `}` 

 
   
 
 
 87.   `});` 

 
   
 
 
 88.   `return container;` 

 
   
 
 
 89.   `}` 

 
   
 
 
 90.  `}`


  





 

在消息监听中处理需要定时处理的任务就好了,因为Rabbitmq能发送消息,所以可以把任务特征码发过来,比如关闭订单就把订单id发过来,这样就避免了需要查询一下那些订单需要关闭而加重MySQL负担了,毕竟一旦订单量大的话,查询本身也是一件很费IO的事情

总结

基于Rabbitmq实现定时任务,就是将消息设置一个过期时间,放入一个没有读取的队列中,让消息过期后自动转入另外一个队列中,监控这个队列消息的监听处来处理定时任务具体的操作

                                                  以上,便是今天的分享,希望大家喜欢,觉得内容不错的,欢迎点击「在看」支持,谢谢各位
                                                 
                                                   

                                                  
                                                  
                                                  
                                                  
                                                    
 
                                                   
                                                   
                                                   
                                                   
                                                     
  
                                                    
                                                    
                                                    
                                                    
                                                      
   
                                                     
                                                     
                                                     
                                                     
                                                       
    
                                                      
                                                      
                                                       
                                                        
                                                         
                                                         
                                                         
                                                          
                                                           
                                                            
                                                             
                                                              
                                                               
                                                                
                                                               
                                                              
                                                             
                                                            
                                                           
                                                          
                                                         
                                                         
                                                          
                                                         
                                                         
                                                          
                                                         
                                                         
                                                         喜欢文章,点个 
                                                         在看 
                                                           
                                                          
                                                         
                                                        
                                                      
                                                    
                                                      
   
                                                     
                                                     
                                                     
                                                   
                                                     
  
                                                    
                                                    
                                                    
                                                  
                                                    
 
                                                   
                                                   
                                                   
                                                 
                                                   

                                                  
                                                  
                                                  

本文分享自微信公众号 - Java专栏(finishbug)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
3年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Stella981 Stella981
3年前
200的大额人民币即将面世?央行:Yes!
点击上方蓝字关注我们!(https://oscimg.oschina.net/oscnet/2a1c2ac00bf54458a78c48a6c2e547d5.png)点击上方“印象python”,选择“星标”公众号重磅干货,第一时间送达!!(
可莉 可莉
3年前
200的大额人民币即将面世?央行:Yes!
点击上方蓝字关注我们!(https://oscimg.oschina.net/oscnet/2a1c2ac00bf54458a78c48a6c2e547d5.png)点击上方“印象python”,选择“星标”公众号重磅干货,第一时间送达!!(
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这