X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9f1850a385ee8e0a011474de19ee6507b0b168f3..cb68d4e34688abd308d7adffc288c82a5deb6c85:/lib/dispatchcloud/scheduler/run_queue.go diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 2f1f175890..d270972295 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -5,6 +5,7 @@ package scheduler import ( + "fmt" "sort" "time" @@ -15,6 +16,31 @@ import ( var quietAfter503 = time.Minute +type QueueEnt struct { + container.QueueEnt + + // Human-readable scheduling status as of the last scheduling + // iteration. + SchedulingStatus string `json:"scheduling_status"` +} + +const ( + schedStatusPreparingRuntimeEnvironment = "preparing runtime environment" + schedStatusPriorityZero = "not scheduling: priority 0" // ", state X" appended at runtime + schedStatusContainerLimitReached = "not starting: supervisor container limit has been reached" + schedStatusWaitingForPreviousAttempt = "waiting for previous attempt to exit" + schedStatusWaitingNewInstance = "waiting for new instance to be ready" + schedStatusWaitingInstanceType = "waiting for suitable instance type to become available" // ": queue position X" appended at runtime + schedStatusWaitingCloudResources = "waiting for cloud resources" + schedStatusWaitingClusterCapacity = "waiting while cluster is running at capacity" // ": queue position X" appended at runtime +) + +// Queue returns the sorted queue from the last scheduling iteration. +func (sch *Scheduler) Queue() []QueueEnt { + ents, _ := sch.lastQueue.Load().([]QueueEnt) + return ents +} + func (sch *Scheduler) runQueue() { running := sch.pool.Running() unalloc := sch.pool.Unallocated() @@ -25,9 +51,9 @@ func (sch *Scheduler) runQueue() { } unsorted, _ := sch.queue.Entries() - sorted := make([]container.QueueEnt, 0, len(unsorted)) + sorted := make([]QueueEnt, 0, len(unsorted)) for _, ent := range unsorted { - sorted = append(sorted, ent) + sorted = append(sorted, QueueEnt{QueueEnt: ent}) } sort.Slice(sorted, func(i, j int) bool { _, irunning := running[sorted[i].Container.UUID] @@ -149,9 +175,9 @@ func (sch *Scheduler) runQueue() { }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} - var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation - var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota - var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota) + var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation + var overquota []QueueEnt // entries that are unmappable because of worker pool quota + var overmaxsuper []QueueEnt // unmappable because max supervisors (these are not included in overquota) var containerAllocatedWorkerBootingCount int // trying is #containers running + #containers we're trying to @@ -159,6 +185,7 @@ func (sch *Scheduler) runQueue() { // reaches the dynamic maxConcurrency limit. trying := len(running) + qpos := 0 supervisors := 0 tryrun: @@ -169,12 +196,20 @@ tryrun: }) if ctr.SchedulingParameters.Supervisor { supervisors += 1 - if maxSupervisors > 0 && supervisors > maxSupervisors { - overmaxsuper = append(overmaxsuper, sorted[i]) - continue + } + if _, running := running[ctr.UUID]; running { + if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked { + sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment } + continue } - if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { + if ctr.Priority < 1 { + sorted[i].SchedulingStatus = schedStatusPriorityZero + ", state " + string(ctr.State) + continue + } + if ctr.SchedulingParameters.Supervisor && maxSupervisors > 0 && supervisors > maxSupervisors { + overmaxsuper = append(overmaxsuper, sorted[i]) + sorted[i].SchedulingStatus = schedStatusContainerLimitReached continue } // If we have unalloc instances of any of the eligible @@ -214,7 +249,7 @@ tryrun: } trying++ if !unallocOK && sch.pool.AtQuota() { - logger.Trace("not locking: AtQuota and no unalloc workers") + logger.Trace("not starting: AtQuota and no unalloc workers") overquota = sorted[i:] break tryrun } @@ -239,17 +274,20 @@ tryrun: // so mark it as allocated, and try to // start the container. unalloc[unallocType]-- - logger = logger.WithField("InstanceType", unallocType) + logger = logger.WithField("InstanceType", unallocType.Name) if dontstart[unallocType] { // We already tried & failed to start // a higher-priority container on the // same instance type. Don't let this // one sneak in ahead of it. } else if sch.pool.KillContainer(ctr.UUID, "about to start") { + sorted[i].SchedulingStatus = schedStatusWaitingForPreviousAttempt logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") } else if sch.pool.StartContainer(unallocType, ctr) { + sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment logger.Trace("StartContainer => true") } else { + sorted[i].SchedulingStatus = schedStatusWaitingNewInstance logger.Trace("StartContainer => false") containerAllocatedWorkerBootingCount += 1 dontstart[unallocType] = true @@ -279,10 +317,12 @@ tryrun: // container A on the next call to // runQueue(), rather than run // container B now. + qpos++ + sorted[i].SchedulingStatus = schedStatusWaitingInstanceType + fmt.Sprintf(": queue position %d", qpos) logger.Trace("all eligible types at capacity") continue } - logger = logger.WithField("InstanceType", availableType) + logger = logger.WithField("InstanceType", availableType.Name) if !sch.pool.Create(availableType) { // Failed despite not being at quota, // e.g., cloud ops throttled. @@ -293,6 +333,7 @@ tryrun: // asynchronously and does its own logging // about the eventual outcome, so we don't // need to.) + sorted[i].SchedulingStatus = schedStatusWaitingNewInstance logger.Info("creating new instance") // Don't bother trying to start the container // yet -- obviously the instance will take @@ -305,12 +346,26 @@ tryrun: sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount)) sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper))) + var qreason string + if sch.pool.AtQuota() { + qreason = schedStatusWaitingCloudResources + } else { + qreason = schedStatusWaitingClusterCapacity + } + for i, ent := range sorted { + if ent.SchedulingStatus == "" && (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) { + qpos++ + sorted[i].SchedulingStatus = fmt.Sprintf("%s: queue position %d", qreason, qpos) + } + } + sch.lastQueue.Store(sorted) + if len(overquota)+len(overmaxsuper) > 0 { // Unlock any containers that are unmappable while // we're at quota (but if they have already been // scheduled and they're loading docker images etc., // let them run). - var unlock []container.QueueEnt + var unlock []QueueEnt unlock = append(unlock, overmaxsuper...) if totalInstances > 0 && len(overquota) > 1 { // We don't unlock the next-in-line container