RabbitMQ存储和队列结构

Stella981
• 阅读 696

本文讲解RabbitMQ的存储,主要有以下内容:

  1. 存储原理

  2. 队列结构

  3. 惰性队列

存储原理

首先确认一个点,持久化和非持久化的消息都会落地磁盘,区别在于持久化的消息一定会写入磁盘(并且如果可以在内存中也会有一份),而非持久化的消息只有在内存吃紧的时候落地磁盘。两种类型消息的落盘都是在RabbitMQ的持久层中完成的。

RabbitMQ的持久层只是一个逻辑上的概念,实际包含两个部分:

  • 队列索引(rabbit_queue_index):负责维护队列中落盘消息的信息,包括消息的存储地点、是否己被交付给消费者、是否己被消费者ack等。 每个队列都有与之对应的一个rabbit_queue_index

  • 消息存储(rabbit_msg_store):以键值对的形式存储消息,它被所有vhost中的队列共享,在每个vhost中有且只有一个。rabbit_msg_store具体还可以分为 msg_store_persistent和msg_store_transient,msg_store_persistent负责持久化消息的持久化,重启后消息不会丢失;msg_store_transient负责 非持久化消息的持久化,重启后消息会丢失。

消息(包括消息体、属性和headers)可以直接存储在rabbit_queue_index中,也可以被保存在rabbit_msg_store中。

最佳的配备方式是较小的消息存储在rabbit_queue_index中而较大的信息则存储在rabbit_msg_store中。消息大小的参数可以通过queue_index_embed_mgs_below来配置,默认大小4096,单位B。

rabbit_queue_index中以顺序的段文件来开始存储,后缀为".idx",每个段文件中包含固定的SEGMENT_ENTRY_COUNT条记录,SEGMENT_ENTRY_COUNT默认值是16384。

经过rabbit_msg_store处理的所有消息都会以追加的方式写入到文件中,当一个文件的大小超过指定的限制(filesizelimit)后,关闭这个文件再创建一个新的文件以供新的消息写入。文件名(文件后缀是".rdq")从0开始进行累加,因此文件名最小的文件也是最老的文件。在进行消息的存储时,RabbitMQ会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射(Index)和文件的相关信息(FileSummary)。

在读取消息的时候,先根据消息的ID(msg id)找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息的内容。如果文件不存在或者被锁住了,则发送请求由rabbit_msg_store进行处理。

消息删除是只是删除ETS表中该消息的相关信息,同时更新消息对应的存储文件的相关信息。执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是被标识为垃圾数据而已。一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并在一个文件中,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阀值GARBAGE FRACTION(默认值为0.5)时才会触发垃圾回收将两个文件合并。

队列结构

通常队列由rabbit_amqpqueue_process和backing_queue两部分组成:

  • rabbit_amqpqueue_process:负责协议相关的消息处理(即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack))等

  • backing_queue:消息存储的具体形式和引擎,并向rabbit_amqpqueue_process提供接口以供调用

如果消息发送的队列是空的且队列有消费者,该消息不会经过该队列直接发往消费者,如果无法直接被消费,则需要将消息暂存入队列,以便重新投递。消息在存入队列后,主要有以下几种状态:

  • alpha:消息内容(包括消息体、属性和headers)和消息索引都存在内存中

  • beta:消息内容保存在磁盘中,消息索引都存在内存中

  • gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都存在

  • delta:消息内容和消息索引都在磁盘中

持久化的消息,消息内容和消息索引必须都保存在磁盘中,才会处于上面状态中的一种,gamma状态只有持久化的消息才有这种状态。

对于没有设置优先级和镜像的队列来说,backing_queue的默认实现是rabbit_variable_queue,其内部通过5个子队列来体现消息的各个状态:

  • Q1:只包含alpha状态的消息

  • Q2:包含beta和gamma的消息

  • Delta:包含delta的消息

  • Q3:包含beta和gamma的消息

  • Q4:只包含alpha状态的消息

