昨天研究了一下RabbitMQ 想做一个动态添加监听的功能
依靠springboot 实现起来也简单
就2个类 1个主类 实现动态添加队列及绑定关系、动态添加监听、动态调整监听线程池大小、动态删除队列、动态取消监听、发送动态队列的消息。
还有个类就是自定义消费者 都是采用string接收参数,后面可以采用指定统一对象,然后用个type字段区分消息类型,再用策略模式分开处理。
原理就是注入rabbitTemplate、rabbitAdmin(所以顶部一定要加@Component),有了rabbitAdmin就可以动态声明交换机、路由、队列。
有了这3样以后就是添加监听了(就是新增消费者),采用DirectMessageListenerContainer 实现监听(推荐用DirectMessageListenerContainer,关键就是setConsumersPerQueue的设置,消费线程都是给线程池管理的,减少了创建线程的开销。还有个SimpleMessageListenerContainer实现,不推荐使用,区别参考https://blog.csdn.net/yingziisme/article/details/86418580)。这里还用了个map保存container方便,因为动态创建队列有N个,就不用bean的方式了。
这里采用的TopicExchange(以前看篇文章记得是direct性能好于topic,8C16G大概10W比6w的样子,但是少了灵活性,topic的队列可以模糊监听不同的routingkey消息)
更新:今天新增了死信队列、重试机制
package com.xx.rabbitmq; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.retry.interceptor.RetryOperationsInterceptor; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; _/** _ * _动态__MQ _ * _实现动态添加队列及绑定关系、动态添加监听 _ * _动态调整监听线程池大小 _ * _动态删除队列、动态取消监听 _ * _发送动态队列的消息 _ _* _ * @author _tiancong _ * @date _2020/11/26 15:55 _ */ @Component @Data @Slf4j public class DynamicRabbitMq { @Autowired private RabbitTemplate rabbitTemplate; @Autowired @Lazy private RabbitAdmin rabbitAdmin; _/** _ * _动态交换机默认名称 _ _*/ _ private static final String EXCHANGE = "yd.dynamic.exchange"; _/** _ * _默认死信交换机名称 _ _*/ _ private static final String DLX_EXCHANGE = "yd.dynamic.exchange.dlx"; _/** _ * _队列前缀 _ _*/ _ private static final String QUEUE_PREFIX = "yd.dynamic.queue."; _/** _ * _默认死信队列名称 _ _*/ _ private static final String DLX_QUEUE = "yd.dynamic.queue.dlx"; _/** _ * _默认死信队列名称 _ _*/ _ private static final String DLX_ROUTING = "yd.dynamic.routing.dlx"; private static final Map<String, DirectMessageListenerContainer> CONTAINER_MAP = new ConcurrentHashMap<>(8); _/** _ * _动态添加队列及绑定关系 _ _* _ * @param queueName _队列名 _ * @param exchange _交换机名 _ * @param routingKey _路由名 _ * @param needDlx _需要死信队列 _ _*/ _ public void addQueueAndExchange(String queueName, String exchange, String routingKey, boolean needDlx) { queueName = getFullQueueName(queueName); Queue queue = new Queue(queueName); if (needDlx) { Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); arguments.put("x-dead-letter-routing-key", DLX_ROUTING); queue = new Queue(queueName, true, false, false, arguments); QueueInformation queueInfo = rabbitAdmin.getQueueInfo(DLX_QUEUE); if (queueInfo == null) { Queue dlxQueue = new Queue(DLX_QUEUE); DirectExchange dlxDirectExchange = new DirectExchange(DLX_EXCHANGE); rabbitAdmin.declareQueue(dlxQueue); rabbitAdmin.declareExchange(dlxDirectExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(dlxQueue).to(dlxDirectExchange).with(DLX_ROUTING)); log.info("创建死信队[{}]列成功", DLX_QUEUE); } } TopicExchange topicExchange = new TopicExchange(exchange); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(routingKey)); }
_/\*\*
_ * _动态删除队列(队列有消息时不删除) _ _* _ * @param queueName _队列名 _ _*/ _ public void deleteQueue(String queueName) { queueName = getFullQueueName(queueName); if (Objects.requireNonNull(rabbitAdmin.getQueueInfo(queueName)).getMessageCount() == 0) { rabbitAdmin.deleteQueue(queueName); log.info("成功删除mq队列{}", queueName); } else { log.info("mq队列[{}]里还有消息。不做删除操作", queueName); } }
_/\*\*
_ * _动态添加队列监听 _ _* _ * @param queueName _队列名 _ * @param routingKey _路由名 _ _*/ _ public void startListener(String queueName, String routingKey) { startListener(queueName, routingKey, 1, false); }
_/\*\*
_ * _动态添加队列监听及修改消费者线程池大小 _ _* _ * @param queueName _队列名 _ * @param routingKey _路由名 _ * @param consumerNum _消费者线程数量 _ * @param needDlx _需要死信队列 _ _*/ _ public void startListener(String queueName, String routingKey, int consumerNum, boolean needDlx) { queueName = getFullQueueName(queueName); addQueueAndExchange(queueName, EXCHANGE, routingKey, needDlx); DirectMessageListenerContainer container = new DirectMessageListenerContainer(rabbitTemplate.getConnectionFactory()); DirectMessageListenerContainer getContainer = CONTAINER_MAP.putIfAbsent(queueName, container); if (getContainer != null) { log.info("动态修改mq监听成功,交换机:{},路由key:{},队列:{},线程数:{}", EXCHANGE, routingKey, queueName, consumerNum); container = getContainer; } else { container.setQueueNames(queueName); log.info("动态添加mq监听成功,交换机:{},路由key:{},队列:{},线程数:{}", EXCHANGE, routingKey, queueName, consumerNum); } container.setPrefetchCount(consumerNum); if (needDlx) { container.setAcknowledgeMode(AcknowledgeMode.AUTO); } else { container.setAcknowledgeMode(AcknowledgeMode.MANUAL); } container.setConsumersPerQueue(consumerNum); container.setMessageListener(new ConsumerHandler(!needDlx)); container.setAdviceChain(createRetry()); container.setDefaultRequeueRejected(false); container.start(); }
_/\*\*
_ * _动态停止监听并删除队列 _ _* _ * @param queueName _队列名 _ _*/ _ public void stopListener(String queueName) { queueName = getFullQueueName(queueName); DirectMessageListenerContainer container = CONTAINER_MAP.get(queueName); if (container != null) { container.stop(); container.destroy(); CONTAINER_MAP.remove(queueName); } log.info("停止监听mq队列{}", queueName); deleteQueue(queueName); }
_/\*\*
_ * _发送动态队列的消息 _ _* _ * @param routingKey _路由名 _ * @param data _数据 _ _*/ _ public void sendMsg(String routingKey, String data) { rabbitTemplate.convertAndSend(EXCHANGE, routingKey, data); }
_/\*\*
_ * _获取队列名全称 _ _* _ * @param queueName _队列名 _ * @return _全称 _ _*/ _ private String getFullQueueName(String queueName) { if (queueName.startsWith(QUEUE_PREFIX)) { return queueName; } return QUEUE_PREFIX + queueName; }
_/\*\*
_ * _重试机制 _ _* _ * _@return _ _*/ _ private RetryOperationsInterceptor createRetry() { return RetryInterceptorBuilder .stateless() //重试次数 .maxAttempts(3) //重试间隔 指数递增时间参数 最大间隔时间 .backOffOptions(1000, 3, 5000) //次数用完之后的处理,用的是默认处理类,失败消息会到死信 .recoverer(new RejectAndDontRequeueRecoverer()) .build(); } }
package com.xx.rabbitmq; import com.rabbitmq.client.Channel; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; _/** _ * @author _tiancong _ * @date _2020/11/25 15:57 _ */ @Slf4j @Data @AllArgsConstructor public class ConsumerHandler implements ChannelAwareMessageListener {
_/\*\*
_ * _是否需要回应 _ _*/ _ private final Boolean needAck; _/** _ * _接收消息 _ _* _ * @param _message _ * @param _channel _ * @throws _Exception _ */ @Override public void onMessage(Message message, Channel channel) throws Exception { int flag = (int) message.getMessageProperties().getHeaders().getOrDefault("retryCount", 0); flag++; if (flag > 1) { log.info("此消息第{}次执行", flag); } message.getMessageProperties().setHeader("retryCount", flag); String data = new String(message.getBody()); log.info("[{}]收到mq消息: {}", message.getMessageProperties().getConsumerQueue(), data); if (getNeedAck()) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { handleMessage(data); channel.basicAck(deliveryTag, false); } catch (Exception e) { channel.basicNack(deliveryTag, false, true); } } else { handleMessage(data); } }
_/\*\*
_ * _处理消息 _ _* _ * @param data _消息体 _ _*/ _ public void handleMessage(String data) {
}
}