X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bfb9e29c250bcfb34a6b1813ca46953503ca05e6..ae4225e48cace794723d508f4478e96024cb7344:/lib/dispatchcloud/scheduler/run_queue.go diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 1e5ac2e046..8f4c2e083d 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -22,7 +22,19 @@ func (sch *Scheduler) runQueue() { sorted = append(sorted, ent) } sort.Slice(sorted, func(i, j int) bool { - if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj { + ilocked := sorted[i].Container.State == arvados.ContainerStateLocked + jlocked := sorted[j].Container.State == arvados.ContainerStateLocked + if ilocked != jlocked { + // Give precedence to containers that we have + // already locked, even if higher-priority + // containers have since arrived in the + // queue. This avoids undesirable queue churn + // effects including extra lock/unlock cycles + // and bringing up new instances and quickly + // shutting them down to make room for + // different instance sizes. + return ilocked + } else if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj { return pi > pj } else { // When containers have identical priority, @@ -76,7 +88,8 @@ func (sch *Scheduler) runQueue() { }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} - var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota + var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota + var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota) var containerAllocatedWorkerBootingCount int // trying is #containers running + #containers we're trying to @@ -84,13 +97,22 @@ func (sch *Scheduler) runQueue() { // reaches the dynamic maxConcurrency limit. trying := len(running) + supervisors := 0 + tryrun: - for i, ctr := range sorted { - ctr, it := ctr.Container, ctr.InstanceType + for i, ent := range sorted { + ctr, it := ent.Container, ent.InstanceType logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, "InstanceType": it.Name, }) + if ctr.SchedulingParameters.Supervisor { + supervisors += 1 + if sch.maxSupervisors > 0 && supervisors > sch.maxSupervisors { + overmaxsuper = append(overmaxsuper, sorted[i]) + continue + } + } if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { continue } @@ -165,19 +187,19 @@ tryrun: } sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount)) - sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota))) + sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper))) - if len(overquota) > 0 { + if len(overquota)+len(overmaxsuper) > 0 { // Unlock any containers that are unmappable while // we're at quota (but if they have already been // scheduled and they're loading docker images etc., // let them run). - for _, ctr := range overquota { + for _, ctr := range append(overmaxsuper, overquota...) { ctr := ctr.Container _, toolate := running[ctr.UUID] if ctr.State == arvados.ContainerStateLocked && !toolate { logger := sch.logger.WithField("ContainerUUID", ctr.UUID) - logger.Debug("unlock because pool capacity is used by higher priority containers") + logger.Info("unlock because pool capacity is used by higher priority containers") err := sch.queue.Unlock(ctr.UUID) if err != nil { logger.WithError(err).Warn("error unlocking")