本文讲解RabbitMQ的存储,主要有以下内容:
存储原理
队列结构
惰性队列
存储原理
首先确认一个点,持久化和非持久化的消息都会落地磁盘,区别在于持久化的消息一定会写入磁盘(并且如果可以在内存中也会有一份),而非持久化的消息只有在内存吃紧的时候落地磁盘。两种类型消息的落盘都是在RabbitMQ的持久层中完成的。
RabbitMQ的持久层只是一个逻辑上的概念,实际包含两个部分:
队列索引(rabbit_queue_index):负责维护队列中落盘消息的信息,包括消息的存储地点、是否己被交付给消费者、是否己被消费者ack等。 每个队列都有与之对应的一个rabbit_queue_index
消息存储(rabbit_msg_store):以键值对的形式存储消息,它被所有vhost中的队列共享,在每个vhost中有且只有一个。rabbit_msg_store具体还可以分为 msg_store_persistent和msg_store_transient,msg_store_persistent负责持久化消息的持久化,重启后消息不会丢失;msg_store_transient负责 非持久化消息的持久化,重启后消息会丢失。
消息(包括消息体、属性和headers)可以直接存储在rabbit_queue_index中,也可以被保存在rabbit_msg_store中。
最佳的配备方式是较小的消息存储在rabbit_queue_index中而较大的信息则存储在rabbit_msg_store中。消息大小的参数可以通过queue_index_embed_mgs_below来配置,默认大小4096,单位B。
rabbit_queue_index中以顺序的段文件来开始存储,后缀为".idx",每个段文件中包含固定的SEGMENT_ENTRY_COUNT条记录,SEGMENT_ENTRY_COUNT默认值是16384。
经过rabbit_msg_store处理的所有消息都会以追加的方式写入到文件中,当一个文件的大小超过指定的限制(filesizelimit)后,关闭这个文件再创建一个新的文件以供新的消息写入。文件名(文件后缀是".rdq")从0开始进行累加,因此文件名最小的文件也是最老的文件。在进行消息的存储时,RabbitMQ会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射(Index)和文件的相关信息(FileSummary)。
在读取消息的时候,先根据消息的ID(msg id)找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息的内容。如果文件不存在或者被锁住了,则发送请求由rabbit_msg_store进行处理。
消息删除是只是删除ETS表中该消息的相关信息,同时更新消息对应的存储文件的相关信息。执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是被标识为垃圾数据而已。一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并在一个文件中,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阀值GARBAGE FRACTION(默认值为0.5)时才会触发垃圾回收将两个文件合并。
队列结构
通常队列由rabbit_amqpqueue_process和backing_queue两部分组成:
rabbit_amqpqueue_process:负责协议相关的消息处理(即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack))等
backing_queue:消息存储的具体形式和引擎,并向rabbit_amqpqueue_process提供接口以供调用
如果消息发送的队列是空的且队列有消费者,该消息不会经过该队列直接发往消费者,如果无法直接被消费,则需要将消息暂存入队列,以便重新投递。消息在存入队列后,主要有以下几种状态:
alpha:消息内容(包括消息体、属性和headers)和消息索引都存在内存中
beta:消息内容保存在磁盘中,消息索引都存在内存中
gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都存在
delta:消息内容和消息索引都在磁盘中
持久化的消息,消息内容和消息索引必须都保存在磁盘中,才会处于上面状态中的一种,gamma状态只有持久化的消息才有这种状态。
对于没有设置优先级和镜像的队列来说,backing_queue的默认实现是rabbit_variable_queue,其内部通过5个子队列来体现消息的各个状态:
Q1:只包含alpha状态的消息
Q2:包含beta和gamma的消息
Delta:包含delta的消息
Q3:包含beta和gamma的消息
Q4:只包含alpha状态的消息
消息的状态一般变更方向是Q1->Q2->Delta->Q3->Q4,大体是从内存到磁盘然后再到内存中。消费者消费消息也会引起消息状态的转换。
消费者消费时先从Q4获取消息,如果获取成功则返回。
如果Q4为空,则从Q3中获取消息,首先判断Q3是否为空,如果为空返回队列为空,即此时队列中无消息
如果Q3不为空,取出Q3的消息,然后判断Q3和Delta中的长度,如果都为空,那么Q2、Delta、Q3、Q4都为空,直接将Q1中的消息转移至Q4,下次直接从Q4中读取消息
如果Q3为空,Delta不为空,则将Delta中的消息转移至Q3中,下次直接从Q3中读取。
在将消息从Delta转移至Q3的过程中,是按照索引分段读取,首先读取某一段,然后判断读取的消息个数和Delta消息的个数,如果相等,判定Delta已无消息,直接将读取 Q2和读取到消息一并放入Q3,如果不相等,仅将此次读取的消息转移到Q3。
通常在负载正常时,如果消息被消费的速度不小于接收新消息的速度,对于不需要保证可靠不丢失的消息来说,极有可能只会处于alpha状态。对于durable属性设置为true的消息,它一定会进入gamma状态,并且在开启publisher confirm机制时,只有到了gamma状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。
惰性队列
惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存的消耗,但是会增加I/0的使用,如果消息是持久化的,那么这样的I/0操作不可避免,惰性队列和持久化的消息可谓是"最佳拍档"。
队列具备两种模式:default和lazy。在队列声明的时候可以通过x-queue-mode参数来设置队列的模式,取值为default和lazy。对应的 Policy设置方式为:
rabbitmqctl set_policy lazy "^myQueue$" '{"queue-mode":"lazy"}' --apply-to queue
本文分享自微信公众号 - shysh95(shysh95)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。