001.调度器学习基础概览
1. 资源调度基础
scheudler是kubernetes中的核心组件,负责为用户声明的pod资源选择合适的node,同时保证集群资源的最大化利用,这里先介绍下资源调度系统设计里面的一些基础概念
1.1 基础任务资源调度
基础的任务资源调度通常包括三部分:
角色类型 | 功能 |
---|---|
node | node负责具体任务的执行,同时对包汇报自己拥有的资源 |
resource manager | 汇总当前集群中所有node提供的资源,供上层的scheduler的调用获取,同时根据node汇报的任务信息来进行当前集群资源的更新 |
scheduler | 结合当前集群的资源和用户提交的任务信息,选择合适的node节点当前的资源,分配节点任务,尽可能保证任务的运行 |
通用的调度框架往往还会包含一个上层的集群管理器,负责针对集群中scheduler的管理和资源分配工作,同时负责scheduler集群状态甚至resource manager的保存
1.2 资源调度设计的挑战
1.2.1 资源:集群资源利用的最大化与平均
传统的IDC集群资源利用:<br />在IDC环境中我们通常希望机器利用率能够平均,让机器保持在某个平均利用率,然后根据资源的需要预留足够的buffer, 来应对集群的资源利用高峰,毕竟采购通常都有周期,我们既不能让机器空着,也不能让他跑满(业务无法弹性)<br />云环境下的资源利用:
1.2.2 调度: 任务最少等待时间与优先级
在集群任务繁忙的时候,可能会导致集群资源部足以分配给当前集群中的所有任务,在让所有任务都能够尽快完成的同时,我们还要保证高优先级的任务优先被完成
1.2.3 调度: 任务本地性
本地性是指在大数据处理中常用的一种机制,其核心是尽可能将任务分配到包含其任务执行资源的节点上,避免数据的复制
1.2.4 集群: 高可用性
在调度过程中可能由于硬件、系统或者软件导致任务的不可用,通常会由需要一些高可用机制,来保证当前集群不会因为部分节点宕机而导致整个系统不可用
1.2.5 系统: 可扩展性
扩展机制主要是指的,系统如何如何应对业务需求的变化,提供的一种可扩展机制,在集群默认调度策略不满足业务需求时,通过扩展接口,来进行系统的扩展满足业务需求
1.3 Pod调度场景的挑战
Pod调度场景其实可以看做一类特殊的任务,除了上面资源调度的挑战,还有一些针对pod调度这个具体的场景(有些是共同的,这里通过pod来描述会比较清晰)
1.3.1 亲和与反亲和
在kubernetes中的亲和性主要体现pod和node两种资源,主要体现在两个方面:
1.亲和性:
1)pod之间的亲和性
2)pod与node之间的亲和性
2.反亲和:
1)pod之间的反亲和性
2)pod与node之间的反亲和
简单举例:1.pod之间的反亲和: 为了保证高可用我们通常会将同一业务的多个节点分散在不通的数据中心和机架
2.pod与node亲和性: 比如某些需要磁盘io操作的pod,我们可以调度到具有ssd的机器上,提高IO性能
1.3.2 多租户与容量规划
多租户通常是为了进行集群资源的隔离,在业务系统中,通常会按照业务线来进行资源的隔离,同时会给业务设定对应的容量,从而避免单个业务线资源的过度使用影响整个公司的所有业务
1.3.3 Zone与node选择
zone通常是在业务容灾中常见的概念,通过将服务分散在多个数据中心,避免因为单个数据中心故障导致业务完全不可用
因为之前亲和性的问题,如何在多个zone中的所有node中选择出一个合适的节点,则是一个比较大的挑战
1.3.4 多样化资源的扩展
系统资源除了cpu、内存还包括网络、磁盘io、gpu等等,针对其余资源的分配调度,kubernetes还需要提供额外的扩展机制来进行调度扩展的支持
1.3.5 资源混部
kubernetes初期是针对pod调度场景而生,主要其实是在线web业务,这类任务的特点大部分都是无状态的,那如何针对离线场景的去支持离线的批处理计算等任务
2. kubernetes中的调度初识
2.1 中心化数据集中存储
2.1.1 中心化的数据存储
kubernetes是一个数据中心化存储的系统,集群中的所有数据都通过apiserver存储到etcd中,包括node节点的资源信息、节点上面的pod信息、当前集群的所有pod信息,在这里其实apiserver也充当了resource manager的角色,存储所有的集群资源和已经分配的资源
2.1.2 调度数据的存储与获取
kubernetes中采用了一种list watch的机制,用于集群中其他节点从apiserver感知数据,scheduler也采用该机制,通过在感知apiserver的数据变化,同时在本地memory中构建一份cache数据(资源数据),来提供调度使用,即SchedulerCache
2.1.3 scheduler的高可用
大多数系统的高可用机制都是通过类似zookeeper、etcd等AP系统实现,通过临时节点或者锁机制机制来实现多个节点的竞争,从而在主节点宕机时,能够快速接管, scheduler自然也是这种机制,通过apiserver底层的etcd来实现锁的竞争,然后通过apiserver的数据,就可以保证调度器的高可用
2.2 调度器内部组成
2.2.1 调度队列
当从apiserver感知到要调度的pod的时候,scheduler会根据pod的优先级,来讲其加入到内部的一个优先级队列中,后续调度的时候,会先获取优先级比较高的pod来进行优先满足调度
这里还有一个点就是如果优先调度了优先级比较低的pod,其实在后续的抢占过程中,也会被驱逐出去。
2.2.2 调度与抢占调度
前面提到过抢占,kubernetes默认会对所有的pod来尝试进行调度,当集群资源部满足的时候,则会尝试抢占调度,通过抢占调度,为高优先级的pod来进行优先调度 其核心都是通过调度算法实现即ScheduleAlgorithm
这里的调度算法实际上是一堆调度算法和调度配置的集合
2.2.3 外部扩展机制
scheduler extender是k8s对调度器的一种扩展机制,我们可以定义对应的extender,在对应资源的调度的时候,k8s会检查对应的资源,如果发现需要调用外部的extender,则将当前的调度数据发送给extender,然后汇总调度数据,决定最终的调度结果
2.2.4 内部扩展机制
上面提到调度算法是一组调度算法和调度配置的集合,kubernetes scheduler framework是则是一个框架声明对应插件的接口,从而支持用户编写自己的plugin,来影响调度决策,个人感觉这并不是一种好的机制,因为要修改代码,或者通过修改kubernetes scheduler启动来进行自定义插件的加载
2.3 调度基础架构
结合上面所说的就得到了一个最简单的架构,主要调度流程分为如下几部分:0.通过apiserver来进行主节点选举,成功者进行调度业务流程处理
1.通过apiserver感知集群的资源数据和pod数据,更新本地schedulerCache
2.通过apiserver感知用户或者controller的pod调度请求,加入本地调度队列
3.通过调度算法来进行pod请求的调度,分配合适的node节点,此过程可能会发生抢占调度
4.将调度结果返回给apiserver,然后由kubelet组件进行后续pod的请求处理
002.SchedulingQueue三级调度队列实现
SchedulingQueue是kubenrets scheduler中负责进行等待调度pod存储的对,Scheduler通过SchedulingQueue来获取当前系统中等待调度的Pod,本文主要讨论SchedulingQueue的设计与实现的各种实现, 了解探究其内部实现与底层源码,本系列代码基于kubernets1.1.6分析而来
1. SchedulingQueue设计
1.1 队列与优先级
1.1.1 队列与场景
类型 | 描述 | 通常实现 |
---|---|---|
队列 | 普通队列是一个FIFO的数据结构,根据元素入队的次序依次出队 | 数组或者链表 |
优先级队列 | 优先级队列通常是指根据某些优先级策略,高优先级会优先被获取 | 数组或者树 |
其实在大多数的调度场景中,大多都是采用优先级队列来实现,优先满足优先级比较高的任务或者需求,从而减少后续高优先级对低优先级的抢占,scheduler中也是如此
1.1.2 优先级的选择
k8s中调度的单元是Pod,scheduler中根据pod的优先级的高低来进行优先级队列的构建, 这个其实是在kubernets的adminission准入插件中,会为用户创建的pod根据用户的设置,进行优先级字段的计算
1.2 三级队列
1.2.1 活动队列
活动队列存储当前系统中所有正在等待调度的队列
1.2.2 不可调度队列
当pod的资源在当前集群中不能被满足时,则会被加入到一个不可调度队列中,然后等待稍后再进行尝试
1.2.3 backoff队列
backoff机制是并发编程中常见的一种机制,即如果任务反复执行依旧失败,则会按次增长等待调度时间,降低重试效率,从而避免反复失败浪费调度资源
针对调度失败的pod会优先存储在backoff队列中,等待后续重试
1.3 阻塞与抢占
1.3.1 阻塞设计
当队列中不存在等待调度的pod的时候,会阻塞scheduler等待有需要调度的pod的时候再唤醒调度器,获取pod进行调度
1.3.2 抢占相关
nominatedPods存储pod被提议运行的node,主要用于抢占调度流程中使用,本节先不分析
2. 源码分析
2.1 数据结构
kubernetes中默认的schedulingQueue实现是PriorityQueue,本章就以该数据结构来分析
type PriorityQueue struct {
stop <-chan struct{}
clock util.Clock
// 存储backoff的pod计时器
podBackoff *PodBackoffMap
lock sync.RWMutex
// 用于协调通知因为获取不到调度pod而阻塞的cond
cond sync.Cond
// 活动队列
activeQ *util.Heap
// backoff队列
podBackoffQ *util.Heap
// 不可调度队列
unschedulableQ *UnschedulablePodsMap
// 存储pod和被提名的node, 实际上就是存储pod和建议的node节点
nominatedPods *nominatedPodMap
// schedulingCycle是一个调度周期的递增序号,当pod pop的时候会递增
schedulingCycle int64
// moveRequestCycle缓存schedulingCycle, 当未调度的pod重新被添加到activeQueue中
// 会保存schedulingCycle到moveRequestCycle中
moveRequestCycle int64
closed bool
}
PriorityQueue作为实现SchedulingQueue的实现,其核心数据结构主要包含三个队列:activeQ、podBackoffQ、unscheduleQ内部通过cond来实现Pop操作的阻塞与通知,接下来先分析核心的调度流程,最后再分析util.Heap里面的具体实现
2.1.1 activeQ
存储所有等待调度的Pod的队列,默认是基于堆来实现,其中元素的优先级则通过对比pod的创建时间和pod的优先级来进行排序
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
activeQ *util.Heap
优先级比较函数
// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodInfo.timestamp.
func activeQComp(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*framework.PodInfo)
prio1 := util.GetPodPriority(pInfo1.Pod)
prio2 := util.GetPodPriority(pInfo2.Pod)
// 首先根据优先级的高低进行比较,然后根据pod的创建时间,越高优先级的Pod越被优先调度
// 越早创建的pod越优先
return (prio1 > prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}
2.1.2 podbackOffQ
podBackOffQ主要存储那些在多个schedulingCycle中依旧调度失败的情况下,则会通过之前说的backOff机制,延迟等待调度的时间
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
podBackoffQ *util.Heap
2.1.3 podBackOff
上面提到podBackOffQ队列中并没有存储pod的backOff的具体信息,比如backoff的计数器,最后一次更新的时间等,podBackOff则类似一个记分板,记录这些信息,供podBackOffQ使用
// podBackoff tracks backoff for pods attempting to be rescheduled
podBackoff *PodBackoffMap
// PodBackoffMap is a structure that stores backoff related information for pods
type PodBackoffMap struct {
// lock for performing actions on this PodBackoffMap
lock sync.RWMutex
// initial backoff duration
initialDuration time.Duration // 当前值是1秒
// maximal backoff duration
maxDuration time.Duration // 当前值是1分钟
// map for pod -> number of attempts for this pod
podAttempts map[ktypes.NamespacedName]int
// map for pod -> lastUpdateTime pod of this pod
podLastUpdateTime map[ktypes.NamespacedName]time.Time
}
2.1.4 unschedulableQ
存储已经尝试调度但是当前集群资源不满足的pod的队列
2.1.5 moveRequestCycle
当因为集群资源发生变化会尝试进行unschedulableQ中的pod转移到activeQ,moveRequestCycle就是存储资源变更时的schedulingCycle
func (p *PriorityQueue) MoveAllToActiveQueue() {
// 省略其他代码
p.moveRequestCycle = p.schedulingCycle
}
2.1.6 schedulingCycle
schedulingCycle是一个递增的序列每次从activeQ中pop出一个pod都会递增
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
//省略其他
p.schedulingCycle++
}
2.2 并发活动队列
2.2.1 并发从活动队列中获取pod
SchedulingQueue提供了一个Pop接口用于从获取当前集群中等待调度的pod,其内部实现主要通过上面cond与activeQ来实现
当前队列中没有可调度的pod的时候,则通过cond.Wait来进行阻塞,然后在忘activeQ中添加pod的时候通过cond.Broadcast来实现通知
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
p.lock.Lock()
defer p.lock.Unlock()
for p.activeQ.Len() == 0 {
if p.closed {
return nil, fmt.Errorf(queueClosed)
}
//
p.cond.Wait()
}
obj, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
pInfo := obj.(*framework.PodInfo)
p.schedulingCycle++
return pInfo.Pod, err
}
2.2.2 加入调度pod到活动队列
当pod加入活动队列中,除了加入activeQ的优先级队列中,还需要从podBackoffQ和unschedulableQ中移除当前的pod,最后进行广播通知阻塞在Pop操作的scheudler进行最新pod的获取
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
pInfo := p.newPodInfo(pod)
// 加入activeQ
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
return err
}
// 从unschedulableQ删除
if p.unschedulableQ.get(pod) != nil {
klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
p.unschedulableQ.delete(pod)
}
// Delete pod from backoffQ if it is backing off
// 从podBackoffQ删除
if err := p.podBackoffQ.Delete(pInfo); err == nil {
klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
}
// 存储pod和被提名的node
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
return nil
}
2.3 schedulingCycle与moveRequestCycle
2.3.1 未调度的队列的及时重试
导致调度周期schedulingCyclye变更主要因素如下:
1.当集群资源发生变化的时候:比如新添加pv、node等资源,那之前在unschedulableQ中因为资源不满足需求的pod就可以进行放入activeQ中或者podBackoffQ中,及时进行调度
2.pod被成功调度: 之前由于亲和性不满足被放入到unschedulableQ中的pod,此时也可以进行尝试,而不必等到超时之后,再加入
这两种情况下会分别触发MoveAllToActiveQueue和movePodsToActiveQueue变更moveRequestCycle使其等于schedulingCycle
2.3.2 对重试机制的影响
当前一个pod失败的时候,有两种选择一是加入podBackoffQ中,二是加入unschedulableQ中,那么针对一个失败的pod如何选择该进入那个队列中呢结合上面的moveRequestCycle变更时机,什么时候moveRequestCycle会大于等于podSchedulingCycle呢?答案就是当前集群中进行过集群资源的变更或者pod被成功分配,那这个时候我们如果重试一个失败的调度则可能会成功,因为集群资源变更了可能有新的资源加入, 如果发生调度器重启的情况下,则会出现moveRequestCycle大于当前的podSchedulingCycle则当前应该对之前已经失败的pod进行重试,而不是加入到unschedulable中
if p.moveRequestCycle >= podSchedulingCycle {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
p.unschedulableQ.addOrUpdate(pInfo)
}
2.4 失败处理逻辑的注入
2.4.1 注入调度失败逻辑处理
在创建scheduler Config的时候会通过MakeDefaultErrorFunc注入一个失败处理函数, 在scheduler调度的时候会进行调用kubernetes/pkg/scheduler/factory/factory.go: MakeDefaultErrorFunc会将没有调度到任何一个node的pod重新放回到优先级队列中
podSchedulingCycle := podQueue.SchedulingCycle()
// 省略非核心代码
if len(pod.Spec.NodeName) == 0 {
//重新放回队列
if err := podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle); err != nil {
klog.Error(err)
}
}
2.4.2 失败处理的回调
当调度pod的失败的时候, scheduler会同时调用sched.Error就是上面注入的失败处理逻辑,来将调度失败未分配node的pod节点重新加入到队里钟kubernetes/pkg/scheduler/scheduler.go
func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
// 错误回调
sched.Error(pod, err)
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
}); err != nil {
klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err)
}
}
2.5 PodBackoffMap
PodBackoffMap主要用于存储pod的最后一次失败的更新时间与实现次数,从而根据这些数据来进行pod的backoffTime的计算
2.5.1 数据结构设计
type PodBackoffMap struct {
// lock for performing actions on this PodBackoffMap
lock sync.RWMutex
// 初始化 backoff duration
initialDuration time.Duration // 当前值是1秒
// 最大 backoff duration
maxDuration time.Duration // 当前值是1分钟
// 记录pod重试的次数
podAttempts map[ktypes.NamespacedName]int
// 记录pod的最后一次的更新时间
podLastUpdateTime map[ktypes.NamespacedName]time.Time
}
2.5.2 backoffTime计算算法
初始化的时候回设定initialDuration和maxDuration,在当前版本中分别是1s和10s,也就是backoffQ中的pod最长10s就会重新加入activeQ中(需要等待定时任务进行辅助)
在每次失败回调的时候,都会进行BackoffPod方法来进行计数更新,在后续获取pod的backoffTime的时候,只需要获取次数然后结合initialDuration进行算法计算,结合pod最后一次的更新时间,就会获取pod的backoffTime的终止时间2.5.3 backoffDuration计算
其实最终的计算很简单就是2的N次幂
func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {
// initialDuration是1s
backoffDuration := pbm.initialDuration
if _, found := pbm.podAttempts[nsPod]; found {
// podAttempts里面包含pod的尝试失败的次数
for i := 1; i < pbm.podAttempts[nsPod]; i++ {
backoffDuration = backoffDuration * 2
// 最大10s
if backoffDuration > pbm.maxDuration {
return pbm.maxDuration
}
}
}
return backoffDuration
}
2.6 podBackoffQ
2.6.1 优先级函数
podBackoffQ实际上会根据pod的backoffTime来进行优先级排序,所以podBackoffQ的队列头部,就是最近一个要过期的pod
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*framework.PodInfo)
bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod))
bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod))
return bo1.Before(bo2)
}
2.6.2 调度失败加入到podBackoffQ
如果调度失败,并且moveRequestCycle=podSchedulingCycle的时候就加入podBackfoffQ中
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
// 省略检查性代码
// 更新pod的backoff 信息
p.backoffPod(pod)
// moveRequestCycle将pod从unscheduledQ大于pod的调度周期添加到 如果pod的调度周期小于当前的调度周期
if p.moveRequestCycle >= podSchedulingCycle {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
p.unschedulableQ.addOrUpdate(pInfo)
}
p.nominatedPods.add(pod, "")
return nil
}
2.6.3 从unschedulableQ迁移
在前面介绍的当集群资源发生变更的时候,会触发尝试unschedulabelQ中的pod进行转移,如果发现当前pod还未到达backoffTime,就加入到podBackoffQ中
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
} else {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
}
2.6.4 podBackoffQ定时转移
在创建PriorityQueue的时候,会创建两个定时任务其中一个就是讲backoffQ中的pod到期后的转移,每秒钟尝试一次
func (p *PriorityQueue) run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
因为是一个堆结果,所以只需要获取堆顶的元素,然后确定是否到期,如果到期后则进行pop处来,加入到activeQ中
func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()
for {
// 获取堆顶元素
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
return
}
pod := rawPodInfo.(*framework.PodInfo).Pod
// 获取到期时间
boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
if !found {
// 如果当前已经不在podBackoff中,则就pop出来然后放入到activeQ
klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
p.podBackoffQ.Pop()
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
continue
}
// 未超时
if boTime.After(p.clock.Now()) {
return
}
// 超时就pop出来
_, err := p.podBackoffQ.Pop()
if err != nil {
klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
return
}
// 加入到activeQ中
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
}
}
2.7 unschedulableQ
2.7.1 调度失败
调度失败后,如果当前集群资源没有发生变更,就加入到unschedulable,原因上面说过
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
// 省略检查性代码
// 更新pod的backoff 信息
p.backoffPod(pod)
// moveRequestCycle将pod从unscheduledQ大于pod的调度周期添加到 如果pod的调度周期小于当前的调度周期
if p.moveRequestCycle >= podSchedulingCycle {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
p.unschedulableQ.addOrUpdate(pInfo)
}
p.nominatedPods.add(pod, "")
return nil
}
2.7.2 定时转移任务
定时任务每30秒执行一次
func (p *PriorityQueue) run() {
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
逻辑其实就非常简单如果当前时间-pod的最后调度时间大于60s,就重新调度,转移到podBackoffQ或者activeQ中
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*framework.PodInfo
currentTime := p.clock.Now()
for _, pInfo := range p.unschedulableQ.podInfoMap {
lastScheduleTime := pInfo.Timestamp
// 如果该pod1分钟内没有被调度就加入到podsToMove
if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pInfo)
}
}
if len(podsToMove) > 0 {
// podsToMove将这些pod移动到activeQ
p.movePodsToActiveQueue(podsToMove)
}
}
3. 调度队列总结
3.1 数据流设计总结
3.1.1 三队列与后台定时任务
从设计上三队列分别存储:活动队列、bakcoff队列、不可调度队列,其中backoff中会根据任务的失败来逐步递增重试时间(最长10s)、unschedulableQ队列则延迟60s
通过后台定时任务分别将backoffQ队列、unschedulableQ队列来进行重试,加入到activeQ中,从而加快完成pod的失败重试调度
3.1.2 cycle与优先调度
schedulingCycle、moveRequestCycle两个cycle其实本质上也是为了加快失败任务的重试调度,当集群资源发生变化的时候,进行立即重试,那些失败的优先级比较高、亲和性问题的pod都可能会被优先调度
3.1.3 锁与cond实现线程安全pop
内部通过lock保证线程安全,并通过cond来实现阻塞等待,从而实现阻塞scheduler worker的通知。