20457: Don't displace locked containers with queued containers.
[arvados.git] / lib / dispatchcloud / scheduler / run_queue.go
index 65ca904be0bce50e94cb41824ae43b872a364654..8f4c2e083da584a869c7b8c34bb8d69434ad1584 100644 (file)
@@ -15,10 +15,6 @@ import (
 
 var quietAfter503 = time.Minute
 
-func isSupervisor(ctr arvados.Container) bool {
-       return (len(ctr.Command) > 0 && ctr.Command[0] == "arvados-cwl-runner") || ctr.SchedulingParameters.Supervisor
-}
-
 func (sch *Scheduler) runQueue() {
        unsorted, _ := sch.queue.Entries()
        sorted := make([]container.QueueEnt, 0, len(unsorted))
@@ -26,7 +22,19 @@ func (sch *Scheduler) runQueue() {
                sorted = append(sorted, ent)
        }
        sort.Slice(sorted, func(i, j int) bool {
-               if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj {
+               ilocked := sorted[i].Container.State == arvados.ContainerStateLocked
+               jlocked := sorted[j].Container.State == arvados.ContainerStateLocked
+               if ilocked != jlocked {
+                       // Give precedence to containers that we have
+                       // already locked, even if higher-priority
+                       // containers have since arrived in the
+                       // queue. This avoids undesirable queue churn
+                       // effects including extra lock/unlock cycles
+                       // and bringing up new instances and quickly
+                       // shutting them down to make room for
+                       // different instance sizes.
+                       return ilocked
+               } else if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj {
                        return pi > pj
                } else {
                        // When containers have identical priority,
@@ -80,7 +88,8 @@ func (sch *Scheduler) runQueue() {
        }).Debug("runQueue")
 
        dontstart := map[arvados.InstanceType]bool{}
-       var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
+       var overquota []container.QueueEnt    // entries that are unmappable because of worker pool quota
+       var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota)
        var containerAllocatedWorkerBootingCount int
 
        // trying is #containers running + #containers we're trying to
@@ -91,15 +100,16 @@ func (sch *Scheduler) runQueue() {
        supervisors := 0
 
 tryrun:
-       for i, ctr := range sorted {
-               ctr, it := ctr.Container, ctr.InstanceType
+       for i, ent := range sorted {
+               ctr, it := ent.Container, ent.InstanceType
                logger := sch.logger.WithFields(logrus.Fields{
                        "ContainerUUID": ctr.UUID,
                        "InstanceType":  it.Name,
                })
-               if isSupervisor(ctr) {
+               if ctr.SchedulingParameters.Supervisor {
                        supervisors += 1
                        if sch.maxSupervisors > 0 && supervisors > sch.maxSupervisors {
+                               overmaxsuper = append(overmaxsuper, sorted[i])
                                continue
                        }
                }
@@ -177,19 +187,19 @@ tryrun:
        }
 
        sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
-       sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota)))
+       sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper)))
 
-       if len(overquota) > 0 {
+       if len(overquota)+len(overmaxsuper) > 0 {
                // Unlock any containers that are unmappable while
                // we're at quota (but if they have already been
                // scheduled and they're loading docker images etc.,
                // let them run).
-               for _, ctr := range overquota {
+               for _, ctr := range append(overmaxsuper, overquota...) {
                        ctr := ctr.Container
                        _, toolate := running[ctr.UUID]
                        if ctr.State == arvados.ContainerStateLocked && !toolate {
                                logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
-                               logger.Debug("unlock because pool capacity is used by higher priority containers")
+                               logger.Info("unlock because pool capacity is used by higher priority containers")
                                err := sch.queue.Unlock(ctr.UUID)
                                if err != nil {
                                        logger.WithError(err).Warn("error unlocking")