为什么使用延迟消息?
不同于同步消息,有些业务场景下希望可以实现延迟一定时间再消费消息。
典型的场景有微信、支付宝等第三方支付回调接口,会在用户支付后3秒、5秒、30秒等等时间后向应用服务器发送回调请求,确保应用服务器可以正确收到消息。
那有些朋友就会说了,把需要定时处理的数据存到数据库中用定时任务就可以实现,为什么还弄个异步消息。增加后台维护成本。
使用定时任务当然没有问题可以实现该问题。在小数据量情况下没有问题。但当数据量交大的时候怎么办?如果每个任务的延迟时间不同怎么办?
其他方式实现消息队列
名称
实现方式
详细说明
Redis
使用zset数据结构
使用zset的score属性存放执行时间戳,起一个死循环的线程不断的取第一个Key值,如果当前时间戳大于该Key的socre 值时将它取出来消费,注意不需要遍历整个Zset集合,以免造成性能浪费
定时任务
给定周期扫描待处理消息
使用该方式间隔时间不好控制,给短会造成无意义的扫描,增加数据库压力,给长了误差较大
定时任务
动态创建唯一性定时任务
一次性的任务会增加数据库存储,需要定时清理,如相差时间较近的任务较多,也会造成性能较差
时间轮
自定义
自定义一个时间轮的数据结构,启动一个后台线程,延迟一秒,获取时间轮中的任务启动子线程独立执行时间轮的任务
如何选择消息中间件?
中间件
是否原生支持
说明
RocketMQ
支持
不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息。预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
RabbitMQ
不支持
可使用消息的TTL和死信Exchange实现
Kafka
不支持
可使用TimingWheel 实现
AcitveMQ
支持
因自己在使用RabbitMQ做为消息中间件,所以直接选用了RabbitMQ来实现。
实现之前
在实现之前我们先需要知道RabbitMQ以下两个概念。
- TTL(Time To Live)消息过期时间。
消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。
- DLX(Dead-Letter-Exchange)死信交换器。
它的作用其实是用来接收死信消息(dead message)的。
- 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
- 消息过期
- 队列达到最大长度
因为消息如果未被正常消费并设置了requeue为false时会进入死信队列,我们可以监控消费死信队列中消息,来观察和分析系统的问题。
RabbitMQ可以从两种维度设置消息过期时间,分别是队列和消息本身。两种方式哪个时间小先执行哪个。
实现思路
想到有两种实现方式和效果。甚至可以结合使用。
第一种:设定固定几个延迟时间(像RocketMQ中间件)
第二种:实现自定义任意时间延迟
以上两种方式各有优缺点,我自己实现的是第二种,下面详细说明
图中后半段死信路由与应用消费基本相同,只要在消费端绑将一个正常队列与死信路由绑定就行。
/**
* @Author: maomao
* @Date: 2019-09-04 18:34
*/
@Slf4j
@Component
public class FreeCloudMQConsume {
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "free.cloud.out.mq",durable = "true"),
exchange = @Exchange(value = "free.cloud.die.exchange",type = ExchangeTypes.TOPIC),
key = "free.cloud.out.mq.dead.message.#")})
public void print(String message){
log.info("print 5 ---- > {}",message);
}
}
调用方发送消息
/**
* 创建延迟队列,会随指定延迟时间+5秒后删除队列
* @param queueName
* @param delayMillis
* @return
*/
private static Queue createDelayQueue(String queueName, Integer delayMillis) {
/**
* 队列名称 //死信时间 ,死信重新投递的交换机 ,路由到队列的routingKey
*/
String time = String.valueOf(System.currentTimeMillis());
String delayQueueName = queueName + ".delay_" + delayMillis + "_" + time;
return QueueBuilder.durable(delayQueueName)
//设置消息失效时间
.withArgument("x-message-ttl",delayMillis * 1000)
//设置队列自动删除时间 ,比消息延迟时间多5秒
.withArgument("x-expires", (delayMillis + 5) * 1000)
//设置死信路由
.withArgument("x-dead-letter-exchange", "free.cloud.die.exchange")
//设置死信路由routingKey
.withArgument("x-dead-letter-routing-key", queueName + ".dead.message." + time)
.build();
}
/**
* 发送延迟消息
* @param queueName
* @param message
* @param delayMillis
*/
public static void sendDelayMessage(String queueName,Object message,Integer delayMillis){
//死信消息队列(动态创建,会销毁)
Queue delayQueue = createDelayQueue(queueName, delayMillis);
//创建队列
addQueue(delayQueue);
//延迟消息路由Key
StringBuilder delayRoutingKey = new StringBuilder(queueName + ".delay");
delayRoutingKey.append(".").append(message.hashCode() + "_" + RandomUtil.randomString(5));
//绑定延迟路由
RabbitMqUtil.addBinding(delayQueue,delayExchange,delayRoutingKey.toString());
getRabbitTemplate().convertAndSend("free.cloud.delay.exchange",delayRoutingKey.toString(),message);
}
以上是自定义延迟消息的关键实现代码,完整代码可以 点击这里 获取
效果