X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dee20da037920692508446dc5f10d71ee3ab1524..cb68d4e34688abd308d7adffc288c82a5deb6c85:/lib/dispatchcloud/scheduler/run_queue.go diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index f729f0dc23..d270972295 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -5,6 +5,7 @@ package scheduler import ( + "fmt" "sort" "time" @@ -13,14 +14,71 @@ import ( "github.com/sirupsen/logrus" ) +var quietAfter503 = time.Minute + +type QueueEnt struct { + container.QueueEnt + + // Human-readable scheduling status as of the last scheduling + // iteration. + SchedulingStatus string `json:"scheduling_status"` +} + +const ( + schedStatusPreparingRuntimeEnvironment = "preparing runtime environment" + schedStatusPriorityZero = "not scheduling: priority 0" // ", state X" appended at runtime + schedStatusContainerLimitReached = "not starting: supervisor container limit has been reached" + schedStatusWaitingForPreviousAttempt = "waiting for previous attempt to exit" + schedStatusWaitingNewInstance = "waiting for new instance to be ready" + schedStatusWaitingInstanceType = "waiting for suitable instance type to become available" // ": queue position X" appended at runtime + schedStatusWaitingCloudResources = "waiting for cloud resources" + schedStatusWaitingClusterCapacity = "waiting while cluster is running at capacity" // ": queue position X" appended at runtime +) + +// Queue returns the sorted queue from the last scheduling iteration. +func (sch *Scheduler) Queue() []QueueEnt { + ents, _ := sch.lastQueue.Load().([]QueueEnt) + return ents +} + func (sch *Scheduler) runQueue() { + running := sch.pool.Running() + unalloc := sch.pool.Unallocated() + + totalInstances := 0 + for _, n := range sch.pool.CountWorkers() { + totalInstances += n + } + unsorted, _ := sch.queue.Entries() - sorted := make([]container.QueueEnt, 0, len(unsorted)) + sorted := make([]QueueEnt, 0, len(unsorted)) for _, ent := range unsorted { - sorted = append(sorted, ent) + sorted = append(sorted, QueueEnt{QueueEnt: ent}) } sort.Slice(sorted, func(i, j int) bool { - if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj { + _, irunning := running[sorted[i].Container.UUID] + _, jrunning := running[sorted[j].Container.UUID] + if irunning != jrunning { + // Ensure the "tryrun" loop (see below) sees + // already-scheduled containers first, to + // ensure existing supervisor containers are + // properly counted before we decide whether + // we have room for new ones. + return irunning + } + ilocked := sorted[i].Container.State == arvados.ContainerStateLocked + jlocked := sorted[j].Container.State == arvados.ContainerStateLocked + if ilocked != jlocked { + // Give precedence to containers that we have + // already locked, even if higher-priority + // containers have since arrived in the + // queue. This avoids undesirable queue churn + // effects including extra lock/unlock cycles + // and bringing up new instances and quickly + // shutting them down to make room for + // different instance sizes. + return ilocked + } else if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj { return pi > pj } else { // When containers have identical priority, @@ -32,45 +90,211 @@ func (sch *Scheduler) runQueue() { } }) - running := sch.pool.Running() - unalloc := sch.pool.Unallocated() + if t := sch.client.Last503(); t.After(sch.last503time) { + // API has sent an HTTP 503 response since last time + // we checked. Use current #containers - 1 as + // maxConcurrency, i.e., try to stay just below the + // level where we see 503s. + sch.last503time = t + if newlimit := len(running) - 1; newlimit < 1 { + sch.maxConcurrency = 1 + } else { + sch.maxConcurrency = newlimit + } + } else if sch.maxConcurrency > 0 && time.Since(sch.last503time) > quietAfter503 { + // If we haven't seen any 503 errors lately, raise + // limit to ~10% beyond the current workload. + // + // As we use the added 10% to schedule more + // containers, len(running) will increase and we'll + // push the limit up further. Soon enough, + // maxConcurrency will get high enough to schedule the + // entire queue, hit pool quota, or get 503s again. + max := len(running)*11/10 + 1 + if sch.maxConcurrency < max { + sch.maxConcurrency = max + } + } + if sch.last503time.IsZero() { + sch.mLast503Time.Set(0) + } else { + sch.mLast503Time.Set(float64(sch.last503time.Unix())) + } + if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances { + sch.maxConcurrency = sch.maxInstances + } + if sch.instancesWithinQuota > 0 && sch.instancesWithinQuota < totalInstances { + // Evidently it is possible to run this many + // instances, so raise our estimate. + sch.instancesWithinQuota = totalInstances + } + if sch.pool.AtQuota() { + // Consider current workload to be the maximum + // allowed, for the sake of reporting metrics and + // calculating max supervisors. + // + // Now that sch.maxConcurrency is set, we will only + // raise it past len(running) by 10%. This helps + // avoid running an inappropriate number of + // supervisors when we reach the cloud-imposed quota + // (which may be based on # CPUs etc) long before the + // configured MaxInstances. + if sch.maxConcurrency == 0 || sch.maxConcurrency > totalInstances { + if totalInstances == 0 { + sch.maxConcurrency = 1 + } else { + sch.maxConcurrency = totalInstances + } + } + sch.instancesWithinQuota = totalInstances + } else if sch.instancesWithinQuota > 0 && sch.maxConcurrency > sch.instancesWithinQuota+1 { + // Once we've hit a quota error and started tracking + // instancesWithinQuota (i.e., it's not zero), we + // avoid exceeding that known-working level by more + // than 1. + // + // If we don't do this, we risk entering a pattern of + // repeatedly locking several containers, hitting + // quota again, and unlocking them again each time the + // driver stops reporting AtQuota, which tends to use + // up the max lock/unlock cycles on the next few + // containers in the queue, and cause them to fail. + sch.maxConcurrency = sch.instancesWithinQuota + 1 + } + sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency)) + + maxSupervisors := int(float64(sch.maxConcurrency) * sch.supervisorFraction) + if maxSupervisors < 1 && sch.supervisorFraction > 0 && sch.maxConcurrency > 0 { + maxSupervisors = 1 + } sch.logger.WithFields(logrus.Fields{ - "Containers": len(sorted), - "Processes": len(running), + "Containers": len(sorted), + "Processes": len(running), + "maxConcurrency": sch.maxConcurrency, }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} - var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota + var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation + var overquota []QueueEnt // entries that are unmappable because of worker pool quota + var overmaxsuper []QueueEnt // unmappable because max supervisors (these are not included in overquota) var containerAllocatedWorkerBootingCount int + // trying is #containers running + #containers we're trying to + // start. We stop trying to start more containers if this + // reaches the dynamic maxConcurrency limit. + trying := len(running) + + qpos := 0 + supervisors := 0 + tryrun: - for i, ctr := range sorted { - ctr, it := ctr.Container, ctr.InstanceType + for i, ent := range sorted { + ctr, types := ent.Container, ent.InstanceTypes logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, - "InstanceType": it.Name, }) - if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { + if ctr.SchedulingParameters.Supervisor { + supervisors += 1 + } + if _, running := running[ctr.UUID]; running { + if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked { + sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment + } + continue + } + if ctr.Priority < 1 { + sorted[i].SchedulingStatus = schedStatusPriorityZero + ", state " + string(ctr.State) continue } + if ctr.SchedulingParameters.Supervisor && maxSupervisors > 0 && supervisors > maxSupervisors { + overmaxsuper = append(overmaxsuper, sorted[i]) + sorted[i].SchedulingStatus = schedStatusContainerLimitReached + continue + } + // If we have unalloc instances of any of the eligible + // instance types, unallocOK is true and unallocType + // is the lowest-cost type. + var unallocOK bool + var unallocType arvados.InstanceType + for _, it := range types { + if unalloc[it] > 0 { + unallocOK = true + unallocType = it + break + } + } + // If the pool is not reporting AtCapacity for any of + // the eligible instance types, availableOK is true + // and availableType is the lowest-cost type. + var availableOK bool + var availableType arvados.InstanceType + for _, it := range types { + if atcapacity[it.ProviderType] { + continue + } else if sch.pool.AtCapacity(it) { + atcapacity[it.ProviderType] = true + continue + } else { + availableOK = true + availableType = it + break + } + } switch ctr.State { case arvados.ContainerStateQueued: - if unalloc[it] < 1 && sch.pool.AtQuota() { - logger.Debug("not locking: AtQuota and no unalloc workers") + if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { + logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency) + continue + } + trying++ + if !unallocOK && sch.pool.AtQuota() { + logger.Trace("not starting: AtQuota and no unalloc workers") overquota = sorted[i:] break tryrun } + if !unallocOK && !availableOK { + logger.Trace("not locking: AtCapacity and no unalloc workers") + continue + } if sch.pool.KillContainer(ctr.UUID, "about to lock") { logger.Info("not locking: crunch-run process from previous attempt has not exited") continue } go sch.lockContainer(logger, ctr.UUID) - unalloc[it]-- + unalloc[unallocType]-- case arvados.ContainerStateLocked: - if unalloc[it] > 0 { - unalloc[it]-- - } else if sch.pool.AtQuota() { + if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { + logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency) + continue + } + trying++ + if unallocOK { + // We have a suitable instance type, + // so mark it as allocated, and try to + // start the container. + unalloc[unallocType]-- + logger = logger.WithField("InstanceType", unallocType.Name) + if dontstart[unallocType] { + // We already tried & failed to start + // a higher-priority container on the + // same instance type. Don't let this + // one sneak in ahead of it. + } else if sch.pool.KillContainer(ctr.UUID, "about to start") { + sorted[i].SchedulingStatus = schedStatusWaitingForPreviousAttempt + logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") + } else if sch.pool.StartContainer(unallocType, ctr) { + sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment + logger.Trace("StartContainer => true") + } else { + sorted[i].SchedulingStatus = schedStatusWaitingNewInstance + logger.Trace("StartContainer => false") + containerAllocatedWorkerBootingCount += 1 + dontstart[unallocType] = true + } + continue + } + if sch.pool.AtQuota() { // Don't let lower-priority containers // starve this one by using keeping // idle workers alive on different @@ -78,55 +302,104 @@ tryrun: logger.Trace("overquota") overquota = sorted[i:] break tryrun - } else if sch.pool.Create(it) { - // Success. (Note pool.Create works - // asynchronously and does its own - // logging about the eventual outcome, - // so we don't need to.) - logger.Info("creating new instance") - } else { + } + if !availableOK { + // Continue trying lower-priority + // containers in case they can run on + // different instance types that are + // available. + // + // The local "atcapacity" cache helps + // when the pool's flag resets after + // we look at container A but before + // we look at lower-priority container + // B. In that case we want to run + // container A on the next call to + // runQueue(), rather than run + // container B now. + qpos++ + sorted[i].SchedulingStatus = schedStatusWaitingInstanceType + fmt.Sprintf(": queue position %d", qpos) + logger.Trace("all eligible types at capacity") + continue + } + logger = logger.WithField("InstanceType", availableType.Name) + if !sch.pool.Create(availableType) { // Failed despite not being at quota, - // e.g., cloud ops throttled. TODO: - // avoid getting starved here if - // instances of a specific type always - // fail. + // e.g., cloud ops throttled. logger.Trace("pool declined to create new instance") continue } - - if dontstart[it] { - // We already tried & failed to start - // a higher-priority container on the - // same instance type. Don't let this - // one sneak in ahead of it. - } else if sch.pool.KillContainer(ctr.UUID, "about to start") { - logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") - } else if sch.pool.StartContainer(it, ctr) { - // Success. - } else { - containerAllocatedWorkerBootingCount += 1 - dontstart[it] = true - } + // Success. (Note pool.Create works + // asynchronously and does its own logging + // about the eventual outcome, so we don't + // need to.) + sorted[i].SchedulingStatus = schedStatusWaitingNewInstance + logger.Info("creating new instance") + // Don't bother trying to start the container + // yet -- obviously the instance will take + // some time to boot and become ready. + containerAllocatedWorkerBootingCount += 1 + dontstart[availableType] = true } } sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount)) - sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota))) + sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper))) - if len(overquota) > 0 { + var qreason string + if sch.pool.AtQuota() { + qreason = schedStatusWaitingCloudResources + } else { + qreason = schedStatusWaitingClusterCapacity + } + for i, ent := range sorted { + if ent.SchedulingStatus == "" && (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) { + qpos++ + sorted[i].SchedulingStatus = fmt.Sprintf("%s: queue position %d", qreason, qpos) + } + } + sch.lastQueue.Store(sorted) + + if len(overquota)+len(overmaxsuper) > 0 { // Unlock any containers that are unmappable while - // we're at quota. - for _, ctr := range overquota { + // we're at quota (but if they have already been + // scheduled and they're loading docker images etc., + // let them run). + var unlock []QueueEnt + unlock = append(unlock, overmaxsuper...) + if totalInstances > 0 && len(overquota) > 1 { + // We don't unlock the next-in-line container + // when at quota. This avoids a situation + // where our "at quota" state expires, we lock + // the next container and try to create an + // instance, the cloud provider still returns + // a quota error, we unlock the container, and + // we repeat this until the container reaches + // its limit of lock/unlock cycles. + unlock = append(unlock, overquota[1:]...) + } else { + // However, if totalInstances is 0 and we're + // still getting quota errors, then the + // next-in-line container is evidently not + // possible to run, so we should let it + // exhaust its lock/unlock cycles and + // eventually cancel, to avoid starvation. + unlock = append(unlock, overquota...) + } + for _, ctr := range unlock { ctr := ctr.Container - if ctr.State == arvados.ContainerStateLocked { + _, toolate := running[ctr.UUID] + if ctr.State == arvados.ContainerStateLocked && !toolate { logger := sch.logger.WithField("ContainerUUID", ctr.UUID) - logger.Debug("unlock because pool capacity is used by higher priority containers") + logger.Info("unlock because pool capacity is used by higher priority containers") err := sch.queue.Unlock(ctr.UUID) if err != nil { logger.WithError(err).Warn("error unlocking") } } } + } + if len(overquota) > 0 { // Shut down idle workers that didn't get any // containers mapped onto them before we hit quota. for it, n := range unalloc {