RabbitMQ 基础概念介绍

Stella981
• 阅读 829

AMQP 消息模型

RabbitMQ 是基于 AMQP(高级消息队列协议)的一个开源实现,其内部实际也是 AMQP 的基本概念。

AMQP 的消息发送流程有如下几个步骤:

  1. 消息生产者(producer)将消息发布到 Exchange中;

  2. Exchange 根据队列的绑定关系将消息分发到不同的队列(Queue)中;

  3. AMQP broker 根据订阅规则将消息分发给消费者(Consumer)或消费者根据需要自行从消息队列中获取消息。

Message

消息由消息头和消息体组成。消息体对 broker 是不透明的,消息头由一系列可选属性组成,包括 routing-key、priority、delivery-mode 等。

Producer

消息生产者

Exchange

Exchange 的主要任务是接收 Producer 生产的消息,并将消息分发给 0 个或多个 Queue 中。而分发规则受 Exchange 类型和 绑定(binding)关系的影响。AMQP 0-9-1 broker 提供了以下四个 Exchange 类型:

类型

默认预定名称

Direct Exchange

空字符串和amq.direct

Fanout Exchange

amq.fanout

Topic Exchange

amq.topic

Headers Exchange

amq.match(在 RabbitMQ 中,额外提供 amq.headers)

每个 Exchange 都有如下属性:

  1. Name,Exchange 的名字;

  2. Durability,是否是持久的 Exchange,当为真时,broker 重启后也会保留此 Exchange;

  3. Auto-delte,当为真时,如果所有绑定的 Queue 都不再使用时,此 Exchange 会自动删除。

关于默认的 Exchange

默认的 Exchange 是一个有 broker 预创建的匿名的(即名字是空字符串)Direct Exchange。如果没有显示地为 Queue 绑定 Exchange,那么创建的每个 Queue 都会自动绑定到默认的的 Exchange 中,并且此时这个 Queue 的 Route key 就是这个 Queue 的名字。

Direct Exchange

Direct Exchange 会根据消息的 route key 将消息分发到不同的 Queue 中。Direct Exchange 适合用户消息的单播发送。Direct Exchange 的工作流程如下:

  1. 一个 Queue 使用 K 作为 route key 绑定到 Direct Exchange 中;

  2. 当 Direct Exchange 收到一个 route key 为 R 的消息时,如果 R == K,则此 Exchange 会将此消息分发到此 Queue 中。

Direct Exchange 经常用在多个 worker 中分配任务(即一个 Master 和多个 Slave)。当使用这个模型时,需要注意AMQP 0-9-1 的负债均衡是以 Consumer 为单位的,而不是一个 Queue 为单位。

一般在此种模式下会使用 RabbitMQ 的默认 Exchange。使用默认的 Exchange 需要注意:

  1. 该Exchange的名字为空字符串;

  2. 不需要对 Exchange 和 Queue 进行任何显示的绑定,Queue 默认会绑定到此 Exchange;

  3. Queue 的 routing key 默认与 Queue 的名称相同;

  4. 如果 Vhost 中不存在与消息的 routing key 匹配的 Queue,该消息会被抛弃。

Fanout Exchange

一个 Fanout Exchange 会将消息分发给所有绑定到此 Exchange 的 Queue 中,而不会考虑 Queue 的 routing key。此模型常用于消息的广播。

Topic Exchange

Topic Exchange 会根据 routing key 将消息分发到与此消息的 routing key 相匹配的并且绑定到此 Exchange 中的 Queue。Topic Exchange 常用于实现 publish/subscribue 模型,即消息的多播模型。

Header Exchange

Header Exchange 不使用 routing key 作为路由依据,而是使用消息头属性来路由消息。

Queue

AMQP 中的队列概念与其他消息模型的队列概念相似,都有以下几个重要概念:

  1. Name,队列的名字;

  2. Durable,队列是否持久。为真时,即使 broker 重启时,此 Queue 也不会删除;

  3. Exclusive,队列是否独占。为真时,表示此 Queue 只能有一个消费者,并且此消费者的连接断开时,此 Queue 会被删除。

  4. Auto-delete,为真时,此 Queue会在最后一个消费者取消订阅时被删除。

在使用一个队列时,需要先进行声明。如果声明的队列不存在,则 broker 自动创建该队列。如果声明的队列已存在时,若重新声明的队列属性与已存在队列的属性完全相同,则不会有任何影响;若不一致,则会产生 406(PRECONDITION_FAILED)错误。

队列名

AMQP 的对列名不能以 “amq.” 开头,因为这样的队列名是预留给 AMQP broker 内部使用的。使用这样的队列名会产生 403(ACCESS_REFUSED)错误。

持久化队列

持久化的队列会被写入到磁盘中,即使 broker 重启了,持久队列也依然存在。持久队列和消息的持久化不同。当 broker 重启时,持久队列自动重新声明,然而只有队列中的持久化消息才会被恢复。

队列的绑定

队列的绑定关系是 Exchange 用于消息分发的规则,即一个 Exchange 能够将消息分发到某个队列的前提是此队列已经绑定到这个 Exchange 中。当队列绑定到一个 Exchange 中时,我们还可以额外设置一个参数,即 routing key,这个 key 会被 Direct Exchange 和 Topic Exchange 作为额外的分发规则使用,换句话说,routing key 扮演着过滤器的角色。

Consumer

AMQP 0-9-1 支持两种消息分发模式:

  1. push 模式:broker 主动推送消息给消费者

  2. pull 模式:消费者主动从 broker 中获取消息

