MQ消息乱序问题解析与实战解决方案

京东云开发者
• 阅读 120

作者:京东物流 刘浩

1. 背景

在分布式系统中,消息队列(MQ)是实现系统解耦、异步通信的重要工具。然而,MQ消费时出现的消息乱序问题,经常会对业务逻辑的正确执行和系统稳定性产生不良影响。本文将详细探讨MQ消息乱序问题的根源,并提供一系列在实际应用中可行的解决方案。

2. MQ消息乱序问题分析

常见的MQ消息乱序问题的根源主要可以归结为以下几点:

2.1 相同topic内的消息乱序

1). 并发消费:

在分布式系统中,为了提高消息处理的吞吐量,通常会配置多个消费者实例来并发消费同一个队列中的消息。然而,由于消费者实例的机器性能、网络延迟以及处理速度的差异,可能导致消息的消费顺序与发送顺序不一致。

2). 消息分区:

为了支持更高效的消息存储和消费,MQ系统通常会采用分区化的设计。然而,当同一业务逻辑的多条消息被分发到不同的分区时,消费者在消费这些消息时就可能出现乱序现象。

3). 网络延迟与抖动:

消息在传输过程中可能会受到网络延迟和抖动的影响,导致消息到达消费者端的时间顺序与发送顺序不一致。

4). 消息重试与故障恢复:

当消费者处理消息失败或出现故障时,MQ系统通常会进行消息重试或故障恢复操作。如果重试机制或故障恢复策略设计不当,也可能导致消息乱序。

2.2 不同topic的消息乱序

从相对时间的视角来审视,消息被消费的顺序并不等同于其被发送的顺序。例如,系统A在12:00时向TopicA发送了消息msgA-12:00,而紧接着系统B在12:01时向TopicB发送了消息msgB-12:01。当系统C同时订阅并消费这两个Topic时,它无法预设msgA-12:00会必然先于msgB-12:01被接收。这是由于消息系统在处理过程中,受到诸如消息分区策略、各个Consumer的处理能力以及其诸如网络、堆积、重试等他综合因素的影响,导致无法确保消息遵循严格的先进先出原则。

3. 案例分析

3.1 数据迁移过程中的mq消费乱序场景

在数据迁移或同步过程中,尤其是双写场景(即数据既写入旧系统,又通过MQ发送到新系统进行异步处理),MQ乱序可能导致严重的数据不一致问题。 MQ消息乱序问题解析与实战解决方案

 具体来说,当数据写入时发送INSERT MQ,数据更新时发送UPDATE MQ,如果UPDATE MQ先于INSERT MQ到达目标系统,目标系统可能会基于一个不存在的数据记录进行更新操作。这会导致以下几种情况:

数据丢失:如果目标系统没有处理UPDATE MQ中提到的数据记录(因为该记录尚未通过INSERT MQ创建),则更新操作会失败,可能导致数据变更丢失或遗漏。

数据覆盖:在高频修改的情况下,频繁更新可能会面临旧数据覆盖新数据的风险,比如UPDATE MQ携带的是旧数据且先于新数据的UPDATE MQ到达。

3.2 业务风险分析

MQ乱序对数据迁移和同步过程的影响是深远的:

数据一致性受损:最直接的影响是数据一致性受损。目标系统中的数据可能与源系统不一致,导致业务决策基于错误的数据。

用户体验下降:数据不一致可能导致用户看到错误的信息或遇到功能故障,从而降低用户体验。

业务中断:在严重的情况下,数据不一致可能导致业务中断或系统故障,影响企业的运营和声誉。

4. 解决方案

为了解决这个问题,可以采取以下措施:

4.1 顺序消息

消息顺序性保证:虽然Kafka不保证全局消息顺序,但可以通过合理的分区策略和消息键来确保同一账单的消息被发送到同一个分区,从而在一定程度上保证消息的顺序性。

比如RocketMQ支持顺序消息。但是需要注意这是局部有序,非全局后续。具体实现过程:

1.发送mq消息时,通过selector将同一个业务主键的消息,发送到同一队列中

2.消费方使用MessageListenerOrderly消费局部有序的消息

该方案需要发送方和消费方同步改造。

