Hystrix核心原理和断路器源码解析

Stella981
• 阅读 672

Hystrix运行原理

Hystrix核心原理和断路器源码解析

  1. 构造一个HystrixCommand或HystrixObservableCommand对象
  2. 执行命令。
  3. 检查是否已命中缓存,如果命中直接返回。
  4. 检查断路器开关是否打开,如果打开,直接熔断,走fallback逻辑。
  5. 检查线程池/队列/信号量是否已满,如果已满,直接拒绝请求,走fallback逻辑。
  6. 上面条件都不满足,调用HystrixObservableCommand.construct()方法HystrixCommand.run()方法,执行业务逻辑。
  7. 判断运行业务逻辑方法是否出现异常或者超时,如果出现,直接降级,走fallback逻辑。
  8. 上报统计数据,用户计算断路器状态。
  9. 返回结果

从流程图可以发现,只有出现57两种情况时,才会上报错误统计数据。

断路器运行原理

Hystrix核心原理和断路器源码解析

断路器的开关控制逻辑如下:

  1. _在一个统计时间窗口内(HystrixCommandProperties.metricsRollingStatisticalWindowInMilliseconds()_,处理的请求数量达到设定的最小阈值(HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()),并且错误百分比超过设定的最大阈值(HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()),这时断路器开关就会打开,断路器状态从转换CLOSED切换为OPEN
  2. 当断路器为打开状态时,它他会直接熔断所有请求(快速失败),走fallback逻辑。
  3. 经过一个睡眠窗口时间后(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()),Hystrix会放行一个请到后续服务,并将断路器开关切换为半开状态(HALF-OPEN)。如果该请求失败,则断路器会将熔断开关切换为打开状态(OPEN),继续熔断所有请求,直到下一个睡眠时间窗口的到来;如果该请求成功,则断路器会切换到关闭状态(CLOSED),这时将允许所有请求通过,直到出现1步骤的情况,断路器开关会切换为打开状态(OPEN)。

断路器源码

Hystrix断路器的实现类是HystrixCircuitBreaker,源码如下:

/**
 * Circuit-breaker logic that is hooked into {@link HystrixCommand} execution and will stop allowing executions if failures have gone past the defined threshold.
 * 断路器,在HystrixCommand执行时会调用断路器逻辑,如果故障超过定义的阈值,断路器熔断开关将会打开,这时将阻止任务执行。
 * <p>
 * The default (and only) implementation  will then allow a single retry after a defined sleepWindow until the execution
 * succeeds at which point it will again close the circuit and allow executions again.
 * <p>
 * 默认(且唯一)实现将允许在定义的sleepWindow之后进行单次重试,直到执行成功,此时它将再次关闭电路并允许再次执行。
 */
public interface HystrixCircuitBreaker {

    /**
     * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does
     * not modify any internal state, and takes into account the half-open logic which allows some requests through
     * after the circuit has been opened
     * <p>
     * 每个HystrixCommand请求都会询问是否允许继续(当断路器开关为OPEN和HALF_OPEN都时返回false,当断路器开关是CLOSE时或者到了下一个睡眠窗口时返回true)。
     * 它是幂等的,不会修改任何内部状态,并考虑到半开逻辑,当一个睡眠窗口到来时他会放行一些请求到后续逻辑
     *
     * @return boolean whether a request should be permitted (是否应允许请求)
     */
    boolean allowRequest();

    /**
     * Whether the circuit is currently open (tripped).
     * 判断熔断开关是否打开(如果是OPEN或HALF_OPEN时都返回true,如果为CLOSE时返回false,无副作用,是幂等方式)。
     *
     * @return boolean state of circuit breaker(返回断路器的状态)
     */
    boolean isOpen();

    /**
     * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     * <p>
     * 断路器在处于半开状态时,作为反馈机制的一部分,从HystrixCommand成功执行时调用。
     */
    void markSuccess();

    /**
     * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     * 断路器当处于半开状态时,作为反馈机制的一部分,从HystrixCommand执行不成功的调用。
     */
    void markNonSuccess();

    /**
     * Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal
     * state.
     * <p>
     * 在命令执行开始时调用以尝试执行,主要所用时判断该请求是否可以执行。这是非幂等的 - 它可能会修改内部状态。
     */
    boolean attemptExecution();
}

断路器的默认实现就是它的一个内部类:

/**
 * @ExcludeFromJavadoc
 * @ThreadSafe
 */
class Factory {
    // String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly)
    // key是HystrixCommandKey.name()(我们不能直接使用HystrixCommandKey,因为我们无法保证它正确实现hashcode / equals)
    private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

    /**
     * 根据HystrixCommandKey获取HystrixCircuitBreaker
     * Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey}.
     * <p>
     * This is thread-safe and ensures only 1 {@link HystrixCircuitBreaker} per {@link HystrixCommandKey}.
     *
     * @param key        {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker}
     * @param group      Pass-thru to {@link HystrixCircuitBreaker}
     * @param properties Pass-thru to {@link HystrixCircuitBreaker}
     * @param metrics    Pass-thru to {@link HystrixCircuitBreaker}
     * @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey}
     */
    public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        // this should find it for all but the first time
        // 根据HystrixCommandKey获取断路器
        HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
        if (previouslyCached != null) {
            return previouslyCached;
        }

        // if we get here this is the first time so we need to initialize

        // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
        // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
        // If 2 threads hit here only one will get added and the other will get a non-null response instead.
        // 第一次没有获取到断路器,那么我们需要取初始化它
        // 这里直接利用ConcurrentHashMap的putIfAbsent方法,它是原子操作,加入有两个线程执行到这里,将会只有一个线程将值放到容器中
        // 让我们省掉了加锁的步骤
        HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
        if (cbForCommand == null) {
            // this means the putIfAbsent step just created a new one so let's retrieve and return it
            return circuitBreakersByCommand.get(key.name());
        } else {
            // this means a race occurred and while attempting to 'put' another one got there before
            // and we instead retrieved it and will now return it
            return cbForCommand;
        }
    }

    /**
     * 根据HystrixCommandKey获取HystrixCircuitBreaker,如果没有返回NULL
     * Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey} or null if none exists.
     *
     * @param key {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker}
     * @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey}
     */
    public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
        return circuitBreakersByCommand.get(key.name());
    }

    /**
     * Clears all circuit breakers. If new requests come in instances will be recreated.
     * 清除所有断路器。如果有新的请求将会重新创建断路器放到容器。
     */
    /* package */
    static void reset() {
        circuitBreakersByCommand.clear();
    }
}


