REST微服务的分布式事务实现

Wesley13
• 阅读 1128

上一篇文章REST微服务的分布式事务实现-分布式事务以及JTA介绍 中,试着带大家理解事务,然后介绍了分布式事务、它的原则和实现方式。这一部分,我们就来详细看看如何使用消息中间件来实现分布式事务。

我们还是使用之前的实例,一个订票系统的购票逻辑:

REST微服务的分布式事务实现

这篇教程的源代码可以从github上获取。

使用消息中间件实现分布式事务,也就是使用事件驱动实现。在这种方式下,Order服务不会直接调用User服务,而是往MQ上发一个消息,说明有新订单需要扣费;User服务会响应这个消息,并处理,处理完成后再发一个消息,说明有新订单需要转移票;然后就会有Ticket服务来处理。而每个服务都是在一个事务里面处理读消息、处理业务、写消息的事情。大致流程如下:

REST微服务的分布式事务实现

在这种方式下,订单的处理是异步的,用户发起一个订单的时候,只是生成一个正在处理的订单,然后通过消息中间件一步步的进行扣费、交票、完成订单等逻辑。而每一个服务中相应的操作,基本都是:

  1. 从一个队列中读取消息

  2. 操作相应数据库操作

  3. 往下一个队列中发送消息

也就是说,需要在这个方法中需要操作数据库和MQ两个资源,这正好是上一篇文章中介绍Spring内部事务和外部事务时使用的实例中的场景。下面就是大致的代码:

12345678

@JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);}

它监听MQ的”order:new”队列,处理订单,往”order:need_to_pay”发送一个消息。然后用户服务就会接收这个消息,触发扣费流程。

在这个地方,我们可以使用JTA事务,来使用两阶段提交来实现两个资源的共同提交,但是这会影响系统的性能。而且,还需要使用的消息中间件实现了XA的规范,提供两阶段提交的功能。
这里也可以使用本地事务,这时,每个事物都会有一个JMS的Session,并使用事务。如此一来,就存在一个数据库的事物和一个JMS的事务,两个事务是相互独立并依次提交的。这样,就有可能在极少数情况下出错,但是也能采取一些错误来尽量解决。我们对上面的事务处理展开(伪代码,只是为了说明处理过程),来看看出错的情况以及该如何处理:

1234567891011

jmsTransaction.begin(); // get transactions from jms sessiondbTransaction.begin(); // get transactions from JDBC connectiontry {orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);dbTransaction.commit();jmsTransaction.commit();} catch(Exception e) {dbTransaction.rollback();jmsTransaction.rollback();}

在上面的方法中,只要发生了错误,MQ消息的消费就算失败,MQ的监听器就会重新触发一次这个方法。
这时,如果错误发生在:

  1. 数据提交时或之前。这时,整个数据库的操作都会被重置(也可能就根本还没更新),重试的时候不需要考虑重复提交的问题,因为之前的提交都已经被回滚。

  2. 数据库提交成功,但是JMS提交失败。这时就需要防止重复提交来避免数据库的重复操作。

我们可以采用之前说过的token方式,在调用这个方法前,生成一个唯一的token。这里使用Java的UUID生成一个ID作为token。(如果这里的重复调用只是在这个服务内部重新触发,就不需要考虑分布式系统的全局一致性ID的问题。这需要根据实际情况来判断用什么样的UUID生成方式)所以,Controller里面接受购票请求如下:

1234567

@PostMapping(value = "/")@Transactionalpublic void create(@RequestBody OrderDTO orderDTO) {String uid = UUID.randomUUID().toString();orderDTO.setToken(uid);jmsTemplate.convertAndSend("order:new", orderDTO);}

然后在Service里面监听这个队列,处理购票:

123456789101112131415

@JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {if (!this.processedUIDs.contains(orderDTO.getToken())) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);orderDTO.setStatus(order.getStatus());orderDTO.setId(order.getId());} else {LOG.info("Duplicate jms message:{}", orderDTO);}jmsTemplate.convertAndSend("order:need_to_pay", orderDTO);processedUIDs.add(orderDTO.getToken());}

