专栏目录
39. 改造 resilience4j 粘合 WebClient 26.OpenFeign的组件 3.Eureka Server 与 API 网关要考虑的问题 29.Spring Cloud OpenFeign 的解析(1) 9.如何理解并定制一个Spring Cloud组件 45. 实现公共日志记录 35. 验证线程隔离正确性 2.微服务框架需要考虑的问题 31. FeignClient 实现断路器以及线程隔离限流的思路 24.测试Spring Cloud LoadBalancer 37. 实现异步的客户端封装配置管理的意义与设计 23.订制Spring Cloud LoadBalancer 21.Spring Cloud LoadBalancer简介 10.使用Log4j2以及一些核心配置 17.Eureka的实例配置 11.Log4j2 监控相关 38. 实现自定义 WebClient 的 NamedContextFactory 15.UnderTow 订制 41. SpringCloudGateway 基本流程讲解(1) 6.微服务特性相关的依赖说明 43.为何 SpringCloudGateway 中会有链路信息丢失 28.OpenFeign的生命周期-进行调用 20. 启动一个 Eureka Server 集群 34.验证重试配置正确性 41. SpringCloudGateway 基本流程讲解(2) 16.Eureka架构和核心概念 44.避免链路信息丢失做的设计(1) 27.OpenFeign的生命周期-创建代理 8.理解 NamedContextFactory 32. 改进负载均衡算法 42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷 19.Eureka的服务端设计与配置 12.UnderTow 简介与内部原理 40. spock 单元测试封装的 WebClient(下) 14.UnderTow AccessLog 配置介绍 33. 实现重试、断路器以及线程隔离源码 36. 验证断路器正确性 7.从Bean到SpringCloud 1. 背景 30. FeignClient 实现重试 25.OpenFeign简介与使用 4.maven依赖回顾以及项目框架结构 22.Spring Cloud LoadBalancer核心源码 5.所有项目的parent与spring-framework-common说明 18.Eureka的客户端核心设计和配置 13.UnderTow 核心配置 40. spock 单元测试封装的 WebClient(上) 44.避免链路信息丢失做的设计(2)

22.Spring Cloud LoadBalancer核心源码

unknown
• 阅读 1440

22.Spring Cloud LoadBalancer核心源码

本系列代码地址:https://github.com/HashZhang/spring-cloud-scaffold/tree/master/spring-cloud-iiford

经过上一节的详细分析,我们知道可以通过 LoadBalancerClientFactory 知道默认配置类LoadBalancerClientConfiguration. 并且获取微服务名称可以通过 environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);

LoadBalancerClientFactory

public static final String NAMESPACE = "loadbalancer";
public static final String PROPERTY_NAME = NAMESPACE + ".client.name";
public LoadBalancerClientFactory() {
    super(LoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME);
}

查看配置类 LoadBalancerClientConfiguration,我们可以发现这个类主要定义两种 Bean,分别是 ReactorLoadBalancer<ServiceInstance>ServiceInstanceListSupplier

ReactorLoadBalancer 是负载均衡器,主要提供根据服务名称获取服务实例列表并从从中选择的功能。

ReactorLoadBalancer

Mono<Response<T>> choose(Request request);

在默认配置中的实现是:

LoadBalancerClientConfiguration

@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(
        Environment environment,
        LoadBalancerClientFactory loadBalancerClientFactory) {
    //获取微服务名称
    String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
    //创建 RoundRobinLoadBalancer 
    //注意这里注入的是 LazyProvider,这主要因为在注册这个 Bean 的时候相关的 Bean 可能还没有被加载注册,利用 LazyProvider 而不是直接注入所需的 Bean 防止报找不到 Bean 注入的错误。
    return new RoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name,
            ServiceInstanceListSupplier.class), name);
}

可以看出,默认配置的 ReactorLoadBalancer 实现是 RoundRobinLoadBalancer。这个负载均衡器实现很简单,有一个原子类型的 AtomicInteger position,从 ServiceInstanceListSupplier 中读取所有的服务实例列表,然后对于 position 原子加1,对列表大小取模,返回列表中这个位置的服务实例 ServiceInstance

