X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7af506a0e9712ca22096ebd56df8867a427dae96..cb68d4e34688abd308d7adffc288c82a5deb6c85:/lib/dispatchcloud/scheduler/run_queue.go diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 6a717bf444..d270972295 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -5,6 +5,7 @@ package scheduler import ( + "fmt" "sort" "time" @@ -15,6 +16,31 @@ import ( var quietAfter503 = time.Minute +type QueueEnt struct { + container.QueueEnt + + // Human-readable scheduling status as of the last scheduling + // iteration. + SchedulingStatus string `json:"scheduling_status"` +} + +const ( + schedStatusPreparingRuntimeEnvironment = "preparing runtime environment" + schedStatusPriorityZero = "not scheduling: priority 0" // ", state X" appended at runtime + schedStatusContainerLimitReached = "not starting: supervisor container limit has been reached" + schedStatusWaitingForPreviousAttempt = "waiting for previous attempt to exit" + schedStatusWaitingNewInstance = "waiting for new instance to be ready" + schedStatusWaitingInstanceType = "waiting for suitable instance type to become available" // ": queue position X" appended at runtime + schedStatusWaitingCloudResources = "waiting for cloud resources" + schedStatusWaitingClusterCapacity = "waiting while cluster is running at capacity" // ": queue position X" appended at runtime +) + +// Queue returns the sorted queue from the last scheduling iteration. +func (sch *Scheduler) Queue() []QueueEnt { + ents, _ := sch.lastQueue.Load().([]QueueEnt) + return ents +} + func (sch *Scheduler) runQueue() { running := sch.pool.Running() unalloc := sch.pool.Unallocated() @@ -25,9 +51,9 @@ func (sch *Scheduler) runQueue() { } unsorted, _ := sch.queue.Entries() - sorted := make([]container.QueueEnt, 0, len(unsorted)) + sorted := make([]QueueEnt, 0, len(unsorted)) for _, ent := range unsorted { - sorted = append(sorted, ent) + sorted = append(sorted, QueueEnt{QueueEnt: ent}) } sort.Slice(sorted, func(i, j int) bool { _, irunning := running[sorted[i].Container.UUID] @@ -149,8 +175,9 @@ func (sch *Scheduler) runQueue() { }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} - var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota - var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota) + var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation + var overquota []QueueEnt // entries that are unmappable because of worker pool quota + var overmaxsuper []QueueEnt // unmappable because max supervisors (these are not included in overquota) var containerAllocatedWorkerBootingCount int // trying is #containers running + #containers we're trying to @@ -158,25 +185,62 @@ func (sch *Scheduler) runQueue() { // reaches the dynamic maxConcurrency limit. trying := len(running) + qpos := 0 supervisors := 0 tryrun: for i, ent := range sorted { - ctr, it := ent.Container, ent.InstanceType + ctr, types := ent.Container, ent.InstanceTypes logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, - "InstanceType": it.Name, }) if ctr.SchedulingParameters.Supervisor { supervisors += 1 - if maxSupervisors > 0 && supervisors > maxSupervisors { - overmaxsuper = append(overmaxsuper, sorted[i]) - continue + } + if _, running := running[ctr.UUID]; running { + if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked { + sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment } + continue } - if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { + if ctr.Priority < 1 { + sorted[i].SchedulingStatus = schedStatusPriorityZero + ", state " + string(ctr.State) continue } + if ctr.SchedulingParameters.Supervisor && maxSupervisors > 0 && supervisors > maxSupervisors { + overmaxsuper = append(overmaxsuper, sorted[i]) + sorted[i].SchedulingStatus = schedStatusContainerLimitReached + continue + } + // If we have unalloc instances of any of the eligible + // instance types, unallocOK is true and unallocType + // is the lowest-cost type. + var unallocOK bool + var unallocType arvados.InstanceType + for _, it := range types { + if unalloc[it] > 0 { + unallocOK = true + unallocType = it + break + } + } + // If the pool is not reporting AtCapacity for any of + // the eligible instance types, availableOK is true + // and availableType is the lowest-cost type. + var availableOK bool + var availableType arvados.InstanceType + for _, it := range types { + if atcapacity[it.ProviderType] { + continue + } else if sch.pool.AtCapacity(it) { + atcapacity[it.ProviderType] = true + continue + } else { + availableOK = true + availableType = it + break + } + } switch ctr.State { case arvados.ContainerStateQueued: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { @@ -184,26 +248,53 @@ tryrun: continue } trying++ - if unalloc[it] < 1 && sch.pool.AtQuota() { - logger.Trace("not locking: AtQuota and no unalloc workers") + if !unallocOK && sch.pool.AtQuota() { + logger.Trace("not starting: AtQuota and no unalloc workers") overquota = sorted[i:] break tryrun } + if !unallocOK && !availableOK { + logger.Trace("not locking: AtCapacity and no unalloc workers") + continue + } if sch.pool.KillContainer(ctr.UUID, "about to lock") { logger.Info("not locking: crunch-run process from previous attempt has not exited") continue } go sch.lockContainer(logger, ctr.UUID) - unalloc[it]-- + unalloc[unallocType]-- case arvados.ContainerStateLocked: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency) continue } trying++ - if unalloc[it] > 0 { - unalloc[it]-- - } else if sch.pool.AtQuota() { + if unallocOK { + // We have a suitable instance type, + // so mark it as allocated, and try to + // start the container. + unalloc[unallocType]-- + logger = logger.WithField("InstanceType", unallocType.Name) + if dontstart[unallocType] { + // We already tried & failed to start + // a higher-priority container on the + // same instance type. Don't let this + // one sneak in ahead of it. + } else if sch.pool.KillContainer(ctr.UUID, "about to start") { + sorted[i].SchedulingStatus = schedStatusWaitingForPreviousAttempt + logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") + } else if sch.pool.StartContainer(unallocType, ctr) { + sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment + logger.Trace("StartContainer => true") + } else { + sorted[i].SchedulingStatus = schedStatusWaitingNewInstance + logger.Trace("StartContainer => false") + containerAllocatedWorkerBootingCount += 1 + dontstart[unallocType] = true + } + continue + } + if sch.pool.AtQuota() { // Don't let lower-priority containers // starve this one by using keeping // idle workers alive on different @@ -211,49 +302,70 @@ tryrun: logger.Trace("overquota") overquota = sorted[i:] break tryrun - } else if sch.pool.Create(it) { - // Success. (Note pool.Create works - // asynchronously and does its own - // logging about the eventual outcome, - // so we don't need to.) - logger.Info("creating new instance") - } else { + } + if !availableOK { + // Continue trying lower-priority + // containers in case they can run on + // different instance types that are + // available. + // + // The local "atcapacity" cache helps + // when the pool's flag resets after + // we look at container A but before + // we look at lower-priority container + // B. In that case we want to run + // container A on the next call to + // runQueue(), rather than run + // container B now. + qpos++ + sorted[i].SchedulingStatus = schedStatusWaitingInstanceType + fmt.Sprintf(": queue position %d", qpos) + logger.Trace("all eligible types at capacity") + continue + } + logger = logger.WithField("InstanceType", availableType.Name) + if !sch.pool.Create(availableType) { // Failed despite not being at quota, - // e.g., cloud ops throttled. TODO: - // avoid getting starved here if - // instances of a specific type always - // fail. + // e.g., cloud ops throttled. logger.Trace("pool declined to create new instance") continue } - - if dontstart[it] { - // We already tried & failed to start - // a higher-priority container on the - // same instance type. Don't let this - // one sneak in ahead of it. - } else if sch.pool.KillContainer(ctr.UUID, "about to start") { - logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") - } else if sch.pool.StartContainer(it, ctr) { - logger.Trace("StartContainer => true") - // Success. - } else { - logger.Trace("StartContainer => false") - containerAllocatedWorkerBootingCount += 1 - dontstart[it] = true - } + // Success. (Note pool.Create works + // asynchronously and does its own logging + // about the eventual outcome, so we don't + // need to.) + sorted[i].SchedulingStatus = schedStatusWaitingNewInstance + logger.Info("creating new instance") + // Don't bother trying to start the container + // yet -- obviously the instance will take + // some time to boot and become ready. + containerAllocatedWorkerBootingCount += 1 + dontstart[availableType] = true } } sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount)) sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper))) + var qreason string + if sch.pool.AtQuota() { + qreason = schedStatusWaitingCloudResources + } else { + qreason = schedStatusWaitingClusterCapacity + } + for i, ent := range sorted { + if ent.SchedulingStatus == "" && (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) { + qpos++ + sorted[i].SchedulingStatus = fmt.Sprintf("%s: queue position %d", qreason, qpos) + } + } + sch.lastQueue.Store(sorted) + if len(overquota)+len(overmaxsuper) > 0 { // Unlock any containers that are unmappable while // we're at quota (but if they have already been // scheduled and they're loading docker images etc., // let them run). - var unlock []container.QueueEnt + var unlock []QueueEnt unlock = append(unlock, overmaxsuper...) if totalInstances > 0 && len(overquota) > 1 { // We don't unlock the next-in-line container