通过MVEL表达式和Apache Chain职责链模式解耦MQ消息处理节点的实践应用

京东云开发者
• 阅读 280

导读

本文主要讲解了MVEL表达式和责任链设计模式相结合一起的消息处理解决方案设计、解耦消息处理节点以及方便代码维护扩展。通过“订单拆单消息”的接入作为具体实践案例,简要阐述了MVEL表达式和Apache Chain职责链设计模式应用场景。希望通过本文,读者可以对MVEL表达式和责任链模式相关概念有一定的认识,并且能够将它们应用到具体的业务场景之中,帮助大家在实际代码研发的时候,降低代码复杂度和提升代码的复用率。****

1、背景

互联网的头部公司,各个后台系统应用交互主链路之中,会下发大量MQ消息给分支业务差异化应用。业务系统应用收到MQ消息后结合实际业务处理,但是往往大家在处理逻辑代码的时候会进行不断的叠加代码,造成代码臃肿、复杂和可读性差等问题。例如:

public void handleMessage(String message) throws Exception {
    CallerInfo callerInfo = Profiler.registerInfo(UmpKey.KEY_BD_DLOK_FLAG_GHOST_HANDLER, "xxx", false, true);
    try {
        DeliveredMessage msg = parseMessage(message);
        if (null == msg) {
            return;
        }
        String id = msg.getOrderId();
        if (null == id) {
            //监听到的订单消息 id不应为空
            return;
        }
        String sendPay = msg.getSendPay();
        //是否XXX
        boolean isShop = CAR_O2O.equals(String.valueOf(sendPay.charAt(XXX)));
        //是否是XXX
        boolean isCar = CAR_ADDED_SERVICE.equals(String.valueOf(sendPay.charAt(XXX)));
        String waybillSign = msg.getWaybillSign();
        //是否是XX
        boolean isSelf = SELF_ORDER.equals(String.valueOf(waybillSign.charAt(XXX)));

        long tid = System.nanoTime();
        Long orderId = Long.parseLong(id);

        //监听到订单后,更改订单状态表中的订单状态
        if (isCar && isSelf) {
            verOrderCarService.updateVerOrderCarStatusByOrderId(tid, orderIdLong, UPDATE_PIN);
        }
        if (isShop && isCar) {
            if (isSelf) {
                // 若在新表ver_order_sms_car中存在发送模板1短信,否则,发送原短信(模板3)
                List<VerOrderSmsCar> verOrderSmsCarList = verOrderSmsCarDao.getCarOrderListByOrderId(orderIdLong);
                if (CollectionUtils.isEmpty(verOrderSmsCarList)) {
                    dealTemplateThreeOrder(tid,orderId);
                } else {
                    dealTemplateOneOrder(tid, orderIdLong, verOrderSmsCarList);
                    this.sendShopSms(verOrderSmsCarList);
                }
            } else {
                // 满足条件的订单  即原订单流程没有走完,发送模板3
                List<VerOrderSmsCar> verOrderSmsCarList = verOrderSmsCarDao.getSmsCarOrderByOrderId(orderId);
                //返回数据字段id
                if (CollectionUtils.isNotEmpty(verOrderSmsCarList)) {
                    return;
                }
                dealTemplateThreeOrder(tid, orderId);
            }
        }
        // 发送状态变更消息
        if(isCar){
            this.sendVerStore(orderId, isShop ? 1 : 0);
        }
    } catch (Exception e) {
        LOGGER.error("监听MQ消息处理异常 : {}", e);
        Profiler.functionError(callerInfo);
    } finally {
        Profiler.registerInfoEnd(callerInfo);
    }
}

总结:代码片段逻辑嵌套复杂、各个处理节点耦合(例如:dealTemplateThreeOrder方法、sendShopSms方法)、新增节点不方便(例如:dealTemplateOneOrder(tid, orderId, verOrderSmsCarList))以及代码行数1000+等一系列问题。

