RocketMQ消息轨迹

Stella981
• 阅读 791

RocketMQ消息轨迹主要包含两篇文章:设计篇与源码分析篇,本节将详细介绍RocketMQ消息轨迹-设计相关。

RocketMQ消息轨迹,主要跟踪消息发送、消息消费的轨迹,即详细记录消息各个处理环节的日志,从设计上至少需要解决如下三个核心问题:

  • 消费轨迹数据格式
  • 记录消息轨迹(消息日志)
  • 消息轨迹数据存储在哪?

1、消息轨迹数据格式

RocketMQ4.5版本消息轨迹主要记录如下信息:

  • traceType 跟踪类型,可选值:Pub(消息发送)、SubBefore(消息拉取到客户端,执行业务定义的消费逻辑之前)、SubAfter(消费后)。
  • timeStamp 当前时间戳。
  • regionId broker所在的区域ID,取自BrokerConfig#regionId。
  • groupName 组名称,traceType为Pub时为生产者组的名称;如果traceType为subBefore或subAfter时为消费组名称。
  • requestId traceType为subBefore、subAfter时使用,消费端的请求Id。
  • topic 消息主题。
  • msgId 消息唯一ID。
  • tags 消息tag。
  • keys 消息索引key,根据该key可快速检索消息。
  • storeHost 跟踪类型为PUB时为存储该消息的Broker服务器IP;跟踪类型为subBefore、subAfter时为消费者IP。
  • bodyLength 消息体的长度。
  • costTime 耗时。
  • msgType 消息的类型,可选值:Normal_Msg(普通消息),Trans_Msg_Half(预提交消息),Trans_msg_Commit(提交消息),Delay_Msg(延迟消息)。
  • offsetMsgId 消息偏移量ID,该ID中包含了broker的ip以及偏移量。
  • success 是发送成功。
  • contextCode 消费状态码,可选值:SUCCESS,TIME_OUT,EXCEPTION,RETURNNULL,FAILED。

2、记录消息轨迹

消息中间件的两大核心主题:消息发送、消息消费,其核心载体就是消息,消息轨迹(消息的流转)主要是记录消息是何时发送到哪台Broker,发送耗时多少时间,在什么是被哪个消费者消费。记录消息的轨迹主要是集中在消息发送前后、消息消费前后,可以通过RokcetMQ的Hook机制。通过如下两个接口来定义钩子函数。 RocketMQ消息轨迹

通过实行上述两个接口,可以实现在消息发送、消息消费前后记录消息轨迹,为了不明显增加消息发送与消息消费的时延,记录消息轨迹最好使用异步发送模式。

3、如何存储消息轨迹数据

消息轨迹需要存储什么消息以及在什么时候记录消息轨迹的问题都以及解决,那接下来就得思考将消息轨迹存储在哪里?存储在数据库中或其他媒介中,都会加重消息中间件,使其依赖外部组件,最佳的选择还是存储在Broker服务器中,将消息轨迹数据也当成一条消息存储到Broker服务器。

既然把消息轨迹当成消息存储在Broker服务器,那存储消息轨迹的Topic如何确定呢?RocketMQ提供了两种方法来定义消息轨迹的Topic。

  • 系统默认Topic 如果Broker的traceTopicEnable配置设置为true,表示在该Broker上创建topic名为:RMQ_SYS_TRACE_TOPIC,队列个数为1,默认该值为false,表示该Broker不承载系统自定义用于存储消息轨迹的topic。
  • 自定义Topic 在创建消息生产者或消息消费者时,可以通过参数自定义用于记录消息轨迹的Topic名称,不过要注意的是,rokcetmq控制台(rocketmq-console)中只支持配置一个消息轨迹Topic,故自定义Topic,在目前这个阶段或许还不是一个最佳实践,建议使用系统默认的Topic即可。

通常为了避免消息轨迹的数据与正常的业务数据混合在一起,官方建议,在Broker集群中,新增加一台机器,只在这台机器上开启消息轨迹跟踪,这样该集群内的消息轨迹数据只会发送到这一台Broker服务器上,并不会增加集群内原先业务Broker的负载压力。

RocketMQ消息轨迹的设计细节就介绍到这里了,下一篇将从源码的角度对其实现细节进行详细的剖析;如果觉得本文对您有帮助的话,期待您的点赞,谢谢。


> 作者简介:《RocketMQ技术内幕》作者,RocketMQ 社区布道师,维护公众号:中间件兴趣圈,可扫描如下二维码与作者进行互动。

RocketMQ消息轨迹

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
3年前
Kafka、RabbitMQ、RocketMQ等消息中间件的对比 —— 消息发送性能和区别
Kafka、RabbitMQ、RocketMQ等消息中间件的对比——消息发送性能和区别那么,消息中间件性能究竟哪家强?带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka、RabbitMQ、RocketMQ)做了
Stella981 Stella981
3年前
RokectMQ 顺序性 和分布式事务
1.顺序性是根据参数的id来使其同时投递到统一队列上。//RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上//RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash//当然你可以根据业务实现自己的MessageQueueSelecto
Wesley13 Wesley13
3年前
JMS消息的概念解释
1、默认生产者消息是持久的:会存数据库\消费者的持久:createDurableSubscriber是指消费者能收到所有它订阅时间点之后的消息,即使消费者注册后关闭,当它重启就能收到注册时间点之后所有的消息;即当此消费用户ID(AAA)在producer发送之前就已经注册,那么此id能收到producer发送的所有消息,如果是在produce
Stella981 Stella981
3年前
Noark入门之协议映射
0x00消息控制器消息控制器,主要作用就是为每个模块提供消息处理的入口.这里的消息不仅仅是协议,还有内部指令,事件等等逻辑入口,这也是为了响应线程模型作出的一种支撑,只要入口在此消息控制器内,那必然走期望的线程调度。@Controller用于标识一个类为当前模块的消息控制器入口.@Controller(threadGroup
线上SQL超时场景分析-MySQL超时之间隙锁 | 京东物流技术团队
前言之前遇到过一个由MySQL间隙锁引发线上sql执行超时的场景,记录一下。背景说明分布式事务消息表:业务上使用消息表的方式,依赖本地事务,实现了一套分布式事务方案消息表名:mqmessages数据量:3000多万索引:createtime和statuss
京东云开发者 京东云开发者
5个月前
【京东云新品发布月刊】2024年7月产品动态
京东云7月产品动态:1.【消息队列RocketMQ】新品上线消息队列RocketMQ是京东云基于ApacheRocketMQ打造的一款低延迟、高并发、高可用、高可靠的分布式消息队列服务。支持开源客户端零改造接入,同时具备计算存储分离,灵活扩缩容的优势。能够
美凌格栋栋酱 美凌格栋栋酱
1小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(