spring集成多个rabbitMQ

Easter79
• 阅读 624

转自:https://blog.csdn.net/zz775854904/article/details/81092892

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求..

开始正题!

在开发之前需要下载rabbitmq, 而在rabbitmq安装之前,童鞋们需要安装erlang, 因为rabbitmq是用erlang写的.

安装完毕之后,我们建立一个maven项目.然后我们开始配置项目.

  1. <spring.version>3.2.8.RELEASE</spring.version>

  2. org.springframework

  3. spring-core

  4. ${spring.version}

  5. org.springframework

  6. spring-webmvc

  7. ${spring.version}

  8. org.springframework

  9. spring-context

  10. ${spring.version}

  11. org.springframework

  12. spring-context-support

  13. ${spring.version}

  14. org.springframework

  15. spring-aop

  16. ${spring.version}

  17. org.springframework

  18. spring-aspects

  19. ${spring.version}

  20. org.springframework

  21. spring-tx

  22. ${spring.version}

  23. org.springframework

  24. spring-jdbc

  25. ${spring.version}

  26. org.springframework

  27. spring-web

  28. ${spring.version}

由于是spring整合,我们需要加入spring的依赖.

  1. org.springframework.amqp

  2. spring-rabbit

  3. 1.3.5.RELEASE

依赖加好了之后, 我们需要定义消息生产者和消息发送者.

由于exchange有几种,这里我只测试了两种, 通过分别定义两个exchange去绑定direct和topic..

首先, 定义消息生产者, 通过配置将template链接connect-factory并注入到代码中使用.

  1. package com.chris.producer;

  2. import org.slf4j.Logger;

  3. import org.slf4j.LoggerFactory;

  4. import org.springframework.amqp.core.AmqpTemplate;

  5. import org.springframework.stereotype.Service;

  6. import javax.annotation.Resource;

  7. import java.io.IOException;

  8. /**

  9. * Created by wuxing on 2016/9/21.

  10. */

  11. @ Service

  12. public class MessageProducer {

  13. private Logger logger = LoggerFactory.getLogger(MessageProducer.class);

  14. @ Resource(name="amqpTemplate")

  15. private AmqpTemplate amqpTemplate;

  16. @ Resource(name="amqpTemplate2")

  17. private AmqpTemplate amqpTemplate2;

  18. public void sendMessage(Object message) throws IOException {

  19. logger.info( "to send message:{}", message);

  20. amqpTemplate.convertAndSend( "queueTestKey", message);

  21. amqpTemplate.convertAndSend( "queueTestChris", message);

  22. amqpTemplate2.convertAndSend( "wuxing.xxxx.wsdwd", message);

  23. }

  24. }

然后我们定义消息消费者, 这里,我定义了三个消费者, 通过监听消息队列, 分别接受各自所匹配的消息.

第一个消费者, 接受direct的消息, 他的exchange为exchangeTest,  rout-key为queueTestKey

  1. package com.chris.consumer;

  2. import org.slf4j.Logger;

  3. import org.slf4j.LoggerFactory;

  4. import org.springframework.amqp.core.Message;

  5. import org.springframework.amqp.core.MessageListener;

  6. /**

  7. * Created by wuxing on 2016/9/21.

  8. */

  9. public class MessageConsumer implements MessageListener {

  10. private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

  11. @Override

  12. public void onMessage(Message message) {

  13. logger.info( "consumer receive message------->:{}", message);

  14. }

  15. }

第二个消费者, 接受direct的消息(为了测试一个exchange可以发送多个消息), 他的exchange为exchangeTest,  rout-key为queueTestChris.

  1. package com.chris.consumer;

  2. import org.slf4j.Logger;

  3. import org.slf4j.LoggerFactory;

  4. import org.springframework.amqp.core.Message;

  5. import org.springframework.amqp.core.MessageListener;

  6. /**

  7. * Created by wuxing on 2016/9/21.

  8. */

  9. public class ChrisConsumer implements MessageListener {

  10. private Logger logger = LoggerFactory.getLogger(ChrisConsumer.class);

  11. @Override

  12. public void onMessage(Message message) {

  13. logger.info( "chris receive message------->:{}", message);

  14. }

  15. }

