RocketMQ进阶

Stella981
• 阅读 730

RocketMQ进阶

分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好。今天我们来唠唠如何实现RocketMQ的事务消息!

Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。RocketMQ进阶

RocketMQ事务流程概要

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:

  • 正常事务发送与提交阶段
  1. 生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)

  2. 服务端响应消息写入结果,半消息发送成功

  3. 开始执行本地事务

  4. 根据本地事务的执行状态执行Commit或者Rollback操作

  • 事务信息的补偿流程
  1. 如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求

  2. 生产者收到确认回查请求后,检查本地事务的执行状态

  3. 根据检查后的结果执行Commit或者Rollback操作 补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

RocketMQ事务流程关键

  1. 事务消息在一阶段对用户不可见 事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成 RMQ_SYS_TRANS_HALF_TOPIC ,这样由于消费者没有订阅这个主题,所以不会被消费。

  2. 如何处理第二阶段的失败消息?在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。

  3. 消息状态 事务消息有三种状态:

  • TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

  • TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

  • TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

代码实现

首先假设我们有这样一个需求:

用户请求订单微服务 order-service 接口删除订单(退货),删除订单后需要发送消息给用户服务 account-service,用户微服务收到消息后会给用户账户增加余额。

这个需求跟钱相关,肯定要保证消息的事务性,接下来我们根据上面的原理实现整个流程。

基础配置

生产者order-servcie和account-service都要引入RocketMQ相关依赖,增加RocketMQ的相关配置

  • 引入组件

<dependency>  <groupId>org.apache.rocketmq</groupId>  <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>

  • 添加配置

# within rocketmq rocketmq: name-server: xxx.xx.x.xx:9876; xxx.xx.x.xx:9876 producer: group: cloud-group

发送半消息

order-service在执行删除订单操作时发送一条半消息给MQServer,发送半消息主要是使用 rocketMQTemplate.sendMessageInTransaction() 方法,发送事务消息。

