X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e4d51d4e25376c44d200eea30a6d2b90d073a0bf..a73295e64f58fe027b995e0cca3d103d4e2289ff:/lib/dispatchcloud/scheduler/run_queue.go diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 6a717bf444..2f1f175890 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -149,6 +149,7 @@ func (sch *Scheduler) runQueue() { }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} + var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota) var containerAllocatedWorkerBootingCount int @@ -162,10 +163,9 @@ func (sch *Scheduler) runQueue() { tryrun: for i, ent := range sorted { - ctr, it := ent.Container, ent.InstanceType + ctr, types := ent.Container, ent.InstanceTypes logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, - "InstanceType": it.Name, }) if ctr.SchedulingParameters.Supervisor { supervisors += 1 @@ -177,6 +177,35 @@ tryrun: if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { continue } + // If we have unalloc instances of any of the eligible + // instance types, unallocOK is true and unallocType + // is the lowest-cost type. + var unallocOK bool + var unallocType arvados.InstanceType + for _, it := range types { + if unalloc[it] > 0 { + unallocOK = true + unallocType = it + break + } + } + // If the pool is not reporting AtCapacity for any of + // the eligible instance types, availableOK is true + // and availableType is the lowest-cost type. + var availableOK bool + var availableType arvados.InstanceType + for _, it := range types { + if atcapacity[it.ProviderType] { + continue + } else if sch.pool.AtCapacity(it) { + atcapacity[it.ProviderType] = true + continue + } else { + availableOK = true + availableType = it + break + } + } switch ctr.State { case arvados.ContainerStateQueued: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { @@ -184,26 +213,50 @@ tryrun: continue } trying++ - if unalloc[it] < 1 && sch.pool.AtQuota() { + if !unallocOK && sch.pool.AtQuota() { logger.Trace("not locking: AtQuota and no unalloc workers") overquota = sorted[i:] break tryrun } + if !unallocOK && !availableOK { + logger.Trace("not locking: AtCapacity and no unalloc workers") + continue + } if sch.pool.KillContainer(ctr.UUID, "about to lock") { logger.Info("not locking: crunch-run process from previous attempt has not exited") continue } go sch.lockContainer(logger, ctr.UUID) - unalloc[it]-- + unalloc[unallocType]-- case arvados.ContainerStateLocked: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency) continue } trying++ - if unalloc[it] > 0 { - unalloc[it]-- - } else if sch.pool.AtQuota() { + if unallocOK { + // We have a suitable instance type, + // so mark it as allocated, and try to + // start the container. + unalloc[unallocType]-- + logger = logger.WithField("InstanceType", unallocType) + if dontstart[unallocType] { + // We already tried & failed to start + // a higher-priority container on the + // same instance type. Don't let this + // one sneak in ahead of it. + } else if sch.pool.KillContainer(ctr.UUID, "about to start") { + logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") + } else if sch.pool.StartContainer(unallocType, ctr) { + logger.Trace("StartContainer => true") + } else { + logger.Trace("StartContainer => false") + containerAllocatedWorkerBootingCount += 1 + dontstart[unallocType] = true + } + continue + } + if sch.pool.AtQuota() { // Don't let lower-priority containers // starve this one by using keeping // idle workers alive on different @@ -211,37 +264,41 @@ tryrun: logger.Trace("overquota") overquota = sorted[i:] break tryrun - } else if sch.pool.Create(it) { - // Success. (Note pool.Create works - // asynchronously and does its own - // logging about the eventual outcome, - // so we don't need to.) - logger.Info("creating new instance") - } else { + } + if !availableOK { + // Continue trying lower-priority + // containers in case they can run on + // different instance types that are + // available. + // + // The local "atcapacity" cache helps + // when the pool's flag resets after + // we look at container A but before + // we look at lower-priority container + // B. In that case we want to run + // container A on the next call to + // runQueue(), rather than run + // container B now. + logger.Trace("all eligible types at capacity") + continue + } + logger = logger.WithField("InstanceType", availableType) + if !sch.pool.Create(availableType) { // Failed despite not being at quota, - // e.g., cloud ops throttled. TODO: - // avoid getting starved here if - // instances of a specific type always - // fail. + // e.g., cloud ops throttled. logger.Trace("pool declined to create new instance") continue } - - if dontstart[it] { - // We already tried & failed to start - // a higher-priority container on the - // same instance type. Don't let this - // one sneak in ahead of it. - } else if sch.pool.KillContainer(ctr.UUID, "about to start") { - logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") - } else if sch.pool.StartContainer(it, ctr) { - logger.Trace("StartContainer => true") - // Success. - } else { - logger.Trace("StartContainer => false") - containerAllocatedWorkerBootingCount += 1 - dontstart[it] = true - } + // Success. (Note pool.Create works + // asynchronously and does its own logging + // about the eventual outcome, so we don't + // need to.) + logger.Info("creating new instance") + // Don't bother trying to start the container + // yet -- obviously the instance will take + // some time to boot and become ready. + containerAllocatedWorkerBootingCount += 1 + dontstart[availableType] = true } }