/**
 * 默认的断路器实现
 * The default production implementation of {@link HystrixCircuitBreaker}.
 *
 * @ExcludeFromJavadoc
 * @ThreadSafe
 */
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private final HystrixCommandProperties properties;
    private final HystrixCommandMetrics metrics;

    enum Status {
        // 断路器状态,关闭,打开,半开
        CLOSED, OPEN, HALF_OPEN;
    }

    // 赋值操作不是线程安全的。若想不用锁来实现,可以用AtomicReference<V>这个类,实现对象引用的原子更新。
    // AtomicReference 原子引用,保证Status原子性修改
    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
    // 记录断路器打开的时间点(时间戳),如果这个时间大于0表示断路器处于打开状态或半开状态
    private final AtomicLong circuitOpened = new AtomicLong(-1);
    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;

        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
        // 在定时器上,当命令执行发生时,这将在OPEN / CLOSED之间设置电路
        Subscription s = subscribeToStream();
        activeSubscription.set(s);
    }

    private Subscription subscribeToStream() {
        /*
         * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
         * 此流将重新计算运行状况流中每个onNext上的OPEN / CLOSED状态
         */
        return metrics.getHealthCountsStream()
                .observe()
                .subscribe(new Subscriber<HealthCounts>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(HealthCounts hc) {
                        // check if we are past the statisticalWindowVolumeThreshold
                        // 检查一个时间窗口内的最小请求数
                        if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                            // we are not past the minimum volume threshold for the stat window,
                            // so no change to circuit status.
                            // if it was CLOSED, it stays CLOSED
                            // IF IT WAS HALF-OPEN, WE NEED TO WAIT FOR A SUCCESSFUL COMMAND EXECUTION
                            // if it was open, we need to wait for sleep window to elapse
                            // 我们没有超过统计窗口的最小音量阈值,所以我们不会去改变断路器状态,如果是closed状态,他将保持这个状态
                            // 如果是半开状态,那么她需要等到一个成功的 Command执行
                            // 如果是打开状态,那么它需要等到这个时间窗口过去
                        } else {
                            // 检查错误比例阀值
                            if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                //we are not past the minimum error threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                // our failure rate is too high, we need to set the state to OPEN
                                // 我们的失败率太高,我们需要将状态设置为OPEN
                                if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                    circuitOpened.set(System.currentTimeMillis());
                                }
                            }
                        }
                    }
                });
    }

    @Override
    public void markSuccess() {
        // 断路器是处理半开并且HystrixCommand执行成功,将状态设置成关闭
        if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
            //This thread wins the race to close the circuit - it resets the stream to start it over from 0
            //该线程赢得了关闭电路的竞争 - 它重置流以从0开始
            metrics.resetStream();
            Subscription previousSubscription = activeSubscription.get();
            if (previousSubscription != null) {
                previousSubscription.unsubscribe();
            }
            Subscription newSubscription = subscribeToStream();
            activeSubscription.set(newSubscription);
            circuitOpened.set(-1L);
        }
    }

    @Override
    public void markNonSuccess() {
        // 断路器是处理半开并且HystrixCommand执行成功,将状态设置成打开
        if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
            //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
            // 该线程赢得了重新打开电路的竞争 - 它重置了睡眠窗口的开始时间
            circuitOpened.set(System.currentTimeMillis());
        }
    }

    @Override
    public boolean isOpen() {
        // 获取配置判断断路器是否强制打开
        if (properties.circuitBreakerForceOpen().get()) {
            return true;
        }
        // 获取配置判断断路器是否强制关闭
        if (properties.circuitBreakerForceClosed().get()) {
            return false;
        }
        return circuitOpened.get() >= 0;
    }

    @Override
    public boolean allowRequest() {
        // 获取配置判断断路器是否强制打开
        if (properties.circuitBreakerForceOpen().get()) {
            return false;
        }
        // 获取配置判断断路器是否强制关闭
        if (properties.circuitBreakerForceClosed().get()) {
            return true;
        }
        if (circuitOpened.get() == -1) {
            return true;
        } else {
            // 如果是半开状态则返回不允许Command执行
            if (status.get().equals(Status.HALF_OPEN)) {
                return false;
            } else {
                // 检查睡眠窗口是否过了
                return isAfterSleepWindow();
            }
        }
    }

    private boolean isAfterSleepWindow() {
        final long circuitOpenTime = circuitOpened.get();
        final long currentTime = System.currentTimeMillis();
        // 获取配置的一个睡眠的时间窗口
        final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
        return currentTime > circuitOpenTime + sleepWindowTime;
    }

    @Override
    public boolean attemptExecution() {
        // 获取配置判断断路器是否强制打开
        if (properties.circuitBreakerForceOpen().get()) {
            return false;
        }
        // 获取配置判断断路器是否强制关闭
        if (properties.circuitBreakerForceClosed().get()) {
            return true;
        }
        if (circuitOpened.get() == -1) {
            return true;
        } else {
            if (isAfterSleepWindow()) {
                //only the first request after sleep window should execute
                //if the executing command succeeds, the status will transition to CLOSED
                //if the executing command fails, the status will transition to OPEN
                //if the executing command gets unsubscribed, the status will transition to OPEN
                // 只有一个睡眠窗口后的第一个请求会被执行
                // 如果执行命令成功,状态将转换为CLOSED
                // 如果执行命令失败,状态将转换为OPEN
                // 如果执行命令取消订阅,状态将过渡到OPEN
                if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                    return true;
                } else {
                    return false;
                }
            } else {
                return false;
            }
        }
    }
}
  • isOpen():判断熔断开关是否打开(该方法是否幂等和Hystrix版本相关)。
  • allowRequest():每个HystrixCommand请求都会询问是否允许继续执行(当断路器开关为OPENHALF_OPEN都时返回false,当断路器开关是CLOSE或到了下一个睡眠窗口时返回true),它是幂等的,不会修改任何内部状态,并考虑到半开逻辑,当一个睡眠窗口到来时他会放行一些请求到后续逻辑。
  • attemptExecution():在命令执行开始时调用以尝试执行,主要所用时判断该请求是否可以执行。这是非幂等的,它可能会修改内部状态。

这里需要注意的是allowRequest()方法时幂等的,可以重复调用;attemptExecution()方法是有副作用的,不可以重复调用;isOpen()是否幂等和Hystrix版本有关。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Android studio 顶部状态栏 的样式 顶部小刘海是否显示 颜色代码 颜色转换
Androidstudio顶部状态栏的样式!在这里插入图片描述(https://imgblog.csdnimg.cn/20200421105253767.jpg?xossprocessimage/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,t
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
mysql查询每个学生的各科成绩,以及总分和平均分
今天看一个mysql教程,看到一个例子,感觉里面的解决方案不是很合理。问题如下:有学生表:!在这里插入图片描述(https://oscimg.oschina.net/oscnet/07b001b0c6cb7e0038a9299e768fc00a0d3.png)成绩表:!在这里插入图片描述(https://oscimg.o
Wesley13 Wesley13
3年前
VirtualService资源详解
VirtualService资源详解学习目标!在这里插入图片描述(https://imgblog.csdnimg.cn/20200831115404401.jpg?xossprocessimage/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,te
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这