Flynn的服务发现组件(discoverd)部署在所有的Flynn集群节点上,该组件使用了Raft协议保证数据的一致性,目前采用的是hashicorp的实现(https://github.com/hashicorp/raft),我们知道在Raft协议中,如果参与选举的节点太多,会导致性能下降,那是不是说Flynn不支持大规模的节点呢?
Flynn是能够支持大规模节点的,虽然discoverd组件部署在所有的节点上,但并不是所有的节点都参与选举,只有部分节点作为Raft集群的节点,其余节点作为代理节点(proxying),Flynn通过以下逻辑判断是否是代理节点,启动discoverd时指定的-peers参数起了关键作用。
// if the advertise addr is not in the peer list we are proxying
proxying := true
for _, addr := range m.peers {
if addr == m.advertiseAddr {
proxying = false
break
}
}
discoverd组件本身依赖Raft协议保证数据的一致性,这里提到的数据,在discoverd里指的是服务,我们可以把我们的服务注册到discoverd上,我们的这些服务可能也需要一个Leader节点,例如Flynn的调度组件(scheduler)就只能在Leader节点上执行调度任务。那是否这些服务也是依赖Raft协议来选择Leader节点呢?
注册到discoverd组件上的服务不依赖Raft协议选择Leader,discoverd组件根据注册时间的长短来选择Leader,活的最长的服务节点被选择为Leader。所有注册到discoverd组件上的服务必须向discoverd组件发送心跳信息,discoverd组件如果检测到某个服务节点没有了心跳信息,就会把该节点移除,如果该节点恰好是Leader节点,那么就会触发重新选择Leader的动作。
以下代码是心跳检查相关代码
// Open starts the raft consensus and opens the store.
func (s *Store) Open() error {
go s.expirer()
return nil
}
// expirer runs in a separate goroutine and checks for instance expiration.
func (s *Store) expirer() {
defer s.wg.Done()
ticker := time.NewTicker(s.ExpiryCheckInterval)
defer ticker.Stop()
for {
// Wait for next check or for close signal.
select {
case <-s.closing:
return
case <-ticker.C:
}
// Check all instances for expiration.
if err := s.EnforceExpiry(); err != nil && err != raft.ErrNotLeader {
s.logger.Printf("enforce expiry: %s", err)
}
}
}
// 以下代码有删减,仅保留主要逻辑
// EnforceExpiry checks all instances for expiration and issues an expiration command, if necessary.
// This function returns raft.ErrNotLeader if this store is not the current leader.
func (s *Store) EnforceExpiry() error {
var cmd []byte
// Ignore if this store is not the leader and hasn't been for at least 2 TTLs intervals.
if !s.IsLeader() {
return raft.ErrNotLeader
} else if s.leaderTime.IsZero() || time.Since(s.leaderTime) < (2*s.InstanceTTL) {
return ErrLeaderWait
}
// Iterate over services and then instances.
var instances []expireInstance
for service, m := range s.data.Instances {
for _, inst := range m {
// Ignore instances that have heartbeated within the TTL.
if t := s.heartbeats[instanceKey{service, inst.ID}]; time.Since(t) <= s.InstanceTTL {
continue
}
// Add to list of instances to expire.
// The current expiry time is added to prevent a race condition of
// instances updating their expiry date while this command is applying.
instances = append(instances, expireInstance{
Service: service,
InstanceID: inst.ID,
})
}
}
// Create command to expire instances.
cmd, err := json.Marshal(&expireInstancesCommand{
Instances: instances,
})
// Apply command to raft.
if _, err := s.raftApply(expireInstancesCommandType, cmd); err != nil {
return err
}
return nil
}
以下是出发重新选举的代码:
func (s *Store) Apply(l *raft.Log) interface{} {
// Extract the command type and data.
typ, cmd := l.Data[0], l.Data[1:]
// Determine the command type by the first byte.
switch typ {
case expireInstancesCommandType:
return s.applyExpireInstancesCommand(cmd)
default:
return fmt.Errorf("invalid command type: %d", typ)
}
}
func (s *Store) applyExpireInstancesCommand(cmd []byte) error {
var c expireInstancesCommand
if err := json.Unmarshal(cmd, &c); err != nil {
return err
}
// Iterate over instances and remove ones with matching expiry times.
services := make(map[string]struct{})
for _, expireInstance := range c.Instances {
// Remove instance.
delete(m, expireInstance.InstanceID)
// Broadcast down event.
s.broadcast(&discoverd.Event{
Service: expireInstance.Service,
Kind: discoverd.EventKindDown,
Instance: inst,
})
// Keep track of services invalidated.
services[expireInstance.Service] = struct{}{}
}
// Invalidate all services that had expirations.
for service := range services {
s.invalidateServiceLeader(service)
}
return nil
}
// invalidateServiceLeader updates the current leader of service.
func (s *Store) invalidateServiceLeader(service string) {
// Retrieve service config.
c := s.data.Services[service]
// Ignore if there is no config or the leader is manually elected.
if c == nil || c.LeaderType == discoverd.LeaderTypeManual {
return
}
// Retrieve current leader ID.
prevLeaderID := s.data.Leaders[service]
// Find the oldest, non-expired instance.
var leader *discoverd.Instance
for _, inst := range s.data.Instances[service] {
if leader == nil || inst.Index < leader.Index {
leader = inst
}
}
// Retrieve the leader ID.
var leaderID string
if leader != nil {
leaderID = leader.ID
}
// Set leader.
s.data.Leaders[service] = leaderID
// Broadcast event.
if prevLeaderID != leaderID {
var inst *discoverd.Instance
if s.data.Instances[service] != nil {
inst = s.data.Instances[service][leaderID]
}
s.broadcast(&discoverd.Event{
Service: service,
Kind: discoverd.EventKindLeader,
Instance: inst,
})
}
}
心跳信息是在哪里呢,估计都想到了,一定是在注册服务的那里。discoverd提供了对应的客户端,代码在discoverd/client里面,有个heartbeat.go就是专门来发心跳的。我们可以通过discoverd客户端的AddServiceAndRegister方法来完成服务注册功能
func (c *Client) AddServiceAndRegister(service, addr string) (Heartbeater, error) {
if err := c.maybeAddService(service); err != nil {
return nil, err
}
return c.Register(service, addr)
}
func (c *Client) Register(service, addr string) (Heartbeater, error) {
return c.RegisterInstance(service, &Instance{Addr: addr})
}
func (c *Client) RegisterInstance(service string, inst *Instance) (Heartbeater, error) {
h := newHeartbeater(c, service, inst)
err := runAttempts.Run(func() error {
firstErr := make(chan error)
go h.run(firstErr)
return <-firstErr
})
if err != nil {
return nil, err
}
return h, nil
}
func (h *heartbeater) run(firstErr chan<- error) {
path := fmt.Sprintf("/services/%s/instances/%s", h.service, h.inst.ID)
register := func() error {
h.Lock()
defer h.Unlock()
return h.client().Put(path, h.inst, nil)
}
timer := time.NewTimer(nextHeartbeat())
for {
select {
case <-timer.C:
if err := register(); err != nil {
timer.Reset(nextHeartbeatFailing())
break
}
timer.Reset(nextHeartbeat())
case <-h.stop:
h.client().Delete(path)
close(h.done)
return
}
}
}
discoverd也是以Http协议提供服务,比如通过GET /services/abc/leader来获取abc服务的Leader节点,当然,也可以使用SSE协议来监听abc服务的Leader变化事件。Flynn的调度组件(scheduler)就是采用SSE协议监听Leader节点的变化。
r.PUT("/services/:service/leader", h.servePutLeader)
r.GET("/services/:service/leader", h.serveGetLeader)