生产侧: MQ消息乱序问题解析与实战解决方案

消费侧:

MQ消息乱序问题解析与实战解决方案

4.2 前置检测

•在消费者处理消息之前,进行前置条件检查。例如,可以查询一个消息辅助表,确保上一个消息已经被成功消费或存入死信队列中。这种检查可以确保消息按照正确的顺序被处理。

•另一种方法是,在消息中添加序列号或时间戳,并在消费者端进行验证。如果当前消息的序列号或时间戳不符合预期顺序,则暂停处理并等待正确的消息到达。

4.3 状态机

在消息处理系统中,状态机可以用来定义和处理消息的顺序。每个状态代表系统当前所处的特定条件或阶段,而状态之间的转换则是由接收到的消息触发的。当系统接收到一个消息时,它会检查当前的状态和消息类型,然后决定是否要转移到另一个状态并执行相应的动作。

对于消息乱序问题,状态机可以通过以下方式解决:

1.定义状态转换规则:首先,需要定义一套明确的状态转换规则。这些规则应该基于业务逻辑来确定,以确保消息按照正确的顺序被处理。例如,如果系统要求先处理事件A再处理事件B,那么状态机就应该在接收到事件A后转移到能够处理事件B的状态。

2.状态检查与消息缓存:当系统接收到一个消息时,它会检查当前的状态是否允许处理该消息。如果当前状态不允许处理该消息(即消息的顺序不正确),则可以将该消息缓存起来,等待状态机转移到正确的状态后再进行处理。

3.状态转移与消息处理:一旦状态机转移到正确的状态,它就可以处理缓存中的消息。这可以确保消息按照正确的顺序被处理,即使它们最初是以乱序到达的。

4.4 监控与报警

建立系统的监控和报警机制,及时发现并处理消息错乱等异常情况。

通过采取以上措施,可以大大降低账单还款系统中消息错乱导致的问题,提高系统的稳定性和用户体验。

5. 小结

MQ消息乱序是分布式系统的常见难题,影响系统稳定性和业务一致性。本文深入解析问题根源,探讨了顺序消息、前置检查、状态机等实战解决方案,为实际开发中的问题解决提供有力参考。

文章中难免会有不足之处,希望读者能给予宝贵的意见和建议。谢谢!

点赞
收藏
评论区
推荐文章
Centos7安装RabbitMQ详细教程 - 附带软件基本解释 - CSDN博客
MQ引言什么是MQMQ:messageQueue翻译为消息队列,通过典型的生产者和消费者模型不断向消息队列中生产消息,消费者不断从队列中获取消息。因为消息的生产和消费都是一部的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现了系统之间的解耦。别名是消息中间件,通过利用高效的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系
Wesley13 Wesley13
3年前
MQ应用场景
MQ常见应用场景以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋,日志处理和消息通讯四个场景。异步处理场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种1.串行的方式;2.并行方式。(1)串行方式:将注册信息写入数据库(https://www.oschina.net/ac
Stella981 Stella981
3年前
Message Queue消息队列基本原理
消息队列基本原理📦本文已归档到:「blog」消息队列(MessageQueue,简称MQ)技术是分布式应用间交换信息的一种技术。消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。注意:_为了简便,下文中除了文章标
Stella981 Stella981
3年前
RabbitMq学习笔记——概念
1、RabbitMQ简介  MQ全称为MessageQueue(消息队列),是一种“应用程序”<—“应用程序”的通信方法。MQ是一个典型的“消费”<—“生产者”模型的代表,生成者往消息队列中写入消息,消费者从消息队列中读取消息。2、MQ的应用场景  对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者
Wesley13 Wesley13
3年前
MQ介绍 & 实例
阅读目录定义优秀MQ特点产品比较实例(简单的实战)关于消息队列与分布式的那些事定义:消息队列(MQ)是一种应用程序对应用程序的通信方法,应用程序通过队列进行通信,而不是通过直接调用彼此来通信,队列的使用除去了接收和发送应
ElasticSearch - 批量更新bulk死锁问题排查 | 京东云技术团队
由于商品变更MQ消息量巨大,为了提升更新ES的性能,防止出现MQ消息积压问题,所以本系统使用了BulkProcessor进行批量异步更新。