分布式日志追踪ID实战 | 京东物流技术团队

京东云开发者
• 阅读 369

本文通过介绍分布式应用下各个场景的全局日志ID透传思路,以及介绍分布式日志追踪ID简单实现原理和实战效果,从而达到通过提高日志查询排查问题的效率。

背景

开发排查系统问题用得最多的手段就是查看系统日志,相信不少人都值过班当过小秘吧:给下接口和出入参吧,麻烦看看日志里的有没有异常信息啊等等,但是在并发大时使用日志定位问题还是比较麻烦,由于大量的其他用户/其他线程的日志也一起输出穿行其中导致很难筛选出指定请求的全部相关日志,以及下游线程/服务对应的日志,甚至一些特殊场景的出入参只打印了一些诸如gis坐标、四级地址等没有单据信息的日志,使得日志定位起来非常不便

场景分析

自己所在组负责的系统主要是web应用,其中涉及到的请求方式主要有:springmvc的servlet的http场景、jsf场景、MQ场景、resteasy场景、clover场景、easyjob场景,每一种场景都需要不同的方式进行logTraceId的透传,接下来逐个探析上述各个场景的透传方案。

在这之前我们先要简单了解一下日志中透传和打印logTraceId的方式,一般我们使用MDC进行logTraceId的透传与打印,但是基于MDC内部使用的是ThreadLocal所以只有本线程才有效,子线程服务的MDC里的值会丢失,所以这里我们要么是在所有涉及到父子线程的地方以编码侵入式自行实现值的传递,要么就是通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题,而本文采用的是比较粗糙地以编码侵入式来解决此问题。

springmvc的servlet的http场景

这个场景相信大家都已经烂熟到骨子里了,主要思路是通过拦截器的方式进行logTraceId的透传,新建一个类实现HandlerInterceptor

preHandle:在业务处理器处理请求之前被调用,这里实现logTraceId的设置与透传

postHandle:在业务处理器处理请求执行完成后,生成视图之前执行,这里空实现就好

afterCompletion:在DispatcherServlet完全处理完请求后被调用,这里用于清除MDC的logTraceId

@Slf4j
public class TraceInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) throws Exception {
        try{
            String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, TraceUtils.getTraceId());
            }
        }catch (RuntimeException e){
            log.error("mvc自定义log跟踪拦截器执行异常",e);
        }
        return true;
    }

    @Override
    public void postHandle(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
    }

    @Override
    public void afterCompletion(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
        try{
            MDC.clear();
        }catch (RuntimeException ex){
            log.error("mvc自定义log跟踪拦截器执行异常",ex);
        }
    }
}

jsf场景

相信大家对于jsf并不陌生,而jsf也支持自定义filter,基于jsf过滤器的运行方式(如下图),可以通过配置全局过滤器(继承AbstractFilter)的方式进行logTraceId的透传,需要注意的是jsf是在线程池中执行的所以一定要信任消息体中的logTraceId

分布式日志追踪ID实战 | 京东物流技术团队

jsf消费者过滤器:主要从上下文环境中获取logTraceId并进行透传,实现代码如下

@Slf4j
public class TraceIdGlobalJsfFilter extends AbstractFilter {
    @Override
    public ResponseMessage invoke(RequestMessage requestMessage) {
        //设置traceId
        setAndGetTraceId(requestMessage);
        try{
            return this.getNext().invoke(requestMessage);
        }finally {
        }
    }

    /**
     * 设置并返回traceId
     * @param requestMessage
     * @return
     */
    private void setAndGetTraceId(RequestMessage requestMessage) {
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
                //如果filter和MDC都没有获取到则说明有遗漏,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消费者自定义log跟踪拦截器预警,filter和MDC都没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
                //如果MDC没有,filter有,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消费者自定义log跟踪拦截器预警,MDC没有filter有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
                //如果MDC有,filter没有,说明是源头已经有了,但是jsf是第一次调,透传
                requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, logTraceId);
            }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null){
                //MDC和fitler都有,但是并不相等,则存在问题打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消费者自定义log跟踪拦截器预警,MDC和filter都有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            }
        }catch (RuntimeException e){
            log.error("jsf消费者自定义log跟踪拦截器执行异常",e);
        }
    }
}

jsf提供者过滤器:通过拿到消费者在消息体中透传的logTraceId来实现,实现代码如下

