RabbitMq动态添加监听

Stella981
• 阅读 878

昨天研究了一下RabbitMQ 想做一个动态添加监听的功能

依靠springboot 实现起来也简单

就2个类 1个主类 实现动态添加队列及绑定关系、动态添加监听、动态调整监听线程池大小、动态删除队列、动态取消监听、发送动态队列的消息。

还有个类就是自定义消费者 都是采用string接收参数,后面可以采用指定统一对象,然后用个type字段区分消息类型,再用策略模式分开处理。

原理就是注入rabbitTemplate、rabbitAdmin(所以顶部一定要加@Component),有了rabbitAdmin就可以动态声明交换机、路由、队列。

有了这3样以后就是添加监听了(就是新增消费者),采用DirectMessageListenerContainer 实现监听(推荐用DirectMessageListenerContainer,关键就是setConsumersPerQueue的设置,消费线程都是给线程池管理的,减少了创建线程的开销。还有个SimpleMessageListenerContainer实现,不推荐使用,区别参考https://blog.csdn.net/yingziisme/article/details/86418580)。这里还用了个map保存container方便,因为动态创建队列有N个,就不用bean的方式了。

这里采用的TopicExchange(以前看篇文章记得是direct性能好于topic,8C16G大概10W比6w的样子,但是少了灵活性,topic的队列可以模糊监听不同的routingkey消息)

更新:今天新增了死信队列、重试机制

package com.xx.rabbitmq; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.retry.interceptor.RetryOperationsInterceptor; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; _/** _ * _动态__MQ _ * _实现动态添加队列及绑定关系、动态添加监听 _ * _动态调整监听线程池大小 _ * _动态删除队列、动态取消监听 _ * _发送动态队列的消息 _ _* _ * @author _tiancong _ * @date _2020/11/26 15:55 _ */ @Component @Data @Slf4j public class DynamicRabbitMq { @Autowired private RabbitTemplate rabbitTemplate; @Autowired @Lazy private RabbitAdmin rabbitAdmin; _/** _ * _动态交换机默认名称 _ _*/ _ private static final String EXCHANGE = "yd.dynamic.exchange"; _/** _ * _默认死信交换机名称 _ _*/ _ private static final String DLX_EXCHANGE = "yd.dynamic.exchange.dlx"; _/** _ * _队列前缀 _ _*/ _ private static final String QUEUE_PREFIX = "yd.dynamic.queue."; _/** _ * _默认死信队列名称 _ _*/ _ private static final String DLX_QUEUE = "yd.dynamic.queue.dlx"; _/** _ * _默认死信队列名称 _ _*/ _ private static final String DLX_ROUTING = "yd.dynamic.routing.dlx"; private static final Map<String, DirectMessageListenerContainer> CONTAINER_MAP = new ConcurrentHashMap<>(8); _/** _ * _动态添加队列及绑定关系 _ _* _ * @param queueName _队列名 _ * @param exchange _交换机名 _ * @param routingKey _路由名 _ * @param needDlx _需要死信队列 _ _*/ _ public void addQueueAndExchange(String queueName, String exchange, String routingKey, boolean needDlx) { queueName = getFullQueueName(queueName); Queue queue = new Queue(queueName); if (needDlx) { Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); arguments.put("x-dead-letter-routing-key", DLX_ROUTING); queue = new Queue(queueName, true, false, false, arguments); QueueInformation queueInfo = rabbitAdmin.getQueueInfo(DLX_QUEUE); if (queueInfo == null) { Queue dlxQueue = new Queue(DLX_QUEUE); DirectExchange dlxDirectExchange = new DirectExchange(DLX_EXCHANGE); rabbitAdmin.declareQueue(dlxQueue); rabbitAdmin.declareExchange(dlxDirectExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(dlxQueue).to(dlxDirectExchange).with(DLX_ROUTING)); log.info("创建死信队[{}]列成功", DLX_QUEUE); } } TopicExchange topicExchange = new TopicExchange(exchange); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(routingKey)); }

_/\*\*

_ * _动态删除队列(队列有消息时不删除) _ _* _ * @param queueName _队列名 _ _*/ _ public void deleteQueue(String queueName) { queueName = getFullQueueName(queueName); if (Objects.requireNonNull(rabbitAdmin.getQueueInfo(queueName)).getMessageCount() == 0) { rabbitAdmin.deleteQueue(queueName); log.info("成功删除mq队列{}", queueName); } else { log.info("mq队列[{}]里还有消息。不做删除操作", queueName); } }

_/\*\*

_ * _动态添加队列监听 _ _* _ * @param queueName _队列名 _ * @param routingKey _路由名 _ _*/ _ public void startListener(String queueName, String routingKey) { startListener(queueName, routingKey, 1, false); }

_/\*\*

