3种使用MQ实现分布式事务的方式

Wesley13
• 阅读 606

1.保证消息传递与一致性

1.1生产者确保消息自主性

当生产者发送一条消息时,它必须完成他的所有业务操作。

如下图:

3种使用MQ实现分布式事务的方式

这保证消费者接受到消息时,生产者已处理完毕相关业务,也就是1PC的基础。

1.2 MQ保存并转发消息

消息标记为持久化,MQ将会利用保存并转发机制,来履行它与发送者之间的契约。

至于activemq高可用部分,详见另外一篇blog:https://my.oschina.net/floor/blog/1574213

一般MQ保存并转发的流程如下:

3种使用MQ实现分布式事务的方式

1.3消费者确认模式的选择

JMS提供的3中模式:

  • auto_acknowledge:

               消费者接收到消息后,立即自动向MQ发送确认消息。

  • dups_ok_acknowledge

              一条消息可以被多次消费,为了确保“一次且仅仅一次发送语义”。影响MQ性能。

  • client_acknowledge

             client端确认模式,手动确认消息已消费。

我是如何选择的:

dups_ok_acknowledge首先被排除,是因为它影响MQ的性能,我公司使用的activemq的性能本就一般,所以没有选择。

client_acknowledge 需要程序员编码确认消息被消费,可能存在1条消息很久没有消费掉,MQ堵住的情况,所以没有选择。

我们最终选择了auto_acknowledge ,原因是SImple is best。至于各位如何选择,按需而定吧!

1.4 漏洞与补救办法

经过之前的选择,我们的MQ流程图如下

3种使用MQ实现分布式事务的方式

稍微看下这个流程,大家就会发现,如果消费者,在自动确认消息后,在还没有消费消息时,若消费者挂掉了 ,由于MQ在auto_acknowledge下,当前生产者不会重新发送,这就产生了消息不一致的情况,即生产者端已处理,消费者端未处理的问题。

1.4.1. 2种方式处理消息不一致的情况

1.生产者再发一条消息,2.消费者查询下生产者业务是否完毕。2种方式均可实现。下边主要说下我公司的方式,方案不唯一仅供参考。

1.4.1.1 生产者再次发送,即定时任务补偿+幂等消费的方式

这里不使用MQ的dups_ok_acknowledge,是因为会影响MQ的性能。生产者再次发送方式存在3个问题:

  • 生产者如何知道消费者没有消费一条消息?
  • 生产者重新发送的频率是多少?
  • 消费者如何处理重复的消息?

生产者如何知道消费者没有消费一条消息?

我们公司加入了一个event表,作为生产者与消费者之间的桥梁,用来维护消息的消费状态。event表的核心字段有,JMS队列名称,业务ID(我公司发送的消息都是业务ID,这里可以是业务bean),完成状态(完成,未完成),其他字段可按需而定。

生产者重新发送的频率是多少?

需要根据业务的实时性要求和消费者的能力和可以堆积的信息进行判断。经过分析与压测后,设定定时补偿的频率。

消费者如何处理重复的消息?

    为何会出现重复的消息?

举例说明:

假设正常情况下一条消息消费需要2s(即单节点一分钟消费30条),我们3分补偿一次。

消息队列中已堆积了199条消息,第200条为当前发送的消息,为了简单仅仅考虑单一消费者的情况,当3分钟后,才消费完90条消息,还有堆积110,注意这个时候消费者还没有消费生产者3分钟前发送的消息,而补偿机制又发送了一条消息进入MQ,这就出现了消费者接受到重复消息的情况。

如何解决:

我在生产中的解决办法是在:利用event表,redis实现分布式事务锁,实现幂等消费。

当消费者接受到消息后,按如下步骤处理:

  1. 先查询event中的消息状态,如果消息存在且未处理,继续往下,若已处理直接返回。
  2. 利用redis,加锁,可以使用redission框架,也可以利用String类型的并设置失效时间的简单方式实现不可重入的锁,个人推荐推荐redission,但是我公司使用的是String类型的并设置失效时间的简单方式。
  3. 加锁成功后,再查询event中的消息状态,如果消息存在且未处理,继续往下,若已处理直接返回。
  4. 消费者处理自己的业务
  5. 更新event表中的状态为已处理
  6. 解锁操作

