Flynn之Discoverd

Stella981
• 阅读 596

Flynn的服务发现组件(discoverd)部署在所有的Flynn集群节点上,该组件使用了Raft协议保证数据的一致性,目前采用的是hashicorp的实现(https://github.com/hashicorp/raft),我们知道在Raft协议中,如果参与选举的节点太多,会导致性能下降,那是不是说Flynn不支持大规模的节点呢?

Flynn是能够支持大规模节点的,虽然discoverd组件部署在所有的节点上,但并不是所有的节点都参与选举,只有部分节点作为Raft集群的节点,其余节点作为代理节点(proxying),Flynn通过以下逻辑判断是否是代理节点,启动discoverd时指定的-peers参数起了关键作用。

    // if the advertise addr is not in the peer list we are proxying
    proxying := true
    for _, addr := range m.peers {
        if addr == m.advertiseAddr {
            proxying = false
            break
        }
    }

discoverd组件本身依赖Raft协议保证数据的一致性,这里提到的数据,在discoverd里指的是服务,我们可以把我们的服务注册到discoverd上,我们的这些服务可能也需要一个Leader节点,例如Flynn的调度组件(scheduler)就只能在Leader节点上执行调度任务。那是否这些服务也是依赖Raft协议来选择Leader节点呢?

注册到discoverd组件上的服务不依赖Raft协议选择Leader,discoverd组件根据注册时间的长短来选择Leader,活的最长的服务节点被选择为Leader。所有注册到discoverd组件上的服务必须向discoverd组件发送心跳信息,discoverd组件如果检测到某个服务节点没有了心跳信息,就会把该节点移除,如果该节点恰好是Leader节点,那么就会触发重新选择Leader的动作。

以下代码是心跳检查相关代码

// Open starts the raft consensus and opens the store.
func (s *Store) Open() error {
    go s.expirer()

    return nil
}

// expirer runs in a separate goroutine and checks for instance expiration.
func (s *Store) expirer() {
    defer s.wg.Done()

    ticker := time.NewTicker(s.ExpiryCheckInterval)
    defer ticker.Stop()

    for {
        // Wait for next check or for close signal.
        select {
        case <-s.closing:
            return
        case <-ticker.C:
        }

        // Check all instances for expiration.
        if err := s.EnforceExpiry(); err != nil && err != raft.ErrNotLeader {
            s.logger.Printf("enforce expiry: %s", err)
        }
    }
}

// 以下代码有删减,仅保留主要逻辑
// EnforceExpiry checks all instances for expiration and issues an expiration command, if necessary.
// This function returns raft.ErrNotLeader if this store is not the current leader.
func (s *Store) EnforceExpiry() error {
    var cmd []byte
    // Ignore if this store is not the leader and hasn't been for at least 2 TTLs intervals.
    if !s.IsLeader() {
        return raft.ErrNotLeader
    } else if s.leaderTime.IsZero() || time.Since(s.leaderTime) < (2*s.InstanceTTL) {
        return ErrLeaderWait
    }

    // Iterate over services and then instances.
    var instances []expireInstance
    for service, m := range s.data.Instances {
        for _, inst := range m {
            // Ignore instances that have heartbeated within the TTL.
            if t := s.heartbeats[instanceKey{service, inst.ID}]; time.Since(t) <= s.InstanceTTL {
                continue
            }

            // Add to list of instances to expire.
            // The current expiry time is added to prevent a race condition of
            // instances updating their expiry date while this command is applying.
            instances = append(instances, expireInstance{
                Service:    service,
                InstanceID: inst.ID,
            })
        }
    }

    // Create command to expire instances.
    cmd, err := json.Marshal(&expireInstancesCommand{
        Instances: instances,
    })

    // Apply command to raft.
    if _, err := s.raftApply(expireInstancesCommandType, cmd); err != nil {
        return err
    }
    return nil
}

以下是出发重新选举的代码:

func (s *Store) Apply(l *raft.Log) interface{} {
    // Extract the command type and data.
    typ, cmd := l.Data[0], l.Data[1:]

    // Determine the command type by the first byte.
    switch typ {

    case expireInstancesCommandType:
        return s.applyExpireInstancesCommand(cmd)
    default:
        return fmt.Errorf("invalid command type: %d", typ)
    }
}

func (s *Store) applyExpireInstancesCommand(cmd []byte) error {
    var c expireInstancesCommand
    if err := json.Unmarshal(cmd, &c); err != nil {
        return err
    }

    // Iterate over instances and remove ones with matching expiry times.
    services := make(map[string]struct{})
    for _, expireInstance := range c.Instances {
        // Remove instance.
        delete(m, expireInstance.InstanceID)

        // Broadcast down event.
        s.broadcast(&discoverd.Event{
            Service:  expireInstance.Service,
            Kind:     discoverd.EventKindDown,
            Instance: inst,
        })

        // Keep track of services invalidated.
        services[expireInstance.Service] = struct{}{}
    }

    // Invalidate all services that had expirations.
    for service := range services {
        s.invalidateServiceLeader(service)
    }

    return nil
}

// invalidateServiceLeader updates the current leader of service.
func (s *Store) invalidateServiceLeader(service string) {
    // Retrieve service config.
    c := s.data.Services[service]

    // Ignore if there is no config or the leader is manually elected.
    if c == nil || c.LeaderType == discoverd.LeaderTypeManual {
        return
    }

    // Retrieve current leader ID.
    prevLeaderID := s.data.Leaders[service]

    // Find the oldest, non-expired instance.
    var leader *discoverd.Instance
    for _, inst := range s.data.Instances[service] {
        if leader == nil || inst.Index < leader.Index {
            leader = inst
        }
    }

    // Retrieve the leader ID.
    var leaderID string
    if leader != nil {
        leaderID = leader.ID
    }

    // Set leader.
    s.data.Leaders[service] = leaderID

    // Broadcast event.
    if prevLeaderID != leaderID {
        var inst *discoverd.Instance
        if s.data.Instances[service] != nil {
            inst = s.data.Instances[service][leaderID]
        }

        s.broadcast(&discoverd.Event{
            Service:  service,
            Kind:     discoverd.EventKindLeader,
            Instance: inst,
        })
    }
}

心跳信息是在哪里呢,估计都想到了,一定是在注册服务的那里。discoverd提供了对应的客户端,代码在discoverd/client里面,有个heartbeat.go就是专门来发心跳的。我们可以通过discoverd客户端的AddServiceAndRegister方法来完成服务注册功能

func (c *Client) AddServiceAndRegister(service, addr string) (Heartbeater, error) {
    if err := c.maybeAddService(service); err != nil {
        return nil, err
    }
    return c.Register(service, addr)
}

func (c *Client) Register(service, addr string) (Heartbeater, error) {
    return c.RegisterInstance(service, &Instance{Addr: addr})
}

func (c *Client) RegisterInstance(service string, inst *Instance) (Heartbeater, error) {
    h := newHeartbeater(c, service, inst)
    err := runAttempts.Run(func() error {
        firstErr := make(chan error)
        go h.run(firstErr)
        return <-firstErr
    })
    if err != nil {
        return nil, err
    }

    return h, nil
}

func (h *heartbeater) run(firstErr chan<- error) {
    path := fmt.Sprintf("/services/%s/instances/%s", h.service, h.inst.ID)
    register := func() error {
        h.Lock()
        defer h.Unlock()
        return h.client().Put(path, h.inst, nil)
    }

    timer := time.NewTimer(nextHeartbeat())
    for {
        select {
        case <-timer.C:
            if err := register(); err != nil {
                timer.Reset(nextHeartbeatFailing())
                break
            }
            timer.Reset(nextHeartbeat())
        case <-h.stop:
            h.client().Delete(path)
            close(h.done)
            return
        }
    }
}

discoverd也是以Http协议提供服务,比如通过GET /services/abc/leader来获取abc服务的Leader节点,当然,也可以使用SSE协议来监听abc服务的Leader变化事件。Flynn的调度组件(scheduler)就是采用SSE协议监听Leader节点的变化。

    r.PUT("/services/:service/leader", h.servePutLeader)
    r.GET("/services/:service/leader", h.serveGetLeader)
点赞
收藏
评论区
推荐文章
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
3年前
Raft 算法在分布式存储系统 Curve 中的实践
作为网易数帆开源的高性能、高可用、高可靠的新一代分布式存储系统,Curve对于多副本数据同步、负载均衡、容灾恢复方面都有较高的要求。网易数帆存储团队选用Raft算法作为Curve底层一致性协议,并基于Raft的特性,实现了异常情况下的数据迁移和自动恢复。本文首先简要介绍一下Raft算法的一些基本概念和术语,再详细介绍其在Curve中的实践。Raft一致性
Wesley13 Wesley13
3年前
Mysql半同步加orchestrator
Github基于Orchestrator,Consul和GLB实现高可用性目标。1.orchestrator用来运行故障监听和故障恢复。我们使用了如下图所示的一个跨数据中心的orchestrator/raft。2.Hashicorp公司的用于服务发现的Consul。使用Consul的KV存储器写入集群主节点的身份。对于每个集群,都有一套KV记录
Easter79 Easter79
3年前
TiKV 源码解析系列文章(十七)raftstore 概览
第一作者:李建俊,第二作者:杨哲轩,王聪TiKV作为一个分布式KV数据库,使用Raft算法来提供强一致性。Raft算法提供了单一group的一致性,但是单一group无法扩展和均衡。因此,TiKV采用了MultiRaft的方式基于Raft算法提供能兼顾一致性、扩展均衡的KV储存。下文以3.0版本代码为例,讲述raf
Wesley13 Wesley13
3年前
1. 容器化部署一套云服务 第一讲 Jenkins(Docker + Jenkins + Yii2 + 云服务器))
容器化部署一套云服务系列1\.容器化部署一套云服务之Jenkins(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2Fjackson0714%2Fp%2Fdeploy1.html)一、购买服务器服务器!caeef00
Stella981 Stella981
3年前
Raft 与 Paxos的区别
RaftRaft概述        Raft一致性算法用于保证在分布式的条件下,所有的节点可以执行相同的命令序列,并达到一致的状态。这类的问题可以归结为“Replicatedstatemachines”问题。!关于Raft一致性协议的概要(http://static.oschina.net/uploads/img/
Stella981 Stella981
3年前
Essential Studio for UWP发布2017 v2,新增甘特图控件
EssentialStudioforUWP(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.evget.com%2Fproduct%2F3894)是包含有35组件的综合套包,包括最快的图表和网格组件。所有组件根据当前被呈现的设备系列自适应渲染。EssentialStu
Stella981 Stella981
3年前
RocketMQ 多副本前置篇:初探raft协议
Raft协议是分布式领域解决一致性的又一著名协议,主要包含Leader选举、日志复制两个部分。温馨提示:本文根据raft官方给出的raft动画进行学习,其动画展示地址:http://thesecretlivesofdata.com/raft/(https://www.oschina.net/action/GoToLink?urlhttp%3A
Stella981 Stella981
3年前
Raft分布式一致性算法原理(选举和同步)
Raft分布式一致性算法原理(选举和同步)一.背景在集群环境下,很容易出现单节点故障的问题,那么我们就需要进行集群部署,但是当集群部署的环境下,我们如何保证工作有序的调度与通信并且保证一致性呢,当客户端发送一连串指令,我们需要在集群环境下,所有服务机器最终要保证一致性,而且在出现一系列异常并且恢复
Stella981 Stella981
3年前
CocosCreator 代码组件(创建与使用、cc.Class类型、生命周期函数)(第三篇)
前言:在前面一篇中讲解了场景的搭建,现在开始介绍一个重要的部分代码组件,通过在不同节点上挂载不同逻辑功能的代码组件来实现游戏的开发。VSCode下载链接:https://code.visualstudio.com/(https://www.oschina.net/action/GoToLink?urlhttps%3A%