DEV Community

cylon
cylon

Posted on

2

kube-scheduler的调度上下文

Scheduler结构

Scheduler 是整个 kube-scheduler 的一个 structure,提供了 kube-scheduler 运行所需的组件。

type Scheduler struct {
    // Cache是一个抽象,会缓存pod的信息,作为scheduler进行查找,操作是基于Pod进行增加
    Cache internalcache.Cache
    // Extenders 算是调度框架中提供的调度插件,会影响kubernetes中的调度策略
    Extenders []framework.Extender

    // NextPod 作为一个函数提供,会阻塞获取下一个ke'diao'du
    NextPod func() *framework.QueuedPodInfo

    // Error is called if there is an error. It is passed the pod in
    // question, and the error
    Error func(*framework.QueuedPodInfo, error)

    // SchedulePod 尝试将给出的pod调度到Node。
    SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)

    // 关闭scheduler的信号
    StopEverything <-chan struct{}

    // SchedulingQueue保存要调度的Pod
    SchedulingQueue internalqueue.SchedulingQueue

    // Profiles中是多个调度框架
    Profiles profile.Map
    client clientset.Interface
    nodeInfoSnapshot *internalcache.Snapshot
    percentageOfNodesToScore int32
    nextStartNodeIndex int
}
Enter fullscreen mode Exit fullscreen mode

作为实际执行的两个核心,SchedulingQueue ,与 scheduleOne 将会分析到这两个

SchedulingQueue

在知道 kube-scheduler 初始化过程后,需要对 kube-scheduler 的整个 structureworkflow 进行分析

Run 中,运行的是 一个 SchedulingQueue 与 一个 scheduleOne ,从结构上看是属于 Scheduler

func (sched *Scheduler) Run(ctx context.Context) {
    sched.SchedulingQueue.Run()

    // We need to start scheduleOne loop in a dedicated goroutine,
    // because scheduleOne function hangs on getting the next item
    // from the SchedulingQueue.
    // If there are no new pods to schedule, it will be hanging there
    // and if done in this goroutine it will be blocking closing
    // SchedulingQueue, in effect causing a deadlock on shutdown.
    go wait.UntilWithContext(ctx, sched.scheduleOne, 0)

    <-ctx.Done()
    sched.SchedulingQueue.Close()
}

Enter fullscreen mode Exit fullscreen mode

SchedulingQueue 是一个队列的抽象,用于存储等待调度的Pod。该接口遵循类似于 cache.FIFO 和 cache.Heap 的模式。

type SchedulingQueue interface {
    framework.PodNominator
    Add(pod *v1.Pod) error
    // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
    // The passed-in pods are originally compiled from plugins that want to activate Pods,
    // by injecting the pods through a reserved CycleState struct (PodsToActivate).
    Activate(pods map[string]*v1.Pod)
    // 将不可调度的Pod重入到队列中
    AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    // SchedulingCycle returns the current number of scheduling cycle which is
    // cached by scheduling queue. Normally, incrementing this number whenever
    // a pod is popped (e.g. called Pop()) is enough.
    SchedulingCycle() int64
    // Pop会弹出一个pod,并从head优先级队列中删除
    Pop() (*framework.QueuedPodInfo, error)
    Update(oldPod, newPod *v1.Pod) error
    Delete(pod *v1.Pod) error
    MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
    AssignedPodAdded(pod *v1.Pod)
    AssignedPodUpdated(pod *v1.Pod)
    PendingPods() []*v1.Pod
    // Close closes the SchedulingQueue so that the goroutine which is
    // waiting to pop items can exit gracefully.
    Close()
    // Run starts the goroutines managing the queue.
    Run()
}
Enter fullscreen mode Exit fullscreen mode

PriorityQueueSchedulingQueue 的实现,该部分的核心构成是两个子队列与一个数据结构,即 activeQbackoffQunschedulablePods

  • activeQ:是一个 heap 类型的优先级队列,是 sheduler 从中获得优先级最高的Pod进行调度
  • backoffQ:也是一个 heap 类型的优先级队列,存放的是不可调度的Pod
  • unschedulablePods:保存确定不可被调度的Pod