@Slf4j
public class TraceIdGlobalJsfProducerFilter extends AbstractFilter {
    @Override
    public ResponseMessage invoke(RequestMessage requestMessage) {
        //设置traceId
        boolean isNeedClearMdc = transferTraceId(requestMessage);
        try{
            return this.getNext().invoke(requestMessage);
        }finally {
            if(isNeedClearMdc){
                clear();
            }
        }
    }
    /**
     * 设置并返回traceId
     * @param requestMessage
     * @return
     */
    private boolean transferTraceId(RequestMessage requestMessage) {
        boolean isNeedClearMdc = false;
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
                //如果filter和MDC都没有获取到,说明存在遗漏场景或是提供给外部系统调用的接口,打印日志进行观察
                String traceId = TraceUtils.getTraceId();
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,traceId);
                requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, traceId);
                if(log.isDebugEnabled()){
                    log.debug("jsf生产者自定义log跟踪拦截器预警,filter和MDC都没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
                isNeedClearMdc = true;
            } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
                //如果MDC没有,filter有,说明是被调用方,需要透传下去
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceIdObj.toString());
                isNeedClearMdc = true;
            } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
                //如果MDC有,filter没有,存在问题,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf生产者自定义log跟踪拦截器预警,MDC有filter没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
                isNeedClearMdc = true;
            }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null && !logTraceId.equals(logTraceIdObj.toString())){
                //MDC和fitler都有,但是并不相等,则信任filter透传结果
                TraceUtils.resetTraceId(logTraceIdObj.toString());
                if(log.isDebugEnabled()){
                    log.debug("jsf生产者自定义log跟踪拦截器预警,MDC和fitler都有traceId,但是并不相等,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            }
            return isNeedClearMdc;
        }catch (RuntimeException e){
            log.error("jsf生产者自定义log跟踪拦截器执行异常",e);
            return false;
        }
    }

    /**
     * 清除MDC
     */
    private void clear() {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("jsf生产者自定义log跟踪拦截器执行异常",e);
        }
    }
}

MQ场景

说到MQ相信大家对于此就更不陌生了,此种场景主要通过在提供者发送消息时拿到上下文中的logTraceId,将其以扩展信息的方式设置进消息体中进行透传,而消费者则从消息体中进行获取

生产者:新建一个抽象类继承MessageProducer,覆写父类中的两个send方法(批量发送、单条发送),send方法中主要调用抽象加工消息体的方法(logTraceId属性赋值)和日志打印,在子类中进行发送前对消息体的加工处理,具体代码如下

@Slf4j
public abstract class BaseTraceIdProducer extends MessageProducer {

    private static final String SEPARATOR_COMMA = ",";

    public BaseTraceIdProducer() {
    }

    public BaseTraceIdProducer(TransportManager transportManager) {
        super(transportManager);
    }

    /**
     * 获取消息体-单个
     * @param messageContext
     * @return
     */
    protected abstract Message getMessage(MessageContext messageContext);

    /** 获取消息体-批量
     *
     * @param messageContext
     * @return
     */
    protected abstract List<Message> getMessages(MessageContext messageContext);

    /**
     * 填充消息体上下文信息
     * @param message
     * @param messageContext
     */
    protected void fillContext(Message message,MessageContext messageContext) {
        if(message == null){
            return;
        }
        if(StringUtils.isBlank(messageContext.getLogTraceId())){
            String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
            messageContext.setLogTraceId(logTraceId);
        }
        if(StringUtils.isBlank(messageContext.getTopic())){
            String topic = message.getTopic();
            messageContext.setTopic(topic);
        }
        String businessId = message.getBusinessId();
        messageContext.getBusinessIdBuf().append(SEPARATOR_COMMA).append(businessId);
    }

    /**
     * traceId嵌入消息体中
     * @param message
     */
    protected void generateTraceIdIntoMessage(Message message){
        if(message == null){
            return;
        }
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId)){
                logTraceId = TraceUtils.getTraceId();
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
            }
            message.setAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY,logTraceId);
        }catch (RuntimeException e){
            log.error("jmq2自定义log跟踪拦截器执行异常",e);
        }
    }

    /**
     * 批量发送消息-无回调
     * @param messages
     * @param timeout
     * @throws JMQException
     */
    public void send(List<Message> messages, int timeout) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessages(messages);
        List<Message> messageList = this.getMessages(messageContext);
        //打印日志,方便排查问题
        printLog(messageContext);
        super.send(messageList, timeout);
    }

    /**
     * 单个发送消息
     * @param message
     * @param transaction
     * @param <T>
     * @return
     * @throws JMQException
     */
    public <T> T send(Message message, LocalTransaction<T> transaction) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessage(message);
        Message msg = this.getMessage(messageContext);
        //打印日志,方便排查问题
        printLog(messageContext);
        return super.send(msg, transaction);
    }

    /**
     * 批量发送消息-有回调
     * @param messages
     * @param timeout
     * @param callback
     * @throws JMQException
     */
    public void send(List<Message> messages, int timeout, AsyncSendCallback callback) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessages(messages);
        List<Message> messageList = this.getMessages(messageContext);
        //打印日志,方便排查问题
        printLog(messageContext);
        super.send(messageList, timeout, callback);
    }

    /**
     * 打印日志,方便排查问题
     * @param messageContext
     */
    private void printLog(MessageContext messageContext) {
        if(messageContext==null){
            return;
        }
        if(log.isInfoEnabled()){
            log.info("MQ发送:traceId:{},topic:{},businessIds:[{}]",messageContext.getLogTraceId(),messageContext.getTopic(),messageContext.getBusinessIdBuf()==null?"":messageContext.getBusinessIdBuf().toString());
        }
    }

}

