package scheduler
import (
+ "fmt"
"sort"
"time"
var quietAfter503 = time.Minute
+type QueueEnt struct {
+ container.QueueEnt
+
+ // Human-readable scheduling status as of the last scheduling
+ // iteration.
+ SchedulingStatus string `json:"scheduling_status"`
+}
+
+// Queue returns the sorted queue from the last scheduling iteration.
+func (sch *Scheduler) Queue() []QueueEnt {
+ ents, _ := sch.lastQueue.Load().([]QueueEnt)
+ return ents
+}
+
func (sch *Scheduler) runQueue() {
running := sch.pool.Running()
unalloc := sch.pool.Unallocated()
}
unsorted, _ := sch.queue.Entries()
- sorted := make([]container.QueueEnt, 0, len(unsorted))
+ sorted := make([]QueueEnt, 0, len(unsorted))
for _, ent := range unsorted {
- sorted = append(sorted, ent)
+ sorted = append(sorted, QueueEnt{QueueEnt: ent})
}
sort.Slice(sorted, func(i, j int) bool {
_, irunning := running[sorted[i].Container.UUID]
}).Debug("runQueue")
dontstart := map[arvados.InstanceType]bool{}
- var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation
- var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
- var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota)
+ var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation
+ var overquota []QueueEnt // entries that are unmappable because of worker pool quota
+ var overmaxsuper []QueueEnt // unmappable because max supervisors (these are not included in overquota)
var containerAllocatedWorkerBootingCount int
// trying is #containers running + #containers we're trying to
// reaches the dynamic maxConcurrency limit.
trying := len(running)
+ qpos := 0
supervisors := 0
tryrun:
})
if ctr.SchedulingParameters.Supervisor {
supervisors += 1
- if maxSupervisors > 0 && supervisors > maxSupervisors {
- overmaxsuper = append(overmaxsuper, sorted[i])
- continue
+ }
+ if _, running := running[ctr.UUID]; running {
+ if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
+ sorted[i].SchedulingStatus = "preparing runtime environment"
}
+ continue
}
- if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
+ if ctr.Priority < 1 {
+ sorted[i].SchedulingStatus = "not scheduling: priority 0, state " + string(ctr.State)
+ continue
+ }
+ if ctr.SchedulingParameters.Supervisor && maxSupervisors > 0 && supervisors > maxSupervisors {
+ overmaxsuper = append(overmaxsuper, sorted[i])
+ sorted[i].SchedulingStatus = "not starting: supervisor container limit has been reached"
continue
}
// If we have unalloc instances of any of the eligible
}
trying++
if !unallocOK && sch.pool.AtQuota() {
- logger.Trace("not locking: AtQuota and no unalloc workers")
+ logger.Trace("not starting: AtQuota and no unalloc workers")
overquota = sorted[i:]
break tryrun
}
// same instance type. Don't let this
// one sneak in ahead of it.
} else if sch.pool.KillContainer(ctr.UUID, "about to start") {
+ sorted[i].SchedulingStatus = "waiting for previous attempt to exit"
logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
} else if sch.pool.StartContainer(unallocType, ctr) {
+ sorted[i].SchedulingStatus = "preparing runtime environment"
logger.Trace("StartContainer => true")
} else {
+ sorted[i].SchedulingStatus = "waiting for new instance to be ready"
logger.Trace("StartContainer => false")
containerAllocatedWorkerBootingCount += 1
dontstart[unallocType] = true
// container A on the next call to
// runQueue(), rather than run
// container B now.
+ qpos++
+ sorted[i].SchedulingStatus = fmt.Sprintf("waiting for suitable instance type to become available: queue position %d", qpos)
logger.Trace("all eligible types at capacity")
continue
}
// asynchronously and does its own logging
// about the eventual outcome, so we don't
// need to.)
+ sorted[i].SchedulingStatus = "waiting for new instance to be ready"
logger.Info("creating new instance")
// Don't bother trying to start the container
// yet -- obviously the instance will take
sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper)))
+ var qreason string
+ if sch.pool.AtQuota() {
+ qreason = "waiting for cloud resources"
+ } else {
+ qreason = "waiting while cluster is running at capacity"
+ }
+ for i, ent := range sorted {
+ if ent.SchedulingStatus == "" && (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
+ qpos++
+ sorted[i].SchedulingStatus = fmt.Sprintf("%s: queue position %d", qreason, qpos)
+ }
+ }
+ sch.lastQueue.Store(sorted)
+
if len(overquota)+len(overmaxsuper) > 0 {
// Unlock any containers that are unmappable while
// we're at quota (but if they have already been
// scheduled and they're loading docker images etc.,
// let them run).
- var unlock []container.QueueEnt
+ var unlock []QueueEnt
unlock = append(unlock, overmaxsuper...)
if totalInstances > 0 && len(overquota) > 1 {
// We don't unlock the next-in-line container