专栏目录
39. 改造 resilience4j 粘合 WebClient 26.OpenFeign的组件 3.Eureka Server 与 API 网关要考虑的问题 29.Spring Cloud OpenFeign 的解析(1) 45. 实现公共日志记录 9.如何理解并定制一个Spring Cloud组件 35. 验证线程隔离正确性 2.微服务框架需要考虑的问题 31. FeignClient 实现断路器以及线程隔离限流的思路 24.测试Spring Cloud LoadBalancer 37. 实现异步的客户端封装配置管理的意义与设计 23.订制Spring Cloud LoadBalancer 21.Spring Cloud LoadBalancer简介 10.使用Log4j2以及一些核心配置 17.Eureka的实例配置 11.Log4j2 监控相关 38. 实现自定义 WebClient 的 NamedContextFactory 15.UnderTow 订制 41. SpringCloudGateway 基本流程讲解(1) 6.微服务特性相关的依赖说明 43.为何 SpringCloudGateway 中会有链路信息丢失 28.OpenFeign的生命周期-进行调用 20. 启动一个 Eureka Server 集群 34.验证重试配置正确性 41. SpringCloudGateway 基本流程讲解(2) 16.Eureka架构和核心概念 44.避免链路信息丢失做的设计(1) 27.OpenFeign的生命周期-创建代理 8.理解 NamedContextFactory 32. 改进负载均衡算法 42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷 19.Eureka的服务端设计与配置 12.UnderTow 简介与内部原理 40. spock 单元测试封装的 WebClient(下) 14.UnderTow AccessLog 配置介绍 33. 实现重试、断路器以及线程隔离源码 36. 验证断路器正确性 7.从Bean到SpringCloud 1. 背景 30. FeignClient 实现重试 25.OpenFeign简介与使用 4.maven依赖回顾以及项目框架结构 22.Spring Cloud LoadBalancer核心源码 5.所有项目的parent与spring-framework-common说明 18.Eureka的客户端核心设计和配置 13.UnderTow 核心配置 44.避免链路信息丢失做的设计(2) 40. spock 单元测试封装的 WebClient(上)

23.订制Spring Cloud LoadBalancer

unknown
• 阅读 1415

23.订制Spring Cloud LoadBalancer

本系列代码地址:https://github.com/HashZhang/spring-cloud-scaffold/tree/master/spring-cloud-iiford

我们使用 Spring Cloud 官方推荐的 Spring Cloud LoadBalancer 作为我们的客户端负载均衡器。上一节我们了解了 Spring Cloud LoadBalancer 的结构,接下来我们来说一下我们在使用 Spring Cloud LoadBalancer 要实现的功能:

  1. 我们要实现不同集群之间不互相调用,通过实例的metamap中的zone配置,来区分不同集群的实例。只有实例的metamap中的zone配置一样的实例才能互相调用。这个通过实现自定义的 ServiceInstanceListSupplier 即可实现
  2. 负载均衡的轮询算法,需要请求与请求之间隔离,不能共用同一个 position 导致某个请求失败之后的重试还是原来失败的实例。上一节看到的默认的 RoundRobinLoadBalancer 是所有线程共用同一个原子变量 position 每次请求原子加 1。在这种情况下会有问题:假设有微服务 A 有两个实例:实例 1 和实例 2。请求 A 到达时,RoundRobinLoadBalancer 返回实例 1,这时有请求 B 到达,RoundRobinLoadBalancer 返回实例 2。然后如果请求 A 失败重试,RoundRobinLoadBalancer 又返回了实例 1。这不是我们期望看到的。

针对这两个功能,我们分别编写自己的实现。

23.订制Spring Cloud LoadBalancer

Spring Cloud LoadBalancer 中的 zone 配置

Spring Cloud LoadBalancer 定义了 LoadBalancerZoneConfig

