AMQP 消息模型
RabbitMQ 是基于 AMQP(高级消息队列协议)的一个开源实现,其内部实际也是 AMQP 的基本概念。
AMQP 的消息发送流程有如下几个步骤:
消息生产者(producer)将消息发布到 Exchange中;
Exchange 根据队列的绑定关系将消息分发到不同的队列(Queue)中;
AMQP broker 根据订阅规则将消息分发给消费者(Consumer)或消费者根据需要自行从消息队列中获取消息。
Message
消息由消息头和消息体组成。消息体对 broker 是不透明的,消息头由一系列可选属性组成,包括 routing-key、priority、delivery-mode 等。
Producer
消息生产者
Exchange
Exchange 的主要任务是接收 Producer 生产的消息,并将消息分发给 0 个或多个 Queue 中。而分发规则受 Exchange 类型和 绑定(binding)关系的影响。AMQP 0-9-1 broker 提供了以下四个 Exchange 类型:
类型
默认预定名称
Direct Exchange
空字符串和amq.direct
Fanout Exchange
amq.fanout
Topic Exchange
amq.topic
Headers Exchange
amq.match(在 RabbitMQ 中,额外提供 amq.headers)
每个 Exchange 都有如下属性:
Name,Exchange 的名字;
Durability,是否是持久的 Exchange,当为真时,broker 重启后也会保留此 Exchange;
Auto-delte,当为真时,如果所有绑定的 Queue 都不再使用时,此 Exchange 会自动删除。
关于默认的 Exchange
默认的 Exchange 是一个有 broker 预创建的匿名的(即名字是空字符串)Direct Exchange。如果没有显示地为 Queue 绑定 Exchange,那么创建的每个 Queue 都会自动绑定到默认的的 Exchange 中,并且此时这个 Queue 的 Route key 就是这个 Queue 的名字。
Direct Exchange
Direct Exchange 会根据消息的 route key 将消息分发到不同的 Queue 中。Direct Exchange 适合用户消息的单播发送。Direct Exchange 的工作流程如下:
一个 Queue 使用 K 作为 route key 绑定到 Direct Exchange 中;
当 Direct Exchange 收到一个 route key 为 R 的消息时,如果 R == K,则此 Exchange 会将此消息分发到此 Queue 中。
Direct Exchange 经常用在多个 worker 中分配任务(即一个 Master 和多个 Slave)。当使用这个模型时,需要注意AMQP 0-9-1 的负债均衡是以 Consumer 为单位的,而不是一个 Queue 为单位。
一般在此种模式下会使用 RabbitMQ 的默认 Exchange。使用默认的 Exchange 需要注意:
该Exchange的名字为空字符串;
不需要对 Exchange 和 Queue 进行任何显示的绑定,Queue 默认会绑定到此 Exchange;
Queue 的 routing key 默认与 Queue 的名称相同;
如果 Vhost 中不存在与消息的 routing key 匹配的 Queue,该消息会被抛弃。
Fanout Exchange
一个 Fanout Exchange 会将消息分发给所有绑定到此 Exchange 的 Queue 中,而不会考虑 Queue 的 routing key。此模型常用于消息的广播。
Topic Exchange
Topic Exchange 会根据 routing key 将消息分发到与此消息的 routing key 相匹配的并且绑定到此 Exchange 中的 Queue。Topic Exchange 常用于实现 publish/subscribue 模型,即消息的多播模型。
Header Exchange
Header Exchange 不使用 routing key 作为路由依据,而是使用消息头属性来路由消息。
Queue
AMQP 中的队列概念与其他消息模型的队列概念相似,都有以下几个重要概念:
Name,队列的名字;
Durable,队列是否持久。为真时,即使 broker 重启时,此 Queue 也不会删除;
Exclusive,队列是否独占。为真时,表示此 Queue 只能有一个消费者,并且此消费者的连接断开时,此 Queue 会被删除。
Auto-delete,为真时,此 Queue会在最后一个消费者取消订阅时被删除。
在使用一个队列时,需要先进行声明。如果声明的队列不存在,则 broker 自动创建该队列。如果声明的队列已存在时,若重新声明的队列属性与已存在队列的属性完全相同,则不会有任何影响;若不一致,则会产生 406(PRECONDITION_FAILED)错误。
队列名
AMQP 的对列名不能以 “amq.” 开头,因为这样的队列名是预留给 AMQP broker 内部使用的。使用这样的队列名会产生 403(ACCESS_REFUSED)错误。
持久化队列
持久化的队列会被写入到磁盘中,即使 broker 重启了,持久队列也依然存在。持久队列和消息的持久化不同。当 broker 重启时,持久队列自动重新声明,然而只有队列中的持久化消息才会被恢复。
队列的绑定
队列的绑定关系是 Exchange 用于消息分发的规则,即一个 Exchange 能够将消息分发到某个队列的前提是此队列已经绑定到这个 Exchange 中。当队列绑定到一个 Exchange 中时,我们还可以额外设置一个参数,即 routing key,这个 key 会被 Direct Exchange 和 Topic Exchange 作为额外的分发规则使用,换句话说,routing key 扮演着过滤器的角色。
Consumer
AMQP 0-9-1 支持两种消息分发模式:
push 模式:broker 主动推送消息给消费者
pull 模式:消费者主动从 broker 中获取消息
在 push 模式下,应用程序需要告知 broker 它需要哪些消息,即我们所说的订阅一个消息的主题。每个消费者都有一个唯一的标识符,即 consumer tag,可以用这个 tag 来取消一个消费者对某个主题的订阅。
消息的 ACK(消息确认)
AMQP 0-9-1 有两种消息 ACK 模式:
自动 ACK 模式
手动 ACK 模式
在自动 ACK 模式下,当 broker 发送消息成功后,会立即将此消息从消息队列中删除,而不会等待消费者的 ACK 回复。
在手动 ACK 模式下,当 broker 发送消息给消费者时,不会立即将此消息删除,而是需要等待消费者的 ACK 回复,得到回复后才会删除消息。因此手动 ACK 模式下,当消费者受到消息后并处理完成后,需要向 broker 显示地发送 ACK 指令。
拒绝消息
当消费者处理消息失败或此时不能出来消息时,那么可以给 broker 发送一个拒绝消息的指令,并且可以要求 broker 丢弃或重新分发此消息。
注意:如果此时只有一个消费者,那么此消费者拒收消息并要求 broker 重新分发此消息时,会造成此消息不断的分发和拒收,形成死循环。
预取消息
通过预取消息机制,消费者可以一次性批量取出消息,然后在处理后对这些批量消息进行统一的 ACK 回复,以提高消息的吞吐量。
RabbitMQ 仅支持 Channel 级别的预取消息的数量配置,不支持基于连接的预取消息数量配置。
连接
AMQP 的连接是长连接,它是一个使用 TCP 作为可靠传输的应用层协议。
Channel
AMQP 不推荐一个应用程序发起多个对 broker 的连接,因为这样会消耗系统资源并且也不利于防火墙的配置。但是如果应用程序确实需要有多个互不干扰的连接来进行不同的操作,可以使用 Channel。在 AMQP 0-9-1 中,一个与 broker 的连接可以被多个 Channel 复用。因此我们可以将 Channel 理解为:一个共享同一个 TCP 连接的轻量级的连接。
基于同一个 TCP 连接的两个不同 Channel 是不会有任何干扰的,因此客户端在和 broker 之间交互时,需要附带上 channel id。
通常来说,在一个多线程消费消息的模型中,每个线程单独开一个 Channel 是一个推荐的做法,而最好不要在各个线程中共享一个 Channel。
Virtual Host
为了在一个 broker 中实现不同的相互隔离的环境(例如每个环境中有不同的用户,不同的 Exchange,不同的队列等),AMQP 引入了 virtual host(vhost)概念。在连接 broker 时,客户端可以指定需要使用哪个 vhost。
虚拟主机操作
列举所有虚拟主机:rabbitmqctl list_vhosts
添加虚拟主机:rabbitmqctl add_vhost <vhost_name>
删除虚拟主机:rabbitmqctl delete_vhost <vhost_name>
添加用户:add_user <username> <password>
设置用户标签:set_user_tags <username> <tag> // 标签包括 administrator、monitoring、policymaker、management
设置用户权限:set_permissions [-p <vhost>] <username> <conf> <write> <read>
conf:配置权限,创建、删除、修改资源,使用正则表达式匹配资源名称
write:资源写权限,可以向资源发送消息,使用正则表达式匹配资源名称,.* 表示所有
read:资源读取权限,可以从资源读取消息,使用正则表达式匹配资源名称,.* 表示所有
资源指 vhost 中的 exchange、queue 等