消息的状态一般变更方向是Q1->Q2->Delta->Q3->Q4,大体是从内存到磁盘然后再到内存中。RabbitMQ存储和队列结构 消费者消费消息也会引起消息状态的转换。

  1. 消费者消费时先从Q4获取消息,如果获取成功则返回。

  2. 如果Q4为空,则从Q3中获取消息,首先判断Q3是否为空,如果为空返回队列为空,即此时队列中无消息

  3. 如果Q3不为空,取出Q3的消息,然后判断Q3和Delta中的长度,如果都为空,那么Q2、Delta、Q3、Q4都为空,直接将Q1中的消息转移至Q4,下次直接从Q4中读取消息

  4. 如果Q3为空,Delta不为空,则将Delta中的消息转移至Q3中,下次直接从Q3中读取。

  5. 在将消息从Delta转移至Q3的过程中,是按照索引分段读取,首先读取某一段,然后判断读取的消息个数和Delta消息的个数,如果相等,判定Delta已无消息,直接将读取 Q2和读取到消息一并放入Q3,如果不相等,仅将此次读取的消息转移到Q3。

通常在负载正常时,如果消息被消费的速度不小于接收新消息的速度,对于不需要保证可靠不丢失的消息来说,极有可能只会处于alpha状态。对于durable属性设置为true的消息,它一定会进入gamma状态,并且在开启publisher confirm机制时,只有到了gamma状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。

惰性队列

惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存的消耗,但是会增加I/0的使用,如果消息是持久化的,那么这样的I/0操作不可避免,惰性队列和持久化的消息可谓是"最佳拍档"。

队列具备两种模式:default和lazy。在队列声明的时候可以通过x-queue-mode参数来设置队列的模式,取值为default和lazy。对应的 Policy设置方式为:

  1. rabbitmqctl set_policy lazy "^myQueue$" '{"queue-mode":"lazy"}' --apply-to queue

本文分享自微信公众号 - shysh95(shysh95)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
RabbitMQ如何通过持久化保证消息99.99%不丢失?
1\.本篇概要要解决该问题,就要用到RabbitMQ中持久化的概念,所谓持久化,就是RabbitMQ会将内存中的数据(Exchange交换器,Queue队列,Message消息)固化到磁盘,以防异常情况发生时,数据丢失。其中,RabblitMQ的持久化分为三个部分:1.交换器(Exchange
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
RabbitMQ系列三 (深入消息队列)
消息持久化是RabbitMQ最为人津津乐道的特性之一,RabbitMQ能够在付出最小的性能代价的基础上实现消息的持久化,最大的奥秘就在于RabbitMQ多层消息队列的设计上。下面,本文就从MessageQueue的设计和消息在MessageQueue的生命周期两个方面全面介绍 RabbitMQ的消息队列。RabbitMQ完全实现
Stella981 Stella981
3年前
RabbitMq的一些概念,持久化、队列排他、自动删除、消息确认机制、消息ACK、消费消息的模式
一、队列持久化的概念队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库。二、排他简单理解就是在连接关闭时是否会删除队列(无论队列中有没有消息) 三、自动删除
Stella981 Stella981
3年前
RabbitMQ消息持久化和消息确认机制
消息持久化消息在传输过程中,可能会出现各种异常失败甚至宕机情况,为了保证消息传输的可靠性,需要进行持久化,也就是在数据写在磁盘上。消息队列持久化包括三部分:1.Message持久化,也就是发送时消息持久化。(Message包含body,body为我们需要发送的消息具体内容,一般以json字符串发送,消费端
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
融云IM即时通讯 融云IM即时通讯
19小时前
融云IM干货丨IM服务消息推送,客户端版本更新后,如何确保消息不丢失?
确保客户端版本更新后消息不丢失,可以采取以下几种策略:消息持久化:确保消息被存储在可靠的存储介质中,如数据库或磁盘,这样即使客户端或服务端发生故障,消息也不会丢失。对于RabbitMQ等消息队列,需要开启持久化机制,将消息持久化到硬盘上,即使服务重启也能从