public class LoadBalancerZoneConfig {
    //标识当前负载均衡器处于哪一个 zone
    private String zone;
    public LoadBalancerZoneConfig(String zone) {
        this.zone = zone;
    }
    public String getZone() {
        return zone;
    }
    public void setZone(String zone) {
        this.zone = zone;
    }
}

如果没有引入 Eureka 相关依赖,则这个 zone 通过 spring.cloud.loadbalancer.zone 配置: LoadBalancerAutoConfiguration

@Bean
@ConditionalOnMissingBean
public LoadBalancerZoneConfig zoneConfig(Environment environment) {
    return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
}

如果引入了 Eureka 相关依赖,则如果在 Eureka 元数据配置了 zone,则这个 zone 会覆盖 Spring Cloud LoadBalancer 中的 LoadBalancerZoneConfig

EurekaLoadBalancerClientConfiguration

@PostConstruct
public void postprocess() {
    if (!StringUtils.isEmpty(zoneConfig.getZone())) {
        return;
    }
    String zone = getZoneFromEureka();
    if (!StringUtils.isEmpty(zone)) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone);
        }
        //设置 `LoadBalancerZoneConfig`
        zoneConfig.setZone(zone);
    }
}

private String getZoneFromEureka() {
    String zone;
    //是否配置了 spring.cloud.loadbalancer.eureka.approximateZoneFromHostname 为 true
    boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname();
    //如果配置了,则尝试从 Eureka 配置的 host 名称中提取
    //实际就是以 . 分割 host,然后第二个就是 zone
    //例如 www.zone1.com 就是 zone1
    if (approximateZoneFromHostname && eurekaConfig != null) {
        return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false));
    }
    else {
        //否则,从 metadata map 中取 zone 这个 key
        zone = eurekaConfig == null ? null : eurekaConfig.getMetadataMap().get("zone");
        //如果这个 key 不存在,则从配置中以 region 从 zone 列表取第一个 zone 作为当前 zone
        if (StringUtils.isEmpty(zone) && clientConfig != null) {
            String[] zones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
            // Pick the first one from the regions we want to connect to
            zone = zones != null && zones.length > 0 ? zones[0] : null;
        }
        return zone;
    }
}

实现 SameZoneOnlyServiceInstanceListSupplier

为了实现通过 zone 来过滤同一 zone 下的实例,并且绝对不会返回非同一 zone 下的实例,我们来编写代码:

SameZoneOnlyServiceInstanceListSupplier

/**
 * 只返回与当前实例同一个 Zone 的服务实例,不同 zone 之间的服务不互相调用
 */
public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
    /**
     * 实例元数据 map 中表示 zone 配置的 key
     */
    private final String ZONE = "zone";
    /**
     * 当前 spring cloud loadbalancer 的 zone 配置
     */
    private final LoadBalancerZoneConfig zoneConfig;
    private String zone;

    public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) {
        super(delegate);
        this.zoneConfig = zoneConfig;
    }

    @Override
    public Flux<List<ServiceInstance>> get() {
        return getDelegate().get().map(this::filteredByZone);
    }

    //通过 zoneConfig 过滤
    private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
        if (zone == null) {
            zone = zoneConfig.getZone();
        }
        if (zone != null) {
            List<ServiceInstance> filteredInstances = new ArrayList<>();
            for (ServiceInstance serviceInstance : serviceInstances) {
                String instanceZone = getZone(serviceInstance);
                if (zone.equalsIgnoreCase(instanceZone)) {
                    filteredInstances.add(serviceInstance);
                }
            }
            if (filteredInstances.size() > 0) {
                return filteredInstances;
            }
        }
        /**
         * @see ZonePreferenceServiceInstanceListSupplier 在没有相同zone实例的时候返回的是所有实例
         * 我们这里为了实现不同 zone 之间不互相调用需要返回空列表
         */
        return List.of();
    }

    //读取实例的 zone,没有配置则为 null
    private String getZone(ServiceInstance serviceInstance) {
        Map<String, String> metadata = serviceInstance.getMetadata();
        if (metadata != null) {
            return metadata.get(ZONE);
        }
        return null;
    }
}