type SchedulingQueue interface {
    framework.PodNominator
    Add(pod *v1.Pod) error
    // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
    // The passed-in pods are originally compiled from plugins that want to activate Pods,
    // by injecting the pods through a reserved CycleState struct (PodsToActivate).
    Activate(pods map[string]*v1.Pod)
    // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
    // The podSchedulingCycle represents the current scheduling cycle number which can be
    // returned by calling SchedulingCycle().
    AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    // SchedulingCycle returns the current number of scheduling cycle which is
    // cached by scheduling queue. Normally, incrementing this number whenever
    // a pod is popped (e.g. called Pop()) is enough.
    SchedulingCycle() int64
    // Pop removes the head of the queue and returns it. It blocks if the
    // queue is empty and waits until a new item is added to the queue.
    Pop() (*framework.QueuedPodInfo, error)
    Update(oldPod, newPod *v1.Pod) error
    Delete(pod *v1.Pod) error
    MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
    AssignedPodAdded(pod *v1.Pod)
    AssignedPodUpdated(pod *v1.Pod)
    PendingPods() []*v1.Pod
    // Close closes the SchedulingQueue so that the goroutine which is
    // waiting to pop items can exit gracefully.
    Close()
    // Run starts the goroutines managing the queue.
    Run()
}
Enter fullscreen mode Exit fullscreen mode

在New scheduler 时可以看到会初始化这个queue

podQueue := internalqueue.NewSchedulingQueue(
    // 实现pod对比的一个函数即less
    profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
    informerFactory,
    internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
    internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
    internalqueue.WithPodNominator(nominator),
    internalqueue.WithClusterEventMap(clusterEventMap),
    internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
Enter fullscreen mode Exit fullscreen mode

NewSchedulingQueue 则是初始化这个 PriorityQueue

// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option) SchedulingQueue {
    return NewPriorityQueue(lessFn, informerFactory, opts...)
}

// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option,
) *PriorityQueue {
    options := defaultPriorityQueueOptions
    for _, opt := range opts {
        opt(&options)
    }
    // 这个就是 less函数,作为打分的一部分
    comp := func(podInfo1, podInfo2 interface{}) bool {
        pInfo1 := podInfo1.(*framework.QueuedPodInfo)
        pInfo2 := podInfo2.(*framework.QueuedPodInfo)
        return lessFn(pInfo1, pInfo2)
    }

    if options.podNominator == nil {
        options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
    }

    pq := &PriorityQueue{
        PodNominator:                      options.podNominator,
        clock:                             options.clock,
        stop:                              make(chan struct{}),
        podInitialBackoffDuration:         options.podInitialBackoffDuration,
        podMaxBackoffDuration:             options.podMaxBackoffDuration,
        podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
        activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
        unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
        moveRequestCycle:                  -1,
        clusterEventMap:                   options.clusterEventMap,
    }
    pq.cond.L = &pq.lock
    pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
    pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()

    return pq
}
Enter fullscreen mode Exit fullscreen mode

了解了Queue的结构,就需要知道 入队列与出队列是在哪里操作的。在初始化时,需要注册一个 addEventHandlerFuncs 这个时候,会注入三个动作函数,也就是controller中的概念;而在AddFunc中可以看到会入队列。

注入是对 Pod 的informer注入的,注入的函数 addPodToSchedulingQueue 就是入栈

Handler: cache.ResourceEventHandlerFuncs{
    AddFunc:    sched.addPodToSchedulingQueue,
    UpdateFunc: sched.updatePodInSchedulingQueue,
    DeleteFunc: sched.deletePodFromSchedulingQueue,
},

func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
    pod := obj.(*v1.Pod)
    klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod))
    if err := sched.SchedulingQueue.Add(pod); err != nil {
        utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
    }
}
Enter fullscreen mode Exit fullscreen mode

而这个 SchedulingQueue 的实现就是 PriorityQueue ,而Add中则对 activeQ进行的操作

func (p *PriorityQueue) Add(pod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    // 格式化入栈数据,包含podinfo,里会包含v1.Pod
    // 初始化的时间,创建的时间,以及不能被调度时的记录其plugin的名称
    pInfo := p.newQueuedPodInfo(pod)
    // 入栈
    if err := p.activeQ.Add(pInfo); err != nil {
        klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod))
        return err
    }
    if p.unschedulablePods.get(pod) != nil {
        klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
        p.unschedulablePods.delete(pod)
    }
    // Delete pod from backoffQ if it is backing off
    if err := p.podBackoffQ.Delete(pInfo); err == nil {
        klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
    }
    metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
    p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
    p.cond.Broadcast()

    return nil
}
Enter fullscreen mode Exit fullscreen mode

在上面看 scheduler 结构时,可以看到有一个 nextPod的,nextPod就是从队列中弹出一个pod,这个在scheduler 时会传入 MakeNextPodFunc 就是这个 nextpod

func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
    return func() *framework.QueuedPodInfo {
        podInfo, err := queue.Pop()
        if err == nil {
            klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
            for plugin := range podInfo.UnschedulablePlugins {
                metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
            }
            return podInfo
        }
        klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
        return nil
    }
}
Enter fullscreen mode Exit fullscreen mode