RoundRobinLoadBalancer

public Mono<Response<ServiceInstance>> choose(Request request) {
    //注入的时候注入的是 Lazy Provider,这里取出真正的 Bean,也就是 ServiceInstanceListSupplier
    ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
            .getIfAvailable(NoopServiceInstanceListSupplier::new);
            //获取实例列表
    return supplier.get(request)
            .next()
            //从列表中选择一个实例
            .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}

private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
        List<ServiceInstance> serviceInstances) {
    Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
    // 如果 ServiceInstanceListSupplier 也实现了 SelectedInstanceCallback,则执行下面的逻辑进行回调。SelectedInstanceCallback 就是每次负载均衡器选择实例之后进行的回调
    if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
        ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    }
    return serviceInstanceResponse;
}

private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
    if (instances.isEmpty()) {
        return new EmptyResponse();
    }
    //postion 原子 +1 并取绝对值
    int pos = Math.abs(this.position.incrementAndGet());
    //返回对应下标的实例
    ServiceInstance instance = instances.get(pos % instances.size());
    return new DefaultResponse(instance);
}

ServiceInstanceListSupplier 是服务列表提供者接口:

ServiceInstanceListSupplier

public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {
    String getServiceId();
    default Flux<List<ServiceInstance>> get(Request request) {
        return get();
    }
    static ServiceInstanceListSupplierBuilder builder() {
        return new ServiceInstanceListSupplierBuilder();
    }
}

spring-cloud-loadbalancer 中有很多 ServiceInstanceListSupplier 的实现,在默认配置中是通过属性配置指定实现的,这个配置项是spring.cloud.loadbalancer.configurations。例如:

LoadBalancerClientConfiguration

@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
//spring.cloud.loadbalancer.configurations 未指定或者为 default
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default",
        matchIfMissing = true)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
        ConfigurableApplicationContext context) {
    return ServiceInstanceListSupplier.builder()
    //通过 DiscoveryClient 提供实例
    .withDiscoveryClient()
    //开启缓存
    .withCaching()
    .build(context);
}

@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
//如果 spring.cloud.loadbalancer.configurations 指定为 zone-preference
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "zone-preference")
public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(
        ConfigurableApplicationContext context) {
    return ServiceInstanceListSupplier.builder()
    //通过 DiscoveryClient 提供实例
    .withDiscoveryClient()
    //启用更倾向于同一个 zone 下实例的特性
    .withZonePreference()
    //开启缓存
    .withCaching()
            .build(context);
}

可以看到,可以通过 ServiceInstanceListSupplier.builder() 生成官方封装好各种特性的 ServiceInstanceListSupplier。其实从底层实现可以看出,所有的 ServiceInstanceListSupplier 实现都是代理模式,例如对于默认配置,底层代码近似于:

return  //开启服务实例缓存
        new CachingServiceInstanceListSupplier(
                        //启用通过 discoveryClient 的服务发现
                        new DiscoveryClientServiceInstanceListSupplier(
                                discoveryClient, env
                        )
                , cacheManagerProvider.getIfAvailable()
        );

除了默认配置 LoadBalancerClientConfiguration,用户配置自定义配置则是通过 @LoadBalancerClients@LoadBalancerClient.这个原理是通过 LoadBalancerClientConfigurationRegistrar 实现的。首先,我们来看一下 LoadBalancerClientFactory 这个 NamedContextFactory 是如何创建的:

[LoadBalancerAutoConfiguration]

private final ObjectProvider<List<LoadBalancerClientSpecification>> configurations;

public LoadBalancerAutoConfiguration(ObjectProvider<List<LoadBalancerClientSpecification>> configurations) {
    //注入 LoadBalancerClientSpecification List 的 provider
    //在 Bean 创建的时候,进行载入,而不是注册的时候
    this.configurations = configurations;
}

@ConditionalOnMissingBean
@Bean
public LoadBalancerClientFactory loadBalancerClientFactory() {
    //创建 LoadBalancerClientFactory
    LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory();
    //读取所有的 LoadBalancerClientSpecification,设置为 LoadBalancerClientFactory 的配置
    clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList));
    return clientFactory;
}

