Spring Cloud升级之路

Stella981
• 阅读 680

针对网关非 Get 请求的重试

在之前的系列里面Spring Cloud升级之路 - Hoxton - 5. 实现微服务调用重试,我们针对 OpenFeign 和 Spring Cloud Gateway 都设置了重试。

对于 OpenFeign:

  • Get请求:任何非200 响应码,任何异常,都会重试。
  • 非 Get 请求:任何IOException(除了SocketTimeOutException,这个是read time out 导致的),还有 redilience 断路器异常,都会重试,其他的都不重试。

对于 Spring Cloud Gateway:

  • Get请求:任何4XX,5XX响应码,任何异常,都会重试。

现在,我们需要实现针对于 Spring Cloud Gateway 的非 Get 请求的任何IOException(除了SocketTimeOutException,这个是read time out 导致的),还有 redilience 断路器异常进行重试,Get因为请求并没有真正发出去。

现有设计

目前在 Spring Cloud Gateway 的 RetryFilterFactory,无法实现针对 Get 和非 Get 对于不同的异常进行不同的重试:

org.springframework.cloud.gateway.filter.factory.RetryGatewayFilterFactory

public class RetryGatewayFilterFactory
        extends AbstractGatewayFilterFactory<RetryGatewayFilterFactory.RetryConfig> {

    /**
     * Retry iteration key.ServerWebExchange的某个Attribute的key
     * 这个Attribute用来在每次调用的时候,+1,看是否超过了重试次数
     */
    public static final String RETRY_ITERATION_KEY = "retry_iteration";

    public RetryGatewayFilterFactory() {
        super(RetryConfig.class);
    }

    @Override
    public GatewayFilter apply(RetryConfig retryConfig) {
        //检验配置
        retryConfig.validate();
        Repeat<ServerWebExchange> statusCodeRepeat = null;
        //如果配置了可重试的HTTP响应状态码,则检查响应码是否可以重试
        if (!retryConfig.getStatuses().isEmpty() || !retryConfig.getSeries().isEmpty()) {
            Predicate<RepeatContext<ServerWebExchange>> repeatPredicate = context -> {
                ServerWebExchange exchange = context.applicationContext();
                //检查是否超过了重试次数
                if (exceedsMaxIterations(exchange, retryConfig)) {
                    return false;
                }

                //判断是否可以重试
                HttpStatus statusCode = exchange.getResponse().getStatusCode();

                boolean retryableStatusCode = retryConfig.getStatuses()
                        .contains(statusCode);

                if (!retryableStatusCode && statusCode != null) { 
                    // try the series
                    retryableStatusCode = retryConfig.getSeries().stream()
                            .anyMatch(series -> statusCode.series().equals(series));
                }

                final boolean finalRetryableStatusCode = retryableStatusCode;

                //判断是否是可以重试的HTTP方法
                HttpMethod httpMethod = exchange.getRequest().getMethod();
                boolean retryableMethod = retryConfig.getMethods().contains(httpMethod);
                //返回是否是可以重试的方法以及是否可以重试的HTTP响应状态码
                return retryableMethod && finalRetryableStatusCode;
            };

            //每次重试,都要重置路由,重新解析路由
            statusCodeRepeat = Repeat.onlyIf(repeatPredicate)
                    .doOnRepeat(context -> reset(context.applicationContext()));
            
            //设置Backoff
            BackoffConfig backoff = retryConfig.getBackoff();
            if (backoff != null) {
                statusCodeRepeat = statusCodeRepeat.backoff(getBackoff(backoff));
            }
        }

        // TODO: support timeout, backoff, jitter, etc... in Builder

        //判断异常是否可以重试
        Retry<ServerWebExchange> exceptionRetry = null;
        if (!retryConfig.getExceptions().isEmpty()) {
            Predicate<RetryContext<ServerWebExchange>> retryContextPredicate = context -> {

                ServerWebExchange exchange = context.applicationContext();

                if (exceedsMaxIterations(exchange, retryConfig)) {
                    return false;
                }

                Throwable exception = context.exception();
                for (Class<? extends Throwable> retryableClass : retryConfig
                        .getExceptions()) {
                    if (retryableClass.isInstance(exception) || (exception != null
                            && retryableClass.isInstance(exception.getCause()))) {
                        trace("exception or its cause is retryable %s, configured exceptions %s",
                                () -> getExceptionNameWithCause(exception),
                                retryConfig::getExceptions);

                        HttpMethod httpMethod = exchange.getRequest().getMethod();
                        boolean retryableMethod = retryConfig.getMethods()
                                .contains(httpMethod);
                        trace("retryableMethod: %b, httpMethod %s, configured methods %s",
                                () -> retryableMethod, () -> httpMethod,
                                retryConfig::getMethods);
                        return retryableMethod;
                    }
                }
                trace("exception or its cause is not retryable %s, configured exceptions %s",
                        () -> getExceptionNameWithCause(exception),
                        retryConfig::getExceptions);
                return false;
            };
            exceptionRetry = Retry.onlyIf(retryContextPredicate)
                    .doOnRetry(context -> reset(context.applicationContext()))
                    .retryMax(retryConfig.getRetries());
            BackoffConfig backoff = retryConfig.getBackoff();
            if (backoff != null) {
                exceptionRetry = exceptionRetry.backoff(getBackoff(backoff));
            }
        }

        GatewayFilter gatewayFilter = apply(retryConfig.getRouteId(), statusCodeRepeat,
                exceptionRetry);
        return new GatewayFilter() {
            @Override
            public Mono<Void> filter(ServerWebExchange exchange,
                    GatewayFilterChain chain) {
                return gatewayFilter.filter(exchange, chain);
            }

            @Override
            public String toString() {
                return filterToStringCreator(RetryGatewayFilterFactory.this)
                        .append("retries", retryConfig.getRetries())
                        .append("series", retryConfig.getSeries())
                        .append("statuses", retryConfig.getStatuses())
                        .append("methods", retryConfig.getMethods())
                        .append("exceptions", retryConfig.getExceptions()).toString();
            }
        };
    }

    private String getExceptionNameWithCause(Throwable exception) {
        if (exception != null) {
            StringBuilder builder = new StringBuilder(exception.getClass().getName());
            Throwable cause = exception.getCause();
            if (cause != null) {
                builder.append("{cause=").append(cause.getClass().getName()).append("}");
            }
            return builder.toString();
        }
        else {
            return "null";
        }
    }

    private Backoff getBackoff(BackoffConfig backoff) {
        return Backoff.exponential(backoff.firstBackoff, backoff.maxBackoff,
                backoff.factor, backoff.basedOnPreviousValue);
    }

    public boolean exceedsMaxIterations(ServerWebExchange exchange,
            RetryConfig retryConfig) {
        Integer iteration = exchange.getAttribute(RETRY_ITERATION_KEY);
        //是否超过了可重试次数
        boolean exceeds = iteration != null && iteration >= retryConfig.getRetries();
        return exceeds;
    }

    public void reset(ServerWebExchange exchange) {
        //这个方法主要是
        Set<String> addedHeaders = exchange.getAttributeOrDefault(
                CLIENT_RESPONSE_HEADER_NAMES, Collections.emptySet());
        addedHeaders
                .forEach(header -> exchange.getResponse().getHeaders().remove(header));
        removeAlreadyRouted(exchange);
    }


    public GatewayFilter apply(String routeId, Repeat<ServerWebExchange> repeat,
            Retry<ServerWebExchange> retry) {
        if (routeId != null && getPublisher() != null) {
            // send an event to enable caching
            getPublisher().publishEvent(new EnableBodyCachingEvent(this, routeId));
        }
        return (exchange, chain) -> {
            trace("Entering retry-filter");

            // chain.filter returns a Mono<Void>
            Publisher<Void> publisher = chain.filter(exchange)
                    // .log("retry-filter", Level.INFO)
                    .doOnSuccessOrError((aVoid, throwable) -> {
                        int iteration = exchange
                                .getAttributeOrDefault(RETRY_ITERATION_KEY, -1);
                        int newIteration = iteration + 1;
                        trace("setting new iteration in attr %d", () -> newIteration);
                        exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration);
                    });

            if (retry != null) {
                // retryWhen returns a Mono<Void>
                // retry needs to go before repeat
                publisher = ((Mono<Void>) publisher)
                        .retryWhen(retry.withApplicationContext(exchange));
            }
            if (repeat != null) {
                // repeatWhen returns a Flux<Void>
                // so this needs to be last and the variable a Publisher<Void>
                publisher = ((Mono<Void>) publisher)
                        .repeatWhen(repeat.withApplicationContext(exchange));
            }

            return Mono.fromDirect(publisher);
        };
    }

    @SuppressWarnings("unchecked")
    public static class RetryConfig implements HasRouteId {
        //路由id
        private String routeId;
        //重试次数,不包括调用的第一次,默认为3,也就是可能会调用4次
        private int retries = 3;
        //针对哪些HTTP状态码重试,一个Series对应一组HttpStatus
        private List<Series> series = toList(Series.SERVER_ERROR);
        //针对哪些HTTP状态码重试,一个HttpStatus就是一个HTTP状态码
        private List<HttpStatus> statuses = new ArrayList<>();
        //针对哪些HTTP方法重试
        private List<HttpMethod> methods = toList(HttpMethod.GET);
        //针对的哪些异常重试
        private List<Class<? extends Throwable>> exceptions = toList(IOException.class,
                TimeoutException.class);
        //重试间隔策略
        private BackoffConfig backoff;
        
        public void validate() {
            //重试次数必须大于10
            Assert.isTrue(this.retries > 0, "retries must be greater than 0");
            //可重试的series,可重试的状态码还有可重试的异常不能都为空,否则没有可以重试的场景了
            Assert.isTrue(
                    !this.series.isEmpty() || !this.statuses.isEmpty()
                            || !this.exceptions.isEmpty(),
                    "series, status and exceptions may not all be empty");
            //重试的Http方法不能为空
            Assert.notEmpty(this.methods, "methods may not be empty");
            if (this.backoff != null) {
                this.backoff.validate();
            }
        }
        
        //省略构造器,getter,setter还有一些工具方法
    }

    public static class BackoffConfig {
        //第一次重试时间间隔
        private Duration firstBackoff = Duration.ofMillis(5);
        //最大等待间隔
        private Duration maxBackoff;
        //增长比例
        private int factor = 2;
        //是否保留上一次请求的重试间隔时间,下次从这个时间间隔开始重试
        private boolean basedOnPreviousValue = true;
        //省略构造器,getter,setter
        public void validate() {
            //第一次重试间隔不能为空
            Assert.notNull(this.firstBackoff, "firstBackoff must be present");
        }
    }

}

