Nacos源码分析系列

Stella981
• 阅读 1294

Nacos源码分析系列 - 服务续租

author:zxw

email:502513206@qq.com

@ Jishou University

@Nacos:https://nacos.io/zh-cn/docs/quick-start-spring-cloud.html


1.前言

之前分析了nacos的服务注册流程,接下来看看nacos的心跳检测机制。

2.面试题

  1. nacos默认每隔多久发送一次心跳检测?

    // 5000毫秒 DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);

  2. nacos默认心跳超时时长为多少

    // 15000毫秒 DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);

3.源码解析

之前在创建namingService对象时,其中有一个步骤就是创建beatRector对象,该对象就是用来检测我们服务实例心跳的。

this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));

该对象主要存储了两个实例,一个为注册中心的相关的serverProxy,一个是定时任务,用来检测心跳。

public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); }

在之前分析注册流程的时候其中注册时候,会构建该实例的BeatInfo然后启动心动发送的定时任务

@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { // 构建beatInfo BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); // 启动定时任务 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }

addBeatInfo

public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }

看看定时任务中做了什么,可以发现client想server端发送心跳请求,然后根据请求响应判断心跳发送结果,如果该实例未注册到server中则会发起一次注册请求

BeatTask

@Override public void run() { // 判断该实例是否停止 if (beatInfo.isStopped()) { return; } // 获取心跳发送时间间隔时长 long nextTime = beatInfo.getPeriod(); try { // 发起请求 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; // 判断间隔时间是否修改 if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } // 如果请求资源不存在,则将该实例注册到nacos中 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

} // 重新启动定时任务 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); }

接下来就看看server端是如何处理心跳请求的InstanceController

@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); // 添加{"clientBeatInterval":5000} result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); // 获取传入参数字符串 String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { // 转成对象 clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } // 获取集群名称,默认为Default String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); // 获取Ip String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); // 获取端口 int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); // 获取当前实例 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); // 如果该beatInfo还未注册,则进行注册 if (instance == null) { // 如果为空,则返回20404状态,让客户端发起注册请求 if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; }

Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "

  • "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);

instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral());

serviceManager.registerInstance(namespaceId, serviceName, instance); }

Service service = serviceManager.getService(namespaceId, serviceName);

if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } // 发送定时任务 service.processClientBeat(clientBeat); // 返回结果 result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }

service.processClientBeat(clientBeat);发起心跳定时任务,更新心跳时间为当前时间

public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); // 立即启动 HealthCheckReactor.scheduleNow(clientBeatProcessor); }

可以看到这里启动了一个ClinetBeatProcessor线程

@Override public void run() { // 请求的服务信息 Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); }

String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); // 获取当前cluster中的所有实例 List<Instance> instances = cluster.allIPs(true);

for (Instance instance : instances) { // 如果是本实例 if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } // 设置心跳为当前时间 instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } }

ClientBeatProcessor是用来更新我们服务实例的心跳的更新时间,并不具备检测心跳超时等任务,所以心跳超时的检查任务是由另外一个线程来完成,而这个线程则是在我们向nacos进行注册的时候启动的。让我们回顾一下注册流程中的步骤:

@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception {

final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

final Instance instance = parseInstance(request); // 重点看这里 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }

我们只看第一行,直接走到createServiceIfAbsent方法(中间省略了一些步骤)

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { ​ Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // 看这里 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }

serviceManager我们只看service.init的方法

private void putServiceAndInit(Service service) throws NacosException { putService(service); // 看这里,service的初始化方法 service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }

Service 可以看到Service在初始化的时候回启动一个HealthCheckReactor线程

public void init() { // 启动线程 HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }

ClientBeatCheckTask跟心跳检测的相关线程逻辑,默认为5秒

@Override public void run() { try { if (!getDistroMapper().responsible(service.getName())) { return; }

if (!getSwitchDomain().isHealthCheckEnabled()) { return; } // 获取当前服务集群下的所有实例 List<Instance> instances = service.allIPs(true);

// first set health status of instances: for (Instance instance : instances) { // 判断心跳是否超过15秒 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) {

if (instance.isHealthy()) { // 设置当前实例健康状态为false instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); // 事件回调通知修改该实例的状态,ServiceChangeEvent getPushService().serviceChanged(service); // 发布实例超时事件 ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } }

if (!getGlobalConfig().isExpireInstance()) { return; }

// then remove obsolete instances: for (Instance instance : instances) {

if (instance.isMarked()) { continue; } // 判断是否超过30秒 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } }

} catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); }

}

来我们来对nacos的心跳检测进行一下梳理

  1. 首先client进行注册的时候,会启动ReactorBeat中的定时任务,如果实例和心跳都被剔除,则会重新发起注册

  2. 启动ClientBeatProcessor任务,更新最后心跳时间为当前时间

  3. 启动ClientBeatCheckTask任务,检测服务实例是否超时

  4. 重复心跳定时任务

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
3年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
6小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(