而这个 queue.Pop() 对应的就是 PriorityQueuePop() ,在这里会将作为 activeQ 的消费端

func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
   p.lock.Lock()
   defer p.lock.Unlock()
   for p.activeQ.Len() == 0 {
      // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
      // When Close() is called, the p.closed is set and the condition is broadcast,
      // which causes this loop to continue and return from the Pop().
      if p.closed {
         return nil, fmt.Errorf(queueClosed)
      }
      p.cond.Wait()
   }
   obj, err := p.activeQ.Pop()
   if err != nil {
      return nil, err
   }
   pInfo := obj.(*framework.QueuedPodInfo)
   pInfo.Attempts++
   p.schedulingCycle++
   return pInfo, nil
}
Enter fullscreen mode Exit fullscreen mode

在上面入口部分也看到了,scheduleOne 和 scheduler,scheduleOne 就是去消费一个Pod,他会调用 NextPod,NextPod就是在初始化传入的 MakeNextPodFunc ,至此回到对应的 Pop来做消费。

schedulerOne是为一个Pod做调度的流程。

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
    if err != nil {
        // This shouldn't happen, because we only accept for scheduling the pods
        // which specify a scheduler name that matches one of the profiles.
        klog.ErrorS(err, "Error occurred")
        return
    }
    if sched.skipPodSchedule(fwk, pod) {
        return
    }
...
Enter fullscreen mode Exit fullscreen mode

调度上下文

image

图1:Pod的调度上下文

Source:https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework

当了解了scheduler结构后,下面分析下调度上下文的过程。看看扩展点是怎么工作的。这个时候又需要提到官网的调度上下文的图。

调度框架 [2]

调度框架 (scheduling framework SF ) 是kubernetes为 scheduler设计的一个pluggable的架构。SF 将scheduler设计为 Plugin 式的 API,API将上一章中提到的一些列调度策略实现为 Plugin

SF 中,定义了一些扩展点 (extension points EP ),而被实现为Plugin的调度程序将被注册在一个或多个 EP 中,换句话来说,在这些 EP 的执行过程中如果注册在多个 EP 中,将会在多个 EP 被调用。

每次调度都分为两个阶段,调度周期(Scheduling Cycel)与绑定周期(Binding Cycle)。

  • SC 表示为,为Pod选择一个节点;SC 是串行运行的。
  • BC 表示为,将 SC 决策结果应用于集群中;BC 可以同时运行。

调度周期与绑定周期结合一起,被称为调度上下文Scheduling Context),下图则是调度上下文的工作流

注:如果决策结果为Pod的调度结果无可用节点,或存在内部错误,则中止 SCBC。Pod将重入队列重试

扩展点 [3]

扩展点(Extension points)是指在调度上下文中的每个可扩展API,通过图提现为[图1]。其中 Filter 相当于 PredicateScoring 相当于 Priority

对于调度阶段会通过以下扩展点:

  • Sort:该插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。

  • preFilter:该插件用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度

  • filter:该插件相当于调度上下文中的 Predicates,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。

  • postFilter:当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter插件将 Pod 标记为schedulable,则不会调用其余插件。即 filter 成功后不会进行这步骤

  • preScore:可用于进行预Score工作(通知性的扩展点)。

  • score:该插件为每个通过 filter 阶段的Node提供打分服务。然后Scheduler将选择具有最高加权分数总和的Node。

  • reserve:因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发 UnReserve 回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到 postBind ,要么失败触发 UnReserve

  • permit:该插件可以阻止或延迟 Pod 的绑定,一般情况下这步骤会做三件事:

    • appove :调度器继续绑定过程
    • Deny:如果任何一个Premit拒绝了Pod与节点的绑定,那么将触发 UnReserve ,并重入队列
    • Wait: 如果 Permit 插件返回 Wait,该 Pod 将保留在内部 Wait Pod 列表中,直到被 Appove。如果发生超时,wait 变为 deny ,将Pod放回至调度队列中,并触发 Unreserve 回滚 。
  • preBind:该插件用于在 bind Pod 之前执行所需的前置工作。如,preBind 可能会提供一个网络卷并将其挂载到目标节点上。如果在该步骤中的任意插件返回错误,则Pod 将被 deny 并放置到调度队列中。

  • bind:在所有的 preBind 完成后,该插件将用于将Pod绑定到Node,并按顺序调用绑定该步骤的插件。如果有一个插件处理了这个事件,那么则忽略其余所有插件。

  • postBind:该插件在绑定 Pod 后调用,可用于清理相关资源(通知性的扩展点)。

  • multiPoint:这是一个仅配置字段,允许同时为所有适用的扩展点启用或禁用插件。