2、MVEL表达式

MVEL为 MVFLEX Expression Language(MVFLEX表达式语言)的缩写,它是一种动态/静态的可嵌入的表达式语言和为Java平台提供Runtime(运行时)的语言。它也可以用来解析简单的JavaBean表达式。Runtime(运行时)允许MVEL表达式通过解释执行或者预编译生成字节码后执行。简单一句话,MVEL可以将字符串内容,转化为Java程序来运行,具体细节内容大家可以参考 https://blog.51cto.com/u_16091571/6271830。

3、责任链设计模式

定义:

责任链模式(Chain of Responsibility)又名 职责链模式,是一种行为设计模式,它允许你构建一个由多个对象组成的链,每个对象都有机会处理请求,或者将请求传递给链中的下一个对象。这种模式常用于处理请求的对象之间存在 层次关系 的情况。责任链模式的主要目的是解耦发送者和接收者,使多个对象都有机会处理请求,而不是将请求发送者与接收者硬编码在一起。

结构:

抽象处理者(Handler): 定义一个处理请求的接口,包含抽象处理方法并维护一个对下一个处理者的引用。

具体处理者(Concrete Handler): 实现处理请求的接口,判断能否处理本次请求,如果能够处理则处理,否则将请求传递给下一个处理者。

客户端类(Client): 创建处理链,并向链头的具体处理者对象提交请求,它不关心处理细节和请求的传递过程。

优缺点:

1)优点**

a.松散耦合: 责任链模式使得请求发送者和接收者解耦,每个处理者仅需关心自己能否处理请求,而不需要知道整个处理流程的细节。

b.灵活性: 可以动态地改变处理者之间的关系和顺序,新增或删除处理者,以适应不同的需求和场景。

c.可扩展性: 容易添加新的处理者,无需修改现有的代码,符合开闭原则。

d.单一职责原则: 每个具体处理者只负责处理特定类型的请求,符合单一职责原则,使得代码更清晰和可维护。

2)缺点

a.性能问题: 在责任链比较长的情况下,请求可能需要遍历整个链条才能找到合适的处理者,可能影响性能。

Apache Chain 职责链:

整个Apache Chain职责链,包括Context、Command和Filter三个核心组件以及ChainBase类。

1)Context 接口

Context 表示命令执行的上下文,在命令间实现共享信息的传递,父接口是 Map,它只是一个标记接口。

2)Command 接口

Commons Chain 中最重要的接口,表示在 Chain 中的具体某一步要执行的命令。它只有一个方法:boolean execute(Context context),如果返回 true,那么表示 Chain 的处理结束,Chain 中的其他命令不会被调用;返回 false,则 Chain 会继续调用下一个 Command,直到 Chain 的末尾或抛出异常。

3)Filter 接口

它是一种特殊的 Command,除了 Command 的 execute 方法之外,还包括了一个方法:boolean postProcess(Context context, Exception exception),Commons Chain 会在执行了 Filter 的 execute 方法之后,执行 postprocess(不论 Chain 以何种方式结束);Filter 执行 execute 的顺序与 Filter 出现在 Chain 中出现的位置一致,但是执行 postprocess 顺序与之相反。如:execute 的执行顺序是:filter1 -> filter2;而 postprocess 的执行顺序是:filter2 -> filter1。

4) ChainBase

ChainBase 实现 Chain 接口。Chain表示“命令链”,要在其中执行的命令,需要先添加到 Chain 中,Chain 的父接口是 Command。ChainBase类可以直接在Spring使用。它的具体执行方法:

public boolean execute(Context context) throws Exception {
    if (context == null) {
        throw new IllegalArgumentException();
    } else {
        this.frozen = true;
        boolean saveResult = false;
        Exception saveException = null;
        int i = false;
        int n = this.commands.length;
        int i;
        for(i = 0; i < n; ++i) {
            try {
                saveResult = this.commands[i].execute(context);
                if (saveResult) {
                    break;
                }
            } catch (Exception var11) {
                saveException = var11;
                break;
            }
        }
        if (i >= n) {
            --i;
        }
        boolean handled = false;
        boolean result = false;
        for(int j = i; j >= 0; --j) {
            if (this.commands[j] instanceof Filter) {
                try {
                    result = ((Filter)this.commands[j]).postprocess(context, saveException);
                    if (result) {
                        handled = true;
                    }
                } catch (Exception var10) {
                }
            }
        }
        if (saveException != null && !handled) {
            throw saveException;
        } else {
            return saveResult;
        }
    }
}

4、实践案例(订单MQ消息处理流程)

在汽车线下安装履约服务的业务场景之中,除开主站黄金流量流程以外,需要在接到中台订单拆单消息、订单出库消息之后给门店技师派单、发送核销码短信等定制化业务流程。此过程中存在接入多个消息处理同一个事件的相同点,也有同一个消息处理不同事件差异点。具体处理层级结构图如下:



通过MVEL表达式和Apache Chain职责链模式解耦MQ消息处理节点的实践应用



相关类图



通过MVEL表达式和Apache Chain职责链模式解耦MQ消息处理节点的实践应用



实现代码

****基于SpringBoot框架实现,消息处理链路中,核心内容包含三部分。第一部分消息处理Handler,接收到消息后将消息内容转化为Java Bean,例如:订单拆单消息(需要拆分订单)OdcDivideOrderhandler。第二部分处理节点Handler,它是职责链的处理节点,按照业务需求进行具体业务代码的实现,例如:技师派单消息发送节点(AddedTechDispatchHandler)。第三部分职责链配置文件,application-chain.xml,以下用订单拆分消息(拆单)处理流程为例。

第一部分( OdcDivideOrderHandler.java ):

/**
 * 订单拆分消息(拆单消息)

 * @author xxx
 * @date xxxx-xx-xx xx:xx
 */
@Service("odcDivideOrderHandler")
public class OdcDivideOrderHandler extends BaseOrderHandler implements MqMessageHandler<List<VerOrder>> {
    /**
     * 消息分派处理
     */
    @Resource(name="odcDivideOrderChain")
    private Chain odcDivideOrderChain;
    /**
     * 基于MVEL表达式过滤执行器的筛选规则
     */
    private final Map<String, String> expressionMap = new HashMap<String, String>() {
        {
            //派单过滤规则
            put("tech_dispatch_rule", "return sendPayMap.get("XXX") == X && sendPayMap.get("XXX") == X;");
        }
    };
    /**
     * @param tid        处理事件
     * @param messageDTO MQ消息
     * @return 处理结果
     * @throws Exception 处理异常
     */
    private boolean handleMessage(long tid, MqMessageDTO<List<VerOrder>> messageDTO) throws Exception {
         List<VerOrder> verOrderList = messageDTO.getObject();
        try {
            //上下文内容
            Context context = new ContextBase();
            //1.处理时间
            context.put(Constants.TID, tid);
            //2.派单列表
            context.put(Constants.VER_ORDER_LIST,carOrderList);
            //3.操作过滤规则
            context.put(Constants.EXPRESSION_RULE_MAP,expressionMap);
            odcDivideOrderChain.execute(context);
        } catch (Exception ex) {
            //此次代码省略........
        }
        return true;
    }
}

解析: 消息处理Handler主要是将接收到消息转化Java Bean,再将具体的上下文内容下传给后续事件处理Handler。参数expressionMap存储的是MVEL表达式需要处理的内容,具体内容结合实际业务场景差异化设置,对于后续节点处理Handler扩展性有很大帮助。odcDivideOrderChain职责链的命令链类,后续各个节点的流转全靠它。

第二部分(AddedTechDispatchHandler.java):

/**
 * 派单Handler
 *
 * @author xxx 
 * @date xxxx-xx-xx xx:xx
 */