总结起来,流程简化如下:

  1. 判断本次请求 HTTP 方法是否被 RetryConfig.methods 包含和 HTTP 响应码是否在 RetryConfig.series 的范围内或者 statuses 的集合内,如果在,看本次请求的 retry_iteration 这个 Attribute 是第几次(从0开始),是否超过了重试次数,如果没超过,就重试,如果超过,停止重试。
  2. 判断本次请求 HTTP 方法是否被 RetryConfig.methods 包含和 异常是否在 RetryConfig.exceptions 的集合内(是其中的某个异常的子类也可以),如果在,看本次请求的 retry_iteration 这个 Attribute 是第几次(从0开始),是否超过了重试次数,如果没超过,就重试,如果超过,停止重试。

配置的时候,HTTP 方法如果包含所有方法,那么没办法区分 GET 请求或者是 非 GET 请求;如果建立两个 Filter 一个拦截 GET 另一个拦截 非GET,那么他们共用的 Attribute 每次就会 +2,重试次数就不准确了。

所以,最后使用了这样一个不优雅的设计,就是 GET 和非 GET 使用不同的 RetryConfig,GET 的还是根据application.properties配置来,针对非 GET 请求,强制重试下面这些异常:

  • io.netty.channel.ConnectTimeoutException.class:连接超时
  • java.net.ConnectException.class:No route to host 异常
  • io.github.resilience4j.circuitbreaker.CallNotPermittedException: resilience4j 断路器相关异常

