Kubernetes 源码剖析 — kubelet PodWorker

本文基于Kubernetes v1.21.2, commit sha 为 092fbfbf53427de67cac1e9fa54aaa09a28371d7

承接上文,我们来继续看 syncLoop 中的内容

pkg/kubelet/kubelet.go:1845

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {  
    plegCh := kl.pleg.Watch()
    // Responsible for checking limits in resolv.conf
    // The limits do not have anything to do with individual pods
    // Since this is called in syncLoop, we don't need to call it anywhere else
    if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
        // 1. 打开 /etc/resolves.conf 并解析
        kl.dnsConfigurer.CheckLimitsForResolvConf()
    }

    for {
        // 2. .. 省略: 如果自身状态异常,那么会进行一个初始值为 100 毫秒,上限为 5 秒的二进制指数退避 sleep
        // 3. 执行主要逻辑,从不同的 channel 中接收消息并分发处理
        // syncTicker 是 duration 为 1 秒的 Ticker; housekeepingTicker 则是 2 秒
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
    }
}

这里对于 DNS 的配置文件进行检查,CheckLimitsForResolvConf 做了以下的事情

  1. 打开 /etc/resolv.conf, 并解析。如果出错则打印日志并产生一条 Event
  2. 如果 search path 上的 domain 的大于 MaxDNSSearchPaths 则打印日志并产生一条 Event。MaxDNSSearchPaths 的值为 6
  3. 如果 search path 的长度超过 MaxDNSSearchListChars 则打印日志病产生一条 Event, MaxDNSSearchListChars 的值为 256

/etc/resolv.conf 的内容如下:

$ cat /etc/resolv.conf 
search lan  
nameserver 192.168.199.1  

对于 seach 项进行检查是因为 Linux 的低版本的 glibc 限制,参考 https://man7.org/linux/man-pages/man5/resolv.conf.5.html

In glibc 2.25 and earlier, the search list is limited to six domains with a total of 256 characters. Since glibc 2.26, the search list is unlimited.

kubelet 的核心逻辑位于 pkg/kubelet/kubelet.go:1919 的 syncLoopIteration 函数中,不过本篇我们先略过这个,来看看 kubelet 的 PodWorker

func (kl *Kubelet) syncLoopIteration(  
    configCh <-chan kubetypes.PodUpdate, 
    handler SyncHandler,
    syncCh <-chan time.Time, 
    housekeepingCh <-chan time.Time, 
    plegCh <-chan *pleg.PodLifecycleEvent
) bool {
    select {
    case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
            klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
            return false
        }

        switch u.Op {
        case kubetypes.ADD:
            klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
            // After restarting, kubelet will get all existing pods through
            // ADD as if they are new pods. These pods will then go through the
            // admission process and *may* be rejected. This can be resolved
            // once we have checkpointing.
            handler.HandlePodAdditions(u.Pods)
        // .. 省略其他事件类型
        default:
            klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
        }
    // .. 省略: 部分 channel 的消息处理
    }
    return true
}

所有的 handler.HandlePodXXX 的函数最终都会调用 dispatchWork 这个函数进行任务的分发处理

pkg/kubelet/kubelet.go:2032

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {  
    // check whether we are ready to delete the pod from the API server (all status up to date)
    containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
    if pod.DeletionTimestamp != nil && containersTerminal {
        klog.V(4).InfoS("Pod has completed execution and should be deleted from the API server", "pod", klog.KObj(pod), "syncType", syncType)
        kl.statusManager.TerminatePod(pod)
        return
    }

    // optimization: avoid invoking the pod worker if no further changes are possible to the pod definition
    // (i.e. the pod has completed and its containers have been terminated)
    if podWorkerTerminal && containersTerminal {
        klog.V(4).InfoS("Pod has completed and its containers have been terminated, ignoring remaining sync work", "pod", klog.KObj(pod), "syncType", syncType)
        return
    }

    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(&UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: func(err error) {
            if err != nil {
                metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
            }
        },
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
        metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
}

这里便会调用到 PodWorker 中的 UpdatePod 函数。PodWorker 实现了一个 goroutine 池来管理 Pod

Per Pod Goroutine 的创建

pkg/kubelet/pod_workers.go:199

// Apply the new setting to the specified pod.
// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
// Update requests are ignored if a kill pod request is pending.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {  
    pod := options.Pod
    uid := pod.UID
    var podUpdates chan UpdatePodOptions
    var exists bool

    p.podLock.Lock()
    defer p.podLock.Unlock()
    if podUpdates, exists = p.podUpdates[uid]; !exists {
        // We need to have a buffer here, because checkForUpdates() method that
        // puts an update into channel is called from the same goroutine where
        // the channel is consumed. However, it is guaranteed that in such case
        // the channel is empty, so buffer of size 1 is enough.
        podUpdates = make(chan UpdatePodOptions, 1)
        p.podUpdates[uid] = podUpdates

        // Creating a new pod worker either means this is a new pod, or that the
        // kubelet just restarted. In either case the kubelet is willing to believe
        // the status of the pod for the first pod worker sync. See corresponding
        // comment in syncPod.
        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()
    }
    if !p.isWorking[pod.UID] {
        p.isWorking[pod.UID] = true
        podUpdates <- *options
    } else {
        // if a request to kill a pod is pending, we do not let anything overwrite that request.
        update, found := p.lastUndeliveredWorkUpdate[pod.UID]
        if !found || update.UpdateType != kubetypes.SyncPodKill {
            p.lastUndeliveredWorkUpdate[pod.UID] = *options
        }
    }
}

