专栏目录
- Spring Cloud OpenFeign 源码解析
- Spring Cloud Ribbon 源码解析
- Spring Cloud Alibaba Sentinel 源码解析
- Spring Cloud Gatway 源码解析
- Spring Cloud Alibaba Nacos 源码解析
代码准备
依赖关系
+------------+ +------------+
| | | |
| | | |
| | | |
| | | |
| consumer +------------> | provider |
| | RestTemplate | |
| | | |
| | | |
| | | |
+------------+ +------------+
pom 依赖
加入nacos 服务发现即可,内部引用了spring-cloud-ribbon
相关依赖
<dependency>
<groupid>com.alibaba.cloud</groupid>
<artifactid>spring-cloud-starter-alibaba-nacos-discovery</artifactid>
</dependency>
调用客户端
我们这里以最简单的 RestTemplate
调用开始使用Ribbon
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
// Controller 使用restTemplate 调用服务提供方接口
ResponseEntity<string> forEntity = restTemplate.getForEntity("http://provider/req", String.class);
源码解析
创建调用拦截器
1. 获取全部 @LoadBalanced
标记的RestTemplate
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<resttemplate> restTemplates = Collections.emptyList();
}
2. 增加 LoadBalancerInterceptor
处理逻辑
没有引入
spring-retry
使用的是@Bean public LoadBalancerInterceptor ribbonInterceptor() { return new LoadBalancerInterceptor(); }
引入
spring-retry
使用的是@Bean @ConditionalOnMissingBean public RetryLoadBalancerInterceptor ribbonInterceptor() { return new RetryLoadBalancerInterceptor(); }
LoadBalancerInterceptor 业务逻辑
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { @Override public ClientHttpResponse intercept() { final URI originalUri = request.getURI(); // http://demo-provider/req 截取 demo-provider 服务名称 String serviceName = originalUri.getHost(); // 默认注入的 RibbonAutoConfiguration.RibbonLoadBalancerClient return this.loadBalancer.execute(serviceName, // 创建请求对象 this.requestFactory.createRequest(request, body, execution)); } }
执行拦截器
3. RibbonLoadBalancerClient执行
//RibbonAutoConfiguration默认注入的RibbonLoadBalancerClient
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
4.execute执行
public class RibbonLoadBalancerClient implements LoadBalancerClient {
public <t> T execute(){
//获取具体的ILoadBalancer实现
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
// 调用ILoadBalancer 实现获取Server
Server server = getServer(loadBalancer, hint);
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
//获取状态记录器,保存此次选取的server
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
}
获取ILoadBalancer
5 SpringClientFactory
// bean 工厂生成LoadBalancer 的实现
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.springClientFactory.getLoadBalancer(serviceId);
}
// 具体生成逻辑看 RibbonClientConfiguration,这个Bean 只有工厂调用的时候才会创建
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<server> serverList, ServerListFilter<server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
return new ZoneAwareLoadBalancer<>();
}
6.创建LoadBalancer 的依赖要素
名称
默认实现
作用
IClientConfig
DefaultClientConfigImpl
ribbon 客户端配置参数,例如: 超时设置、压缩设置等
ServerList
NacosServerList
目标服务的实例实例表,具体服务发现客户端实现
ServerListFilter
ZonePreferenceServerListFilter
针对ServerList 实例列表的过滤逻辑处理
IRule
ZoneAvoidanceRule
负载均衡选择Server 的规则
IPing
DummyPing
检验服务是否可用的方法实现
ServerListUpdater
PollingServerListUpdater
针对ServerList 更新的操作实现
以上默认实现参考 RibbonClientConfiguration. ZoneAwareLoadBalancer
获取服务实例
//Server server = getServer(loadBalancer, hint); 4. excute 方法
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
7. ZoneAwareLoadBalancer
public class ZoneAwareLoadBalancer{
public ZoneAwareLoadBalancer() {
// 调用父类初始化方法。 这里会开启实例维护的定时任务等 (具体解析参考 扩展部分)
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
@Override
public Server chooseServer(Object key) {
// 若是使用的 Nacos 服务发现,则没有 Zone 的概念,直接调用父类的实现
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
return super.chooseServer(key);
}
// 以下为有 Zone 的概念 例如 Eureka (具体)
...
return server;
}
}
父类调用
IRule
实现选择Serverpublic Server chooseServer(Object key) { return rule.choose(key); }
8.PredicateBasedRule 选择规则
public abstract class PredicateBasedRule {
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
// 获取断言配置
Optional<server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
9. ZoneAvoidancePredicate服务列表断言
public class ZoneAvoidancePredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {
return true;
}
// 还是获取区域配置,如是使用的 Nacos 直接返回true
String serverZone = input.getServer().getZone();
if (serverZone == null) {
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
// 区域高可用判断
...
}
}
扩展: ServerList 维护
初始化ServerList
在上文 6.创建LoadBalancer 的依赖要素,中 ServerList
目标服务的实例实例表,具体服务发现客户端实现。我们来看下 Nacos 的实现
public class NacosServerList extends AbstractServerList<nacosserver> {
@Override
public List<nacosserver> getInitialListOfServers() {
return getServers();
}
@Override
public List<nacosserver> getUpdatedListOfServers() {
return getServers();
}
private List<nacosserver> getServers() {
String group = discoveryProperties.getGroup();
//调用nacos-sdk 查询实例列表
List<instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
// 类型转换
return instancesToServerList(instances);
}
}
更新ServerListUpdater
ServerList 初始化后更新操作通过
PollingServerListUpdater
public class PollingServerListUpdater implements ServerListUpdater { @Override public synchronized void start(final UpdateAction updateAction) { // 更新任务 交给updateAction 具体实现 final Runnable wrapperRunnable = () -> { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); }; // 开启后台线程定时执行 updateAction scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } }
updateAction 实现
public void doUpdate() { DynamicServerListLoadBalancer.this.updateListOfServers(); }
public class PollingServerListUpdater implements ServerListUpdater { public void updateListOfServers() { List
servers = new ArrayList();
// 调用NacosServiceList 获取全部服务列表 servers = this.serverListImpl.getUpdatedListOfServers();
// 如果配置实例过滤器在执行过滤 if (this.filter != null) { servers = this.filter.getFilteredListOfServers((List)servers); }
// 更新LoadBalancer 服务列表 this.updateAllServerList((List)servers); } }
扩展: Server 状态维护
LoadBalancer 初始构造时会触发
setupPingTask()
public BaseLoadBalancer() { this.name = DEFAULT_NAME; this.ping = null; setRule(DEFAULT_RULE); // 开启ping 检查任务 setupPingTask(); lbStats = new LoadBalancerStats(DEFAULT_NAME); }
setupPingTask
void setupPingTask() { // 是否可以ping, 默认的DummyPing 直接 跳过不执行 if (canSkipPing()) { return; } // 执行PingTask lbTimer.schedule(new BaseLoadBalancer.PingTask(), 0, pingIntervalSeconds * 1000); // 开启任务 new BaseLoadBalancer.Pinger(pingStrategy).runPinger(); }
SerialPingStrategy 串行执行逻辑
// 串行调度执行 Iping 逻辑 private static class SerialPingStrategy implements IPingStrategy { @Override public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; for (int i = 0; i < numCandidates; i++) { results[i] = false; /* Default answer is DEAD. */ if (ping != null) { results[i] = ping.isAlive(servers[i]); } } return results; } }
调用url 判断可用性
public class PingUrl implements IPing { public boolean isAlive(Server server) { urlStr = urlStr + server.getId(); urlStr = urlStr + this.getPingAppendString(); boolean isAlive = false; HttpClient httpClient = new DefaultHttpClient(); HttpUriRequest getRequest = new HttpGet(urlStr); String content = null; HttpResponse response = httpClient.execute(getRequest); content = EntityUtils.toString(response.getEntity()); isAlive = response.getStatusLine().getStatusCode() == 200; return isAlive; } }
扩展: RibbonClient 懒加载处理
由上文可知,默认情况下 Ribbon 在第一次请求才会去创建 LoadBalancer
,这种懒加载机制会导致服务启动后,第一次调用服务延迟问题,甚至在整合 断路器(hystrix)等出现超时熔断 。
为了解决这个问题,我们会配置 Ribbon 的饥饿加载
ribbon:
eager-load:
clients:
- provider
RibbonApplicationContextInitializer
服务启动后自动调用 工厂提前创建需要的ribbon clientspublic class RibbonApplicationContextInitializer implements ApplicationListener
{ private final List clientNames; protected void initialize() { if (clientNames != null) { for (String clientName : clientNames) { this.springClientFactory.getContext(clientName); } } } @Override public void onApplicationEvent(ApplicationReadyEvent event) { initialize(); } }
后续计划
欢迎关注我,后边更新 Ribbon
、Hystrix
、Sentinel
、Nacos
等组件源码图文解析。
另注: 以上图片素材 (omnigraffle & 亿图) 可以在公众号 JAVA架构日记
获取
『★★★★★』 基于Spring Boot 2.2、 Spring Cloud Hoxton & Alibaba、 OAuth2 的RBAC 权限管理系统 > 项目推荐: Spring Cloud 、Spring Security OAuth2的RBAC权限管理系统 欢迎关注