23.订制Spring Cloud LoadBalancer

在之前章节的讲述中,我们提到了我们使用 spring-cloud-sleuth 作为链路追踪库。我们想可以通过其中的 traceId,来区分究竟是否是同一个请求。

RoundRobinWithRequestSeparatedPositionLoadBalancer

//一定必须是实现ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//因为注册的时候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    //每次请求算上重试不会超过1分钟
    //对于超过1分钟的,这种请求肯定比较重,不应该重试
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
            //随机初始值,防止每次都是从第一个开始调用
            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;


    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
        this.serviceInstanceListSupplier = serviceInstanceListSupplier;
        this.serviceId = serviceId;
        this.tracer = tracer;
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        return getInstanceResponseByRoundRobin(serviceInstances);
    }

    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        //为了解决原始算法不同调用并发可能导致一个请求重试相同的实例
        Span currentSpan = tracer.currentSpan();
        if (currentSpan == null) {
            currentSpan = tracer.newTrace();
        }
        long l = currentSpan.context().traceId();
        AtomicInteger seed = positionCache.get(l);
        int s = seed.getAndIncrement();
        int pos = s % serviceInstances.size();
        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
        return new DefaultResponse(serviceInstances.stream()
                //实例返回列表顺序可能不同,为了保持一致,先排序再取
                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                .collect(Collectors.toList()).get(pos));
    }
}

23.订制Spring Cloud LoadBalancer

在上一节,我们提到了可以通过 @LoadBalancerClients 注解配置默认的负载均衡器配置,我们这里就是通过这种方式进行配置。首先在 spring.factories 中添加自动配置类:

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration

然后编写这个自动配置类,其实很简单,就是添加一个 @LoadBalancerClients 注解,设置默认配置类:

LoadBalancerAutoConfiguration

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class)
public class LoadBalancerAutoConfiguration {
}

编写这个默认配置类,将上面我们实现的两个类,组装进去:

DefaultLoadBalancerConfiguration

@Configuration(proxyBeanMethods = false)
public class DefaultLoadBalancerConfiguration {

    @Bean
    public ServiceInstanceListSupplier serviceInstanceListSupplier(
            DiscoveryClient discoveryClient,
            Environment env,
            ConfigurableApplicationContext context,
            LoadBalancerZoneConfig zoneConfig
    ) {
        ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
                .getBeanProvider(LoadBalancerCacheManager.class);
        return  //开启服务实例缓存
                new CachingServiceInstanceListSupplier(
                        //只能返回同一个 zone 的服务实例
                        new SameZoneOnlyServiceInstanceListSupplier(
                                //启用通过 discoveryClient 的服务发现
                                new DiscoveryClientServiceInstanceListSupplier(
                                        discoveryClient, env
                                ),
                                zoneConfig
                        )
                        , cacheManagerProvider.getIfAvailable()
                );
    }

    @Bean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(
            Environment environment,
            ServiceInstanceListSupplier serviceInstanceListSupplier,
            Tracer tracer
    ) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RoundRobinWithRequestSeparatedPositionLoadBalancer(
                serviceInstanceListSupplier,
                name,
                tracer
        );
    }
}

这样,我们就实现了自定义的负载均衡器。也理解了 Spring Cloud LoadBalancer 的使用。

23.订制Spring Cloud LoadBalancer

我们这一节详细分析在我们项目中使用 Spring Cloud LoadBalancer 要实现的功能,实现了自定义的负载均衡器,也理解了 Spring Cloud LoadBalancer 的使用。下一节我们使用单元测试验证我们要实现的这些功能是否有效。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

23.订制Spring Cloud LoadBalancer

点赞
收藏
评论区
推荐文章

暂无数据

unknown
unknown
Lv1
男 · rrrr · rrrrrrrr
rrrrr
文章
0
粉丝
17
获赞
0
热门文章

暂无数据