scheduler 对于调度上下文在代码中的实现就是 scheduleOne ,下面就是看这个调度上下文

Sort

Sort 插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。

在进入 scheduleOne 后,NextPodactiveQ 中队列中得到一个Pod,然后的 frameworkForPod 会做打分的动作就是调度上下文的第一个扩展点 sort

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
...

func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
    // 获取指定的profile
    fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
    if !ok {
        return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
    }
    return fwk, nil
}
Enter fullscreen mode Exit fullscreen mode

回顾,因为在New scheduler时会初始化这个 sort 函数

podQueue := internalqueue.NewSchedulingQueue(
    profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
    informerFactory,
    internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
    internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
    internalqueue.WithPodNominator(nominator),
    internalqueue.WithClusterEventMap(clusterEventMap),
    internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
Enter fullscreen mode Exit fullscreen mode

preFilter

preFilter作为第一个扩展点,是用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
    if err != nil {
        // This shouldn't happen, because we only accept for scheduling the pods
        // which specify a scheduler name that matches one of the profiles.
        klog.ErrorS(err, "Error occurred")
        return
    }
    if sched.skipPodSchedule(fwk, pod) {
        return
    }

    klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    state := framework.NewCycleState()
    state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
    // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
    podsToActivate := framework.NewPodsToActivate()
    state.Write(framework.PodsToActivateKey, podsToActivate)

    schedulingCycleCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    // 这里将进入prefilter
    scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
Enter fullscreen mode Exit fullscreen mode

schedulePod 尝试将给定的 pod 调度到节点列表中的节点之一。如果成功,它将返回节点的名称。

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    defer trace.LogIfLong(100 * time.Millisecond)
    // 用于将cache更新为当前内容
    if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
        return result, err
    }
    trace.Step("Snapshotting scheduler cache and node infos done")

    if sched.nodeInfoSnapshot.NumNodes() == 0 {
        return result, ErrNoNodesAvailable
    }
    // 找到一个合适的pod时,会执行扩展点
    feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)

    ...
Enter fullscreen mode Exit fullscreen mode

findNodesThatFitPod 会执行对应的过滤插件来找到最适合的Node,包括备注,以及方法名都可以看到,这里运行的插件😁😁,后面会分析算法内容,只对workflow学习。

func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    diagnosis := framework.Diagnosis{
        NodeToStatusMap:      make(framework.NodeToStatusMap),
        UnschedulablePlugins: sets.NewString(),
    }

    // Run "prefilter" plugins.
    preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
    allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
    if err != nil {
        return nil, diagnosis, err
    }
    if !s.IsSuccess() {
        if !s.IsUnschedulable() {
            return nil, diagnosis, s.AsError()
        }
        // All nodes will have the same status. Some non trivial refactoring is
        // needed to avoid this copy.
        for _, n := range allNodes {
            diagnosis.NodeToStatusMap[n.Node().Name] = s
        }
        // Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins.
        if s.FailedPlugin() != "" {
            diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
        }
        return nil, diagnosis, nil
    }

    // "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
    // This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
    if len(pod.Status.NominatedNodeName) > 0 {
        feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
        if err != nil {
            klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
        }
        // Nominated node passes all the filters, scheduler is good to assign this node to the pod.
        if len(feasibleNodes) != 0 {
            return feasibleNodes, diagnosis, nil
        }
    }

    nodes := allNodes
    if !preRes.AllNodes() {
        nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
        for n := range preRes.NodeNames {
            nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
            if err != nil {
                return nil, diagnosis, err
            }
            nodes = append(nodes, nInfo)
        }
    }
    feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
    if err != nil {
        return nil, diagnosis, err
    }

    feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
    if err != nil {
        return nil, diagnosis, err
    }
    return feasibleNodes, diagnosis, nil
}
Enter fullscreen mode Exit fullscreen mode

filter

filter插件相当于调度上下文中的 Predicates,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。

对于代码中来讲,filter还是处于 findNodesThatFitPod 函数中,findNodesThatPassFilters 就是获取到 FN,即可行节点,而这个过程就是 filter 扩展点

func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    ...

    feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
    if err != nil {
        return nil, diagnosis, err
    }

    feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
    if err != nil {
        return nil, diagnosis, err
    }
    return feasibleNodes, diagnosis, nil
}
Enter fullscreen mode Exit fullscreen mode

Postfilter

当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter插件将 Pod 标记为schedulable,则不会调用其余插件。即 filter 成功后不会进行这步骤,那我们来验证下这里把😊

