本文是对Kubernetes V1.3发布的新Garbage Collector模块的源码解读。实际上本文的是基于kubernetes v1.4的代码进行分析的,和V1.3没有大的改动。GC worker的默认值从V1.3中的5调整为V1.4中的20。阅读本文前,请先阅读Kubernetes GC in v1.3。如果可以,先把kubernetes gc pr多看几遍,或许你根本不需要看我的这两篇博客。
Garbage Collector的定义
GarbageCollector is responsible for carrying out cascading deletion, and removing ownerReferences from the dependents if the owner is deleted with DeleteOptions.OrphanDependents=true.
type GarbageCollector struct {
    restMapper meta.RESTMapper
    // metaOnlyClientPool uses a special codec, which removes fields except for
    // apiVersion, kind, and metadata during decoding.
    metaOnlyClientPool dynamic.ClientPool
    // clientPool uses the regular dynamicCodec. We need it to update
    // finalizers. It can be removed if we support patching finalizers.
    clientPool                       dynamic.ClientPool
    dirtyQueue                       *workqueue.TimedWorkQueue
    orphanQueue                      *workqueue.TimedWorkQueue
    monitors                         []monitor
    propagator                       *Propagator
    clock                            clock.Clock
    registeredRateLimiter            *RegisteredRateLimiter
    registeredRateLimiterForMonitors *RegisteredRateLimiter
    // GC caches the owners that do not exist according to the API server.
    absentOwnerCache *UIDCache
}
接下来看看创建Garbage Collector的逻辑,主要看Propagator和monitors的构造。
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, deletableResources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) {
    gc := &GarbageCollector{
        metaOnlyClientPool:               metaOnlyClientPool,
        clientPool:                       clientPool,
        restMapper:                       mapper,
        clock:                            clock.RealClock{},
        dirtyQueue:                       workqueue.NewTimedWorkQueue(),
        orphanQueue:                      workqueue.NewTimedWorkQueue(),
        registeredRateLimiter:            NewRegisteredRateLimiter(deletableResources),
        registeredRateLimiterForMonitors: NewRegisteredRateLimiter(deletableResources),
        absentOwnerCache:                 NewUIDCache(500),
    }
    gc.propagator = &Propagator{
        eventQueue: workqueue.NewTimedWorkQueue(),
        uidToNode: &concurrentUIDToNode{
            RWMutex:   &sync.RWMutex{},
            uidToNode: make(map[types.UID]*node),
        },
        gc: gc,
    }
    for resource := range deletableResources {
        if _, ok := ignoredResources[resource]; ok {
            glog.V(6).Infof("ignore resource %#v", resource)
            continue
        }
        kind, err := gc.restMapper.KindFor(resource)
        if err != nil {
            return nil, err
        }
        monitor, err := gc.monitorFor(resource, kind)
        if err != nil {
            return nil, err
        }
        gc.monitors = append(gc.monitors, monitor)
    }
    return gc, nil
}
注意在构建monitors的过程中,需要屏蔽以下Resources的monitor。
var ignoredResources = map[schema.GroupVersionResource]struct{}{
    schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicationcontrollers"}:              {},
    schema.GroupVersionResource{Group: "", Version: "v1", Resource: "bindings"}:                                           {},
    schema.GroupVersionResource{Group: "", Version: "v1", Resource: "componentstatuses"}:                                  {},
    schema.GroupVersionResource{Group: "", Version: "v1", Resource: "events"}:                                             {},
    schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1beta1", Resource: "tokenreviews"}:             {},
    schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "subjectaccessreviews"}:      {},
    schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "selfsubjectaccessreviews"}:  {},
    schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "localsubjectaccessreviews"}: {},
}
启动Garbage Collector
- 每个monitor的controller都启动一个go协程运行,10s遍历一次所有monitor是否已经完成监控资源的同步。 
- 启动go协程运行Propagator的processEvent方法,开始逐条处理Propagator的Event Queue中的Event。 
- 默认启动20个GC worker(GarbageCollector.worker())对Dirty Queue中的Resources进行处理。 
- 默认启动20个GC orphanFinalizer (GarbageCollector.orphanFinalizer())对orphan Queue中的Resources进行处理。 - func (gc GarbageCollector) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Garbage Collector: Initializing") for _, monitor := range gc.monitors { go monitor.controller.Run(stopCh) } wait.PollInfinite(10time.Second, func() (bool, error) { for _, monitor := range gc.monitors { if !monitor.controller.HasSynced() { glog.Infof("Garbage Collector: Waiting for resource monitors to be synced...") return false, nil } } return true, nil }) glog.Infof("Garbage Collector: All monitored resources synced. Proceeding to collect garbage") // worker go wait.Until(gc.propagator.processEvent, 0, stopCh) for i := 0; i < workers; i++ { go wait.Until(gc.worker, 0, stopCh) go wait.Until(gc.orphanFinalizer, 0, stopCh) } Register() <-stopCh glog.Infof("Garbage Collector: Shutting down") gc.dirtyQueue.ShutDown() gc.orphanQueue.ShutDown() gc.propagator.eventQueue.ShutDown() } 
GC worker(Garbage Processor)
GC worker(Garbage Processor) 对dirty queue的处理逻辑:
- Consists of the Dirty Queue and workers.
- Each worker:
- Dequeues an item from Dirty Queue.
- If the item's OwnerReferences is empty, continues to process the next item in the Dirty Queue.
- Otherwise checks each entry in the OwnerReferences: - If at least one owner exists, do nothing. - If none of the owners exist, requests the API server to delete the item.
简单的说,GC worker就是在delete object的时候,先去遍历所有owner,只有当所有owner都已经被delete之后,才会请求api server去delete该object
func (gc *GarbageCollector) worker() {
    timedItem, quit := gc.dirtyQueue.Get()
    if quit {
        return
    }
    defer gc.dirtyQueue.Done(timedItem)
    err := gc.processItem(timedItem.Object.(*node))
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", timedItem.Object, err))
        // retry if garbage collection of an object failed.
        gc.dirtyQueue.Add(timedItem)
        return
    }
    DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}
