X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/58e057254087f406ca4146da7031187487b0ddbc..a73295e64f58fe027b995e0cca3d103d4e2289ff:/lib/dispatchcloud/scheduler/run_queue.go diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index e6b1b06a2c..2f1f175890 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -16,13 +16,43 @@ import ( var quietAfter503 = time.Minute func (sch *Scheduler) runQueue() { + running := sch.pool.Running() + unalloc := sch.pool.Unallocated() + + totalInstances := 0 + for _, n := range sch.pool.CountWorkers() { + totalInstances += n + } + unsorted, _ := sch.queue.Entries() sorted := make([]container.QueueEnt, 0, len(unsorted)) for _, ent := range unsorted { sorted = append(sorted, ent) } sort.Slice(sorted, func(i, j int) bool { - if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj { + _, irunning := running[sorted[i].Container.UUID] + _, jrunning := running[sorted[j].Container.UUID] + if irunning != jrunning { + // Ensure the "tryrun" loop (see below) sees + // already-scheduled containers first, to + // ensure existing supervisor containers are + // properly counted before we decide whether + // we have room for new ones. + return irunning + } + ilocked := sorted[i].Container.State == arvados.ContainerStateLocked + jlocked := sorted[j].Container.State == arvados.ContainerStateLocked + if ilocked != jlocked { + // Give precedence to containers that we have + // already locked, even if higher-priority + // containers have since arrived in the + // queue. This avoids undesirable queue churn + // effects including extra lock/unlock cycles + // and bringing up new instances and quickly + // shutting them down to make room for + // different instance sizes. + return ilocked + } else if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj { return pi > pj } else { // When containers have identical priority, @@ -34,9 +64,6 @@ func (sch *Scheduler) runQueue() { } }) - running := sch.pool.Running() - unalloc := sch.pool.Unallocated() - if t := sch.client.Last503(); t.After(sch.last503time) { // API has sent an HTTP 503 response since last time // we checked. Use current #containers - 1 as @@ -67,8 +94,54 @@ func (sch *Scheduler) runQueue() { } else { sch.mLast503Time.Set(float64(sch.last503time.Unix())) } + if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances { + sch.maxConcurrency = sch.maxInstances + } + if sch.instancesWithinQuota > 0 && sch.instancesWithinQuota < totalInstances { + // Evidently it is possible to run this many + // instances, so raise our estimate. + sch.instancesWithinQuota = totalInstances + } + if sch.pool.AtQuota() { + // Consider current workload to be the maximum + // allowed, for the sake of reporting metrics and + // calculating max supervisors. + // + // Now that sch.maxConcurrency is set, we will only + // raise it past len(running) by 10%. This helps + // avoid running an inappropriate number of + // supervisors when we reach the cloud-imposed quota + // (which may be based on # CPUs etc) long before the + // configured MaxInstances. + if sch.maxConcurrency == 0 || sch.maxConcurrency > totalInstances { + if totalInstances == 0 { + sch.maxConcurrency = 1 + } else { + sch.maxConcurrency = totalInstances + } + } + sch.instancesWithinQuota = totalInstances + } else if sch.instancesWithinQuota > 0 && sch.maxConcurrency > sch.instancesWithinQuota+1 { + // Once we've hit a quota error and started tracking + // instancesWithinQuota (i.e., it's not zero), we + // avoid exceeding that known-working level by more + // than 1. + // + // If we don't do this, we risk entering a pattern of + // repeatedly locking several containers, hitting + // quota again, and unlocking them again each time the + // driver stops reporting AtQuota, which tends to use + // up the max lock/unlock cycles on the next few + // containers in the queue, and cause them to fail. + sch.maxConcurrency = sch.instancesWithinQuota + 1 + } sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency)) + maxSupervisors := int(float64(sch.maxConcurrency) * sch.supervisorFraction) + if maxSupervisors < 1 && sch.supervisorFraction > 0 && sch.maxConcurrency > 0 { + maxSupervisors = 1 + } + sch.logger.WithFields(logrus.Fields{ "Containers": len(sorted), "Processes": len(running), @@ -76,6 +149,7 @@ func (sch *Scheduler) runQueue() { }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} + var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota) var containerAllocatedWorkerBootingCount int @@ -89,14 +163,13 @@ func (sch *Scheduler) runQueue() { tryrun: for i, ent := range sorted { - ctr, it := ent.Container, ent.InstanceType + ctr, types := ent.Container, ent.InstanceTypes logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, - "InstanceType": it.Name, }) if ctr.SchedulingParameters.Supervisor { supervisors += 1 - if sch.maxSupervisors > 0 && supervisors > sch.maxSupervisors { + if maxSupervisors > 0 && supervisors > maxSupervisors { overmaxsuper = append(overmaxsuper, sorted[i]) continue } @@ -104,35 +177,86 @@ tryrun: if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { continue } + // If we have unalloc instances of any of the eligible + // instance types, unallocOK is true and unallocType + // is the lowest-cost type. + var unallocOK bool + var unallocType arvados.InstanceType + for _, it := range types { + if unalloc[it] > 0 { + unallocOK = true + unallocType = it + break + } + } + // If the pool is not reporting AtCapacity for any of + // the eligible instance types, availableOK is true + // and availableType is the lowest-cost type. + var availableOK bool + var availableType arvados.InstanceType + for _, it := range types { + if atcapacity[it.ProviderType] { + continue + } else if sch.pool.AtCapacity(it) { + atcapacity[it.ProviderType] = true + continue + } else { + availableOK = true + availableType = it + break + } + } switch ctr.State { case arvados.ContainerStateQueued: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency) - overquota = sorted[i:] - break tryrun + continue } trying++ - if unalloc[it] < 1 && sch.pool.AtQuota() { + if !unallocOK && sch.pool.AtQuota() { logger.Trace("not locking: AtQuota and no unalloc workers") overquota = sorted[i:] break tryrun } + if !unallocOK && !availableOK { + logger.Trace("not locking: AtCapacity and no unalloc workers") + continue + } if sch.pool.KillContainer(ctr.UUID, "about to lock") { logger.Info("not locking: crunch-run process from previous attempt has not exited") continue } go sch.lockContainer(logger, ctr.UUID) - unalloc[it]-- + unalloc[unallocType]-- case arvados.ContainerStateLocked: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { - logger.Debugf("not starting: already at maxConcurrency %d", sch.maxConcurrency) - overquota = sorted[i:] - break tryrun + logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency) + continue } trying++ - if unalloc[it] > 0 { - unalloc[it]-- - } else if sch.pool.AtQuota() { + if unallocOK { + // We have a suitable instance type, + // so mark it as allocated, and try to + // start the container. + unalloc[unallocType]-- + logger = logger.WithField("InstanceType", unallocType) + if dontstart[unallocType] { + // We already tried & failed to start + // a higher-priority container on the + // same instance type. Don't let this + // one sneak in ahead of it. + } else if sch.pool.KillContainer(ctr.UUID, "about to start") { + logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") + } else if sch.pool.StartContainer(unallocType, ctr) { + logger.Trace("StartContainer => true") + } else { + logger.Trace("StartContainer => false") + containerAllocatedWorkerBootingCount += 1 + dontstart[unallocType] = true + } + continue + } + if sch.pool.AtQuota() { // Don't let lower-priority containers // starve this one by using keeping // idle workers alive on different @@ -140,37 +264,41 @@ tryrun: logger.Trace("overquota") overquota = sorted[i:] break tryrun - } else if sch.pool.Create(it) { - // Success. (Note pool.Create works - // asynchronously and does its own - // logging about the eventual outcome, - // so we don't need to.) - logger.Info("creating new instance") - } else { + } + if !availableOK { + // Continue trying lower-priority + // containers in case they can run on + // different instance types that are + // available. + // + // The local "atcapacity" cache helps + // when the pool's flag resets after + // we look at container A but before + // we look at lower-priority container + // B. In that case we want to run + // container A on the next call to + // runQueue(), rather than run + // container B now. + logger.Trace("all eligible types at capacity") + continue + } + logger = logger.WithField("InstanceType", availableType) + if !sch.pool.Create(availableType) { // Failed despite not being at quota, - // e.g., cloud ops throttled. TODO: - // avoid getting starved here if - // instances of a specific type always - // fail. + // e.g., cloud ops throttled. logger.Trace("pool declined to create new instance") continue } - - if dontstart[it] { - // We already tried & failed to start - // a higher-priority container on the - // same instance type. Don't let this - // one sneak in ahead of it. - } else if sch.pool.KillContainer(ctr.UUID, "about to start") { - logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") - } else if sch.pool.StartContainer(it, ctr) { - logger.Trace("StartContainer => true") - // Success. - } else { - logger.Trace("StartContainer => false") - containerAllocatedWorkerBootingCount += 1 - dontstart[it] = true - } + // Success. (Note pool.Create works + // asynchronously and does its own logging + // about the eventual outcome, so we don't + // need to.) + logger.Info("creating new instance") + // Don't bother trying to start the container + // yet -- obviously the instance will take + // some time to boot and become ready. + containerAllocatedWorkerBootingCount += 1 + dontstart[availableType] = true } } @@ -182,7 +310,28 @@ tryrun: // we're at quota (but if they have already been // scheduled and they're loading docker images etc., // let them run). - for _, ctr := range append(overmaxsuper, overquota...) { + var unlock []container.QueueEnt + unlock = append(unlock, overmaxsuper...) + if totalInstances > 0 && len(overquota) > 1 { + // We don't unlock the next-in-line container + // when at quota. This avoids a situation + // where our "at quota" state expires, we lock + // the next container and try to create an + // instance, the cloud provider still returns + // a quota error, we unlock the container, and + // we repeat this until the container reaches + // its limit of lock/unlock cycles. + unlock = append(unlock, overquota[1:]...) + } else { + // However, if totalInstances is 0 and we're + // still getting quota errors, then the + // next-in-line container is evidently not + // possible to run, so we should let it + // exhaust its lock/unlock cycles and + // eventually cancel, to avoid starvation. + unlock = append(unlock, overquota...) + } + for _, ctr := range unlock { ctr := ctr.Container _, toolate := running[ctr.UUID] if ctr.State == arvados.ContainerStateLocked && !toolate { @@ -194,6 +343,8 @@ tryrun: } } } + } + if len(overquota) > 0 { // Shut down idle workers that didn't get any // containers mapped onto them before we hit quota. for it, n := range unalloc {