RabbitMQ是工作在amqp协议(advanced message queue protocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。
除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(Task Queue),任务队列背后的核心想法是避免立即执行资源密集型的任务并不得不等待它完成,而是将任务封装成消息发送到一个队列中,然后由“消费者”来获取任务并依次执行,当有多个“消费者”的时候,它们会均匀的分配这些任务。默认情况下,RabbitMQ会轮流的将消息发送给“消费者”,这种分发消息的方式被称为“round-robin”。
RabbitMQ支持“消息确认反馈”来确保消息不会因为“消费者”挂了而丢失——在消息被接收并处理之后,会给RabbitMQ发送一条确认,这样RabbitMQ就能将该消息删掉。如果没有收到确认,RabbitMQ会重发这条消息。RabbitMQ还支持消息持久化,来保证当RabbitMQ服务停止时消息不会丢失,这种情况下,RabbitMQ会将消息写到硬盘。
发布/订阅模式——消息会被发送到所有它的“订阅者”
Exchange,完整的RabbitMQ模型中还包含一个消息交换机。消息生产者先将消息发送给交换机,然后交换机再将消息发送给队列。交换机有一个属性叫exchange type能决定如何处理消息,比如是否该把一条消息推送给某一个队列或是某多个队列,还是应该把它丢弃掉。其属性值有:direct,topic,headers,fanout。
exchange和队列之间的关系称为“binding”(它们之间有个binging key),消息会被路由到由其"routingKey"属性指定名称的队列中,如果没有队列绑定到exchange上,消息将会丢失。
fanout exchange会把它接收到的消息广播到所有的队列,而direct exchange会匹配消息的routing key和队列的bingding key,然后将消息发送到与之匹配成功的队列上。没有匹配的消息将被丢弃。
topic exchange,被发送到topic exchange的消息不能被指定一个随意的routing key,它必须是一个由"."分隔的单词的清单,这些单词通常指定了消息的某些特征,如"quick.orange.rabbit",单词数量的上限是255个字节。bingding key必须也是同样的形式,和direct exchange一样,带有特定routing key的消息会被发送到带有匹配bingding key的队列上,但是bingding key有两个重要的特点,"*"号可以代替一个单词,"#"可以代替0个或者多个单词,例如,"*.orange.*"的binding key会匹配所有routing key中第二个单词为orange的消息,而bingding key:"#"会匹配所有的消息。
结合实际项目中的代码,看看如何在SpringMVC中配置RabbitMQ。
通常会从Spring的配置文件中单独引出一个RabbitMQ的配置文件application-rabbit-context.xml,并将该配置文件中需要用到的一些参数值通过rabbit.properties配置文件单独配置。
在pom中,会引入一个将Spring与RabbitMQ整合的依赖:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
它会提供一个RabbitMQ Java Client来提供服务。然后我们要配置一个connect-factory,它里面定义了连接的主机、端口、用户名和密码等信息。(连接工厂创建连接,连接创建channel,channel声明队列,队列接收/发送消息)
<rabbit:connection-factory
id="connectionFactory"
host="${rabbit.connect.host}"
port="${rabbit.connect.port}"
username="${rabbit.connect.username}"
password="${rabbit.connect.password}"
channel-cache-size="${rabbit.connect.channelCacheSize}"
publisher-returns="true"
publisher-confirms="true" />
接下来是配置一个消息转换器,用来将消息转换为Json,方便消息在生产者和消费者之间传输。
<bean id="messageConverter"
class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
然后会定义一个<rabbit:admin />,它的作用是在系统启动时自动得到exchange和queue的bean,并将它们在容器中注册。
然后是生产者和消费者的定义,生产者会通过一个template发送消息,配置如下:
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory" message-converter="messageConverter" />
配置消费者通常是配置一个queue,一个listener和一个listener container:
<rabbit:queue id="TagRequestQueue" name="optag.request.queue" />
<bean id="TagRequestListener" class="com.ximalaya.tagging.backend.listener.TagRequestListener" />
<rabbit:listener-container
connection-factory="connectionFactory" message-converter="messageConverter">
<rabbit:listener ref="TagRequestListener" queues="TagRequestQueue" />
</rabbit:listener-container>
以上就完成了Spring中RabbitMQ的配置,在发送和接受消息的代码实现上,发送消息的类方法中会通过template的convertAndSend方法将消息对象发送出去,而消息接收类则通过实现MessageListener接口并重写其onMessage方法接收消息并做相应的处理。