@Service("addedTechDispatchHandler")
public class AddedTechDispatchHandler implements Command {
    /**
     * 派单消息topic
     */
    @Value("${xxx}")
    private String topic;
    /**
     * 消息生产
     */
    @Resource(name = "xxxxx")
    private MessageProducer messageProducer;
    @Override
    public boolean execute(Context context) throws Exception {
        Object tid = context.get(Constants.TID);
        Object object = context.get(Constants.VER_ORDER_LIST);
        if (!(object instanceof List)) {
            return false;
        }
        //订单列表
        List<VerOrder> orders = (List<VerOrder>) object;
        //列表为空
        if (CollectionUtils.isEmpty(orders)) {
            return false;
        }
        //过滤规则
        Object ruleObj = context.get(Constants.EXPRESSION_RULE_MAP);
        if (!(ruleObj instanceof Map)) {
            return false;
        }
        //派单规则
        Object obj = ((Map) ruleObj).get(Constants.TECH_DISPATCH_RULE);
        //没有配置规则直接返回
        if (!(obj instanceof String)) {
            return false;
        }
        String expression = (String) obj;
        if (StringUtils.isBlank(expression)) {
            return false;
        }
        for (VerOrder verOrder : orders) {
            //发送派单消息
            this.sendTechDispatch(tid, verOrder, expression);
        }
        return false;
    }

    /**
     * 发送技师派单消息
     *
     * @param tid      处理时间
     * @param verOrder 订单
     */
    public void sendTechDispatch(Object tid, VerOrder verOrder, String expression) {
        try {
            //派单规则判断,false-不派单,true-需要派单
            if (!SendPayUtil.isExpression(expression, verOrder.getSendPayMap())) {
                return;
            }
            String cxt = JSON.toJSONString(verOrder);
            Message message = new Message(topic, cxt, verOrder.getOrderId().toString());
            messageProducer.send(message);
        } catch (JMQException e) {
           //此次代码省略........
        } catch (Exception e) {
           //此次代码省略........
        } finally {
           //此次代码省略........
        }
    }
}

解析: 事件节点Handler主要是解析上下内容,执行需要处理的事项内容。特别是SendPayUtil.isExpression(expression, verOrder.getSendPayMap())方法,识别了MVEL表达式,使得即使同一个事件处理节点(例如:派单节点)也可以根据不同MQ消息,设置不同的规则。

/**
 *  sendPayMap表达式解析
 * @param expression 表达式
 * @param sendPayMap 订单SendPayMap值
 * @return 解析结果
 */
public static boolean isExpression(String expression,String sendPayMap){
    //sendPayMap为空
    if (StringUtils.isBlank(sendPayMap)) {
        return false;
    }
    Map map = null;
    try {
        map = JSON.parseObject(sendPayMap, Map.class);
    } catch (Exception ex) {
        LOGGER.error("sendPayMap格式转化错误", ex);
    }
    //map
    if (map == null || map.isEmpty()) {
        return false;
    }
    Map<String,Map> param = new HashMap<>(1);
    param.put(Constant.SEND_PAY_MAP,map);
    return (Boolean)MVEL.eval(expression,param);
}