@Override public void delete(String orderNo) {  Order order = orderMapper.selectByNo(orderNo);  //如果订单存在且状态为有效,进行业务处理  if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {   String transactionId = UUID.randomUUID().toString();   //如果可以删除订单则发送消息给rocketmq,让用户中心消费消息   rocketMQTemplate.sendMessageInTransaction("add-amount",     MessageBuilder.withPayload(       UserAddMoneyDTO.builder()         .userCode(order.getAccountCode())         .amount(order.getAmount())         .build()     )     .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)     .setHeader("order_id",order.getId())     .build()     ,order   );  } }

首先先校验一下订单状态,然后发送消息给MQServer,这个逻辑大家都看得懂,主要是关注 sendMessageInTransaction() 方法,源码如下:

public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException {  try {   if (((TransactionMQProducer)this.producer).getTransactionListener() == null) {    throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");   } else {    org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);    return this.producer.sendMessageInTransaction(rocketMsg, arg);   }  } catch (MQClientException var5) {   throw RocketMQUtil.convert(var5);  } }

该方法有三个参数:

  • destination:目的地(主题),这里发送给 add-amount 这个主题

  • message:发送给消费者的消息体,需要使用 MessageBuilder.withPayload() 来构建消息

  • arg:参数

注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!

执行本地事务与回查

MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认消息究竟是Commit还是Rollback。如果在告诉MQServer本地执行状态的时候出异常了还需要让MQServer能够回查到,怎么实现这一些列操作呢?RocketMQ进阶

RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事务监听器,这个接口类的实现如下:RocketMQ进阶

第一个方法 executeLocalTransaction 为执行本地事务;第二个方法 checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。有了这个接口类我们的执行逻辑清楚了,但是还有个问题:本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?RocketMQ进阶

我们可以在执行本地事务的时候同时生成一个事务日志,让本地事务与日志事务在同一个方法中,同时添加 @Transactional 注解,保证两个操作事务是一个原子操作。这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示没执行成功,需要Rollback

思路既然理顺了,咱们就开撸。RocketMQ进阶

  • 首先创建一个日志表 RocketMQ进阶 很简单的三个字段,主要是这个事务id,需要根据这个事务id回查事务,还记得我们在发送半消息时生成的事务id吗,就是干这个用的!

  • 在生产者编写方法实现 RocketMQLocalTransactionListener

`@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
    private final OrderService orderService;
    private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("执行本地事务");
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer orderId = Integer.valueOf((String)headers.get("order_id"));
        log.info("transactionId is {}, orderId is {}",transactionId,orderId);

        try{
            //执行本地事务,并记录日志
            orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
            //执行成功,可以提交事务
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 本地事务的检查,检查本地事务是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("检查本地事务,事务ID:{}",transactionId);
        //根据事务id从日志表检索
        QueryWrapper queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id",transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        if(null != rocketmqTransactionLog){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}
`

  • 执行本地事务的方法

@Transactional(rollbackFor = RuntimeException.class) @Override public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){     //将订单状态置位无效  orderMapper.changeStatus(id,status);     //插入事务表  rocketMqTransactionLogMapper.insert(    RocketmqTransactionLog.builder()      .transactionId(transactionId)      .log("执行删除订单操作")    .build()  ); }

这一块的代码逻辑都是在生产端,即Order-Server,大家不要搞错了

消费消息

Rollback的消息MQServer会给我们处理,我们只要关注Commit状态时消费端可以正常消费即可。在 account-service监听消息,如果收到消息则给用户账户增加余额。

@Slf4j @Service @RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group") @RequiredArgsConstructor(onConstructor = @__(@Autowired) ) public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {     private final AccountMapper accountMapper;     /**      * 收到消息的业务逻辑      */     @Override     public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {         log.info("received message: {}",userAddMoneyDTO);         accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());         log.info("add money success");     } }

测试

RocketMQ进阶 订单表有这样一条记录,用户为jianzh5,amount为200

RocketMQ进阶 用户表的记录,执行完成后jianzh5的账户应该变成250

  • 调用删除订单接口,删除订单 RocketMQ进阶

  • 发送半消息 RocketMQ进阶

  • 执行本地事务,并生成事务日志 RocketMQ进阶

  • 模拟异常情况 在发送Commit消息的时候我们用命令杀掉进程 taskkill /pid 19748 -t -f,模拟异常! RocketMQ进阶

  • 重新启动order-service,查看是否会执行回查动作 RocketMQ进阶 MQServer进行回查,检查事务日志,判断是否可以提交事务

  • 消费者消费事务消息,保证事务的一致性 RocketMQ进阶

小结

使用RocketMQ实现事务消息的过程还是很复杂的,需要好好理解开头的那张图,只有理解了事务消息的交互过程才能编写相应的代码!

如果本文对你有帮助,

别忘记给我个三连:

点赞,转发,评论

咱们下期见!

收藏 等于白嫖点赞 才是真情!

End

干货分享

这里为大家准备了一份小小的礼物,关注公众号,输入如下代码,即可获得百度网盘地址,无套路领取!

001:《程序员必读书籍》
002:《从无到有搭建中小型互联网公司后台服务架构与运维架构》
003:《互联网企业高并发解决方案》
004:《互联网架构教学视频》
006:《SpringBoot实现点餐系统》
007:《SpringSecurity实战视频》
008:《Hadoop实战教学视频》
009:《腾讯2019Techo开发者大会PPT》

010: 微信交流群

近期热文top5

1、架构师之路 - 虚拟化技术与容器Docker

2、数据库优化 - 实例参数优化

3、数据库优化技巧 - SQL语句优化

4、架构师之路-微服务技术选型

5、使用 Hexo 搭建你的技术博客

RocketMQ进阶

我就知道你“在看”

RocketMQ进阶

本文分享自微信公众号 - JAVA日知录(javadaily)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
4cast
4castpackageloadcsv.KumarAwanish发布:2020122117:43:04.501348作者:KumarAwanish作者邮箱:awanish00@gmail.com首页:
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
线上SQL超时场景分析-MySQL超时之间隙锁 | 京东物流技术团队
前言之前遇到过一个由MySQL间隙锁引发线上sql执行超时的场景,记录一下。背景说明分布式事务消息表:业务上使用消息表的方式,依赖本地事务,实现了一套分布式事务方案消息表名:mqmessages数据量:3000多万索引:createtime和statuss
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(