func (gc *GarbageCollector) processItem(item *node) error {
    // Get the latest item from the API server
    latest, err := gc.getObject(item.identity)
    if err != nil {
        if errors.IsNotFound(err) {
            // the Propagator can add "virtual" node for an owner that doesn't
            // exist yet, so we need to enqueue a virtual Delete event to remove
            // the virtual node from Propagator.uidToNode.
            glog.V(6).Infof("item %v not found, generating a virtual delete event", item.identity)
            event := &event{
                eventType: deleteEvent,
                obj:       objectReferenceToMetadataOnlyObject(item.identity),
            }
            glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
            gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
            return nil
        }
        return err
    }
    if latest.GetUID() != item.identity.UID {
        glog.V(6).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity)
        event := &event{
            eventType: deleteEvent,
            obj:       objectReferenceToMetadataOnlyObject(item.identity),
        }
        glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
        gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
        return nil
    }
    ownerReferences := latest.GetOwnerReferences()
    if len(ownerReferences) == 0 {
        glog.V(6).Infof("object %s's doesn't have an owner, continue on next item", item.identity)
        return nil
    }
    // TODO: we need to remove dangling references if the object is not to be
    // deleted.
    for _, reference := range ownerReferences {
        if gc.absentOwnerCache.Has(reference.UID) {
            glog.V(6).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
            continue
        }
        // TODO: we need to verify the reference resource is supported by the
        // system. If it's not a valid resource, the garbage collector should i)
        // ignore the reference when decide if the object should be deleted, and
        // ii) should update the object to remove such references. This is to
        // prevent objects having references to an old resource from being
        // deleted during a cluster upgrade.
        fqKind := schema.FromAPIVersionAndKind(reference.APIVersion, reference.Kind)
        client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
        if err != nil {
            return err
        }
        resource, err := gc.apiResource(reference.APIVersion, reference.Kind, len(item.identity.Namespace) != 0)
        if err != nil {
            return err
        }
        owner, err := client.Resource(resource, item.identity.Namespace).Get(reference.Name)
        if err == nil {
            if owner.GetUID() != reference.UID {
                glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
                gc.absentOwnerCache.Add(reference.UID)
                continue
            }
            glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID)
            return nil
        } else if errors.IsNotFound(err) {
            gc.absentOwnerCache.Add(reference.UID)
            glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
        } else {
            return err
        }
    }
    glog.V(2).Infof("none of object %s's owners exist any more, will garbage collect it", item.identity)
    return gc.deleteObject(item.identity)
}
##Propagator **Propagator**中的single worker对Event Queue的处理逻辑见如下代码。再对比kubernetes gc pr对Propagator的功能描述。我就不多说废话了!
- The Propagator is for optimization, not for correctness. 
- Consists of an Event Queue, a single worker, and a DAG of owner-dependent relations. 
- The DAG stores only name/uid/orphan triplets, not the entire body of every item. 
- Watches for create/update/delete events for all resources, enqueues the events to the Event Queue. 
- Worker: 
- Dequeues an item from the Event Queue. 
- If the item is an creation or update, then updates the DAG accordingly. - If the object has an owner and the owner doesn’t exist in the DAG yet, then apart from adding the object to the DAG, also enqueues the object to the Dirty Queue. 
- If the item is a deletion, then removes the object from the DAG, and enqueues all its dependent objects to the Dirty Queue. 
- The propagator shouldn't need to do any RPCs, so a single worker should be sufficient. This makes locking easier. 
- With the Propagator, we only need to run the Scanner when starting the GC to populate the DAG and the Dirty Queue. - // Dequeueing an event from eventQueue, updating graph, populating dirty_queue. func (p *Propagator) processEvent() { timedItem, quit := p.eventQueue.Get() if quit { return } defer p.eventQueue.Done(timedItem) event, ok := timedItem.Object.(*event) if !ok { utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", timedItem.Object)) return } obj := event.obj accessor, err := meta.Accessor(obj) if err != nil { utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) return } typeAccessor, err := meta.TypeAccessor(obj) if err != nil { utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) return } glog.V(6).Infof("Propagator process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType) // Check if the node already exsits existingNode, found := p.uidToNode.Read(accessor.GetUID()) switch { case (event.eventType == addEvent || event.eventType == updateEvent) && !found: newNode := &node{ identity: objectReference{ OwnerReference: metav1.OwnerReference{ APIVersion: typeAccessor.GetAPIVersion(), Kind: typeAccessor.GetKind(), UID: accessor.GetUID(), Name: accessor.GetName(), }, Namespace: accessor.GetNamespace(), }, dependents: make(map[*node]struct{}), owners: accessor.GetOwnerReferences(), } p.insertNode(newNode) // the underlying delta_fifo may combine a creation and deletion into one event if shouldOrphanDependents(event, accessor) { glog.V(6).Infof("add %s to the orphanQueue", newNode.identity) p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: newNode}) } case (event.eventType == addEvent || event.eventType == updateEvent) && found: // caveat: if GC observes the creation of the dependents later than the // deletion of the owner, then the orphaning finalizer won't be effective. if shouldOrphanDependents(event, accessor) { glog.V(6).Infof("add %s to the orphanQueue", existingNode.identity) p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: existingNode}) } // add/remove owner refs added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences()) if len(added) == 0 && len(removed) == 0 { glog.V(6).Infof("The updateEvent %#v doesn't change node references, ignore", event) return } // update the node itself existingNode.owners = accessor.GetOwnerReferences() // Add the node to its new owners' dependent lists. p.addDependentToOwners(existingNode, added) // remove the node from the dependent list of node that are no long in // the node's owners list. p.removeDependentFromOwners(existingNode, removed) case event.eventType == deleteEvent: if !found { glog.V(6).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID()) return } p.removeNode(existingNode) existingNode.dependentsLock.RLock() defer existingNode.dependentsLock.RUnlock() if len(existingNode.dependents) > 0 { p.gc.absentOwnerCache.Add(accessor.GetUID()) } for dep := range existingNode.dependents { p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: dep}) } } EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, timedItem.StartTime)) } 
##orphanFinalizer orphanFinalizer dequeues a node from the orphanQueue, then finds its dependents based on the graph maintained by the GC, then removes it from the OwnerReferences of its dependents, and finally updates the owner to remove the "Orphan" finalizer. The node is add back into the orphanQueue if any of these steps fail. 阅读如下代码的同时,对比kubernetes gc pr对OrphanFinalizer的功能描述。
简单的说,orphanFinalizer最重要的就是通过owners和dependents的对应关系,在delete object的时候,去把自己从它的dependents object的ownerReference中删除。
- Watches for update events that meet two conditions: 
- the updated object has the identifier of the finalizer in ObjectMeta.Finalizers; 
- ObjectMeta.DeletionTimestamp is updated from nil to non-nil. 
- Removes the object in the event from the OwnerReferences of its dependents. 
- dependent objects can be found via the DAG kept by the GC, or by relisting the dependent resource and checking the OwnerReferences field of each potential dependent object. 
- Also removes any dangling owner references the dependent objects have. 
- At last, removes the itself from the ObjectMeta.Finalizers of the object. - func (gc *GarbageCollector) orphanFinalizer() { timedItem, quit := gc.orphanQueue.Get() if quit { return } defer gc.orphanQueue.Done(timedItem) owner, ok := timedItem.Object.(*node) if !ok { utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", timedItem.Object)) } // we don't need to lock each element, because they never get updated owner.dependentsLock.RLock() dependents := make([]*node, 0, len(owner.dependents)) for dependent := range owner.dependents { dependents = append(dependents, dependent) } owner.dependentsLock.RUnlock() err := gc.orhpanDependents(owner.identity, dependents) if err != nil { glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err) gc.orphanQueue.Add(timedItem) return } // update the owner, remove "orphaningFinalizer" from its finalizers list err = gc.removeOrphanFinalizer(owner) if err != nil { glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err) gc.orphanQueue.Add(timedItem) } OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime)) } 
##Gabarbage Collector is a controller started by controller-manager kube-controller-manager组件启动的时候,会启动所有配置的controller,包括garbagecollector,replicationcontroller,horizontalpodautoscaling等。
/Users/garnett/workspace/go/src/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
// Run runs the CMServer.  This should never exit.
func Run(s *options.CMServer) error {
    ...
        err := StartControllers(newControllerInitializers(), s, rootClientBuilder, clientBuilder, stop)
        glog.Fatalf("error running controllers: %v", err)
        panic("unreachable")
    ...
}
func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
    for controllerName, initFn := range controllers {
            time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
            ...
            started, err := initFn(ctx)
            ...
        }
}
func newControllerInitializers() map[string]InitFunc {
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefuleset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["certificatesigningrequests"] = startCSRController
    return controllers
}
对于garbage collector的启动,则是调用startGarbageCollectorController方法,最终启动一个go协程执行前面分析过的garbageCollector.Run(workers, ctx.Stop)。
/Users/garnett/workspace/go/src/k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go
func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
    ...
    go garbageCollector.Run(workers, ctx.Stop)
    return true, nil
}
前面提到,默认会启动20个Garbage worker & orphanFinalizer,这是在controller manager创建的时候指定的该默认值。注意,在V1.3版本中,默认值为5。
/Users/garnett/workspace/go/src/k8s.io/kubernetes/cmd/kube-controller-manager/app/options/options.go
// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
    s := CMServer{
        KubeControllerManagerConfiguration: componentconfig.KubeControllerManagerConfiguration{
        ...
        ConcurrentGCSyncs:       20,
        ...
    }
    ...
}
End-to-End Examples
最后,最为补充,我不得不把kubernetes gc pr中的example拿出来,这个是理解GC整个流程最好的东西。
This section presents an example of all components working together to enforce the cascading deletion or orphaning.
Life of a Deployment and its descendants
- User creates a deployment D1.
- The Propagator of the GC observes the creation. It creates an entry of D1in the DAG.
- The deployment controller observes the creation of D1. It creates the replicasetR1, whose OwnerReferences field contains a reference toD1, and has the "orphan" finalizer in its ObjectMeta.Finalizers map.
- The Propagator of the GC observes the creation of R1. It creates an entry ofR1in the DAG, withD1as its owner.
- The replicaset controller observes the creation of R1and creates PodsP1~`Pn, all withR1` in their OwnerReferences.
- The Propagator of the GC observes the creation of P1~`Pn. It creates entries for them in the DAG, withR1` as their owner.
In case the user wants to cascadingly delete D1's descendants, then
- The user deletes the deployment D1, withDeleteOptions.OrphanDependents=false. API server checks ifD1has "orphan" finalizer in its Finalizers map, if so, it updatesD1to remove the "orphan" finalizer. Then API server deletesD1.
- The "orphan" finalizer does not take any action, because the observed deletion shows D1has an empty Finalizers map.
- The Propagator of the GC observes the deletion of D1. It deletesD1from the DAG. It adds its dependent object, replicasetR1, to the dirty queue.
- The Garbage Processor of the GC dequeues R1from the dirty queue. It findsR1has an owner reference pointing toD1, andD1no longer exists, so it requests API server to deleteR1, withDeleteOptions.OrphanDependents=false. (The Garbage Processor should always set this field to false.)
- The API server updates R1to remove the "orphan" finalizer if it's in theR1's Finalizers map. Then the API server deletesR1, asR1has an empty Finalizers map.
- The Propagator of the GC observes the deletion of R1. It deletesR1from the DAG. It adds its dependent objects, PodsP1~`Pn`, to the Dirty Queue.
- The Garbage Processor of the GC dequeues Px(1 <= x <= n) from the Dirty Queue. It finds thatPxhave an owner reference pointing toD1, andD1no longer exists, so it requests API server to deletePx, withDeleteOptions.OrphanDependents=false.
- API server deletes the Pods.
In case the user wants to orphan D1's descendants, then
- The user deletes the deployment D1, withDeleteOptions.OrphanDependents=true.
- The API server first updates D1, with DeletionTimestamp=now and DeletionGracePeriodSeconds=0, increments the Generation by 1, and add the "orphan" finalizer to ObjectMeta.Finalizers if it's not present yet. The API server does not deleteD1, because its Finalizers map is not empty.
- The deployment controller observes the update, and acknowledges by updating the D1's ObservedGeneration. The deployment controller won't create more replicasets onD1's behalf.
- The "orphan" finalizer observes the update, and notes down the Generation. It waits until the ObservedGeneration becomes equal to or greater than the noted Generation. Then it updates R1to removeD1from its OwnerReferences. At last, it updatesD1, removing itself fromD1's Finalizers map.
- The API server handles the update of D1, because i) DeletionTimestamp is non-nil, ii) the DeletionGracePeriodSeconds is zero, and iii) the last finalizer is removed from the Finalizers map, API server deletesD1.
- The Propagator of the GC observes the deletion of D1. It deletesD1from the DAG. It adds its dependent, replicasetR1, to the Dirty Queue.
- The Garbage Processor of the GC dequeues R1from the Dirty Queue and skips it, because its OwnerReferences is empty.
 
  
  
  
 
 
  
 
 
 