对于每一个 Pod,都会有一个 goroutine 负责管理,它所执行的函数是 managePodLoop。podWorkers 的工作就是管理这些 goroutine 。对于一个新的 Pod,PodWorker 首先会创建一个 channel podUpdates,然后赋值到自己的 map p.podUpdates 中,这个 map 维护了所有 goroutine 的消息输入端, close 这个 channel 便可以使对应的 goroutine 退出。UpdatePod 函数会被频繁调用,消息只有在 Pod 的 管理 goroutine 的 isWorking 条件为 false 的时候才会被直接传入 channel,否则会放到 lastUndeliveredWorkUpdate 中。这个变量维护了最近一个没有被处理的消息。SyncPodKill 类型的消息具有最高的优先级,如果当 lastUndeliveredWorkUpdate 中的消息为 SyncPodKill 时,之后任何的消息都不能覆盖掉它。为什么这里需要 lastUndeliveredWorkUpdate 而不是消息都直接放入 channel 呢?因为 Kubernetes 是按照最终状态对齐的,比如我们这里连续的 3 个 SyncPodUpdate 消息过来,managePodLoop 是按照最新的 Spec 去更新的,所以会产生两次的重复更新。lastUndeliveredWorkUpdate 的作用便是缓冲并合并

Per Pod Goroutine 的销毁

当触发 HandlePodRemoves 操作的时候会删除对应 Pod 的管理 goroutine,对应的调用栈为

pkg/kubelet/kubelet.go:2130 HandlePodRemoves

pkg/kubelet/kubelet.go:1761 deletePod

pkg/kubelet/pod_workers.go:246 removeWorder

func (p *podWorkers) removeWorker(uid types.UID) {  
    if ch, ok := p.podUpdates[uid]; ok {
        close(ch)
        delete(p.podUpdates, uid)
        // If there is an undelivered work update for this pod we need to remove it
        // since per-pod goroutine won't be able to put it to the already closed
        // channel when it finishes processing the current work update.
        delete(p.lastUndeliveredWorkUpdate, uid)
    }
}
func (p *podWorkers) ForgetWorker(uid types.UID) {  
    p.podLock.Lock()
    defer p.podLock.Unlock()
    p.removeWorker(uid)
}

Per Pod Goroutine 的工作

pkg/kubelet/pod_workers.go:157

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {  
    var lastSyncTime time.Time
    for update := range podUpdates {
        err := func() error {
            podUID := update.Pod.UID
            // This is a blocking call that would return only if the cache
            // has an entry for the pod that is newer than minRuntimeCache
            // Time. This ensures the worker doesn't start syncing until
            // after the cache is at least newer than the finished time of
            // the previous sync.
            status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
            if err != nil {
                // This is the legacy event thrown by manage pod loop
                // all other events are now dispatched from syncPodFn
                p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
                return err
            }
            err = p.syncPodFn(syncPodOptions{
                mirrorPod:      update.MirrorPod,
                pod:            update.Pod,
                podStatus:      status,
                killPodOptions: update.KillPodOptions,
                updateType:     update.UpdateType,
            })
            lastSyncTime = time.Now()
            return err
        }()
        // notify the call-back function if the operation succeeded or not
        if update.OnCompleteFunc != nil {
            update.OnCompleteFunc(err)
        }
        if err != nil {
            // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
            klog.ErrorS(err, "Error syncing pod, skipping", "pod", klog.KObj(update.Pod), "podUID", update.Pod.UID)
        }
        p.wrapUp(update.Pod.UID, err)
    }
}

syncPodFn 是核心处理内容,它的实际实现在 kubelet.go 文件中。我们先来看最后收尾的 wrapUp 函数

pkg/kubelet/pod_workers.go:262

func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {  
    // Requeue the last update if the last sync returned error.
    switch {
    case syncErr == nil:
        // No error; requeue at the regular resync interval.
        p.workQueue.Enqueue(uid, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
    case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
        // Network is not ready; back off for short period of time and retry as network might be ready soon.
        p.workQueue.Enqueue(uid, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
    default:
        // Error occurred during the sync; back off and then retry.
        p.workQueue.Enqueue(uid, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
    }
    p.checkForUpdates(uid)
}

pkg/kubelet/pod_workers.go:278

func (p *podWorkers) checkForUpdates(uid types.UID) {  
    p.podLock.Lock()
    defer p.podLock.Unlock()
    if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
        p.podUpdates[uid] <- workUpdate
        delete(p.lastUndeliveredWorkUpdate, uid)
    } else {
        p.isWorking[uid] = false
    }
}

checkForUpdates 函数负责从 lastUndeliveredWorkUpdate 中取出待处理的消息然后塞到 podUpdates channel 中。如果没有任何消息需要处理,那么会将 isWorking 置为 false,允许外部从 UpdatePod 这条路径传入消息

整个 PodWorker 的工作逻辑如下图

image-20210703000755727