@Slf4j
public class TraceIdEnvMessageProducer extends BaseTraceIdProducer {

    private static final String UAT_TRUE = String.valueOf(true);
    private boolean uat = false;

    public TraceIdEnvMessageProducer() {
    }

    public TraceIdEnvMessageProducer(TransportManager transportManager) {
        super(transportManager);
    }

    /**
     * 环境变量打标-单个消息体
     * @param message
     */
    private void convertUatMessage(Message message) {
        if (message != null) {
            message.setAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT, UAT_TRUE);
        }
    }


    /**
     * 消息转换-批量消息体
     * @param messageContext
     * @return
     */
    private List<Message> convertMessages(MessageContext messageContext) {
        List<Message> messages = messageContext.getMessages();
        if (!CollectionUtils.isEmpty(messages)) {
            Iterator messageIterator = messages.iterator();
            while(messageIterator.hasNext()) {
                Message message = (Message)messageIterator.next();
                if(this.isUat()){
                    this.convertUatMessage(message);
                }
                super.generateTraceIdIntoMessage(message);
                super.fillContext(message,messageContext);
            }
        }
        return messageContext.getMessages();
    }

    /**
     * 消息转换-单个消息体
     * @param messageContext
     * @return
     */
    private Message convertMessage(MessageContext messageContext){
        Message message = messageContext.getMessage();
        if(this.isUat()){
            this.convertUatMessage(message);
        }
        super.generateTraceIdIntoMessage(message);
        super.fillContext(message,messageContext);
        return message;
    }

    protected Message getMessage(MessageContext messageContext) {
        if(log.isDebugEnabled()){
            log.debug("current environment is UAT : {}", this.isUat());
        }
        return this.convertMessage(messageContext);
    }

    protected List<Message> getMessages(MessageContext messageContext) {
        if(log.isDebugEnabled()){
            log.debug("current environment is UAT : {}", this.isUat());
        }
        return this.convertMessages(messageContext);
    }

    public void setUat(boolean uat) {
        this.uat = uat;
    }

    boolean isUat() {
        return this.uat;
    }

}

消费者:新建一个抽象类继承MessageListener,覆写父类中的onMessage方法,主要进行设置日志traceId和消费完成后的traceId清理等,而在子类中进行一些自定义处理,具体代码如下

@Slf4j
public abstract class BaseTraceIdMessageListener implements MessageListener {

    public BaseTraceIdMessageListener() {
    }

    public abstract void onMessageList(List<Message> messages) throws Exception;

    @Override
    public final void onMessage(List<Message> messages) throws Exception {
        try{
            if(CollectionUtils.isEmpty(messages)){
                return;
            }
            //设置日志traceId
            setLogTraceId(messages);
            this.onMessageList(messages);
            //消费完后清除traceId
            clear();
        }catch (Exception e){
            throw e;
        }finally {
            MDC.clear();
        }
    }

    /**
     * 设置日志traceId
     * @param messages
     */
    private void setLogTraceId(List<Message> messages) {
        try{
            Message message = messages.get(0);
            String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId)){
                logTraceId = TraceUtils.getTraceId();
            }
            MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
        }catch (RuntimeException e){
            log.error("jmq2自定义log跟踪拦截器执行异常",e);
        }
    }

    /**
     * 清除traceId
     */
    private void clear() {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("jmq2自定义log跟踪拦截器执行异常",e);
        }
    }

}

@Slf4j
public abstract class TraceIdEnvMessageListener extends BaseTraceIdMessageListener{

    private String uat;

    public TraceIdEnvMessageListener() {
    }

    public abstract void onMessages(List<Message> var1) throws Exception;

    @Override
    public void onMessageList(List<Message> messages) throws Exception {
        Iterator iterator;
        Message message;
        if (this.getUat() != null && Boolean.valueOf(this.getUat())) {
            iterator = messages.iterator();

            while(true) {
                while(iterator.hasNext()) {
                    message = (Message)iterator.next();
                    if (message != null && Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
                        this.onMessages(Arrays.asList(message));
                    } else {
                        log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
                    }
                }

                return;
            }
        } else if (this.getUat() != null && !Boolean.valueOf(this.getUat())) {
            iterator = messages.iterator();

            while(true) {
                while(iterator.hasNext()) {
                    message = (Message)iterator.next();
                    if (message != null && !Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
                        this.onMessages(Arrays.asList(message));
                    } else {
                        log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
                    }
                }

                return;
            }
        } else {
            this.onMessages(messages);
        }
    }

    public void setUat(String uat) {
        if (!"true".equals(uat) && !"false".equals(uat)) {
            throw new IllegalArgumentException("uat 属性值只能为 true 或 false.");
        } else {
            this.uat = uat;
        }
    }

    public String getUat() {
        return this.uat;
    }
}

resteasy场景

此场景类似于spinrg-mvc场景,也是http请求,需要通过拦截器在消息头中进行logTraceId的透传,主要有客户端拦截器,服务端:预处理拦截器、后置拦截器,代码如下