在 push 模式下,应用程序需要告知 broker 它需要哪些消息,即我们所说的订阅一个消息的主题。每个消费者都有一个唯一的标识符,即 consumer tag,可以用这个 tag 来取消一个消费者对某个主题的订阅。

消息的 ACK(消息确认)

AMQP 0-9-1 有两种消息 ACK 模式:

  1. 自动 ACK 模式

  2. 手动 ACK 模式

在自动 ACK 模式下,当 broker 发送消息成功后,会立即将此消息从消息队列中删除,而不会等待消费者的 ACK 回复。

在手动 ACK 模式下,当 broker 发送消息给消费者时,不会立即将此消息删除,而是需要等待消费者的 ACK 回复,得到回复后才会删除消息。因此手动 ACK 模式下,当消费者受到消息后并处理完成后,需要向 broker 显示地发送 ACK 指令。

拒绝消息

当消费者处理消息失败或此时不能出来消息时,那么可以给 broker 发送一个拒绝消息的指令,并且可以要求 broker 丢弃或重新分发此消息。

注意:如果此时只有一个消费者,那么此消费者拒收消息并要求 broker 重新分发此消息时,会造成此消息不断的分发和拒收,形成死循环。

预取消息

通过预取消息机制,消费者可以一次性批量取出消息,然后在处理后对这些批量消息进行统一的 ACK 回复,以提高消息的吞吐量。

RabbitMQ 仅支持 Channel 级别的预取消息的数量配置,不支持基于连接的预取消息数量配置。

连接

AMQP 的连接是长连接,它是一个使用 TCP 作为可靠传输的应用层协议。

Channel

AMQP 不推荐一个应用程序发起多个对 broker 的连接,因为这样会消耗系统资源并且也不利于防火墙的配置。但是如果应用程序确实需要有多个互不干扰的连接来进行不同的操作,可以使用 Channel。在 AMQP 0-9-1 中,一个与 broker 的连接可以被多个 Channel 复用。因此我们可以将 Channel 理解为:一个共享同一个 TCP 连接的轻量级的连接。

基于同一个 TCP 连接的两个不同 Channel 是不会有任何干扰的,因此客户端在和 broker 之间交互时,需要附带上 channel id。

通常来说,在一个多线程消费消息的模型中,每个线程单独开一个 Channel 是一个推荐的做法,而最好不要在各个线程中共享一个 Channel。

Virtual Host

为了在一个 broker 中实现不同的相互隔离的环境(例如每个环境中有不同的用户,不同的 Exchange,不同的队列等),AMQP 引入了 virtual host(vhost)概念。在连接 broker 时,客户端可以指定需要使用哪个 vhost。

虚拟主机操作

列举所有虚拟主机:rabbitmqctl list_vhosts
添加虚拟主机:rabbitmqctl add_vhost <vhost_name>
删除虚拟主机:rabbitmqctl delete_vhost <vhost_name>
添加用户:add_user <username> <password>
设置用户标签:set_user_tags <username> <tag> // 标签包括 administrator、monitoring、policymaker、management
设置用户权限:set_permissions [-p <vhost>] <username> <conf> <write> <read>
conf:配置权限,创建、删除、修改资源,使用正则表达式匹配资源名称
write:资源写权限,可以向资源发送消息,使用正则表达式匹配资源名称,.* 表示所有
read:资源读取权限,可以从资源读取消息,使用正则表达式匹配资源名称,.* 表示所有
资源指 vhost 中的 exchange、queue 等

参考:

  1. https://segmentfault.com/a/1190000007123977
  2. https://blog.csdn.net/chendaoqiu/article/details/48440633
  3. https://blog.csdn.net/dandanzmc/article/details/52262850
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
RabbitMQ学习总结(7)——Spring整合RabbitMQ实例
1.RabbitMQ简介RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。 官网:http://www.rabbitmq.com/(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fwww.rabbi
Stella981 Stella981
3年前
RabbitMQ使用场景
AMQPAMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件:!(https://oscimg.oschina.net/oscnet/4eea8d68796a038f27fdc5945546a544f9d.png)1.Server(broker): 接受客户端连接,实现AMQP消息队列和路
Stella981 Stella981
3年前
MQ对比之RabbitMQ & Redis
消息队列选择:RabbitMQ&RedisRabbitMQRabbitMQ是一个由erlang开发的AMQP(AdvancedMessageQueue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message
Stella981 Stella981
3年前
C#队列学习笔记:RabbitMQ基础知识
  一、引言  RabbitMQ是RabbitMessageQueue的简写,但不能仅仅理解其为消息队列,消息代理更合适。RabbitMQ是一个由Erlang语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下:!RabbitMQ内部结构(https://oscimg.oschina.net/osc
Stella981 Stella981
3年前
RabbitMQ 消息中间件搭建详解
1.RabbitMQ简介消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包
Stella981 Stella981
3年前
AvctiveMQ和RabbitMQ的区别
ActiveMQ:传统的消息队列,使用Java语言编写。基于JMS(JavaMessageService),采用多线程并发,资源消耗比较大。支持P2P和发布订阅两种模式。RabbitMQ:是使用Erlang语言开发的开源消息队列系统。基于AMQP协议来实现的。AMQP的主要特征是面向消息、队列、路由(
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免
Stella981 Stella981
3年前
RabbitMQ基础概念详细介绍
RabbitMQ简介AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。R
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这