21123: Add container status API to cloud dispatcher.
[arvados.git] / lib / dispatchcloud / scheduler / run_queue.go
index 03fa592777e6fa7c09eb57031bb19c3bdeb80029..2f4bce8987b86418ab3389db45073fbccc2e576f 100644 (file)
@@ -5,6 +5,7 @@
 package scheduler
 
 import (
+       "fmt"
        "sort"
        "time"
 
@@ -15,6 +16,20 @@ import (
 
 var quietAfter503 = time.Minute
 
+type QueueEnt struct {
+       container.QueueEnt
+
+       // Human-readable scheduling status as of the last scheduling
+       // iteration.
+       SchedulingStatus string `json:"scheduling_status"`
+}
+
+// Queue returns the sorted queue from the last scheduling iteration.
+func (sch *Scheduler) Queue() []QueueEnt {
+       ents, _ := sch.lastQueue.Load().([]QueueEnt)
+       return ents
+}
+
 func (sch *Scheduler) runQueue() {
        running := sch.pool.Running()
        unalloc := sch.pool.Unallocated()
@@ -25,9 +40,9 @@ func (sch *Scheduler) runQueue() {
        }
 
        unsorted, _ := sch.queue.Entries()
-       sorted := make([]container.QueueEnt, 0, len(unsorted))
+       sorted := make([]QueueEnt, 0, len(unsorted))
        for _, ent := range unsorted {
-               sorted = append(sorted, ent)
+               sorted = append(sorted, QueueEnt{QueueEnt: ent})
        }
        sort.Slice(sorted, func(i, j int) bool {
                _, irunning := running[sorted[i].Container.UUID]
@@ -149,9 +164,9 @@ func (sch *Scheduler) runQueue() {
        }).Debug("runQueue")
 
        dontstart := map[arvados.InstanceType]bool{}
-       var atcapacity = map[string]bool{}    // ProviderTypes reported as AtCapacity during this runQueue() invocation
-       var overquota []container.QueueEnt    // entries that are unmappable because of worker pool quota
-       var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota)
+       var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation
+       var overquota []QueueEnt           // entries that are unmappable because of worker pool quota
+       var overmaxsuper []QueueEnt        // unmappable because max supervisors (these are not included in overquota)
        var containerAllocatedWorkerBootingCount int
 
        // trying is #containers running + #containers we're trying to
@@ -159,6 +174,7 @@ func (sch *Scheduler) runQueue() {
        // reaches the dynamic maxConcurrency limit.
        trying := len(running)
 
+       qpos := 0
        supervisors := 0
 
 tryrun:
@@ -169,12 +185,20 @@ tryrun:
                })
                if ctr.SchedulingParameters.Supervisor {
                        supervisors += 1
-                       if maxSupervisors > 0 && supervisors > maxSupervisors {
-                               overmaxsuper = append(overmaxsuper, sorted[i])
-                               continue
+               }
+               if _, running := running[ctr.UUID]; running {
+                       if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
+                               sorted[i].SchedulingStatus = "preparing runtime environment"
                        }
+                       continue
                }
-               if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
+               if ctr.Priority < 1 {
+                       sorted[i].SchedulingStatus = "not scheduling: priority 0, state " + string(ctr.State)
+                       continue
+               }
+               if ctr.SchedulingParameters.Supervisor && maxSupervisors > 0 && supervisors > maxSupervisors {
+                       overmaxsuper = append(overmaxsuper, sorted[i])
+                       sorted[i].SchedulingStatus = "not starting: supervisor container limit has been reached"
                        continue
                }
                // If we have unalloc instances of any of the eligible
@@ -214,7 +238,7 @@ tryrun:
                        }
                        trying++
                        if !unallocOK && sch.pool.AtQuota() {
-                               logger.Trace("not locking: AtQuota and no unalloc workers")
+                               logger.Trace("not starting: AtQuota and no unalloc workers")
                                overquota = sorted[i:]
                                break tryrun
                        }
@@ -246,10 +270,13 @@ tryrun:
                                        // same instance type. Don't let this
                                        // one sneak in ahead of it.
                                } else if sch.pool.KillContainer(ctr.UUID, "about to start") {
+                                       sorted[i].SchedulingStatus = "waiting for previous attempt to exit"
                                        logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
                                } else if sch.pool.StartContainer(unallocType, ctr) {
+                                       sorted[i].SchedulingStatus = "preparing runtime environment"
                                        logger.Trace("StartContainer => true")
                                } else {
+                                       sorted[i].SchedulingStatus = "waiting for new instance to be ready"
                                        logger.Trace("StartContainer => false")
                                        containerAllocatedWorkerBootingCount += 1
                                        dontstart[unallocType] = true
@@ -279,6 +306,8 @@ tryrun:
                                // container A on the next call to
                                // runQueue(), rather than run
                                // container B now.
+                               qpos++
+                               sorted[i].SchedulingStatus = fmt.Sprintf("waiting for suitable instance type to become available: queue position %d", qpos)
                                logger.Trace("all eligible types at capacity")
                                continue
                        }
@@ -293,6 +322,7 @@ tryrun:
                        // asynchronously and does its own logging
                        // about the eventual outcome, so we don't
                        // need to.)
+                       sorted[i].SchedulingStatus = "waiting for new instance to be ready"
                        logger.Info("creating new instance")
                        // Don't bother trying to start the container
                        // yet -- obviously the instance will take
@@ -305,12 +335,26 @@ tryrun:
        sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
        sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper)))
 
+       var qreason string
+       if sch.pool.AtQuota() {
+               qreason = "waiting for cloud resources"
+       } else {
+               qreason = "waiting while cluster is running at capacity"
+       }
+       for i, ent := range sorted {
+               if ent.SchedulingStatus == "" && (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
+                       qpos++
+                       sorted[i].SchedulingStatus = fmt.Sprintf("%s: queue position %d", qreason, qpos)
+               }
+       }
+       sch.lastQueue.Store(sorted)
+
        if len(overquota)+len(overmaxsuper) > 0 {
                // Unlock any containers that are unmappable while
                // we're at quota (but if they have already been
                // scheduled and they're loading docker images etc.,
                // let them run).
-               var unlock []container.QueueEnt
+               var unlock []QueueEnt
                unlock = append(unlock, overmaxsuper...)
                if totalInstances > 0 && len(overquota) > 1 {
                        // We don't unlock the next-in-line container