20718: Cleans up by running "go mod tidy -compat=1.17"
[arvados.git] / lib / dispatchcloud / scheduler / run_queue.go
index db6e97b5961e2018965ecec27ccbef818a73ba06..63ac4a7b32122e68787bad1b71c1c5a988e97440 100644 (file)
@@ -16,12 +16,25 @@ import (
 var quietAfter503 = time.Minute
 
 func (sch *Scheduler) runQueue() {
+       running := sch.pool.Running()
+       unalloc := sch.pool.Unallocated()
+
        unsorted, _ := sch.queue.Entries()
        sorted := make([]container.QueueEnt, 0, len(unsorted))
        for _, ent := range unsorted {
                sorted = append(sorted, ent)
        }
        sort.Slice(sorted, func(i, j int) bool {
+               _, irunning := running[sorted[i].Container.UUID]
+               _, jrunning := running[sorted[j].Container.UUID]
+               if irunning != jrunning {
+                       // Ensure the "tryrun" loop (see below) sees
+                       // already-scheduled containers first, to
+                       // ensure existing supervisor containers are
+                       // properly counted before we decide whether
+                       // we have room for new ones.
+                       return irunning
+               }
                ilocked := sorted[i].Container.State == arvados.ContainerStateLocked
                jlocked := sorted[j].Container.State == arvados.ContainerStateLocked
                if ilocked != jlocked {
@@ -46,9 +59,6 @@ func (sch *Scheduler) runQueue() {
                }
        })
 
-       running := sch.pool.Running()
-       unalloc := sch.pool.Unallocated()
-
        if t := sch.client.Last503(); t.After(sch.last503time) {
                // API has sent an HTTP 503 response since last time
                // we checked. Use current #containers - 1 as
@@ -79,8 +89,16 @@ func (sch *Scheduler) runQueue() {
        } else {
                sch.mLast503Time.Set(float64(sch.last503time.Unix()))
        }
+       if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances {
+               sch.maxConcurrency = sch.maxInstances
+       }
        sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
 
+       maxSupervisors := int(float64(sch.maxConcurrency) * sch.supervisorFraction)
+       if maxSupervisors < 1 && sch.supervisorFraction > 0 && sch.maxConcurrency > 0 {
+               maxSupervisors = 1
+       }
+
        sch.logger.WithFields(logrus.Fields{
                "Containers":     len(sorted),
                "Processes":      len(running),
@@ -108,7 +126,7 @@ tryrun:
                })
                if ctr.SchedulingParameters.Supervisor {
                        supervisors += 1
-                       if sch.maxSupervisors > 0 && supervisors > sch.maxSupervisors {
+                       if maxSupervisors > 0 && supervisors > maxSupervisors {
                                overmaxsuper = append(overmaxsuper, sorted[i])
                                continue
                        }
@@ -204,6 +222,8 @@ tryrun:
                                }
                        }
                }
+       }
+       if len(overquota) > 0 {
                // Shut down idle workers that didn't get any
                // containers mapped onto them before we hit quota.
                for it, n := range unalloc {