X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bfb9e29c250bcfb34a6b1813ca46953503ca05e6..60542dcecbffe14a43098d9e68ee6ae05868df1e:/lib/dispatchcloud/scheduler/run_queue.go diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 1e5ac2e046..3505c3e064 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -16,13 +16,43 @@ import ( var quietAfter503 = time.Minute func (sch *Scheduler) runQueue() { + running := sch.pool.Running() + unalloc := sch.pool.Unallocated() + + totalInstances := 0 + for _, n := range sch.pool.CountWorkers() { + totalInstances += n + } + unsorted, _ := sch.queue.Entries() sorted := make([]container.QueueEnt, 0, len(unsorted)) for _, ent := range unsorted { sorted = append(sorted, ent) } sort.Slice(sorted, func(i, j int) bool { - if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj { + _, irunning := running[sorted[i].Container.UUID] + _, jrunning := running[sorted[j].Container.UUID] + if irunning != jrunning { + // Ensure the "tryrun" loop (see below) sees + // already-scheduled containers first, to + // ensure existing supervisor containers are + // properly counted before we decide whether + // we have room for new ones. + return irunning + } + ilocked := sorted[i].Container.State == arvados.ContainerStateLocked + jlocked := sorted[j].Container.State == arvados.ContainerStateLocked + if ilocked != jlocked { + // Give precedence to containers that we have + // already locked, even if higher-priority + // containers have since arrived in the + // queue. This avoids undesirable queue churn + // effects including extra lock/unlock cycles + // and bringing up new instances and quickly + // shutting them down to make room for + // different instance sizes. + return ilocked + } else if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj { return pi > pj } else { // When containers have identical priority, @@ -34,9 +64,6 @@ func (sch *Scheduler) runQueue() { } }) - running := sch.pool.Running() - unalloc := sch.pool.Unallocated() - if t := sch.client.Last503(); t.After(sch.last503time) { // API has sent an HTTP 503 response since last time // we checked. Use current #containers - 1 as @@ -67,8 +94,54 @@ func (sch *Scheduler) runQueue() { } else { sch.mLast503Time.Set(float64(sch.last503time.Unix())) } + if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances { + sch.maxConcurrency = sch.maxInstances + } + if sch.instancesWithinQuota > 0 && sch.instancesWithinQuota < totalInstances { + // Evidently it is possible to run this many + // instances, so raise our estimate. + sch.instancesWithinQuota = totalInstances + } + if sch.pool.AtQuota() { + // Consider current workload to be the maximum + // allowed, for the sake of reporting metrics and + // calculating max supervisors. + // + // Now that sch.maxConcurrency is set, we will only + // raise it past len(running) by 10%. This helps + // avoid running an inappropriate number of + // supervisors when we reach the cloud-imposed quota + // (which may be based on # CPUs etc) long before the + // configured MaxInstances. + if sch.maxConcurrency == 0 || sch.maxConcurrency > totalInstances { + if totalInstances == 0 { + sch.maxConcurrency = 1 + } else { + sch.maxConcurrency = totalInstances + } + } + sch.instancesWithinQuota = totalInstances + } else if sch.instancesWithinQuota > 0 && sch.maxConcurrency > sch.instancesWithinQuota+1 { + // Once we've hit a quota error and started tracking + // instancesWithinQuota (i.e., it's not zero), we + // avoid exceeding that known-working level by more + // than 1. + // + // If we don't do this, we risk entering a pattern of + // repeatedly locking several containers, hitting + // quota again, and unlocking them again each time the + // driver stops reporting AtQuota, which tends to use + // up the max lock/unlock cycles on the next few + // containers in the queue, and cause them to fail. + sch.maxConcurrency = sch.instancesWithinQuota + 1 + } sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency)) + maxSupervisors := int(float64(sch.maxConcurrency) * sch.supervisorFraction) + if maxSupervisors < 1 && sch.supervisorFraction > 0 && sch.maxConcurrency > 0 { + maxSupervisors = 1 + } + sch.logger.WithFields(logrus.Fields{ "Containers": len(sorted), "Processes": len(running), @@ -76,7 +149,9 @@ func (sch *Scheduler) runQueue() { }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} - var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota + var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation + var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota + var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota) var containerAllocatedWorkerBootingCount int // trying is #containers running + #containers we're trying to @@ -84,13 +159,22 @@ func (sch *Scheduler) runQueue() { // reaches the dynamic maxConcurrency limit. trying := len(running) + supervisors := 0 + tryrun: - for i, ctr := range sorted { - ctr, it := ctr.Container, ctr.InstanceType + for i, ent := range sorted { + ctr, it := ent.Container, ent.InstanceType logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, "InstanceType": it.Name, }) + if ctr.SchedulingParameters.Supervisor { + supervisors += 1 + if maxSupervisors > 0 && supervisors > maxSupervisors { + overmaxsuper = append(overmaxsuper, sorted[i]) + continue + } + } if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { continue } @@ -98,8 +182,7 @@ tryrun: case arvados.ContainerStateQueued: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency) - overquota = sorted[i:] - break tryrun + continue } trying++ if unalloc[it] < 1 && sch.pool.AtQuota() { @@ -107,6 +190,11 @@ tryrun: overquota = sorted[i:] break tryrun } + if unalloc[it] < 1 && (atcapacity[it.ProviderType] || sch.pool.AtCapacity(it)) { + logger.Trace("not locking: AtCapacity and no unalloc workers") + atcapacity[it.ProviderType] = true + continue + } if sch.pool.KillContainer(ctr.UUID, "about to lock") { logger.Info("not locking: crunch-run process from previous attempt has not exited") continue @@ -115,9 +203,8 @@ tryrun: unalloc[it]-- case arvados.ContainerStateLocked: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { - logger.Debugf("not starting: already at maxConcurrency %d", sch.maxConcurrency) - overquota = sorted[i:] - break tryrun + logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency) + continue } trying++ if unalloc[it] > 0 { @@ -130,6 +217,27 @@ tryrun: logger.Trace("overquota") overquota = sorted[i:] break tryrun + } else if atcapacity[it.ProviderType] || sch.pool.AtCapacity(it) { + // Continue trying lower-priority + // containers in case they can run on + // different instance types that are + // available. + // + // The local "atcapacity" cache helps + // when the pool's flag resets after + // we look at container A but before + // we look at lower-priority container + // B. In that case we want to run + // container A on the next call to + // runQueue(), rather than run + // container B now. + // + // TODO: try running this container on + // a bigger (but not much more + // expensive) instance type. + logger.WithField("InstanceType", it.Name).Trace("at capacity") + atcapacity[it.ProviderType] = true + continue } else if sch.pool.Create(it) { // Success. (Note pool.Create works // asynchronously and does its own @@ -138,10 +246,7 @@ tryrun: logger.Info("creating new instance") } else { // Failed despite not being at quota, - // e.g., cloud ops throttled. TODO: - // avoid getting starved here if - // instances of a specific type always - // fail. + // e.g., cloud ops throttled. logger.Trace("pool declined to create new instance") continue } @@ -165,25 +270,48 @@ tryrun: } sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount)) - sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota))) + sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper))) - if len(overquota) > 0 { + if len(overquota)+len(overmaxsuper) > 0 { // Unlock any containers that are unmappable while // we're at quota (but if they have already been // scheduled and they're loading docker images etc., // let them run). - for _, ctr := range overquota { + var unlock []container.QueueEnt + unlock = append(unlock, overmaxsuper...) + if totalInstances > 0 && len(overquota) > 1 { + // We don't unlock the next-in-line container + // when at quota. This avoids a situation + // where our "at quota" state expires, we lock + // the next container and try to create an + // instance, the cloud provider still returns + // a quota error, we unlock the container, and + // we repeat this until the container reaches + // its limit of lock/unlock cycles. + unlock = append(unlock, overquota[1:]...) + } else { + // However, if totalInstances is 0 and we're + // still getting quota errors, then the + // next-in-line container is evidently not + // possible to run, so we should let it + // exhaust its lock/unlock cycles and + // eventually cancel, to avoid starvation. + unlock = append(unlock, overquota...) + } + for _, ctr := range unlock { ctr := ctr.Container _, toolate := running[ctr.UUID] if ctr.State == arvados.ContainerStateLocked && !toolate { logger := sch.logger.WithField("ContainerUUID", ctr.UUID) - logger.Debug("unlock because pool capacity is used by higher priority containers") + logger.Info("unlock because pool capacity is used by higher priority containers") err := sch.queue.Unlock(ctr.UUID) if err != nil { logger.WithError(err).Warn("error unlocking") } } } + } + if len(overquota) > 0 { // Shut down idle workers that didn't get any // containers mapped onto them before we hit quota. for it, n := range unalloc {