@ClientInterceptor
@Provider
@Slf4j
public class ResteasyClientInterceptor implements ClientExecutionInterceptor {
    @Override
    public ClientResponse execute(ClientExecutionContext clientExecutionContext) throws Exception {
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            ClientRequest request = clientExecutionContext.getRequest();
            String headerTraceId = request.getHeaders().getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && StringUtils.isBlank(headerTraceId)){
                //如果filter和MDC都没有获取到则说明是调用源头
                String traceId = TraceUtils.getTraceId();
                TraceUtils.resetTraceId(traceId);
                request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,traceId);
            } else if(StringUtils.isBlank(headerTraceId)){
                //如果MDC有但是filter没有则需要传递
                request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,logTraceId);
            }
        }catch (RuntimeException e){
            log.error("resteasy客户端log跟踪拦截器执行异常",e);
        }
        return clientExecutionContext.proceed();
    }
}

@Slf4j
@Provider
@ServerInterceptor
public class RestEasyPreInterceptor implements PreProcessInterceptor {
    @Override
    public ServerResponse preProcess(HttpRequest request, ResourceMethod resourceMethod) throws Failure, WebApplicationException {
        try{
            MultivaluedMap<String, String> requestHeaders = request.getHttpHeaders().getRequestHeaders();
            String headerTraceId = requestHeaders.getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
            if(StringUtils.isNotBlank(headerTraceId)){
                //如果filter则透传
                TraceUtils.resetTraceId(headerTraceId);
            }
        }catch (RuntimeException e){
            log.error("resteasy服务端log跟踪前置拦截器执行异常",e);
        }
        return null;
    }
}

@Slf4j
@Provider
@ServerInterceptor
public class ResteasyPostInterceptor implements PostProcessInterceptor {
    @Override
    public void postProcess(ServerResponse serverResponse) {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("resteasy服务端log跟踪后置拦截器执行异常",e);
        }
    }
}

clover场景

clover的大体机制主要是在项目启动的时候扫描到带有注解@HessianWebService的类进行服务注册并维持心跳检测,而clover端则通过servlet请求方式进行任务的回调,同时继承AbstractScheduleTaskProcess方式的任务是以线程池的方式进行业务的处理

基于上述原理我们需要解决两个问题:1.新建一个类继承ServiceExporterServlet,并在web.xml配置中进行servlet配置,代码如下;

@Slf4j
public class ServiceExporterTraceIdServlet extends ServiceExporterServlet {

    @Override
    public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
        try {
            String traceId = MDC.get("traceId");
            if (StringUtils.isBlank(traceId)) {
                MDC.put("traceId", TraceUtils.getTraceId());
            }
        } catch (Exception e) {
            log.error("clover请求servlet执行异常", e);
        }
        try {
            super.service(req, res);
        } catch (Throwable e) {
            log.error("clover请求servlet执行异常", e);
            throw e;
        }finally {
            try{
                MDC.clear();
            }catch (RuntimeException ex){
                log.error("clover请求servlet执行异常",ex);
            }
        }
    }
}

2.新建一个抽象类继承AbstractScheduleTaskProcess,在类中以编码形式进行父子线程的透传(可优化:通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题),所有任务均改为继承此类,关键代码如下

try{
            traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                log.warn("clover自定义log跟踪拦截器预警,mdc没有traceId");
            }
        }catch (RuntimeException e){
            log.error("clover自定义log跟踪拦截器执行异常",e);
        }
        final String logTraceId = traceId;
        while(iterator.hasNext()) {
            final List<TcTask> list = (List<TcTask>)iterator.next();
            this.executor.submit(new Callable<Object>() {
                public Object call() throws Exception {
                    try{
                        if (StringUtils.isNotBlank(logTraceId)) {
                            MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, logTraceId);
                        }
                    }catch (RuntimeException e){
                        log.error("clover自定义log跟踪拦截器执行异常",e);
                    }
                    Object var1;
                    try {
                        if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
                            BaseTcTaskProcessWorker.logger.info("正在执行任务[" + this.getClass().getName() + "],条数:" + list.size() + "...");
                        }


                        BaseTcTaskProcessWorker.this.executeTasks(list);

                        if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
                            BaseTcTaskProcessWorker.logger.info("执行任务[" + this.getClass().getName() + "],条数:" + list.size() + "成功!");
                        }

                        var1 = null;
                    } catch (Exception var5) {
                        BaseTcTaskProcessWorker.logger.error(var5.getMessage(), var5);
                        throw var5;
                    } finally {
                        try{
                            MDC.clear();
                        }catch (RuntimeException ex){
                            log.error("clover自定义log跟踪拦截器执行异常",ex);
                        }
                        latch.countDown();
                    }

                    return var1;
                }
            });
        }

easyjob场景

