3.rabbitmq

Wesley13
• 阅读 730

rabbitmq-----发布订阅模式

模型组成

一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer

一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。

3.rabbitmq

3.rabbitmq

Exchange

相比较于前两种模型Hello World和Work,这里多一个一个Exchange。其实Exchange是RabbitMQ的标配组成部件之一,前两种没有提到Exchange是为了简化模型,即使模型中没有看到Exchange的声明,其实还是声明了一个默认的Exchange。

RabbitMQ中实际发送消息并不是直接将消息发送给消息队列,消息队列也没那么聪明知道这条消息从哪来要到哪去。RabbitMQ会先将消息发送个Exchange,Exchange会根据这条消息打上的标记知道该条消息从哪来到哪去。

Exchange凭什么知道消息的何去何从,因为Exchange有几种类型:direct,fanout,topic和headers。这里说的订阅者模式就可以认为是fanout模式了。

RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储
RabbitMQ提供了四种Exchangefanout,direct,topic,header .发布/订阅模式就是是基于`fanout Exchange`实现的。

  • fanout这种模式不需要指定队列名称,需要将Exchangequeue绑定,他们之间的关系是‘多对多’的关系 
    任何发送到fanout Exchange的消息都会被转发到与该Exchange绑定的queue上面。

订阅者模式有何不同
订阅者模式相对前面的Work模式有和不同?Work也有多个消费者,但是只有一个消息队列,并且一个消息只会被某一个消费者消费。但是订阅者模式不一样,它有多个消息队列,也有多个消费者,而且一条消息可以被多个消费者消费,类似广播模式。下面通过实例代码看看这种模式是如何收发消息的。

 1 package com.maozw.mq.pubsub;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.Channel;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.amqp.rabbit.connection.Connection;
 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.web.bind.annotation.PathVariable;
11 import org.springframework.web.bind.annotation.RequestMapping;
12 import org.springframework.web.bind.annotation.RestController;
13 
14 import java.io.IOException;
15 import java.util.concurrent.TimeoutException;
16 
17 import static org.apache.log4j.varia.ExternallyRolledFileAppender.OK;
18 
19 /**
20  * work 模式
21  * 两种分发: 轮询分发 + 公平分发
22  * 轮询分发:消费端:自动确认消息;boolean autoAck = true;
23  * 公平分发: 消费端:手动确认消息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false);
24  *
25  * @author MAOZW
26  * @Description: ${todo}
27  * @date 2018/11/26 15:06
28  */
29 @RestController
30 @RequestMapping("/publish")
31 public class PublishProducer {
32     private static final Logger LOGGER = LoggerFactory.getLogger(PublishProducer.class);
33     @Autowired
34     RabbitConfig rabbitConfig;
35 
36 
37     @RequestMapping("/send/{exchangeName}/{queueName}")
38     public String send(@PathVariable String exchangeName, @PathVariable String queueName) throws IOException, TimeoutException {
39         Connection connection = null;
40         Channel channel= null;
41         try {
42             ConnectionFactory connectionFactory = rabbitConfig.connectionFactory();
43             connection = connectionFactory.createConnection();
44             channel = connection.createChannel(false);
45 
46             /**
47              * 申明交换机
48              */
49             channel.exchangeDeclare(exchangeName,"fanout");
50 
51             /**
52              * 发送消息
53              * 每个消费者 发送确认消息之前,消息队列不会发送下一个消息给消费者,一次只处理一个消息
54              * 自动模式无需设置下面设置
55              */
56             int prefetchCount = 1;
57             channel.basicQos(prefetchCount);
58 
59             String Hello = ">>>> Hello Simple <<<<";
60             for (int i = 0; i < 5; i++) {
61                 String message = Hello + i;
62                 channel.basicPublish(RabbitConfig.EXCHANGE_AAAAA, "", null, message.getBytes());
63                 LOGGER.info("生产消息: " + message);
64             }
65             return "OK";
66         }catch (Exception e) {
67 
68         } finally {
69             connection.close();
70             channel.close();
71             return OK;
72         }
73     }
74 }

 订阅1

 1 package com.maozw.mq.pubsub;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.AMQP;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 import org.slf4j.Logger;
 9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12 