还是在 scheduleOne 中,当我们运行的 SchedulePod 完成后(成功或失败),这时会返回一个err,而 postfilter 会根据这个 err进行选择执行或不执行,符合官方给出的说法。

scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    if err != nil {
        // SchedulePod() may have failed because the pod would not fit on any host, so we try to
        // preempt, with the expectation that the next time the pod is tried for scheduling it
        // will fit due to the preemption. It is also possible that a different pod will schedule
        // into the resources that were preempted, but this is harmless.
        var nominatingInfo *framework.NominatingInfo
        if fitError, ok := err.(*framework.FitError); ok {
            if !fwk.HasPostFilterPlugins() {
                klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
            } else {
                // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
                result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
                if status.Code() == framework.Error {
                    klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                } else {
                    fitError.Diagnosis.PostFilterMsg = status.Message()
                    klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                }
                if result != nil {
                    nominatingInfo = result.NominatingInfo
                }
            }
            // Pod did not fit anywhere, so it is counted as a failure. If preemption
            // succeeds, the pod should get counted as a success the next time we try to
            // schedule it. (hopefully)
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else if err == ErrNoNodesAvailable {
            nominatingInfo = clearNominatedNode
            // No nodes available is counted as unschedulable rather than an error.
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else {
            nominatingInfo = clearNominatedNode
            klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        }
        sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
        return
    }
Enter fullscreen mode Exit fullscreen mode

PreScore,Score

可用于进行预Score工作,作为通知性的扩展点,会在在filter完之后直接会关联 preScore 插件进行继续工作,而不是返回,如果配置的这些插件有任何一个返回失败,则Pod将被拒绝。


func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
        return result, err
    }
    trace.Step("Snapshotting scheduler cache and node infos done")

    if sched.nodeInfoSnapshot.NumNodes() == 0 {
        return result, ErrNoNodesAvailable
    }

    feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
    if err != nil {
        return result, err
    }
    trace.Step("Computing predicates done")

    if len(feasibleNodes) == 0 {
        return result, &framework.FitError{
            Pod:         pod,
            NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
            Diagnosis:   diagnosis,
        }
    }

    // When only one node after predicate, just use it.
    if len(feasibleNodes) == 1 {
        return ScheduleResult{
            SuggestedHost:  feasibleNodes[0].Name,
            EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
            FeasibleNodes:  1,
        }, nil
    }
    // 这里会完成prescore,score
    priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
    if err != nil {
        return result, err
    }

    host, err := selectHost(priorityList)
    trace.Step("Prioritizing done")

    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
        FeasibleNodes:  len(feasibleNodes),
    }, err
}
Enter fullscreen mode Exit fullscreen mode

priorityNodes 会通过配置的插件给Node打分,并返回每个Node的分数,将每个插件打分结果计算总和获得Node的分数,最后获得节点的加权总分数。

func prioritizeNodes(
    ctx context.Context,
    extenders []framework.Extender,
    fwk framework.Framework,
    state *framework.CycleState,
    pod *v1.Pod,
    nodes []*v1.Node,
) (framework.NodeScoreList, error) {
    // If no priority configs are provided, then all nodes will have a score of one.
    // This is required to generate the priority list in the required format
    if len(extenders) == 0 && !fwk.HasScorePlugins() {
        result := make(framework.NodeScoreList, 0, len(nodes))
        for i := range nodes {
            result = append(result, framework.NodeScore{
                Name:  nodes[i].Name,
                Score: 1,
            })
        }
        return result, nil
    }

    // Run PreScore plugins.
    preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
    if !preScoreStatus.IsSuccess() {
        return nil, preScoreStatus.AsError()
    }

    // Run the Score plugins.
    scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
    if !scoreStatus.IsSuccess() {
        return nil, scoreStatus.AsError()
    }

    // Additional details logged at level 10 if enabled.
    klogV := klog.V(10)
    if klogV.Enabled() {
        for plugin, nodeScoreList := range scoresMap {
            for _, nodeScore := range nodeScoreList {
                klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score)
            }
        }
    }

    // Summarize all scores.
    result := make(framework.NodeScoreList, 0, len(nodes))

    for i := range nodes {
        result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
        for j := range scoresMap {
            result[i].Score += scoresMap[j][i].Score
        }
    }

    if len(extenders) != 0 && nodes != nil {
        var mu sync.Mutex
        var wg sync.WaitGroup
        combinedScores := make(map[string]int64, len(nodes))
        for i := range extenders {
            if !extenders[i].IsInterested(pod) {
                continue
            }
            wg.Add(1)
            go func(extIndex int) {
                metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
                defer func() {
                    metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
                    wg.Done()
                }()
                prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
                if err != nil {
                    // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
                    klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
                    return
                }
                mu.Lock()
                for i := range *prioritizedList {
                    host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                    if klogV.Enabled() {
                        klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score)
                    }
                    combinedScores[host] += score * weight
                }
                mu.Unlock()
            }(i)
        }
        // wait for all go routines to finish
        wg.Wait()
        for i := range result {
            // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
            // therefore we need to scale the score returned by extenders to the score range used by the scheduler.
            result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
        }
    }

    if klogV.Enabled() {
        for i := range result {
            klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score)
        }
    }
    return result, nil
}
Enter fullscreen mode Exit fullscreen mode