RetryGatewayFilter

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        //获取微服务名称
        String serviceName = request.getHeaders().getFirst(CommonConstant.SERVICE_NAME);
        HttpMethod method = exchange.getRequest().getMethod();
        //生成 GatewayFilter,保存到 gatewayFilterMap
        GatewayFilter gatewayFilter = gatewayFilterMap.computeIfAbsent(serviceName + ":" + method, k -> {
            Map<String, RetryConfig> retryConfigMap = apiGatewayRetryConfig.getRetry();
            //通过微服务名称,获取重试配置
            RetryConfig retryConfig = retryConfigMap.containsKey(serviceName) ? retryConfigMap.get(serviceName) : apiGatewayRetryConfig.getDefault();
            //重试次数为0,则不重试
            if (retryConfig.getRetries() == 0) {
                return null;
            }
            //针对非GET请求,强制限制重试并且只能重试下面的异常b
            if (!HttpMethod.GET.equals(method)) {
                RetryConfig newConfig = new RetryConfig();
                BeanUtils.copyProperties(retryConfig, newConfig);
                //限制所有方法都可以重试,由于外层限制了不为GET,这里相当于不为GET的所有方法
                newConfig.setMethods(HttpMethod.values());
                newConfig.setSeries();
                newConfig.setStatuses();
                newConfig.setExceptions(//链接超时
                        io.netty.channel.ConnectTimeoutException.class,
                        //No route to host
                        java.net.ConnectException.class,
                        //针对Resilience4j的异常
                        CallNotPermittedException.class);
                retryConfig = newConfig;
            }
            return this.apply(retryConfig);
        });
        return gatewayFilter != null ? gatewayFilter.filter(exchange, chain) : chain.filter(exchange);
    }
点赞
收藏
评论区
推荐文章
Easter79 Easter79
3年前
springboot整合spring retry 重试机制
当我们调用一个接口可能由于网络等原因造成第一次失败,再去尝试就成功了,这就是重试机制,spring支持重试机制,并且在SpringCloud中可以与Hystaix结合使用,可以避免访问到已经不正常的实例。但是切记非幂等情况下慎用重试一 加入依赖<!重试机制<depen
Easter79 Easter79
3年前
SpringCloud重试机制配置
    首先声明一点,这里的重试并不是报错以后的重试,而是负载均衡客户端发现远程请求实例不可到达后,去重试其他实例。Table1.ribbon重试配置ribbon.OkToRetryOnAllOperationsfalse(是否所有操作都重试)ribbon.MaxAutoRetriesNextServer2(重
Stella981 Stella981
3年前
Spring RabbitMQ 消息重试机制
RabbitMQ框架提供了重试机制,只需要简单的配置即可开启,可以提升程序的健壮性。测试一:重试5次spring:rabbitmq:listener:simple:retry:enabled:true