第三个消费者, 接受topic的消息他的exchange为exchangeTest2,  pattern为wuxing.*.. 网上说.*可以匹配一个, .#可以匹配一个或多个..但是笔者好像两个都试了..都可以匹配一个或多个..不知道什么鬼...

  1. package com.chris.consumer;

  2. import org.slf4j.Logger;

  3. import org.slf4j.LoggerFactory;

  4. import org.springframework.amqp.core.Message;

  5. import org.springframework.amqp.core.MessageListener;

  6. /**

  7. * Created by wuxing on 2016/9/21.

  8. */

  9. public class WuxingConsumer implements MessageListener {

  10. private Logger logger = LoggerFactory.getLogger(WuxingConsumer.class);

  11. @Override

  12. public void onMessage(Message message) {

  13. logger.info( "wuxing receive message------->:{}", message);

  14. }

  15. }

然后就是关键的地方了..rabbit整合spring的配置文件.

  1. <beans xmlns="http://www.springframework.org/schema/beans"

  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"

  3. xsi:schemaLocation="http://www.springframework.org/schema/beans

  4. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

  5. http://www.springframework.org/schema/rabbit

  6. http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">

  7. <rabbit:connection-factory id="connectionFactory"

  8. username="guest" password="guest" host="localhost" port="5672" />

  9. <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"

  10. exchange="exchangeTest"/>

  11. <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/>

  12. <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/>

  13. <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">

  14. rabbit:bindings

  15. <rabbit:binding queue="queueTest" key="queueTestKey">

  16. <rabbit:listener-container connection-factory="connectionFactory">

  17. <rabbit:listener queues="queueTest" ref="messageReceiver"/>

  18. <rabbit:queue name="queueChris" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/>

  19. <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">

  20. rabbit:bindings

  21. <rabbit:binding queue="queueChris" key="queueTestChris">

  22. <rabbit:listener-container connection-factory="connectionFactory">

  23. <rabbit:listener queues="queueChris" ref="receiverChris"/>

  24. <rabbit:connection-factory id="connectionFactory2"

  25. username="guest" password="guest" host="localhost" port="5672"/>

  26. <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory2"

  27. exchange="exchangeTest2"/>

  28. <rabbit:admin id="connectAdmin2" connection-factory="connectionFactory2"/>

  29. <rabbit:queue name="queueWuxing" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin2"/>

  30. <rabbit:topic-exchange name="exchangeTest2" durable="true" auto-delete="false" declared-by="connectAdmin2">

  31. rabbit:bindings

  32. <rabbit:binding queue="queueWuxing" pattern="wuxing.*">

  33. <rabbit:listener-container connection-factory="connectionFactory2" >

  34. <rabbit:listener queues="queueWuxing" ref="recieverWuxing"/>

这里,有个问题笔者研究了好久...就是如何定义两个exchange, 一开始一直不成功..直到找到了一篇国外的文章才解决...

定义两个exchange的时候, 需要用到declared-by..

而这个必须要引入下面的这个申明, 才有..

  1. http: //www.springframework.org/schema/rabbit

  2. http: //www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">

文件中大概的配置解释一下.

connect-factory进行连接rabbitmq服务.

template用于连接factory并指定exchange, 这上面还能直接指定rout-key.

admin相当于一个管理员的角色..可以将exchange和queue进行管理, 

queue和topic-exchange分别定义队列和路由器, 这里需要用declared-by指定管理员,从而连接到相应的factory.

listener-container用于消费者的监听(其实,rabbit配置中是可以指定某个类的某个方法的, 但是笔者失败了, 还在试验中...)

这里还有一个问题...需要大家注意..

当一个exchange绑定了一种类型之后, 这个exchange在配置就不能再换成另一种了.会一直报错, received 'direct' but current is 'topic'  类似这种..

笔者这个也是被坑了若干时间去找问题...

然后贴下spring的基本配置

  1. <beans xmlns="http://www.springframework.org/schema/beans"

  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

  3. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd

  4. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">

  5. <context:component-scan base-package="com.chris.consumer, com.chris.producer" />

  6. <context:annotation-config />

  7. <context:spring-configured />