Reserve

Reserve 因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发 UnReserve 回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到 postBind ,要么失败触发 UnReserve

// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { // 当处理不成功时
    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    // 触发 un-reserve 来清理相关Pod的状态
    fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
        klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
    }
    sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
    return
}
Enter fullscreen mode Exit fullscreen mode

permit

Permit 插件可以阻止或延迟 Pod 的绑定

    // Run "permit" plugins.
    runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
        var reason string
        if runPermitStatus.IsUnschedulable() {
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = v1.PodReasonUnschedulable
        } else {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = SchedulerError
        }
        // 只要其中一个插件返回的状态不是 success 或者 wait
        fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        // 从cache中忘掉pod
        if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
        }
        sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
        return
    }

Enter fullscreen mode Exit fullscreen mode

Binding Cycle

在选择好 FN 后则做一个假设绑定,并更新到cache中,接下来回去执行真正的bind操作,也就是 binding cycle

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    ...
    ...
    // binding cycle 是一个异步的操作,这里表现就是go协程
    go func() {
        bindingCycleCtx, cancel := context.WithCancel(ctx)
        defer cancel()
        metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
        defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
        // 运行WaitOnPermit插件,如果失败则,unReserve回滚
        waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
        if !waitOnPermitStatus.IsSuccess() {
            var reason string
            if waitOnPermitStatus.IsUnschedulable() {
                metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
                reason = v1.PodReasonUnschedulable
            } else {
                metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
                reason = SchedulerError
            }
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
                klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
            } else {
                // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
                // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
                // TODO(#103853): de-duplicate the logic.
                // Avoid moving the assumed Pod itself as it's always Unschedulable.
                // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
                // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
                defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
                    return assumedPod.UID != pod.UID
                })
            }
            sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
            return
        }

    // 运行Prebind 插件
        preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if !preBindStatus.IsSuccess() {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
                klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
            } else {
                // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
                // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
                // TODO(#103853): de-duplicate the logic.
                sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
            }
            sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
            return
        }
        // bind是真正的绑定操作
        err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
        if err != nil {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            // 如果失败了就触发 un-reserve plugins 
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if err := sched.Cache.ForgetPod(assumedPod); err != nil {
                klog.ErrorS(err, "scheduler cache ForgetPod failed")
            } else {
                // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
                // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
                // TODO(#103853): de-duplicate the logic.
                sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
            }
            sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
            return
        }
        // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
        klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
        metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
        metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
        metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

        // 运行 "postbind" 插件
        // 是通知性的扩展点,该插件在绑定 Pod 后调用,可用于清理相关资源()。
        fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

        // At the end of a successful binding cycle, move up Pods if needed.
        if len(podsToActivate.Map) != 0 {
            sched.SchedulingQueue.Activate(podsToActivate.Map)
            // Unlike the logic in scheduling cycle, we don't bother deleting the entries
            // as `podsToActivate.Map` is no longer consumed.
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

调度上下文中的失败流程

上面说到的都是正常的请求,下面会对失败的请求是如何重试的进行分析,而 scheduler 中关于失败处理方面相关的属性会涉及到上面 scheduler 结构中的 backoffQunschedulablePods

  • backoffQ:也是一个 heap 类型的优先级队列,存放的是不可调度的Pod
  • unschedulablePods:保存确定不可被调度的Pod,一个map类型

backoffQ 与 unschedulablePods 会在初始化 scheduler 时初始化,

func NewPriorityQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option,
) *PriorityQueue {
    options := defaultPriorityQueueOptions
    for _, opt := range opts {
        opt(&options)
    }

    comp := func(podInfo1, podInfo2 interface{}) bool {
        pInfo1 := podInfo1.(*framework.QueuedPodInfo)
        pInfo2 := podInfo2.(*framework.QueuedPodInfo)
        return lessFn(pInfo1, pInfo2)
    }

    if options.podNominator == nil {
        options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
    }

    pq := &PriorityQueue{
        PodNominator:                      options.podNominator,
        clock:                             options.clock,
        stop:                              make(chan struct{}),
        podInitialBackoffDuration:         options.podInitialBackoffDuration,
        podMaxBackoffDuration:             options.podMaxBackoffDuration,
        podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
        activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
        unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
        moveRequestCycle:                  -1,
        clusterEventMap:                   options.clusterEventMap,
    }
    pq.cond.L = &pq.lock
    // 初始化backoffQ
    // NewWithRecorder作为一个可选的 metricRecorder 的 Heap 对象。
    // podInfoKeyFunc是一个函数,返回错误与字符串
    // pq.podsCompareBackoffCompleted 比较两个pod的回退时间,如果第一个在第二个之前为true,
    // 反之 false
    pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
    pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()

    return pq
}
Enter fullscreen mode Exit fullscreen mode