熟悉并发的朋友,会发现1-3步骤是DCL模型。

综上所述,以生产者再次发送的方式,保证消费者消费消息的整体流程如下:

3种使用MQ实现分布式事务的方式

看到这个模型图,可能觉得比较复杂,除了第8步,我们都可以在基类中实现了,并且由于event表数据独立于MQ,我们可以做一个监控(仅仅自己考虑公司没有实现),针对event,查询消息的消费情况,还能实现人工重发功能。以上模式,已在在生产模式中大量使用。

该模式的优点是可以确保消息最终一致性,生产者,MQ,消费者压力均不大,我们公司利用该模式实现核心业务,例如 票购买后的,拆票操作,追号触发追一期操作等。

1.4.1.2.消费者进行查询,推拉结合方式。

按理说消费者可以启动一个定时任务,查询生产者需要它消费的数据。模型类似之前,但是我公司并没有使用这种方式。因为是不想影响消费者的消费能力。

但是我们在通知业务中,实现了一种简易的推拉结合的方式,该方式个人认为使用面比较窄,但对通知业务有一定的适用性,在这里做下简要介绍

实现方式:

仅仅提供了一个http接口供用户查询,该http接口不一定在生产者,这里仅仅是画在生产者中。

其模型图如下:

3种使用MQ实现分布式事务的方式

这个模式使用的前提是:

消费者不消费数据,也对业务没有影响。

案例说明:

以通知3D的开奖号码为例:

生产者为抓取服务,当抓取服务,抓到的3D彩果后,针对每一个订阅者生存独一无二的消息数据,之后发送MQ。

消费者为push服务(实际上是调用第三方推送),接收的消息发给订阅的用户。

由于消息已入库,会在通知中心中展示,而用户是否接受到推送并不重要,他可以在app的消息中心中查询。

2.扩展,使用Event-Sourcing+MQ解决RPC类型的分布式事务‘

这个方式,来自于黄勇老师,我们目前在工作中用在用户支付与订单状态更新上,下边大部分的文字和截图的,都是来自黄勇老师的《架构探险-轻量级微服务架构下册》,在这里感谢黄勇老师的传道与解惑。

2.1什么是Event-Sourcing?

它是一种基于事件回溯的解决方案,一般将它应用在领域对象模型中。事件不可枚举,事件类型可以枚举。我们以event表示事件表,其大体内容如下:

  • ID event的唯一表示
  • EventType: 事件类型:CREATE,UPDATE,DELETE等。
  • Model Name:表示模型名称:例如Foo,Bar等。
  • MOdel ID: 模型ID
  • Create Time 创建时间,精确到毫秒。

2.2实现方式:

第一步,操作模型表与时间表并写入消息队列

3种使用MQ实现分布式事务的方式

第二步,从消息队列中获取事件 ,操作模型表,若有异常,将事件再写入消息队列

3种使用MQ实现分布式事务的方式

第三步,从消息队列中获取原事件,操作事件表与模型表,进行“事件回溯”

3种使用MQ实现分布式事务的方式

回溯操作的特殊说明:

  • INSERT,的逆向操作为DELETE,但是一般业务是标记删除,也就是说逆向操作为UPDATE。
  • UPDATE,的逆向操作为UPDATE
  • DELETE,由于一般是标记删除,所有逆向操作也是UPDATE。

2.3Event-Sourcing和MQ的RPC事务的控制流程图

3种使用MQ实现分布式事务的方式

总结

这篇文章,是我工作中使用MQ的感悟,可能存在不对的地方,欢迎各位指正。

本文3中模式,

  • 定时补偿+幂等消费
  • 推拉结合
  • Event-Sourcing和MQ,实现RPC式分布式事务,(来自@黄勇 ,老师)
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这