_ * _动态添加队列监听及修改消费者线程池大小 _ _* _ * @param queueName _队列名 _ * @param routingKey _路由名 _ * @param consumerNum _消费者线程数量 _ * @param needDlx _需要死信队列 _ _*/ _ public void startListener(String queueName, String routingKey, int consumerNum, boolean needDlx) { queueName = getFullQueueName(queueName); addQueueAndExchange(queueName, EXCHANGE, routingKey, needDlx); DirectMessageListenerContainer container = new DirectMessageListenerContainer(rabbitTemplate.getConnectionFactory()); DirectMessageListenerContainer getContainer = CONTAINER_MAP.putIfAbsent(queueName, container); if (getContainer != null) { log.info("动态修改mq监听成功,交换机:{},路由key:{},队列:{},线程数:{}", EXCHANGE, routingKey, queueName, consumerNum); container = getContainer; } else { container.setQueueNames(queueName); log.info("动态添加mq监听成功,交换机:{},路由key:{},队列:{},线程数:{}", EXCHANGE, routingKey, queueName, consumerNum); } container.setPrefetchCount(consumerNum); if (needDlx) { container.setAcknowledgeMode(AcknowledgeMode.AUTO); } else { container.setAcknowledgeMode(AcknowledgeMode.MANUAL); } container.setConsumersPerQueue(consumerNum); container.setMessageListener(new ConsumerHandler(!needDlx)); container.setAdviceChain(createRetry()); container.setDefaultRequeueRejected(false); container.start(); }

_/\*\*

_ * _动态停止监听并删除队列 _ _* _ * @param queueName _队列名 _ _*/ _ public void stopListener(String queueName) { queueName = getFullQueueName(queueName); DirectMessageListenerContainer container = CONTAINER_MAP.get(queueName); if (container != null) { container.stop(); container.destroy(); CONTAINER_MAP.remove(queueName); } log.info("停止监听mq队列{}", queueName); deleteQueue(queueName); }

_/\*\*

_ * _发送动态队列的消息 _ _* _ * @param routingKey _路由名 _ * @param data _数据 _ _*/ _ public void sendMsg(String routingKey, String data) { rabbitTemplate.convertAndSend(EXCHANGE, routingKey, data); }

_/\*\*

_ * _获取队列名全称 _ _* _ * @param queueName _队列名 _ * @return _全称 _ _*/ _ private String getFullQueueName(String queueName) { if (queueName.startsWith(QUEUE_PREFIX)) { return queueName; } return QUEUE_PREFIX + queueName; }

_/\*\*

_ * _重试机制 _ _* _ * _@return _ _*/ _ private RetryOperationsInterceptor createRetry() { return RetryInterceptorBuilder .stateless() //重试次数 .maxAttempts(3) //重试间隔 指数递增时间参数 最大间隔时间 .backOffOptions(1000, 3, 5000) //次数用完之后的处理,用的是默认处理类,失败消息会到死信 .recoverer(new RejectAndDontRequeueRecoverer()) .build(); } }

package com.xx.rabbitmq; import com.rabbitmq.client.Channel; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; _/** _ * @author _tiancong _ * @date _2020/11/25 15:57 _ */ @Slf4j @Data @AllArgsConstructor public class ConsumerHandler implements ChannelAwareMessageListener {

_/\*\*

_ * _是否需要回应 _ _*/ _ private final Boolean needAck; _/** _ * _接收消息 _ _* _ * @param _message _ * @param _channel _ * @throws _Exception _ */ @Override public void onMessage(Message message, Channel channel) throws Exception { int flag = (int) message.getMessageProperties().getHeaders().getOrDefault("retryCount", 0); flag++; if (flag > 1) { log.info("此消息第{}次执行", flag); } message.getMessageProperties().setHeader("retryCount", flag); String data = new String(message.getBody()); log.info("[{}]收到mq消息: {}", message.getMessageProperties().getConsumerQueue(), data); if (getNeedAck()) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { handleMessage(data); channel.basicAck(deliveryTag, false); } catch (Exception e) { channel.basicNack(deliveryTag, false, true); } } else { handleMessage(data); } }

_/\*\*

_ * _处理消息 _ _* _ * @param data _消息体 _ _*/ _ public void handleMessage(String data) {

}

}

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
SpringBoot整合多个RabbitMQ
一、背景​最近项目中需要用到了RabbitMQ来监听消息队列,监听的消息队列的虚拟主机(virtualHost)和队列名(queueName)是不一致的,但是接收到的消息格式相同的。而且可能还存在程序不停机的情况下,动态的增加新的队列(queue)的监听,因此就需要我们自己在程序中实现一种方法实现动态配置RabbitMQ
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Easter79 Easter79
3年前
SpringBoot整合多个RabbitMQ
一、背景​最近项目中需要用到了RabbitMQ来监听消息队列,监听的消息队列的虚拟主机(virtualHost)和队列名(queueName)是不一致的,但是接收到的消息格式相同的。而且可能还存在程序不停机的情况下,动态的增加新的队列(queue)的监听,因此就需要我们自己在程序中实现一种方法实现动态配置RabbitMQ
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这