easyjob的大体机制是在项目启动的时候通过扫描实现接口Scheduler的类进行上报注册,同时启动一个acceptor(获取任务的线程池),而acceptor拉取到任务后会将父任务放进一个叫executor的线程池,子任务范进一个叫slowExecutor的线程池,我们可以新建一个抽奖类实现接口ScheduleFlowTask,复用clover场景硬编码方式进行父子线程logTraceId的透传处理(可优化:通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题),示例代码如下

@Slf4j
public abstract class AbstractEasyjobOnlyScheduleProcess<T> implements ScheduleFlowTask {

    /**
     * EASYJOB平台UMP监控key前缀
     */
    private static final String EASYJOB_UMP_KEY_RREFIX = "trans.easyjob.dotask.";

    /**
     * EASYJOB单个任务处理分布式锁前缀
     */
    private static final String EASYJOB_SINGLE_TASK_LOCK_PREFIX = "basic_easyjob_single_task_lock_prefix_";

    /**
     * 环境标识-开关配置进行环境隔离
     */
    @Value("${spring.profiles.active}")
    private String activeEnv;

    @Value("${task.scene.mark}")
    private String sceneMark = TaskSceneMarkEnum.PRODUCTION.getDesc();

    /**
     * easyJob维度线程池变量
     */
    private ThreadPoolExecutor easyJobExecutor;
    /**
     * easyJob维度服务器个数-分片个数
     */
    private volatile int easyJobLastThreadCount = 0;

    /**
     * easyjob多线程名称
     */
    private static final String EASYJOB_THREAD_NAME = "dts.easyJobs";

    /**
     * 子类的泛型参数类型
     */
    private Class<T> argumentType;

    /**
     * 无参构造
     */
    public AbstractEasyjobOnlyScheduleProcess() {
        //设置子类泛型参数类型
        argumentType = this.getArgumentType();
    }

    @Autowired
    private RedisHelper redisHelper;

    /**
     * 非task表扫描待处理的任务数据
     * @param taskServerParam
     * @param curServer
     * @return
     */
    protected abstract List<T> loadTasks(TaskServerParam taskServerParam, int curServer);

    /**
     * 业务处理抽象方法-单个
     * @param task
     */
    protected abstract void doSingleTask(T task);

    /**
     * 业务处理抽象方法-批量
     * @param tasks
     */
    protected abstract void doBatchTasks(List<T> tasks);

    /**
     * 拼装ump监控key
     * @param prefix
     * @param taskNameKey
     * @return
     */
    private String getUmpKey(String prefix,String taskNameKey) {
        StringBuffer umpKeyBuf = new StringBuffer();
        umpKeyBuf.append(prefix).append(taskNameKey);
        return umpKeyBuf.toString();
    }

    /**
     * easyjob平台异步任务回调方法
     * @param scheduleContext
     * @return
     * @throws Exception
     */
    @Override
    public TaskResult doTask(ScheduleContext scheduleContext) throws Exception {
        String requestNo = TraceUtils.getTraceId();
        try {
            String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
            }
        } catch (Exception e) {
            log.error("easyjob执行异常", e);
        }
        EasyJobTaskServerParam taskServerParam = null;

