1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/lib/dispatchcloud/container"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "github.com/sirupsen/logrus"
18 var quietAfter503 = time.Minute
20 type QueueEnt struct {
23 // Human-readable scheduling status as of the last scheduling
25 SchedulingStatus string `json:"scheduling_status"`
29 schedStatusPreparingRuntimeEnvironment = "Container is allocated to an instance and preparing to run."
30 schedStatusPriorityZero = "This container will not be scheduled to run because its priority is 0 and state is %v."
31 schedStatusSupervisorLimitReached = "Waiting in workflow queue at position %v. Cluster is at capacity and cannot start any new workflows right now."
32 schedStatusWaitingForPreviousAttempt = "Waiting for previous container attempt to exit."
33 schedStatusWaitingNewInstance = "Waiting for a %v instance to boot and be ready to accept work."
34 schedStatusWaitingInstanceType = "Waiting in queue at position %v. Cluster is at capacity for all eligible instance types (%v) and cannot start a new instance right now."
35 schedStatusWaitingCloudResources = "Waiting in queue at position %v. Cluster is at cloud account limits and cannot start any new instances right now."
36 schedStatusWaitingClusterCapacity = "Waiting in queue at position %v. Cluster is at capacity and cannot start any new instances right now."
39 // Queue returns the sorted queue from the last scheduling iteration.
40 func (sch *Scheduler) Queue() []QueueEnt {
41 ents, _ := sch.lastQueue.Load().([]QueueEnt)
45 func (sch *Scheduler) runQueue() {
46 running := sch.pool.Running()
47 unalloc := sch.pool.Unallocated()
50 for _, n := range sch.pool.CountWorkers() {
54 unsorted, _ := sch.queue.Entries()
55 sorted := make([]QueueEnt, 0, len(unsorted))
56 for _, ent := range unsorted {
57 sorted = append(sorted, QueueEnt{QueueEnt: ent})
59 sort.Slice(sorted, func(i, j int) bool {
60 _, irunning := running[sorted[i].Container.UUID]
61 _, jrunning := running[sorted[j].Container.UUID]
62 if irunning != jrunning {
63 // Ensure the "tryrun" loop (see below) sees
64 // already-scheduled containers first, to
65 // ensure existing supervisor containers are
66 // properly counted before we decide whether
67 // we have room for new ones.
70 ilocked := sorted[i].Container.State == arvados.ContainerStateLocked
71 jlocked := sorted[j].Container.State == arvados.ContainerStateLocked
72 if ilocked != jlocked {
73 // Give precedence to containers that we have
74 // already locked, even if higher-priority
75 // containers have since arrived in the
76 // queue. This avoids undesirable queue churn
77 // effects including extra lock/unlock cycles
78 // and bringing up new instances and quickly
79 // shutting them down to make room for
80 // different instance sizes.
82 } else if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj {
85 // When containers have identical priority,
86 // start them in the order we first noticed
87 // them. This avoids extra lock/unlock cycles
88 // when we unlock the containers that don't
89 // fit in the available pool.
90 return sorted[i].FirstSeenAt.Before(sorted[j].FirstSeenAt)
94 if t := sch.client.Last503(); t.After(sch.last503time) {
95 // API has sent an HTTP 503 response since last time
96 // we checked. Use current #containers - 1 as
97 // maxConcurrency, i.e., try to stay just below the
98 // level where we see 503s.
100 if newlimit := len(running) - 1; newlimit < 1 {
101 sch.maxConcurrency = 1
103 sch.maxConcurrency = newlimit
105 } else if sch.maxConcurrency > 0 && time.Since(sch.last503time) > quietAfter503 {
106 // If we haven't seen any 503 errors lately, raise
107 // limit to ~10% beyond the current workload.
109 // As we use the added 10% to schedule more
110 // containers, len(running) will increase and we'll
111 // push the limit up further. Soon enough,
112 // maxConcurrency will get high enough to schedule the
113 // entire queue, hit pool quota, or get 503s again.
114 max := len(running)*11/10 + 1
115 if sch.maxConcurrency < max {
116 sch.maxConcurrency = max
119 if sch.last503time.IsZero() {
120 sch.mLast503Time.Set(0)
122 sch.mLast503Time.Set(float64(sch.last503time.Unix()))
124 if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances {
125 sch.maxConcurrency = sch.maxInstances
127 if sch.instancesWithinQuota > 0 && sch.instancesWithinQuota < totalInstances {
128 // Evidently it is possible to run this many
129 // instances, so raise our estimate.
130 sch.instancesWithinQuota = totalInstances
132 if sch.pool.AtQuota() {
133 // Consider current workload to be the maximum
134 // allowed, for the sake of reporting metrics and
135 // calculating max supervisors.
137 // Now that sch.maxConcurrency is set, we will only
138 // raise it past len(running) by 10%. This helps
139 // avoid running an inappropriate number of
140 // supervisors when we reach the cloud-imposed quota
141 // (which may be based on # CPUs etc) long before the
142 // configured MaxInstances.
143 if sch.maxConcurrency == 0 || sch.maxConcurrency > totalInstances {
144 if totalInstances == 0 {
145 sch.maxConcurrency = 1
147 sch.maxConcurrency = totalInstances
150 sch.instancesWithinQuota = totalInstances
151 } else if sch.instancesWithinQuota > 0 && sch.maxConcurrency > sch.instancesWithinQuota+1 {
152 // Once we've hit a quota error and started tracking
153 // instancesWithinQuota (i.e., it's not zero), we
154 // avoid exceeding that known-working level by more
157 // If we don't do this, we risk entering a pattern of
158 // repeatedly locking several containers, hitting
159 // quota again, and unlocking them again each time the
160 // driver stops reporting AtQuota, which tends to use
161 // up the max lock/unlock cycles on the next few
162 // containers in the queue, and cause them to fail.
163 sch.maxConcurrency = sch.instancesWithinQuota + 1
165 sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
167 maxSupervisors := int(float64(sch.maxConcurrency) * sch.supervisorFraction)
168 if maxSupervisors < 1 && sch.supervisorFraction > 0 && sch.maxConcurrency > 0 {
172 sch.logger.WithFields(logrus.Fields{
173 "Containers": len(sorted),
174 "Processes": len(running),
175 "maxConcurrency": sch.maxConcurrency,
178 dontstart := map[arvados.InstanceType]bool{}
179 var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation
180 var overquota []QueueEnt // entries that are unmappable because of worker pool quota
181 var overmaxsuper []QueueEnt // unmappable because max supervisors (these are not included in overquota)
182 var containerAllocatedWorkerBootingCount int
184 // trying is #containers running + #containers we're trying to
185 // start. We stop trying to start more containers if this
186 // reaches the dynamic maxConcurrency limit.
187 trying := len(running)
193 for i, ent := range sorted {
194 ctr, types := ent.Container, ent.InstanceTypes
195 logger := sch.logger.WithFields(logrus.Fields{
196 "ContainerUUID": ctr.UUID,
198 if ctr.SchedulingParameters.Supervisor {
201 if _, running := running[ctr.UUID]; running {
202 if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
203 sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment
207 if ctr.Priority < 1 {
208 sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusPriorityZero, string(ctr.State))
211 if ctr.SchedulingParameters.Supervisor && maxSupervisors > 0 && supervisors > maxSupervisors {
212 overmaxsuper = append(overmaxsuper, sorted[i])
213 sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusSupervisorLimitReached, len(overmaxsuper))
216 // If we have unalloc instances of any of the eligible
217 // instance types, unallocOK is true and unallocType
218 // is the lowest-cost type.
220 var unallocType arvados.InstanceType
221 for _, it := range types {
228 // If the pool is not reporting AtCapacity for any of
229 // the eligible instance types, availableOK is true
230 // and availableType is the lowest-cost type.
232 var availableType arvados.InstanceType
233 for _, it := range types {
234 if atcapacity[it.ProviderType] {
236 } else if sch.pool.AtCapacity(it) {
237 atcapacity[it.ProviderType] = true
246 case arvados.ContainerStateQueued:
247 if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
248 logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency)
252 if !unallocOK && sch.pool.AtQuota() {
253 logger.Trace("not starting: AtQuota and no unalloc workers")
254 overquota = sorted[i:]
257 if !unallocOK && !availableOK {
258 logger.Trace("not locking: AtCapacity and no unalloc workers")
261 if sch.pool.KillContainer(ctr.UUID, "about to lock") {
262 logger.Info("not locking: crunch-run process from previous attempt has not exited")
265 go sch.lockContainer(logger, ctr.UUID)
266 unalloc[unallocType]--
267 case arvados.ContainerStateLocked:
268 if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
269 logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency)
274 // We have a suitable instance type,
275 // so mark it as allocated, and try to
276 // start the container.
277 unalloc[unallocType]--
278 logger = logger.WithField("InstanceType", unallocType.Name)
279 if dontstart[unallocType] {
280 // We already tried & failed to start
281 // a higher-priority container on the
282 // same instance type. Don't let this
283 // one sneak in ahead of it.
284 } else if sch.pool.KillContainer(ctr.UUID, "about to start") {
285 sorted[i].SchedulingStatus = schedStatusWaitingForPreviousAttempt
286 logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
287 } else if sch.pool.StartContainer(unallocType, ctr) {
288 sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment
289 logger.Trace("StartContainer => true")
291 sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusWaitingNewInstance, unallocType.Name)
292 logger.Trace("StartContainer => false")
293 containerAllocatedWorkerBootingCount += 1
294 dontstart[unallocType] = true
298 if sch.pool.AtQuota() {
299 // Don't let lower-priority containers
300 // starve this one by using keeping
301 // idle workers alive on different
303 logger.Trace("overquota")
304 overquota = sorted[i:]
308 // Continue trying lower-priority
309 // containers in case they can run on
310 // different instance types that are
313 // The local "atcapacity" cache helps
314 // when the pool's flag resets after
315 // we look at container A but before
316 // we look at lower-priority container
317 // B. In that case we want to run
318 // container A on the next call to
319 // runQueue(), rather than run
322 var typenames []string
323 for _, tp := range types {
324 typenames = append(typenames, tp.Name)
326 sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusWaitingInstanceType, qpos, strings.Join(typenames, ", "))
327 logger.Trace("all eligible types at capacity")
330 logger = logger.WithField("InstanceType", availableType.Name)
331 if !sch.pool.Create(availableType) {
332 // Failed despite not being at quota,
333 // e.g., cloud ops throttled.
334 logger.Trace("pool declined to create new instance")
337 // Success. (Note pool.Create works
338 // asynchronously and does its own logging
339 // about the eventual outcome, so we don't
341 sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusWaitingNewInstance, availableType.Name)
342 logger.Info("creating new instance")
343 // Don't bother trying to start the container
344 // yet -- obviously the instance will take
345 // some time to boot and become ready.
346 containerAllocatedWorkerBootingCount += 1
347 dontstart[availableType] = true
351 sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
352 sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper)))
355 if sch.pool.AtQuota() {
356 qreason = schedStatusWaitingCloudResources
358 qreason = schedStatusWaitingClusterCapacity
360 for i, ent := range sorted {
361 if ent.SchedulingStatus == "" && (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
363 sorted[i].SchedulingStatus = fmt.Sprintf(qreason, qpos)
366 sch.lastQueue.Store(sorted)
368 if len(overquota)+len(overmaxsuper) > 0 {
369 // Unlock any containers that are unmappable while
370 // we're at quota (but if they have already been
371 // scheduled and they're loading docker images etc.,
373 var unlock []QueueEnt
374 unlock = append(unlock, overmaxsuper...)
375 if totalInstances > 0 && len(overquota) > 1 {
376 // We don't unlock the next-in-line container
377 // when at quota. This avoids a situation
378 // where our "at quota" state expires, we lock
379 // the next container and try to create an
380 // instance, the cloud provider still returns
381 // a quota error, we unlock the container, and
382 // we repeat this until the container reaches
383 // its limit of lock/unlock cycles.
384 unlock = append(unlock, overquota[1:]...)
386 // However, if totalInstances is 0 and we're
387 // still getting quota errors, then the
388 // next-in-line container is evidently not
389 // possible to run, so we should let it
390 // exhaust its lock/unlock cycles and
391 // eventually cancel, to avoid starvation.
392 unlock = append(unlock, overquota...)
394 for _, ctr := range unlock {
396 _, toolate := running[ctr.UUID]
397 if ctr.State == arvados.ContainerStateLocked && !toolate {
398 logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
399 logger.Info("unlock because pool capacity is used by higher priority containers")
400 err := sch.queue.Unlock(ctr.UUID)
402 logger.WithError(err).Warn("error unlocking")
407 if len(overquota) > 0 {
408 // Shut down idle workers that didn't get any
409 // containers mapped onto them before we hit quota.
410 for it, n := range unalloc {
414 sch.pool.Shutdown(it)
419 // Lock the given container. Should be called in a new goroutine.
420 func (sch *Scheduler) lockContainer(logger logrus.FieldLogger, uuid string) {
421 if !sch.uuidLock(uuid, "lock") {
424 defer sch.uuidUnlock(uuid)
425 if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued {
426 // This happens if the container has been cancelled or
427 // locked since runQueue called sch.queue.Entries(),
428 // possibly by a lockContainer() call from a previous
429 // runQueue iteration. In any case, we will respond
430 // appropriately on the next runQueue iteration, which
431 // will have already been triggered by the queue
433 logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing")
436 err := sch.queue.Lock(uuid)
438 logger.WithError(err).Warn("error locking container")
441 logger.Debug("lock succeeded")
442 ctr, ok := sch.queue.Get(uuid)
444 logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
445 } else if ctr.State != arvados.ContainerStateLocked {
446 logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
450 // Acquire a non-blocking lock for specified UUID, returning true if
451 // successful. The op argument is used only for debug logs.
453 // If the lock is not available, uuidLock arranges to wake up the
454 // scheduler after a short delay, so it can retry whatever operation
455 // is trying to get the lock (if that operation is still worth doing).
457 // This mechanism helps avoid spamming the controller/database with
458 // concurrent updates for any single container, even when the
459 // scheduler loop is running frequently.
460 func (sch *Scheduler) uuidLock(uuid, op string) bool {
462 defer sch.mtx.Unlock()
463 logger := sch.logger.WithFields(logrus.Fields{
464 "ContainerUUID": uuid,
467 if op, locked := sch.uuidOp[uuid]; locked {
468 logger.Debugf("uuidLock not available, Op=%s in progress", op)
469 // Make sure the scheduler loop wakes up to retry.
470 sch.wakeup.Reset(time.Second / 4)
473 logger.Debug("uuidLock acquired")
474 sch.uuidOp[uuid] = op
478 func (sch *Scheduler) uuidUnlock(uuid string) {
480 defer sch.mtx.Unlock()
481 delete(sch.uuidOp, uuid)