RabbitMQ——队列消息数

Stella981
• 阅读 824

背景


在实际使用过程,会遇到这么些情况:

生产者发送的消息数量与消费者接收的消息数量不一致。例如生产者向rabbitmq投递了100条消息,消费者只从队列中接收到了80条消息,并且当前队列中已经没有任何消息。

要定位这个问题,通常是分段来定位,一方面统计生产者到底发送了多少消息,一方面统计有多少消息是正确路由到指定队列的,两者进行比较判断生产者发送是否有问题,如果数量一致,也就是生产者发送的消息都正确到队列后,基本可以断定是消费端的问题。比如消费者本身的逻辑处理不对,或者有其他消费者到该队列消费了一部分数据等。

那么这里有个问题:怎样正确统计到底有多少消息发送到了指定队列?尤其是生产、消费同时进行时,怎样进行正确统计?或者该问题变相的变成一条运维需求,即统计一个时间段内发布到指定队列的消息数。

可能的解决办法


1、抓包或者开启trace来追踪消息进出rabbitmq的情况

然而这种方式仅适用于开发调试阶段,当消息量达到一定程度时,trace会严重影响性能,而抓包也不一定能在正确时刻找到有问题的包。

2、针对有问题的队列,再建一个队列,并以同样的binding-key绑定到同样的exchange上。这样一来,生产者发送过来的消息,会同时进入到两个队列,其中一个队列中的消息被消费者消费掉,新建的队列因没有消费者可保留全部的消息,我们只需要看这个队列中的消息数就可以完成统计工作。

同样,这种办法也是存在一些问题的。首先,消息在队列中堆积,会占用rabbitmq的内存或磁盘空间,从而影响rabbitmq的整体性能。有人可能会问,那再开发个程序,消费这个队列中的消息但不做任何处理,仅仅是进行计数统计,是不是就解决问题了。然而并不是这样,因为一个消息同时路由进入到多个队列,对于生产者的性能也是有损耗的(尤其是一个消息路由到6个以上的队列)。

另一种可行方法


在rabbitmq中,每个消息在队列中会有一个对应的序号,这个序号是每个队列独立维护的。该序号的意义主要是保证消息按照先进先出的方式有序被消费者消费。

其内部实现,每个队列的状态信息中,维护了一个字段:next_seq_id。该字段表示下一条进入队列的消息的序号。每当有消息发送到队列时,该值会加1,同时每个消息的序号也作为消息索引的一部分持久化到文件中了,这样rabbitmq重启后,队列中的消息依然是可以按照有序的方式被消费者消费。

我们可以定期获取该字段对应的值,前后两次的差值就是这个时间段内进入队列的消息总数。

获取方式


可以通过http接口来获取指定队列该字段的值。

如下图所示,两次查看spurs这个队列信息之间,一共发送了3条消息。

RabbitMQ——队列消息数

当然,我们也可以不指定队列,即查看全部队列的信息,并从中获取next_seq_id字段对应的值。

除此之外,rabbitmq的插件rabbitmq_management中提供了管理控制台的命令行工具rabbitmqadmin,该工具本质上也是通过http的方式调用对应接口获取相关信息,可以理解为是封装成了一个可执行程序(脚本)。

例如:

RabbitMQ——队列消息数

最后再补充说明一点:

前面说了,每个消息在队列中都有一个对应的序号,并且该序号随着消息一起持久化到文件中了,但字段next_seq_id本身并没有进行持久化,因此rabbitmq重启后,每个队列会重新计算该值。具体为:找下一个即将写的索引文件,乘以16384得到的值即为新的next_seq_id的值。(为什么是乘以16384,可以参考这篇文章

RabbitMQ——队列消息数

总结


统计一个时间段内进入队列的消息数,可以通过队列的内部状态字段next_seq_id来实现。

RabbitMQ——队列消息数

本文分享自微信公众号 - hncscwc(gh_383bc7486c1a)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Centos7安装RabbitMQ详细教程 - 附带软件基本解释 - CSDN博客
MQ引言什么是MQMQ:messageQueue翻译为消息队列,通过典型的生产者和消费者模型不断向消息队列中生产消息,消费者不断从队列中获取消息。因为消息的生产和消费都是一部的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现了系统之间的解耦。别名是消息中间件,通过利用高效的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系
Stella981 Stella981
3年前
MQ对比之RabbitMQ & Redis
消息队列选择:RabbitMQ&RedisRabbitMQRabbitMQ是一个由erlang开发的AMQP(AdvancedMessageQueue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message
Stella981 Stella981
3年前
Spring Boot(七):RabbitMQ 详解
一、RabbitMQ简介RabbitMQ即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的
Wesley13 Wesley13
3年前
3.rabbitmq
rabbitmq发布订阅模式模型组成一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送
Wesley13 Wesley13
3年前
ActiveMQ简述,使用
官网地址:http://activemq.apache.org/参考文章:http://my.oschina.net/nk2011/blog/366395JMS支持两种消息发送和接收模型。一种称为P2P(PonittoPoint)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的
Stella981 Stella981
3年前
RabbitMQ 基础概念介绍
AMQP消息模型RabbitMQ是基于AMQP(高级消息队列协议)的一个开源实现,其内部实际也是AMQP的基本概念。AMQP的消息发送流程有如下几个步骤:1.消息生产者(producer)将消息发布到Exchange中;2.Exchange根据队列的绑定关系将消息分发到不同的队列(Queue
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免