对于初始化 backoffQ 会产生的两个函数,getBackoffTimecalculateBackoffDuration

// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
    duration := p.calculateBackoffDuration(podInfo)
    backoffTime := podInfo.Timestamp.Add(duration)
    return backoffTime
}

// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
    duration := p.podInitialBackoffDuration
    for i := 1; i < podInfo.Attempts; i++ {
        // Use subtraction instead of addition or multiplication to avoid overflow.
        if duration > p.podMaxBackoffDuration-duration {
            return p.podMaxBackoffDuration
        }
        duration += duration
    }
    return duration
}
Enter fullscreen mode Exit fullscreen mode

对于整个故障错误会按照如下流程进行,在初始化 scheduler 会注册一个 Error 函数,这个函数用作对不可调度Pod进行处理,实际上被注册的函数是 MakeDefaultErrorFunc。这个函数将作为 Error 函数被调用。

sched := newScheduler(
    schedulerCache,
    extenders,
    internalqueue.MakeNextPodFunc(podQueue),
    MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache),
    stopEverything,
    podQueue,
    profiles,
    client,
    snapshot,
    options.percentageOfNodesToScore,
)
Enter fullscreen mode Exit fullscreen mode

而在 调度周期中,也就是 scheduleOne 可以看到,每个扩展点操作失败后都会调用 handleSchedulingFailure 而该函数,使用了注册的 Error 函数来处理Pod

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    ...
    defer cancel()
    scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    if err != nil {

        var nominatingInfo *framework.NominatingInfo
        if fitError, ok := err.(*framework.FitError); ok {
            if !fwk.HasPostFilterPlugins() {
                klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
            } else {

                result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
                if status.Code() == framework.Error {
                    klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                } else {
                    fitError.Diagnosis.PostFilterMsg = status.Message()
                    klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                }
                if result != nil {
                    nominatingInfo = result.NominatingInfo
                }
            }

            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else if err == ErrNoNodesAvailable {
            nominatingInfo = clearNominatedNode
            // No nodes available is counted as unschedulable rather than an error.
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else {
            nominatingInfo = clearNominatedNode
            klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        }
        // 处理不可调度Pod
        sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
        return
    }

Enter fullscreen mode Exit fullscreen mode

来到了注册的 Error 函数 MakeDefaultErrorFunc

func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
    return func(podInfo *framework.QueuedPodInfo, err error) {
        pod := podInfo.Pod
        if err == ErrNoNodesAvailable {
            klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
        } else if fitError, ok := err.(*framework.FitError); ok {
            // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
            podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
            klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
        } else if apierrors.IsNotFound(err) {
            klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)
            if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
                nodeName := errStatus.Status().Details.Name
                // when node is not found, We do not remove the node right away. Trying again to get
                // the node and if the node is still not found, then remove it from the scheduler cache.
                _, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
                if err != nil && apierrors.IsNotFound(err) {
                    node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
                    if err := schedulerCache.RemoveNode(&node); err != nil {
                        klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
                    }
                }
            }
        } else {
            klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
        }

        // Check if the Pod exists in informer cache.
        cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
        if err != nil {
            klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err)
            return
        }

        // In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
        // It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
        if len(cachedPod.Spec.NodeName) != 0 {
            klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
            return
        }

        // As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
        podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())
        // 添加到unschedulable队列中
        if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
            klog.ErrorS(err, "Error occurred")
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

下面来到 AddUnschedulableIfNotPresent ,这个也是操作 backoffQunschedulablePods 的真正的动作