那么,LoadBalancerClientSpecification 这些 Bean 是怎么创建的呢?在 @LoadBalancerClients@LoadBalancerClient 注解中,都包含 @Import(LoadBalancerClientConfigurationRegistrar.class)。这个 @Import 加载一个 ImportBeanDefinitionRegistrar,这里是 LoadBalancerClientConfigurationRegistrar. ImportBeanDefinitionRegistrar里面的方法参数包含注解元数据,以及注册 Bean 的 BeanDefinitionRegistry。一般通过注解元数据,动态通过 BeanDefinitionRegistry 注册 Bean,在这里的实现是:

[LoadBalancerClients]

@Configuration(proxyBeanMethods = false)
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
@Documented
@Import(LoadBalancerClientConfigurationRegistrar.class)
public @interface LoadBalancerClients {
    //可以指定多个 LoadBalancerClient
    LoadBalancerClient[] value() default {};
    //指定所有的负载均衡配置的默认配置
    Class<?>[] defaultConfiguration() default {};
}

[LoadBalancerClient]

@Configuration(proxyBeanMethods = false)
@Import(LoadBalancerClientConfigurationRegistrar.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LoadBalancerClient {
    //name 和 value 都是微服务名称
    @AliasFor("name")
    String value() default "";
    @AliasFor("value")
    String name() default "";
    //这个微服务的配置
    Class<?>[] configuration() default {};
}

[LoadBalancerClientConfigurationRegistrar]

@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
    //获取 LoadBalancerClients 注解的元数据
    Map<String, Object> attrs = metadata.getAnnotationAttributes(LoadBalancerClients.class.getName(), true);
    if (attrs != null && attrs.containsKey("value")) {
        AnnotationAttributes[] clients = (AnnotationAttributes[]) attrs.get("value");
        //对于 value 属性,其实就是一个 LoadBalancerClient 列表,对于每个生成一个特定微服务名字的  LoadBalancerClientSpecification
        for (AnnotationAttributes client : clients) {
            registerClientConfiguration(registry, getClientName(client), client.get("configuration"));
        }
    }
    //如果指定了 defaultConfiguration,则注册为 default 的配置
    if (attrs != null && attrs.containsKey("defaultConfiguration")) {
        String name;
        if (metadata.hasEnclosingClass()) {
            name = "default." + metadata.getEnclosingClassName();
        }
        else {
            name = "default." + metadata.getClassName();
        }
        registerClientConfiguration(registry, name, attrs.get("defaultConfiguration"));
    }
    //获取 LoadBalancerClient 注解的元数据
    Map<String, Object> client = metadata.getAnnotationAttributes(LoadBalancerClient.class.getName(), true);
    String name = getClientName(client);
    if (name != null) {
        registerClientConfiguration(registry, name, client.get("configuration"));
    }
}

private static void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) {
    //初始化 LoadBalancerClientSpecification 的 BeanDefinition,用于注册一个 LoadBalancerClientSpecification Bean
    BeanDefinitionBuilder builder = BeanDefinitionBuilder
            .genericBeanDefinition(LoadBalancerClientSpecification.class);
    //构造器参数
    builder.addConstructorArgValue(name);
    builder.addConstructorArgValue(configuration);
    //注册 Bean
    registry.registerBeanDefinition(name + ".LoadBalancerClientSpecification", builder.getBeanDefinition());
}

从代码中我们可以看出,通过使用 @LoadBalancerClients@LoadBalancerClient 注解可以自动生成对应的 LoadBalancerClientSpecification 进而实现公共负载均衡配置或者特定某个微服务的负载均衡配置。

22.Spring Cloud LoadBalancer核心源码

我们这一节详细分析 Spring Cloud LoadBalancer 的源代码来理解其中的原理,下一节我们将介绍在我们项目中使用 Spring Cloud LoadBalancer 要实现的功能。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

22.Spring Cloud LoadBalancer核心源码

点赞
收藏
评论区
推荐文章

暂无数据

unknown
unknown
Lv1
男 · rrrr · rrrrrrrr
rrrrr
文章
0
粉丝
17
获赞
0
热门文章

暂无数据