rabbitmq-----发布订阅模式
模型组成
一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。
Exchange
相比较于前两种模型Hello World和Work,这里多一个一个Exchange。其实Exchange是RabbitMQ的标配组成部件之一,前两种没有提到Exchange是为了简化模型,即使模型中没有看到Exchange的声明,其实还是声明了一个默认的Exchange。
RabbitMQ中实际发送消息并不是直接将消息发送给消息队列,消息队列也没那么聪明知道这条消息从哪来要到哪去。RabbitMQ会先将消息发送个Exchange,Exchange会根据这条消息打上的标记知道该条消息从哪来到哪去。
Exchange凭什么知道消息的何去何从,因为Exchange有几种类型:direct,fanout,topic和headers。这里说的订阅者模式就可以认为是fanout模式了。
RabbitMQ
中,所有生产者提交的消息都由Exchange
来接受,然后Exchange
按照特定的策略转发到Queue
进行存储RabbitMQ
提供了四种Exchange
:fanout,direct,topic,header .发布/订阅模式就是是基于`fanout Exchange`实现的。
fanout
这种模式不需要指定队列名称,需要将Exchange
和queue
绑定,他们之间的关系是‘多对多’的关系
任何发送到fanout Exchange
的消息都会被转发到与该Exchange
绑定的queue
上面。
订阅者模式有何不同
订阅者模式相对前面的Work模式有和不同?Work也有多个消费者,但是只有一个消息队列,并且一个消息只会被某一个消费者消费。但是订阅者模式不一样,它有多个消息队列,也有多个消费者,而且一条消息可以被多个消费者消费,类似广播模式。下面通过实例代码看看这种模式是如何收发消息的。
1 package com.maozw.mq.pubsub;
2
3 import com.maozw.mq.config.RabbitConfig;
4 import com.rabbitmq.client.Channel;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
7 import org.springframework.amqp.rabbit.connection.Connection;
8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.web.bind.annotation.PathVariable;
11 import org.springframework.web.bind.annotation.RequestMapping;
12 import org.springframework.web.bind.annotation.RestController;
13
14 import java.io.IOException;
15 import java.util.concurrent.TimeoutException;
16
17 import static org.apache.log4j.varia.ExternallyRolledFileAppender.OK;
18
19 /**
20 * work 模式
21 * 两种分发: 轮询分发 + 公平分发
22 * 轮询分发:消费端:自动确认消息;boolean autoAck = true;
23 * 公平分发: 消费端:手动确认消息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false);
24 *
25 * @author MAOZW
26 * @Description: ${todo}
27 * @date 2018/11/26 15:06
28 */
29 @RestController
30 @RequestMapping("/publish")
31 public class PublishProducer {
32 private static final Logger LOGGER = LoggerFactory.getLogger(PublishProducer.class);
33 @Autowired
34 RabbitConfig rabbitConfig;
35
36
37 @RequestMapping("/send/{exchangeName}/{queueName}")
38 public String send(@PathVariable String exchangeName, @PathVariable String queueName) throws IOException, TimeoutException {
39 Connection connection = null;
40 Channel channel= null;
41 try {
42 ConnectionFactory connectionFactory = rabbitConfig.connectionFactory();
43 connection = connectionFactory.createConnection();
44 channel = connection.createChannel(false);
45
46 /**
47 * 申明交换机
48 */
49 channel.exchangeDeclare(exchangeName,"fanout");
50
51 /**
52 * 发送消息
53 * 每个消费者 发送确认消息之前,消息队列不会发送下一个消息给消费者,一次只处理一个消息
54 * 自动模式无需设置下面设置
55 */
56 int prefetchCount = 1;
57 channel.basicQos(prefetchCount);
58
59 String Hello = ">>>> Hello Simple <<<<";
60 for (int i = 0; i < 5; i++) {
61 String message = Hello + i;
62 channel.basicPublish(RabbitConfig.EXCHANGE_AAAAA, "", null, message.getBytes());
63 LOGGER.info("生产消息: " + message);
64 }
65 return "OK";
66 }catch (Exception e) {
67
68 } finally {
69 connection.close();
70 channel.close();
71 return OK;
72 }
73 }
74 }
订阅1
1 package com.maozw.mq.pubsub;
2
3 import com.maozw.mq.config.RabbitConfig;
4 import com.rabbitmq.client.AMQP;
5 import com.rabbitmq.client.Channel;
6 import com.rabbitmq.client.DefaultConsumer;
7 import com.rabbitmq.client.Envelope;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12
13 import java.io.IOException;
14
15 /**
16 * @author MAOZW
17 * @Description: ${todo}
18 * @date 2018/11/26 15:06
19 */
20
21 public class SubscribeConsumer {
22 private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer.class);
23
24 public static void main(String[] args) throws IOException {
25 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
26 Connection connection = connectionFactory.createConnection();
27 Channel channel = connection.createChannel(false);
28 /**
29 * 创建队列申明
30 */
31 boolean durable = true;
32 channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT, durable, false, false, null);
33 /**
34 * 绑定队列到交换机
35 */
36 channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT, RabbitConfig.EXCHANGE_AAAAA,"");
37
38 /**
39 * 改变分发规则
40 */
41 channel.basicQos(1);
42 DefaultConsumer consumer = new DefaultConsumer(channel) {
43 @Override
44 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
45 super.handleDelivery(consumerTag, envelope, properties, body);
46 System.out.println("[2] 接口数据 : " + new String(body, "utf-8"));
47 try {
48 Thread.sleep(300);
49 } catch (InterruptedException e) {
50 e.printStackTrace();
51 } finally {
52 System.out.println("[2] done!");
53 //消息应答:手动回执,手动确认消息
54 channel.basicAck(envelope.getDeliveryTag(),false);
55 }
56 }
57 };
58 //监听队列
59 /**
60 * autoAck 消息应答
61 * 默认轮询分发打开:true :这种模式一旦rabbitmq将消息发送给消费者,就会从内存中删除该消息,不关心客户端是否消费正常。
62 * 使用公平分发需要关闭autoAck:false 需要手动发送回执
63 */
64 boolean autoAck = false;
65 channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT,autoAck, consumer);
66 }
67
68 }
1 package com.maozw.mq.pubsub;
2
3 import com.maozw.mq.config.RabbitConfig;
4 import com.rabbitmq.client.AMQP;
5 import com.rabbitmq.client.Channel;
6 import com.rabbitmq.client.DefaultConsumer;
7 import com.rabbitmq.client.Envelope;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12
13 import java.io.IOException;
14
15 /**
16 * @author MAOZW
17 * @Description: ${todo}
18 * @date 2018/11/26 15:06
19 */
20
21 public class SubscribeConsumer2 {
22 private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer2.class);
23
24 public static void main(String[] args) throws IOException {
25 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
26 Connection connection = connectionFactory.createConnection();
27 Channel channel = connection.createChannel(false);
28 /**
29 * 创建队列申明
30 */
31 boolean durable = true;
32 channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT2, durable, false, false, null);
33 /**
34 * 绑定队列到交换机
35 */
36 channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT2, RabbitConfig.EXCHANGE_AAAAA,"");
37
38 /**
39 * 改变分发规则
40 */
41 channel.basicQos(1);
42 DefaultConsumer consumer = new DefaultConsumer(channel) {
43 @Override
44 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
45 super.handleDelivery(consumerTag, envelope, properties, body);
46 System.out.println("[2] 接口数据 : " + new String(body, "utf-8"));
47 try {
48 Thread.sleep(400);
49 } catch (InterruptedException e) {
50 e.printStackTrace();
51 } finally {
52 System.out.println("[2] done!");
53 //消息应答:手动回执,手动确认消息
54 channel.basicAck(envelope.getDeliveryTag(),false);
55 }
56 }
57 };
58 //监听队列
59 /**
60 * autoAck 消息应答
61 * 默认轮询分发打开:true :这种模式一旦rabbitmq将消息发送给消费者,就会从内存中删除该消息,不关心客户端是否消费正常。
62 * 使用公平分发需要关闭autoAck:false 需要手动发送回执
63 */
64 boolean autoAck = false;
65 channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT2,autoAck, consumer);
66 }
67 }