Spring Cloud Eureka 全解 (3)

Stella981
• 阅读 753

本文基于SpringCloud-Dalston.SR5

关于服务注册

开启/关闭服务注册配置:eureka.client.register-with-eureka = true (默认)

什么时候注册?

  1. 应用第一次启动时,初始化EurekaClient时,应用状态改变:从STARTING变为UP会触发这个Listener,调用instanceInfoReplicator.onDemandUpdate(); 可以推测出,实例状态改变时,也会通过注册接口更新实例状态信息

    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } };

  2. 定时任务,如果InstanceInfo发生改变,也会通过注册接口更新信息

    public void run() { try { discoveryClient.refreshInstanceInfo(); //如果实例信息发生改变,则需要调用register更新InstanceInfo Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }

  3. 在定时renew时,如果renew接口返回404(代表这个实例在EurekaServer上面找不到),可能是之前注册失败或者注册过期导致的。这时需要调用register重新注册

    boolean renew() { EurekaHttpResponse httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); //如果renew接口返回404(代表这个实例在EurekaServer上面找不到),可能是之前注册失败或者注册过期导致的 if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }

向Eureka发送注册请求EurekaServer发生了什么?

主要有两个存储,一个是之前提到过的registry,还有一个最近变化队列,后面我们会知道,这个最近变化队列里面就是客户端获取增量实例信息的内容:

# 整体注册信息缓存
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
# 最近变化队列
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>(); 

EurekaServer收到实例注册主要分两步:

  • 调用父类方法注册

  • 同步到其他EurekaServer实例

    public void register(InstanceInfo info, boolean isReplication) { int leaseDuration = 90; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //调用父类方法注册 super.register(info, leaseDuration, isReplication); //同步到其他EurekaServer实例 this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication); }

我们先看同步到其他EurekaServer实例

其实就是,注册到的EurekaServer再依次调用其他集群内的EurekaServer的Register方法将实例信息同步过去

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

然后看看调用父类方法注册:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        //register虽然看上去好像是修改,但是这里用的是读锁,后面会解释
        read.lock();
        //从registry中查看这个app是否存在
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        //不存在就创建
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        //查看这个app的这个实例是否已存在
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        
        if (existingLease != null && (existingLease.getHolder() != null)) {
            //如果已存在,对比时间戳,保留比较新的实例信息......
        } else {
            // 如果不存在,证明是一个新的实例
            //更新自我保护监控变量的值的代码.....
            
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        //放入registry
        gMap.put(registrant.getId(), lease);
        
        //加入最近修改的记录队列
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        //初始化状态,记录时间等相关代码......
        
        //主动让Response缓存失效
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    } finally {
        read.unlock();
    }
}
 

总结起来,就是主要三件事:

1.将实例注册信息放入或者更新registry

2.将实例注册信息加入最近修改的记录队列

3.主动让Response缓存失效

我们来类比下服务取消

服务取消CANCEL

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        //cancel虽然看上去好像是修改,但是这里用的是读锁,后面会解释
        read.lock();
        
        //从registry中剔除这个实例
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        if (leaseToCancel == null) {
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            //改变状态,记录状态修改时间等相关代码......
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                //加入最近修改的记录队列
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
            }
            //主动让Response缓存失效
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            return true;
        }
    } finally {
        read.unlock();
    }
}

总结起来,也是主要三件事:

1.从registry中剔除这个实例

2.将实例注册信息加入最近修改的记录队列

3.主动让Response缓存失效

这里我们注意到了这个最近修改队列,我们来详细看看

最近修改队列

这个最近修改队列和消费者定时获取服务实例列表有着密切的关系

private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }

    };
}

这个RetentionTimeInMSInDeltaQueue默认是180s(配置是eureka.server.retention-time-in-m-s-in-delta-queue,默认是180s,官网写错了),可以看出这个队列是一个长度为180s的滑动窗口,保存最近180s以内的应用实例信息修改,后面我们会看到,客户端调用获取增量信息,实际上就是从这个queue中读取,所以可能一段时间内读取到的信息都是一样的。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Golang注册Eureka的工具包goeureka发布
1.简介提供Go微服务客户端注册到Eureka中心。点击:github地址(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fgithub.com%2FSimonWang00%2Fgoeureka),欢迎各位多多star!(已通过测试验证,用于正式生产部署)2.原理
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这