AddUnschedulableIfNotPresent 函数会吧无法调度的 pod 插入队列,除非它已经在队列中。通常情况下,PriorityQueue 将不可调度的 Pod 放在 unschedulablePods 中。但如果最近有 move request,则将 pod 放入 podBackoffQ 中。

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    pod := pInfo.Pod
    // 如果已经存在则不添加
    if p.unschedulablePods.get(pod) != nil {
        return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
    }
    // 检查是否在activeQ中
    if _, exists, _ := p.activeQ.Get(pInfo); exists {
        return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
    }
    // 检查是否在podBackoffQ中
    if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
        return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
    }

    // 在重新添加时,会刷新 Pod时间为最新操作的时间
    pInfo.Timestamp = p.clock.Now()

    for plugin := range pInfo.UnschedulablePlugins {
        metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
    }
    // 如果接受到move request那么则放入BackoffQ
    if p.moveRequestCycle >= podSchedulingCycle {
        if err := p.podBackoffQ.Add(pInfo); err != nil {
            return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
        }
        metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
    } else {
        // 否则将放入到 unschedulablePods
        p.unschedulablePods.addOrUpdate(pInfo)
        metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()

    }

    p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

在启动 scheduler 时,会将这两个队列异步启用两个loop来操作队列。表现在 Run()

func (p *PriorityQueue) Run() {
    go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
    go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}
Enter fullscreen mode Exit fullscreen mode

可以看到 flushBackoffQCompleted 作为 BackoffQ 实现;而 flushUnschedulablePodsLeftover 作为 UnschedulablePods 实现。

flushBackoffQCompleted 是用于将所有已完成回退的 pod 从 backoffQ 移到 activeQ

func (p *PriorityQueue) flushBackoffQCompleted() {
    p.lock.Lock()
    defer p.lock.Unlock()
    broadcast := false
    for { // 这就是heap实现的方法,窥视下,但不弹出
        rawPodInfo := p.podBackoffQ.Peek()
        if rawPodInfo == nil {
            break
        }
        pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
        boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
        if boTime.After(p.clock.Now()) {
            break
        }
        _, err := p.podBackoffQ.Pop() // 弹出一个
        if err != nil {
            klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
            break
        }
        p.activeQ.Add(rawPodInfo) // 放入到活动队列中
        metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
        broadcast = true
    }

    if broadcast {
        p.cond.Broadcast()
    }
}
Enter fullscreen mode Exit fullscreen mode

flushUnschedulablePodsLeftover 函数用于将在 unschedulablePods 中的存放时间超过 podMaxInUnschedulablePodsDuration 值的 pod 移动到 backoffQactiveQ 中。

podMaxInUnschedulablePodsDuration 会根据配置传入,当没有传入,也就是使用了 Deprecated 那么会为5分钟。

func NewOptions() *Options {
    o := &Options{
        SecureServing:  apiserveroptions.NewSecureServingOptions().WithLoopback(),
        Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
        Authorization:  apiserveroptions.NewDelegatingAuthorizationOptions(),
        Deprecated: &DeprecatedOptions{
            PodMaxInUnschedulablePodsDuration: 5 * time.Minute,
        },
Enter fullscreen mode Exit fullscreen mode

对于 flushUnschedulablePodsLeftover 就是做一个时间对比,然后添加到对应的队列中

func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
    p.lock.Lock()
    defer p.lock.Unlock()

    var podsToMove []*framework.QueuedPodInfo
    currentTime := p.clock.Now()
    for _, pInfo := range p.unschedulablePods.podInfoMap {
        lastScheduleTime := pInfo.Timestamp
        if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
            podsToMove = append(podsToMove, pInfo)
        }
    }

    if len(podsToMove) > 0 {
        p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
    }
}
Enter fullscreen mode Exit fullscreen mode

总结调度上下文流程

  • 在构建一个 scheduler 时经历如下步骤:
    • 准备cache,informer,queue,错误处理函数等
    • 添加事件函数,会监听资源(如Pod),当有变动则触发对应事件函数,这是入站 activeQ
  • 构建完成后会 run,run时会run一个 SchedulingQueue,这个是作为不可调度队列
    • BackoffQ
    • UnschedulablePods
    • 不可调度队列会根据注册时定期消费队列中Pod将其添加到 activeQ
  • 启动一个 scheduleOne 的loop,这个是调度上下文中所有的扩展点的执行,也是 activeQ 的消费端
    • scheduleOne 获取 pod
    • 执行各个扩展点,如果出错则 Error 函数 MakeDefaultErrorFunc 将其添加到不可调度队列中
    • 回到不可调度队列中消费部分

Reference

[1] kubernetes scheduler extender
[2] scheduling framework
[3] Extension points

Sentry image

Hands-on debugging session: instrument, monitor, and fix

Join Lazar for a hands-on session where you’ll build it, break it, debug it, and fix it. You’ll set up Sentry, track errors, use Session Replay and Tracing, and leverage some good ol’ AI to find and fix issues fast.

RSVP here →

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay