SpringBoot整合多个RabbitMQ

Easter79
• 阅读 634

一、背景

​ 最近项目中需要用到了RabbitMQ来监听消息队列,监听的消息队列的 虚拟主机(virtualHost)和队列名(queueName)是不一致的,但是接收到的消息格式相同的。而且可能还存在程序不停机的情况下,动态的增加新的队列(queue)的监听,因此就需要我们自己在程序中实现一种方法实现动态配置RabbitMQ

二、需求

我们有2RabbitMQ的配置,在程序启动的时候,动态的配置好这2个RabbitMQ,实现消息的监听。

RabbitMQ的配置信息

host

port

username

password

virtualHost

queueName

47.101.130.164

5672

rabbit-multi-01

rabbit-multi-01

/rabbit-multi-01

queue-rabbit-multi-01

47.101.130.164

5672

rabbit-multi-02

rabbit-multi-02

/rabbit-multi-02

queue-rabbit-multi-02

三、实现思路

1、动态配置RabbitMQ

包括 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer

2、将上方配置好的Bean注入到Spring容器中,之后可能需要用到

Spring容器中注入Bean的方法

DefaultListableBeanFactory#registerSingleton


DefaultListableBeanFactory#registerBeanDefinition

四、实现步骤

1、引入maven依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2、创建RabbitProperties用来表示RabbitMQ的配置信息

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RabbitProperties {
    private String host;
    private Integer port;
    private String username;
    private String password;
    private String virtualHost;
    private String queueName;
}

3、配置RabbitMQ

配置 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer等,并动态注入到Spring容器中

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MultiRabbitMqConfig {

    private final DefaultListableBeanFactory defaultListableBeanFactory;

    private static Map<String, RabbitProperties> multiMqPropertiesMap = new HashMap<String, RabbitProperties>() {
        {
            put("first", RabbitProperties.builder()
                    .host("47.101.130.164")
                    .port(5672)
                    .username("rabbit-multi-01")
                    .password("rabbit-multi-01")
                    .virtualHost("/rabbit-multi-01")
                    .queueName("queue-rabbit-multi-01").build());
            put("second", RabbitProperties.builder()
                    .host("47.101.130.164")
                    .port(5672)
                    .username("rabbit-multi-02")
                    .password("rabbit-multi-02")
                    .virtualHost("/rabbit-multi-02")
                    .queueName("queue-rabbit-multi-02").build());
        }
    };

    @PostConstruct
    public void initRabbitmq() {
        multiMqPropertiesMap.forEach((key, rabbitProperties) -> {

            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class)
                    .addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL)
                    .addPropertyValue("host", rabbitProperties.getHost())
                    .addPropertyValue("port", rabbitProperties.getPort())
                    .addPropertyValue("username", rabbitProperties.getUsername())
                    .addPropertyValue("password", rabbitProperties.getPassword())
                    .addPropertyValue("virtualHost", rabbitProperties.getVirtualHost())
                    .getBeanDefinition();
            String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory");
            defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition);
            CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class);

            String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin");
            AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class)
                    .addConstructorArgValue(connectionFactory)
                    .addPropertyValue("autoStartup", true)
                    .getBeanDefinition();
            defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition);
            RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class);
            log.info("rabbitAdmin:[{}]", rabbitAdmin);

            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate);

            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            // 设置监听的队列
            simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName());
            // 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加
            simpleMessageListenerContainer.setConcurrentConsumers(3);
            // 最大的并发消费者
            simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
            // 设置是否重回队列
            simpleMessageListenerContainer.setDefaultRequeueRejected(false);
            // 设置签收模式
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 设置非独占模式
            simpleMessageListenerContainer.setExclusive(false);
            // 设置consumer未被 ack 的消息个数
            simpleMessageListenerContainer.setPrefetchCount(1);
            // 设置消息监听器
            simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
                try {
                    log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody()));
                    log.info("====>connection:[{}]", channel.getConnection());
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    // 发生异常此处需要捕获到
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                }
            });
            defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer);
        });
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RabbitTemplate firstRabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean("firstRabbitTemplate");
            firstRabbitTemplate.convertAndSend("exchange-rabbit-multi-01", "", "first queue message");
            log.info("over...");
        }).start();
    }
}

五、实现效果

SpringBoot整合多个RabbitMQ

六、代码

https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-springboot-multi

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
SpringBoot整合多个RabbitMQ
一、背景​最近项目中需要用到了RabbitMQ来监听消息队列,监听的消息队列的虚拟主机(virtualHost)和队列名(queueName)是不一致的,但是接收到的消息格式相同的。而且可能还存在程序不停机的情况下,动态的增加新的队列(queue)的监听,因此就需要我们自己在程序中实现一种方法实现动态配置RabbitMQ
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
RabbitMq动态添加监听
昨天研究了一下RabbitMQ想做一个动态添加监听的功能依靠springboot实现起来也简单就2个类1个主类实现动态添加队列及绑定关系、动态添加监听、动态调整监听线程池大小、动态删除队列、动态取消监听、发送动态队列的消息。还有个类就是自定义消费者都是采用string接收参数,后面可以采用指定统一对象,然后用个type字段区分消息类型,
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
5
获赞
1.2k