然后是单元测试类, 这里通过输出100-1慢慢递减,去观察控制台消费者接收消息的情况.

  1. package com.chris;

  2. import com.chris.producer.MessageProducer;

  3. import org.junit.Before;

  4. import org.junit.Test;

  5. import org.slf4j.Logger;

  6. import org.slf4j.LoggerFactory;

  7. import org.springframework.context.ApplicationContext;

  8. import org.springframework.context.support.ClassPathXmlApplicationContext;

  9. /**

  10. * Created by wuxing on 2016/9/21.

  11. */

  12. public class MessageTest {

  13. private Logger logger = LoggerFactory.getLogger(MessageTest.class);

  14. private ApplicationContext context = null;

  15. @Before

  16. public void setUp() throws Exception {

  17. context = new ClassPathXmlApplicationContext("application.xml");

  18. }

  19. @Test

  20. public void should_send_a_amq_message() throws Exception {

  21. MessageProducer messageProducer = (MessageProducer) context.getBean( "messageProducer");

  22. int a = 100;

  23. while (a > 0) {

  24. messageProducer.sendMessage( "Hello, I am amq sender num :" + a--);

  25. try {

  26. //暂停一下,好让消息消费者去取消息打印出来

  27. Thread.sleep( 1000);

  28. } catch (InterruptedException e) {

  29. e.printStackTrace();

  30. }

  31. }

  32. }

  33. }

然后控制台的结果如下(这里只贴出关键信息, 其他配置的log的省略了)

  1. 2016-09-22 16:15:00,330 [main] INFO [com.chris.producer.MessageProducer] - to send message:Hello, I am amq sender num :100

  2. 2016-09-22 16:15:00,348 [main] DEBUG [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)

  3. 2016-09-22 16:15:00,348 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3)

  4. 2016-09-22 16:15:00,349 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [exchangeTest], routingKey = [queueTestKey]

  5. 2016-09-22 16:15:00,357 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3)

  6. 2016-09-22 16:15:00,358 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [exchangeTest], routingKey = [queueTestChris]

  7. 2016-09-22 16:15:00,368 [main] DEBUG [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,2)

  8. 2016-09-22 16:15:00,369 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)

  9. 2016-09-22 16:15:00,369 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [exchangeTest2], routingKey = [wuxing.xxxx.wsdwd]

  10. 2016-09-22 16:15:00,370 [pool-1-thread-6] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tags=[[amq.ctag-hyW85GZHk-AHLLFJUmNLDQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0

  11. 2016-09-22 16:15:00,372 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestKey, deliveryTag=1, messageCount=0])

  12. 2016-09-22 16:15:00,373 [SimpleAsyncTaskExecutor-1] INFO [com.chris.consumer.MessageConsumer] - consumer receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestKey, deliveryTag=1, messageCount=0])

  13. 2016-09-22 16:15:00,374 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Retrieving delivery for Consumer: tags=[[amq.ctag-hyW85GZHk-AHLLFJUmNLDQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0

  14. 2016-09-22 16:15:00,379 [pool-2-thread-4] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tags=[[amq.ctag-T-c1red0T_HHyCFfpXLYIQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0

  15. 2016-09-22 16:15:00,381 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest2, receivedRoutingKey=wuxing.xxxx.wsdwd, deliveryTag=1, messageCount=0])

  16. 2016-09-22 16:15:00,382 [SimpleAsyncTaskExecutor-1] INFO [com.chris.consumer.WuxingConsumer] - wuxing receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest2, receivedRoutingKey=wuxing.xxxx.wsdwd, deliveryTag=1, messageCount=0])

  17. 2016-09-22 16:15:00,383 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Retrieving delivery for Consumer: tags=[[amq.ctag-T-c1red0T_HHyCFfpXLYIQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0

  18. 2016-09-22 16:15:00,396 [pool-1-thread-5] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tags=[[amq.ctag-h5ERpaWrnqmkNhbfM7S8Ww]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0

  19. 2016-09-22 16:15:00,397 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestChris, deliveryTag=1, messageCount=0])

  20. 2016-09-22 16:15:00,398 [SimpleAsyncTaskExecutor-1] INFO [com.chris.consumer.ChrisConsumer] - chris receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestChris, deliveryTag=1, messageCount=0])

我们可以看到生产者有发出一个信息, 然后发布在了三个通道上.

1. on exchange [exchangeTest] , routingKey = [queueTestKey]

2. on exchange [exchangeTest] , routingKey = [queueTestChris]

3. on exchange [exchangeTest2] , routingKey = [wuxing.xxxx.wsdwd]

然后三个消费者分别收到了他们的消息..至此, 整个test就结束了.

对项目有兴趣的童鞋可以拿项目的源码玩一玩  源码在这里

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
6
获赞
1.2k