        CallerInfo callerinfo = null;
        try {
            //条件转换
            taskServerParam = EasyJobCoreUtil.transTaskServerParam(scheduleContext);
            String taskNameKey = getTaskNameKey();
            String umpKey = getUmpKey(EASYJOB_UMP_KEY_RREFIX,taskNameKey);
            callerinfo = Profiler.registerInfo(umpKey, Constants.TRANS_BASIC, false, true);
            //多服务器,并且非子任务,本次不执行,提交子任务
            if (taskServerParam.getServerCount() > 1 && !taskServerParam.isSubTask()) {
                submitSubTask(scheduleContext, taskServerParam,requestNo);
                return TaskResult.success();
            }

            if (log.isInfoEnabled()) {
                log.info("请求编号[{}],开始获取任务,任务ID[{}],任务名称[{}],执行参数[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), JSON.toJSONString(taskServerParam));
            }
            TaskServerParam cloverTaskServerParam = EasyJobCoreUtil.transferCloverTaskServerParam(taskServerParam);

            List<T> tasks = this.selectTasks(cloverTaskServerParam, taskServerParam.getCurServer());

            if (log.isInfoEnabled()) {
                log.info("请求编号[{}],获取任务ID[{}],任务名称[{}]共{}条", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks == null ? 0 : tasks.size());
            }

            if (CollectionUtils.isNotEmpty(tasks)) {
                if (log.isInfoEnabled()) {
                    log.info("请求编号[{}],开始执行任务,任务ID[{}],任务名称[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName());
                }

                this.easyJobExecuteTasksInner(taskServerParam, tasks,requestNo);
                if (log.isInfoEnabled()) {
                    log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],执行数量[{}]完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks.size());
                }

            }
            return TaskResult.success();
        } catch (Exception e) {
            Profiler.functionError(callerinfo);
            if (log.isInfoEnabled()) {
                log.error("请求编号[{}],任务执行失败,任务ID[{}],任务名称[{}]", requestNo, taskServerParam == null ? "" : taskServerParam.getTaskId(), taskServerParam == null ? "" :taskServerParam.getTaskName(), e);
            }
            return TaskResult.fail(e.getMessage());
        }finally {
            try{
                MDC.clear();
            }catch (RuntimeException ex){
                log.error("easyjob执行异常",ex);
            }
            Profiler.registerInfoEnd(callerinfo);
        }
    }

    /**
     * 多分片提交子任务
     * @param scheduleContext 调度任务上下文参数
     * @param taskServerParam 调度任务参数
     * @param requestNo 调度任务参数
     * @return void
     */
    private void submitSubTask(ScheduleContext scheduleContext, EasyJobTaskServerParam taskServerParam,String requestNo) throws IOException {

        log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],子任务个数[{}],开始提交子任务", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());

        String jobClass = scheduleContext.getTaskGetResponse().getJobClass();

        if (StringUtils.isBlank(jobClass)) {
            throw new RuntimeException("jobClass get error");
        }

        for (int i = 0; i < taskServerParam.getServerCount(); i++) {
            Map<String, String> dataMap = scheduleContext.getParameters();
            //提交子任务标识
            dataMap.put("isSubTask", "true");
            //给子任务进行编号
            dataMap.put("curServer", String.valueOf(i));
            //父任务名称传递子任务
            dataMap.put("taskName", taskServerParam.getTaskName());
            scheduleContext.commitSubTask(jobClass, dataMap, taskServerParam.getExpected(), taskServerParam.getTransactionalAccept());
        }
        // 父任务等待子任务执行完毕再更改状态,如果执行时间超过等待时间,抛异常
        //scheduleContext.waitForSubtaskCompleted((long) taskServerParam.getServerCount() * taskServerParam.getExpected());
        log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],子任务个数[{}],提交完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());
    }

    /**
     * 创建线程池,按配置参数执行task
     * @param param 执行参数
     * @param tasks 任务集合
     * @param requestNoStr
     * @return void
     */
    private void easyJobExecuteTasksInner(final EasyJobTaskServerParam param, List<T> tasks,String requestNoStr) {
        int threadCount = param.getThreadCount();
        synchronized (this) {
            if (this.easyJobExecutor == null) {
                this.easyJobExecutor = (ThreadPoolExecutor) EasyJobCoreUtil.createCustomeasyJobExecutorService(threadCount, EASYJOB_THREAD_NAME);
                this.easyJobLastThreadCount = threadCount;
            } else if (threadCount > this.easyJobLastThreadCount) {
                this.easyJobExecutor.setMaximumPoolSize(threadCount);
                this.easyJobExecutor.setCorePoolSize(threadCount);
                this.easyJobLastThreadCount = threadCount;
            } else if (threadCount < this.easyJobLastThreadCount) {
                this.easyJobExecutor.setCorePoolSize(threadCount);
                this.easyJobExecutor.setMaximumPoolSize(threadCount);
                this.easyJobLastThreadCount = threadCount;
            }
        }

        List<List<T>> lists = Lists.partition(tasks, param.getExecuteCount());
        final CountDownLatch latch = new CountDownLatch(lists.size());
        final String requestNo = requestNoStr;
        for (final List<T> list : lists) {
            this.easyJobExecutor.submit(
                    new Callable<Object>() {
                        public Object call() throws Exception {
                            try{
                                if (StringUtils.isNotBlank(requestNo)) {
                                    MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
                                }
                            }catch (RuntimeException e){
                                log.error("easyjob自定义log跟踪拦截器执行异常",e);
                            }
                            try {
                                if (log.isInfoEnabled()) {
                                    log.info("请求编号[{}],正在执行任务,任务ID[{}],任务名称[{}],[{}],条数:[{}]...", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
                                }
                                executeTasks(list);
                                if (log.isInfoEnabled()) {
                                    log.info("请求编号[{}],执行任务,任务ID[{}],任务名称[{}],[{}],条数:[{}]成功!", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
                                }
                            } catch (Exception e) {
                                log.error(e.getMessage(), e);
                                throw e;
                            } finally {
                                try{
                                    MDC.clear();
                                }catch (RuntimeException ex){
                                    log.error("easyjob自定义log跟踪拦截器执行异常",ex);
                                }
                                latch.countDown();
                            }
                            return null;
                        }

                    }
            );
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException("interrupted when processing data access request in concurrency", e);
        }
    }

    /**
     * 获取任务名称
     * @return
     */
    private String getTaskNameKey(){
        StringBuffer keyBuf = new StringBuffer();
        keyBuf.append(activeEnv)
                .append(Constants.SEPARATOR_UNDERLINE)
                .append(this.getClass().getSimpleName());
        return keyBuf.toString();
    }

    protected void executeTasks(List<T> taskList) {
        if(CollectionUtils.isEmpty(taskList)) {
            return;
        }
        this.doTasks(taskList);
    }

    /**
     * 业务处理抽象方法
     * @param list
     */
    protected void doTasks(List<T> list){
        if(isDoBatchTasks()){
            CallerInfo info = Profiler.registerInfo(getClass().getName()+"_batch", Constants.TRANS_BASIC,false, true);
            try {
                /** 开始执行各个子类真正业务逻辑 */
                this.doBatchTasks(list);
            } catch(CommonBusinessException ex){
                log.warn(ex.getMessage());
            } catch (Exception e) {
                Profiler.functionError(info);
                log.error("任务处理失败,方法:{},任务:{}",ClassHelper.getMethod(),JSON.toJSONString(list), e);
            } finally {
                Profiler.registerInfoEnd(info);
            }
        }else{
            for (T task : list) {
                CallerInfo info = Profiler.registerInfo(getClass().getName(), Constants.TRANS_BASIC,false, true);
                if(task == null) { continue; }
                String lockKey = "";
                try {
                    /** 开始执行各个子类真正业务逻辑 */
                    if (useConcurrentLock()) {
                        lockKey = getLockKey(task);
                        if (redisHelper.lock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey)) {
                            this.doSingleTask(task);
                        }else{
                            lockKey = "";
                            log.warn("lockKey:{},加载失败,正在被其他用户锁定,请重试!",lockKey);
                        }
                    } else {
                        this.doSingleTask(task);
                    }
                } catch(CommonBusinessException ex){
                    log.warn(ex.getMessage());
                } catch (Exception e) {
                    Profiler.functionError(info);
                    log.error("任务处理失败,方法:{},任务:{}",ClassHelper.getMethod(),JSON.toJSONString(task), e);
                } finally {
                    Profiler.registerInfoEnd(info);
                    if (StringUtils.isNotBlank(lockKey)) {
                        redisHelper.unlock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey);
                    }
                }
            }
        }
    }

    /**
     * 获取实体类的实际类型
     *
     * @return
     */
    private Class<T> getArgumentType() {
        return (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    /**
     * 是否使用防并发锁
     * 默认不使用,如需使用子类重写该方法
     * @return
     */
    protected boolean useConcurrentLock() {
        return false;
    }

    /**
     * 根所注解获取LockKey,可被子类重写,提高效率
     *
     * @param businessObj   业务对象
     * @return concurrent lock key
     */
    protected String getLockKey( T businessObj) {
        StringBuilder lockKey = new StringBuilder(EASYJOB_SINGLE_TASK_LOCK_PREFIX);
        //若存在注解指定的防重字段,则使用这些字段拼装防重Key,否则使用MQ业务主键防重
        List<ValueEntryInfo> valueEntries = getAnnotaionConcurrentKeys(businessObj);
        if (!CollectionUtils.isEmpty(valueEntries)) {
            for (ValueEntryInfo valueEntry : valueEntries) {
                lockKey.append(Constants.SEPARATOR_UNDERLINE);
                lockKey.append(valueEntry.getValue());
            }
        } else {
           throw new CommonBusinessException(String.format("此任务处理需要加分布式锁,但是未设置锁key,所以不做业务处理,请检查,任务信息:%s",JSON.toJSONString(businessObj)));
        }
        return lockKey.toString();
    }

    /**
     * 查找对象的ConccurentKey注解,获取防重字段,并排序返回
     *
     * @param businessObj 业务对象
     * @return 有序的业务字段值列表
     */
    private List<ValueEntryInfo> getAnnotaionConcurrentKeys(T businessObj) {
        List<ValueEntryInfo> valueEntries = new ArrayList<ValueEntryInfo>();
        Field[] fields = businessObj.getClass().getDeclaredFields();
        for (int i = 0; i < fields.length; i++) {
            ConcurrentKey concurrentKey = fields[i].getAnnotation(ConcurrentKey.class);
            if (concurrentKey != null) {
                fields[i].setAccessible(true);
                Object fieldVal = null;
                try {
                    ValueEntryInfo valueEntry = new ValueEntryInfo();
                    fieldVal = fields[i].get(businessObj);
                    if (fieldVal != null) {
                        valueEntry.setValue(String.format("%1$s", fieldVal));
                        valueEntry.setOrder(concurrentKey.order());
                        valueEntries.add(valueEntry);
                    }
                } catch (IllegalAccessException e) {
                    log.error("IllegalAccess-{}.{}", businessObj.getClass().getName(), fields[i].getName());
                }
            }
        }
        if (valueEntries.size() > 1) {
            //排序ConcurrentKey
            Collections.sort(valueEntries, new Comparator<ValueEntryInfo>() {
                @Override
                public int compare(ValueEntryInfo o1, ValueEntryInfo o2) {
                    if (o1.getOrder() > o2.getOrder()) {
                        return 1;
                    } else if (o1.getOrder() == o2.getOrder()) {
                        return 0;
                    } else {
                        return -1;
                    }
                }
            });
        }
        return valueEntries;
    }

    protected List<T> selectTasks(TaskServerParam taskServerParam, int curServer) {
        return this.loadTasks(taskServerParam, curServer);
    }

    /**
     * 获取select时的任务创建开始时间
     * @param serverArg
     * @return
     */
    protected Date getCreateTimeFrom(String serverArg){
        return null;
    }

    /**
     * 是否以批量方式处理任务
     * @return
     */
    protected boolean isDoBatchTasks(){
        return false;
    }

}

实战结果

上述所述均为透传ID场景的原理和示例代码,实战效果如下图:调用jsf超时,跨系统查看日志进行排查,得知为慢sql引起

分布式日志追踪ID实战 | 京东物流技术团队

分布式日志追踪ID实战 | 京东物流技术团队

上述大部分场景已经抽出一个通用jar包,详细使用教程见我的另一篇文章:分布式日志追踪ID使用教程

作者:京东物流 张小龙

来源:京东云开发者社区 自猿其说 Tech 转载请注明来源

点赞
收藏
评论区
推荐文章
Stella981 Stella981
3年前
Dubbo日志链路追踪TraceId选型
!mark(https://oscimg.oschina.net/oscnet/updd1ad9729fb807ee6dc473bdc283b1a4481.png)一、目的开发排查系统问题用得最多的手段就是查看系统日志,但是在分布式环境下使用日志定位问题还是比较麻烦,需要借助全链路追踪ID把上下文串联起来,本文主要分享基于
Stella981 Stella981
3年前
Linux下安装 SkyWalking 分布式追踪系统
Linux下安装SkyWalking分布式追踪系统完全无代码入侵【落地】背景:由于现系统拆分为了分布式系统,对于线上查看错误日志有点费劲方案:部署搭建SkyWalking的分布式追踪系统一、SkyWalking简介
Stella981 Stella981
3年前
ELK项目es+kibana+jdk配置问题
简介ELK需求背景业务发展越来越庞大,服务器越来越多各种访问日志、应用日志、错误日志量越来越多开发人员排查问题,需要到服务器上查日志,不方便运营人员需要一些数据,需要我们运维到服务器上分析日志说白了就是日志那么多我要给你们搞牛逼的系统如果有机会再搞日志系统加消息列队eskafka消化来的数据ELK包含Elasti
Stella981 Stella981
3年前
Linux神器strace的使用方法及实践
在Linux系统中,strace命令是一个集诊断、调试、统计与一体的工具,可用来追踪调试程序,能够与其他命令搭配使用,接下来就Linux系统调用工具strace的使用方法和实践给大家做个详细介绍,一起来了解下strace的操作实例吧。【场景】1、在操作系统运维中会出现程序或系统命令运行失败,通过报错和日志无法定位问题根因。
京东云开发者 京东云开发者
11个月前
IT工单治理野史:由每周最高150+治理到20+ | 京东物流技术团队
背景相信不少人都值过班当过小秘吧,每天都要在线排查与解答各种各样来自IT或"单聊"的问题,同时还要针对每个问题进行"复盘"分析,在完善系统、提高体验的同时挖掘出其中的雷点,防止某一天突然"爆炸"造成不可控的局面。我们这边在值班小秘每日进行线上问题排查、解答
京东云开发者 京东云开发者
9个月前
技术分享-日志链路追踪
1.背景简述在技术运维过程中,很难从某服务庞杂的日志中,单独找寻出某次API调用的全部日志。为提高排查问题的效率,在多个系统及应用内根据统一的TraceId查找同一次请求链路上的日志,根据日志快速定位问题,同时需对业务代码无侵入,特别是在高频请求下,也可以
京东云开发者 京东云开发者
2个月前
日志框架简介-Slf4j+Logback入门实践
作者:京东零售张洪前言随着互联网和大数据的迅猛发展,分布式日志系统和日志分析系统已广泛应用,几乎所有应用程序都使用各种日志框架记录程序运行信息。因此,作为工程师,了解主流的日志记录框架非常重要。虽然应用程序的运行结果不受日志的有无影响,但没有日志的应用程序
javalover123 javalover123
1年前
轻量级分布式日志追踪-Tlog快速入门
公司目前还没有上SkyWalking、Pinpoint等分布式追踪系统,所以先用个轻量级的吧。Tlog只生成TraceId写入日志文件,没有收集、存储、查询,所以轻量
分布式系统中的分布式链路追踪与分布式调用链路
在分布式系统中,由于服务间的调用关系复杂,需要实现分布式链路追踪来跟踪请求在各个服务中的调用路径和时间消耗。这对问题排查和性能监控都很重要。常用的分布式链路追踪实现有基于日志的和基于分布式追踪系统的两种方式:
京东云开发者 京东云开发者
11个月前
关于「日志采样」的一些思考及实践
一、背景:系统日志可用于追踪用户操作轨迹,异常情况下,合理的日志有助于快速排查、定位问题,毫无疑问,打印日志对于系统是很重要的。当业务规模较小时,大家都倾向于享受日志带来的便利,从而忽略日志带来的潜在的负面影响,缺乏对日志的管控。在JD当前用户量、业务规模