Nacos源码分析系列 - 服务续租
author:zxw
email:502513206@qq.com
@ Jishou University
@Nacos:https://nacos.io/zh-cn/docs/quick-start-spring-cloud.html
1.前言
之前分析了nacos的服务注册流程,接下来看看nacos的心跳检测机制。
2.面试题
nacos默认每隔多久发送一次心跳检测?
// 5000毫秒 DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
nacos默认心跳超时时长为多少
// 15000毫秒 DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
3.源码解析
之前在创建namingService对象时,其中有一个步骤就是创建beatRector对象,该对象就是用来检测我们服务实例心跳的。
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
该对象主要存储了两个实例,一个为注册中心的相关的serverProxy,一个是定时任务,用来检测心跳。
public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); }
在之前分析注册流程的时候其中注册时候,会构建该实例的BeatInfo然后启动心动发送的定时任务
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { // 构建beatInfo BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); // 启动定时任务 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
addBeatInfo
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }
看看定时任务中做了什么,可以发现client想server端发送心跳请求,然后根据请求响应判断心跳发送结果,如果该实例未注册到server中则会发起一次注册请求
BeatTask
@Override public void run() { // 判断该实例是否停止 if (beatInfo.isStopped()) { return; } // 获取心跳发送时间间隔时长 long nextTime = beatInfo.getPeriod(); try { // 发起请求 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; // 判断间隔时间是否修改 if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } // 如果请求资源不存在,则将该实例注册到nacos中 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} // 重新启动定时任务 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); }
接下来就看看server端是如何处理心跳请求的InstanceController
@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); // 添加{"clientBeatInterval":5000} result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); // 获取传入参数字符串 String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { // 转成对象 clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } // 获取集群名称,默认为Default String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); // 获取Ip String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); // 获取端口 int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); // 获取当前实例 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); // 如果该beatInfo还未注册,则进行注册 if (instance == null) { // 如果为空,则返回20404状态,让客户端发起注册请求 if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; }
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
- "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance); }
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } // 发送定时任务 service.processClientBeat(clientBeat); // 返回结果 result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }
service.processClientBeat(clientBeat);
发起心跳定时任务,更新心跳时间为当前时间
public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); // 立即启动 HealthCheckReactor.scheduleNow(clientBeatProcessor); }
可以看到这里启动了一个ClinetBeatProcessor线程
@Override public void run() { // 请求的服务信息 Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); }
String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); // 获取当前cluster中的所有实例 List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) { // 如果是本实例 if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } // 设置心跳为当前时间 instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } }
ClientBeatProcessor
是用来更新我们服务实例的心跳的更新时间,并不具备检测心跳超时等任务,所以心跳超时的检查任务是由另外一个线程来完成,而这个线程则是在我们向nacos进行注册的时候启动的。让我们回顾一下注册流程中的步骤:
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception {
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final Instance instance = parseInstance(request); // 重点看这里 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
我们只看第一行,直接走到createServiceIfAbsent
方法(中间省略了一些步骤)
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // 看这里 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
serviceManager
我们只看service.init的方法
private void putServiceAndInit(Service service) throws NacosException { putService(service); // 看这里,service的初始化方法 service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
Service
可以看到Service在初始化的时候回启动一个HealthCheckReactor
线程
public void init() { // 启动线程 HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
ClientBeatCheckTask
跟心跳检测的相关线程逻辑,默认为5秒
@Override public void run() { try { if (!getDistroMapper().responsible(service.getName())) { return; }
if (!getSwitchDomain().isHealthCheckEnabled()) { return; } // 获取当前服务集群下的所有实例 List<Instance> instances = service.allIPs(true);
// first set health status of instances: for (Instance instance : instances) { // 判断心跳是否超过15秒 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) {
if (instance.isHealthy()) { // 设置当前实例健康状态为false instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); // 事件回调通知修改该实例的状态,ServiceChangeEvent getPushService().serviceChanged(service); // 发布实例超时事件 ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } }
if (!getGlobalConfig().isExpireInstance()) { return; }
// then remove obsolete instances: for (Instance instance : instances) {
if (instance.isMarked()) { continue; } // 判断是否超过30秒 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } }
} catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); }
}
来我们来对nacos的心跳检测进行一下梳理
首先client进行注册的时候,会启动ReactorBeat中的定时任务,如果实例和心跳都被剔除,则会重新发起注册
启动ClientBeatProcessor任务,更新最后心跳时间为当前时间
启动ClientBeatCheckTask任务,检测服务实例是否超时
重复心跳定时任务