第三部分(application-chain.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
       default-lazy-init="false" default-autowire="byName">
    <bean id="odcOutStockFullOrderChain" class="org.apache.commons.chain.impl.ChainBase">
        <constructor-arg>
            <array>
                <ref bean="addedOrderWangShiFuHandler"/>
                <ref bean="addedTechDispatchHandler"/>
            </array>
        </constructor-arg>
    </bean>
    <bean id="odcDivideOrderChain" class="org.apache.commons.chain.impl.ChainBase">
        <constructor-arg>
            <array>
                <ref bean="addedTechDispatchHandler"/>
            </array>
        </constructor-arg>
    </bean>
    <bean id="odcUndividedOrderChain" class="org.apache.commons.chain.impl.ChainBase">
        <constructor-arg>
            <array>
                <ref bean="addedTechDispatchHandler"/>
            </array>
        </constructor-arg>
    </bean>
</beans>

解析: 命令链配置文件,实现各个事件处理节点进行配置化,聚合各个分散的节点业务逻辑内,后续注入到对应的消息解析Handler。

5、总结

整个消息处理过程中采用Apache Chain职责链模式来降低代码层面的耦合度以及可以动态地改变处理者之间的关系和顺序,新增或删除处理者,以适应不同的需求和场景。MVEL表达式增强了同一事件处理节点的复用性,最大限度的提升了代码的简洁性。希望此文对大家后续设计类似场景有一定的帮助和启发。

作者:京东零售 张强

来源:京东云开发者社区

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java实现23种设计模式之责任链模式
顾名思义,责任链模式(ChainofResponsibilityPattern)为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。pa
亚瑟 亚瑟
3年前
RabbitMQ、Kafka横向对比
基于某些原因,许多开发者会把这两种技术当做等价的来看待。的确,在一些案例场景下选择RabbitMQ还是Kafka没什么差别,但是这两种技术在底层实现方面是有许多差异的。\TOC\一、异步消息模式异步消息可以作为解耦消息的生产和处理的一种解决方案(DMP系统上使用较少,解耦是通过分布式服务构成的,这两种方式各有利弊,后面有机会再说)。
Wesley13 Wesley13
3年前
MQ应用场景
MQ常见应用场景以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋,日志处理和消息通讯四个场景。异步处理场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种1.串行的方式;2.并行方式。(1)串行方式:将注册信息写入数据库(https://www.oschina.net/ac
可莉 可莉
3年前
2020Kafka最新最全面试题!
1、请说明什么是ApacheKafka?ApacheKafka是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和可复制的提交日志服务。2、说说Kafka的使用场景?①异步处理②应用解耦③流量削峰④日志处理⑤消息通讯等。3、使用Kafka有什么优点和缺点?优点:①支持跨数据中心的消息复制;②单
Stella981 Stella981
3年前
2020Kafka最新最全面试题!
1、请说明什么是ApacheKafka?ApacheKafka是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和可复制的提交日志服务。2、说说Kafka的使用场景?①异步处理②应用解耦③流量削峰④日志处理⑤消息通讯等。3、使用Kafka有什么优点和缺点?优点:①支持跨数据中心的消息复制;②单
Wesley13 Wesley13
3年前
Java设计模式之责任链模式
引入责任链模式责任链模式顾名思义,责任链模式(ChainofResponsibilityPattern)为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会
京东云开发者 京东云开发者
3个月前
还在自己实现责任链?我建议你造轮子之前先看看这个开源项目
1.前言设计模式在软件开发中被广泛使用。通过使用设计模式,开发人员可以更加高效地开发出高质量的软件系统,提高代码的可读性、可维护性和可扩展性。责任链模式是一种常用的行为型设计模式,它将请求沿着处理链进行发送,直到其中一个处理者对请求进行处理为止。在责任链模
京东云开发者 京东云开发者
2个月前
通过Forcebot压测实践简述“并发模式”与“RPS模式”两种模式的区别
作者:京东零售张强导读本文主要讲解了Forcebot压测平台之中“并发模式”与“RPS模式”两种模式对于服务端性能指标的影响。通过“商品查询标签”的压测作为具体实践案例,简要阐述了“并发模式”与“RPS模式”两种模式压测过程中TPS、TP99以及TP999
京东云开发者 京东云开发者
2个月前
京东APP百亿级商品与车关系数据检索实践
作者:京东零售张强导读本文主要讲解了京东百亿级商品车型适配数据存储结构设计以及怎样实现适配接口的高性能查询。通过京东百亿级数据缓存架构设计实践案例,简单剖析了jimdb的位图(bitmap)函数和lua脚本应用在高性能场景。希望通过本文,读者可以对缓存的内