13 import java.io.IOException;
14 
15 /**
16  * @author MAOZW
17  * @Description: ${todo}
18  * @date 2018/11/26 15:06
19  */
20 
21 public class SubscribeConsumer {
22     private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer.class);
23 
24     public static void main(String[] args) throws IOException {
25         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
26         Connection connection = connectionFactory.createConnection();
27         Channel channel = connection.createChannel(false);
28         /**
29          * 创建队列申明
30          */
31         boolean durable = true;
32         channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT, durable, false, false, null);
33         /**
34          * 绑定队列到交换机
35          */
36         channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT, RabbitConfig.EXCHANGE_AAAAA,"");
37 
38         /**
39          * 改变分发规则
40          */
41         channel.basicQos(1);
42         DefaultConsumer consumer = new DefaultConsumer(channel) {
43             @Override
44             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
45                 super.handleDelivery(consumerTag, envelope, properties, body);
46                 System.out.println("[2] 接口数据 : " + new String(body, "utf-8"));
47                 try {
48                     Thread.sleep(300);
49                 } catch (InterruptedException e) {
50                     e.printStackTrace();
51                 } finally {
52                     System.out.println("[2] done!");
53                     //消息应答:手动回执,手动确认消息
54                     channel.basicAck(envelope.getDeliveryTag(),false);
55                 }
56             }
57         };
58         //监听队列
59         /**
60          * autoAck 消息应答
61          *  默认轮询分发打开:true :这种模式一旦rabbitmq将消息发送给消费者,就会从内存中删除该消息,不关心客户端是否消费正常。
62          *  使用公平分发需要关闭autoAck:false  需要手动发送回执
63          */
64         boolean autoAck = false;
65         channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT,autoAck, consumer);
66     }
67     
68 }

 1 package com.maozw.mq.pubsub;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.AMQP;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 import org.slf4j.Logger;
 9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12 
13 import java.io.IOException;
14 
15 /**
16  * @author MAOZW
17  * @Description: ${todo}
18  * @date 2018/11/26 15:06
19  */
20 
21 public class SubscribeConsumer2 {
22     private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer2.class);
23 
24     public static void main(String[] args) throws IOException {
25         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
26         Connection connection = connectionFactory.createConnection();
27         Channel channel = connection.createChannel(false);
28         /**
29          * 创建队列申明
30          */
31         boolean durable = true;
32         channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT2, durable, false, false, null);
33         /**
34          * 绑定队列到交换机
35          */
36         channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT2, RabbitConfig.EXCHANGE_AAAAA,"");
37 
38         /**
39          * 改变分发规则
40          */
41         channel.basicQos(1);
42         DefaultConsumer consumer = new DefaultConsumer(channel) {
43             @Override
44             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
45                 super.handleDelivery(consumerTag, envelope, properties, body);
46                 System.out.println("[2] 接口数据 : " + new String(body, "utf-8"));
47                 try {
48                     Thread.sleep(400);
49                 } catch (InterruptedException e) {
50                     e.printStackTrace();
51                 } finally {
52                     System.out.println("[2] done!");
53                     //消息应答:手动回执,手动确认消息
54                     channel.basicAck(envelope.getDeliveryTag(),false);
55                 }
56             }
57         };
58         //监听队列
59         /**
60          * autoAck 消息应答
61          *  默认轮询分发打开:true :这种模式一旦rabbitmq将消息发送给消费者,就会从内存中删除该消息,不关心客户端是否消费正常。
62          *  使用公平分发需要关闭autoAck:false  需要手动发送回执
63          */
64         boolean autoAck = false;
65         channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT2,autoAck, consumer);
66     }
67 }
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
3年前
Kafka安装步骤
基本概念1.Producer:消息生产者,就是向kafkabroker发消息的客户端2.Consumer:消息消费者,向kafkabroker取消息的客户端3.ConsumerGroup(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一
Stella981 Stella981
3年前
RabbitMQ操作
注意:在rabbitmq中,可以存在多个exchange,exchange只是负责接收消息,然后消息必须发送到给queue中,如果没有queue,消息就丢失了,exchange就相当于交换机,不负责存消息,queue是必须声明的,所以exchange负责转发,queue负责接收!(https://oscimg.oschina.net/oscnet/1
Stella981 Stella981
3年前
RabbitMQ基本示例,轮询机制,no_ack作用
一、RabbitMQ简介:'''RabbitMQ就是消息队列之前不是学了Queue了吗,都是队列还学RabbitMQ干嘛?干的事情是一样的Python的Queue有两个,一个线程Queue生产者消费者模型,一个进程Queue用于父进程与子进程交互两个完全独
Stella981 Stella981
3年前
RabbitMQ 基础概念介绍
AMQP消息模型RabbitMQ是基于AMQP(高级消息队列协议)的一个开源实现,其内部实际也是AMQP的基本概念。AMQP的消息发送流程有如下几个步骤:1.消息生产者(producer)将消息发布到Exchange中;2.Exchange根据队列的绑定关系将消息分发到不同的队列(Queue
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免
Stella981 Stella981
3年前
RabbitMQ——队列消息数
背景在实际使用过程,会遇到这么些情况:生产者发送的消息数量与消费者接收的消息数量不一致。例如生产者向rabbitmq投递了100条消息,消费者只从队列中接收到了80条消息,并且当前队列中已经没有任何消息。要定位这个问题,通常是分段来定位,一方面统计生产者到底发送了多少消息,一方面统计有多少消息是正确路由到
Wesley13 Wesley13
3年前
JMS消息的概念解释
1、默认生产者消息是持久的:会存数据库\消费者的持久:createDurableSubscriber是指消费者能收到所有它订阅时间点之后的消息,即使消费者注册后关闭,当它重启就能收到注册时间点之后所有的消息;即当此消费用户ID(AAA)在producer发送之前就已经注册,那么此id能收到producer发送的所有消息,如果是在produce
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这