20718: Cleans up by running "go mod tidy -compat=1.17"
[arvados.git] / lib / dispatchcloud / scheduler / run_queue.go
index b8158579a3a3a1b30a0eccc80b421f8abe8bdecf..63ac4a7b32122e68787bad1b71c1c5a988e97440 100644 (file)
@@ -16,13 +16,38 @@ import (
 var quietAfter503 = time.Minute
 
 func (sch *Scheduler) runQueue() {
+       running := sch.pool.Running()
+       unalloc := sch.pool.Unallocated()
+
        unsorted, _ := sch.queue.Entries()
        sorted := make([]container.QueueEnt, 0, len(unsorted))
        for _, ent := range unsorted {
                sorted = append(sorted, ent)
        }
        sort.Slice(sorted, func(i, j int) bool {
-               if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj {
+               _, irunning := running[sorted[i].Container.UUID]
+               _, jrunning := running[sorted[j].Container.UUID]
+               if irunning != jrunning {
+                       // Ensure the "tryrun" loop (see below) sees
+                       // already-scheduled containers first, to
+                       // ensure existing supervisor containers are
+                       // properly counted before we decide whether
+                       // we have room for new ones.
+                       return irunning
+               }
+               ilocked := sorted[i].Container.State == arvados.ContainerStateLocked
+               jlocked := sorted[j].Container.State == arvados.ContainerStateLocked
+               if ilocked != jlocked {
+                       // Give precedence to containers that we have
+                       // already locked, even if higher-priority
+                       // containers have since arrived in the
+                       // queue. This avoids undesirable queue churn
+                       // effects including extra lock/unlock cycles
+                       // and bringing up new instances and quickly
+                       // shutting them down to make room for
+                       // different instance sizes.
+                       return ilocked
+               } else if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj {
                        return pi > pj
                } else {
                        // When containers have identical priority,
@@ -34,9 +59,6 @@ func (sch *Scheduler) runQueue() {
                }
        })
 
-       running := sch.pool.Running()
-       unalloc := sch.pool.Unallocated()
-
        if t := sch.client.Last503(); t.After(sch.last503time) {
                // API has sent an HTTP 503 response since last time
                // we checked. Use current #containers - 1 as
@@ -67,8 +89,16 @@ func (sch *Scheduler) runQueue() {
        } else {
                sch.mLast503Time.Set(float64(sch.last503time.Unix()))
        }
+       if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances {
+               sch.maxConcurrency = sch.maxInstances
+       }
        sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
 
+       maxSupervisors := int(float64(sch.maxConcurrency) * sch.supervisorFraction)
+       if maxSupervisors < 1 && sch.supervisorFraction > 0 && sch.maxConcurrency > 0 {
+               maxSupervisors = 1
+       }
+
        sch.logger.WithFields(logrus.Fields{
                "Containers":     len(sorted),
                "Processes":      len(running),
@@ -76,7 +106,8 @@ func (sch *Scheduler) runQueue() {
        }).Debug("runQueue")
 
        dontstart := map[arvados.InstanceType]bool{}
-       var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
+       var overquota []container.QueueEnt    // entries that are unmappable because of worker pool quota
+       var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota)
        var containerAllocatedWorkerBootingCount int
 
        // trying is #containers running + #containers we're trying to
@@ -87,15 +118,16 @@ func (sch *Scheduler) runQueue() {
        supervisors := 0
 
 tryrun:
-       for i, ctr := range sorted {
-               ctr, it := ctr.Container, ctr.InstanceType
+       for i, ent := range sorted {
+               ctr, it := ent.Container, ent.InstanceType
                logger := sch.logger.WithFields(logrus.Fields{
                        "ContainerUUID": ctr.UUID,
                        "InstanceType":  it.Name,
                })
                if ctr.SchedulingParameters.Supervisor {
                        supervisors += 1
-                       if sch.maxSupervisors > 0 && supervisors > sch.maxSupervisors {
+                       if maxSupervisors > 0 && supervisors > maxSupervisors {
+                               overmaxsuper = append(overmaxsuper, sorted[i])
                                continue
                        }
                }
@@ -106,8 +138,7 @@ tryrun:
                case arvados.ContainerStateQueued:
                        if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
                                logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency)
-                               overquota = sorted[i:]
-                               break tryrun
+                               continue
                        }
                        trying++
                        if unalloc[it] < 1 && sch.pool.AtQuota() {
@@ -123,9 +154,8 @@ tryrun:
                        unalloc[it]--
                case arvados.ContainerStateLocked:
                        if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
-                               logger.Debugf("not starting: already at maxConcurrency %d", sch.maxConcurrency)
-                               overquota = sorted[i:]
-                               break tryrun
+                               logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency)
+                               continue
                        }
                        trying++
                        if unalloc[it] > 0 {
@@ -173,25 +203,27 @@ tryrun:
        }
 
        sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
-       sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota)))
+       sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper)))
 
-       if len(overquota) > 0 {
+       if len(overquota)+len(overmaxsuper) > 0 {
                // Unlock any containers that are unmappable while
                // we're at quota (but if they have already been
                // scheduled and they're loading docker images etc.,
                // let them run).
-               for _, ctr := range overquota {
+               for _, ctr := range append(overmaxsuper, overquota...) {
                        ctr := ctr.Container
                        _, toolate := running[ctr.UUID]
                        if ctr.State == arvados.ContainerStateLocked && !toolate {
                                logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
-                               logger.Debug("unlock because pool capacity is used by higher priority containers")
+                               logger.Info("unlock because pool capacity is used by higher priority containers")
                                err := sch.queue.Unlock(ctr.UUID)
                                if err != nil {
                                        logger.WithError(err).Warn("error unlocking")
                                }
                        }
                }
+       }
+       if len(overquota) > 0 {
                // Shut down idle workers that didn't get any
                // containers mapped onto them before we hit quota.
                for it, n := range unalloc {