简单来说,解决办法就是,如果是重复触发的,就略过数据库相关的处理,直接往MQ的目标队列发送需要的数据。使得整个流程能够往下走。

刚才说的是在一个服务内出错的情况,还有一种错误情况是,订单服务和用户服务已经处理完订单创建和扣费的操作,然后到了Ticket服务的时候,却发现没有票了。虽然我们可以通过合理的设计业务逻辑来避免这种问题,例如,在操作之前先检查用户余额,检查并锁票,然后进行操作数据的事情。但是,在有些情况下,很难通过业务流程的设计来完全避免这种问题。如果出现了这种的问题,我们也可以通过撤销的流程来实现,业务流程如下:
REST微服务的分布式事务实现

在上面的解决方案中,使用JDK的UUID类生成一个ID,实际上这个ID只是在当前的JVM内,才能够保证是唯一的。其次,在JMS的标准中,没有规定一个消息的Listener在读取一个消息失败后,重新读取的问题。在微服务环境中,如果一个应用部署了多个实例,那个这个消息有可能会被另一个实例读到。所以在上面的方案中,使用JVM内的唯一ID放在消息的内容中,它有可能被任意一个实例处理,处理失败后,又有可能被另一个实例处理。这就会出问题。所以我们需要一个分布式环境下的生成唯一ID的解决办法。例如,先获得JVM的唯一ID以后,再加上IP+端口等信息。而且,对已经处理过的ID的缓存,也需要存在分布式环境中。

所以,我们完全可以不使用两阶段提交,就实现微服务架构下的分布式事务。使用这种方式,它的优点是:

  1. 实现简单。结合Spring的事务,几乎不用写额外的事务相关的代码,就能够实现。我们只需要更好的服务的拆分和设计业务流程。

  2. 系统吞吐量高。因为数据库或MQ不会被长期的锁住,可以并发的处理更多的事务。

  3. 容错性好。各个服务之间通过MQ来触发协调,即使在处理一个任务的时候有一个服务停了,消息还会一直保持,直到服务起来开始监听,然后继续触发这个任务。

当然这种方式也有一些缺点,最大的问题就是异步处理的问题。用户发出一个请求后,处理该业务的服务只是简单处理,往MQ发送消息开始处理流程,然后就返回了。这时候这个任务还在处理。虽然有时候我们可以通过等的方式,等待最终处理完成的消息,然后在返回给用户。但是这样又得考虑响应时间、超时、各种错误等情况。

有些人会觉得这种方式使得开发和调试都变得复杂,在我看来,恰恰相反,这使得开发和调试都简单了。首先,根据微服务架构的设计原则,就是每个服务只负责一个功能模块;再者,根据面向对象的设计原则,一个方法只做一件事情。如果我们能够合理的拆分服务,和每一步的处理方法,这正是一个好的设计。在维护的时候,每个方法、每个步骤做什么事情,都很清楚。

说到调试,我的原则是,你应该通过单元测试来发现和解决问题,而不是调试。以上面的购票流程为例,每一个服务,通过MQ触发一个方法的时候,它的参数应该是什么、状态是什么都应该是明确的,这个方法执行完成后,会产生什么新的数据,状态会更新成什么,都应该是明确的。而这些都可以通过单元测试来很好的测试。如果你的复杂流程中的每一个都能通过单元测试进行完善的测试,那么这些方法串联到一起,不但能够很好的工作,也能应付各种异常的情况。

相关文章:

REST微服务的分布式事务实现-分布式系统、事务以及JTA介绍

分布式事务解决方案

大白话聊聊分布式事务

某宝分布式事务架构设计

本文分享自微信公众号 - IT技术小咖(IT-arch)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
线上SQL超时场景分析-MySQL超时之间隙锁 | 京东物流技术团队
前言之前遇到过一个由MySQL间隙锁引发线上sql执行超时的场景,记录一下。背景说明分布式事务消息表:业务上使用消息表的方式,依赖本地事务,实现了一套分布式事务方案消息表名:mqmessages数据量:3000多万索引:createtime和statuss
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这