X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9121aabce004cf42f60c775eb191c93020b9be42..60542dcecbffe14a43098d9e68ee6ae05868df1e:/lib/dispatchcloud/scheduler/run_queue.go diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 0c4634d755..3505c3e064 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -19,6 +19,11 @@ func (sch *Scheduler) runQueue() { running := sch.pool.Running() unalloc := sch.pool.Unallocated() + totalInstances := 0 + for _, n := range sch.pool.CountWorkers() { + totalInstances += n + } + unsorted, _ := sch.queue.Entries() sorted := make([]container.QueueEnt, 0, len(unsorted)) for _, ent := range unsorted { @@ -92,7 +97,12 @@ func (sch *Scheduler) runQueue() { if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances { sch.maxConcurrency = sch.maxInstances } - if sch.pool.AtQuota() && len(running) > 0 && (sch.maxConcurrency == 0 || sch.maxConcurrency > len(running)) { + if sch.instancesWithinQuota > 0 && sch.instancesWithinQuota < totalInstances { + // Evidently it is possible to run this many + // instances, so raise our estimate. + sch.instancesWithinQuota = totalInstances + } + if sch.pool.AtQuota() { // Consider current workload to be the maximum // allowed, for the sake of reporting metrics and // calculating max supervisors. @@ -103,7 +113,27 @@ func (sch *Scheduler) runQueue() { // supervisors when we reach the cloud-imposed quota // (which may be based on # CPUs etc) long before the // configured MaxInstances. - sch.maxConcurrency = len(running) + if sch.maxConcurrency == 0 || sch.maxConcurrency > totalInstances { + if totalInstances == 0 { + sch.maxConcurrency = 1 + } else { + sch.maxConcurrency = totalInstances + } + } + sch.instancesWithinQuota = totalInstances + } else if sch.instancesWithinQuota > 0 && sch.maxConcurrency > sch.instancesWithinQuota+1 { + // Once we've hit a quota error and started tracking + // instancesWithinQuota (i.e., it's not zero), we + // avoid exceeding that known-working level by more + // than 1. + // + // If we don't do this, we risk entering a pattern of + // repeatedly locking several containers, hitting + // quota again, and unlocking them again each time the + // driver stops reporting AtQuota, which tends to use + // up the max lock/unlock cycles on the next few + // containers in the queue, and cause them to fail. + sch.maxConcurrency = sch.instancesWithinQuota + 1 } sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency)) @@ -119,6 +149,7 @@ func (sch *Scheduler) runQueue() { }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} + var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota) var containerAllocatedWorkerBootingCount int @@ -159,6 +190,11 @@ tryrun: overquota = sorted[i:] break tryrun } + if unalloc[it] < 1 && (atcapacity[it.ProviderType] || sch.pool.AtCapacity(it)) { + logger.Trace("not locking: AtCapacity and no unalloc workers") + atcapacity[it.ProviderType] = true + continue + } if sch.pool.KillContainer(ctr.UUID, "about to lock") { logger.Info("not locking: crunch-run process from previous attempt has not exited") continue @@ -181,6 +217,27 @@ tryrun: logger.Trace("overquota") overquota = sorted[i:] break tryrun + } else if atcapacity[it.ProviderType] || sch.pool.AtCapacity(it) { + // Continue trying lower-priority + // containers in case they can run on + // different instance types that are + // available. + // + // The local "atcapacity" cache helps + // when the pool's flag resets after + // we look at container A but before + // we look at lower-priority container + // B. In that case we want to run + // container A on the next call to + // runQueue(), rather than run + // container B now. + // + // TODO: try running this container on + // a bigger (but not much more + // expensive) instance type. + logger.WithField("InstanceType", it.Name).Trace("at capacity") + atcapacity[it.ProviderType] = true + continue } else if sch.pool.Create(it) { // Success. (Note pool.Create works // asynchronously and does its own @@ -189,10 +246,7 @@ tryrun: logger.Info("creating new instance") } else { // Failed despite not being at quota, - // e.g., cloud ops throttled. TODO: - // avoid getting starved here if - // instances of a specific type always - // fail. + // e.g., cloud ops throttled. logger.Trace("pool declined to create new instance") continue } @@ -223,7 +277,28 @@ tryrun: // we're at quota (but if they have already been // scheduled and they're loading docker images etc., // let them run). - for _, ctr := range append(overmaxsuper, overquota...) { + var unlock []container.QueueEnt + unlock = append(unlock, overmaxsuper...) + if totalInstances > 0 && len(overquota) > 1 { + // We don't unlock the next-in-line container + // when at quota. This avoids a situation + // where our "at quota" state expires, we lock + // the next container and try to create an + // instance, the cloud provider still returns + // a quota error, we unlock the container, and + // we repeat this until the container reaches + // its limit of lock/unlock cycles. + unlock = append(unlock, overquota[1:]...) + } else { + // However, if totalInstances is 0 and we're + // still getting quota errors, then the + // next-in-line container is evidently not + // possible to run, so we should let it + // exhaust its lock/unlock cycles and + // eventually cancel, to avoid starvation. + unlock = append(unlock, overquota...) + } + for _, ctr := range unlock { ctr := ctr.Container _, toolate := running[ctr.UUID] if ctr.State == arvados.ContainerStateLocked && !toolate {