]> git.arvados.org - arvados.git/blob - lib/dispatchcloud/scheduler/run_queue.go
Remove KeepClient API
[arvados.git] / lib / dispatchcloud / scheduler / run_queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "fmt"
9         "sort"
10         "strings"
11         "time"
12
13         "git.arvados.org/arvados.git/lib/dispatchcloud/container"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "github.com/sirupsen/logrus"
16 )
17
18 var quietAfter503 = time.Minute
19
20 type QueueEnt struct {
21         container.QueueEnt
22
23         // Human-readable scheduling status as of the last scheduling
24         // iteration.
25         SchedulingStatus string `json:"scheduling_status"`
26 }
27
28 const (
29         schedStatusPreparingRuntimeEnvironment = "Container is allocated to an instance and preparing to run."
30         schedStatusPriorityZero                = "This container will not be scheduled to run because its priority is 0 and state is %v."
31         schedStatusSupervisorLimitReached      = "Waiting in workflow queue at position %v.  Cluster is at capacity and cannot start any new workflows right now."
32         schedStatusWaitingForPreviousAttempt   = "Waiting for previous container attempt to exit."
33         schedStatusWaitingNewInstance          = "Waiting for a %v instance to boot and be ready to accept work."
34         schedStatusWaitingInstanceType         = "Waiting in queue at position %v.  Cluster is at capacity for all eligible instance types (%v) and cannot start a new instance right now."
35         schedStatusWaitingCloudResources       = "Waiting in queue at position %v.  Cluster is at cloud account limits and cannot start any new instances right now."
36         schedStatusWaitingClusterCapacity      = "Waiting in queue at position %v.  Cluster is at capacity and cannot start any new instances right now."
37 )
38
39 // Queue returns the sorted queue from the last scheduling iteration.
40 func (sch *Scheduler) Queue() []QueueEnt {
41         ents, _ := sch.lastQueue.Load().([]QueueEnt)
42         return ents
43 }
44
45 func (sch *Scheduler) runQueue() {
46         running := sch.pool.Running()
47         unalloc := sch.pool.Unallocated()
48
49         totalInstances := 0
50         for _, n := range sch.pool.CountWorkers() {
51                 totalInstances += n
52         }
53
54         unsorted, _ := sch.queue.Entries()
55         sorted := make([]QueueEnt, 0, len(unsorted))
56         for _, ent := range unsorted {
57                 sorted = append(sorted, QueueEnt{QueueEnt: ent})
58         }
59         sort.Slice(sorted, func(i, j int) bool {
60                 _, irunning := running[sorted[i].Container.UUID]
61                 _, jrunning := running[sorted[j].Container.UUID]
62                 if irunning != jrunning {
63                         // Ensure the "tryrun" loop (see below) sees
64                         // already-scheduled containers first, to
65                         // ensure existing supervisor containers are
66                         // properly counted before we decide whether
67                         // we have room for new ones.
68                         return irunning
69                 }
70                 ilocked := sorted[i].Container.State == arvados.ContainerStateLocked
71                 jlocked := sorted[j].Container.State == arvados.ContainerStateLocked
72                 if ilocked != jlocked {
73                         // Give precedence to containers that we have
74                         // already locked, even if higher-priority
75                         // containers have since arrived in the
76                         // queue. This avoids undesirable queue churn
77                         // effects including extra lock/unlock cycles
78                         // and bringing up new instances and quickly
79                         // shutting them down to make room for
80                         // different instance sizes.
81                         return ilocked
82                 } else if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj {
83                         return pi > pj
84                 } else {
85                         // When containers have identical priority,
86                         // start them in the order we first noticed
87                         // them. This avoids extra lock/unlock cycles
88                         // when we unlock the containers that don't
89                         // fit in the available pool.
90                         return sorted[i].FirstSeenAt.Before(sorted[j].FirstSeenAt)
91                 }
92         })
93
94         if t := sch.client.Last503(); t.After(sch.last503time) {
95                 // API has sent an HTTP 503 response since last time
96                 // we checked. Use current #containers - 1 as
97                 // maxConcurrency, i.e., try to stay just below the
98                 // level where we see 503s.
99                 sch.last503time = t
100                 if newlimit := len(running) - 1; newlimit < 1 {
101                         sch.maxConcurrency = 1
102                 } else {
103                         sch.maxConcurrency = newlimit
104                 }
105         } else if sch.maxConcurrency > 0 && time.Since(sch.last503time) > quietAfter503 {
106                 // If we haven't seen any 503 errors lately, raise
107                 // limit to ~10% beyond the current workload.
108                 //
109                 // As we use the added 10% to schedule more
110                 // containers, len(running) will increase and we'll
111                 // push the limit up further. Soon enough,
112                 // maxConcurrency will get high enough to schedule the
113                 // entire queue, hit pool quota, or get 503s again.
114                 max := len(running)*11/10 + 1
115                 if sch.maxConcurrency < max {
116                         sch.maxConcurrency = max
117                 }
118         }
119         if sch.last503time.IsZero() {
120                 sch.mLast503Time.Set(0)
121         } else {
122                 sch.mLast503Time.Set(float64(sch.last503time.Unix()))
123         }
124         if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances {
125                 sch.maxConcurrency = sch.maxInstances
126         }
127         if sch.instancesWithinQuota > 0 && sch.instancesWithinQuota < totalInstances {
128                 // Evidently it is possible to run this many
129                 // instances, so raise our estimate.
130                 sch.instancesWithinQuota = totalInstances
131         }
132         if sch.pool.AtQuota() {
133                 // Consider current workload to be the maximum
134                 // allowed, for the sake of reporting metrics and
135                 // calculating max supervisors.
136                 //
137                 // Now that sch.maxConcurrency is set, we will only
138                 // raise it past len(running) by 10%.  This helps
139                 // avoid running an inappropriate number of
140                 // supervisors when we reach the cloud-imposed quota
141                 // (which may be based on # CPUs etc) long before the
142                 // configured MaxInstances.
143                 if sch.maxConcurrency == 0 || sch.maxConcurrency > totalInstances {
144                         if totalInstances == 0 {
145                                 sch.maxConcurrency = 1
146                         } else {
147                                 sch.maxConcurrency = totalInstances
148                         }
149                 }
150                 sch.instancesWithinQuota = totalInstances
151         } else if sch.instancesWithinQuota > 0 && sch.maxConcurrency > sch.instancesWithinQuota+1 {
152                 // Once we've hit a quota error and started tracking
153                 // instancesWithinQuota (i.e., it's not zero), we
154                 // avoid exceeding that known-working level by more
155                 // than 1.
156                 //
157                 // If we don't do this, we risk entering a pattern of
158                 // repeatedly locking several containers, hitting
159                 // quota again, and unlocking them again each time the
160                 // driver stops reporting AtQuota, which tends to use
161                 // up the max lock/unlock cycles on the next few
162                 // containers in the queue, and cause them to fail.
163                 sch.maxConcurrency = sch.instancesWithinQuota + 1
164         }
165         sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
166
167         maxSupervisors := int(float64(sch.maxConcurrency) * sch.supervisorFraction)
168         if maxSupervisors < 1 && sch.supervisorFraction > 0 && sch.maxConcurrency > 0 {
169                 maxSupervisors = 1
170         }
171
172         sch.logger.WithFields(logrus.Fields{
173                 "Containers":     len(sorted),
174                 "Processes":      len(running),
175                 "maxConcurrency": sch.maxConcurrency,
176         }).Debug("runQueue")
177
178         dontstart := map[arvados.InstanceType]bool{}
179         var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation
180         var overquota []QueueEnt           // entries that are unmappable because of worker pool quota
181         var overmaxsuper []QueueEnt        // unmappable because max supervisors (these are not included in overquota)
182         var containerAllocatedWorkerBootingCount int
183
184         // trying is #containers running + #containers we're trying to
185         // start. We stop trying to start more containers if this
186         // reaches the dynamic maxConcurrency limit.
187         trying := len(running)
188
189         qpos := 0
190         supervisors := 0
191
192 tryrun:
193         for i, ent := range sorted {
194                 ctr, types := ent.Container, ent.InstanceTypes
195                 logger := sch.logger.WithFields(logrus.Fields{
196                         "ContainerUUID": ctr.UUID,
197                 })
198                 if ctr.SchedulingParameters.Supervisor {
199                         supervisors += 1
200                 }
201                 if _, running := running[ctr.UUID]; running {
202                         if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
203                                 sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment
204                         }
205                         continue
206                 }
207                 if ctr.Priority < 1 {
208                         sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusPriorityZero, string(ctr.State))
209                         continue
210                 }
211                 if ctr.SchedulingParameters.Supervisor && maxSupervisors > 0 && supervisors > maxSupervisors {
212                         overmaxsuper = append(overmaxsuper, sorted[i])
213                         sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusSupervisorLimitReached, len(overmaxsuper))
214                         continue
215                 }
216                 // If we have unalloc instances of any of the eligible
217                 // instance types, unallocOK is true and unallocType
218                 // is the lowest-cost type.
219                 var unallocOK bool
220                 var unallocType arvados.InstanceType
221                 for _, it := range types {
222                         if unalloc[it] > 0 {
223                                 unallocOK = true
224                                 unallocType = it
225                                 break
226                         }
227                 }
228                 // If the pool is not reporting AtCapacity for any of
229                 // the eligible instance types, availableOK is true
230                 // and availableType is the lowest-cost type.
231                 var availableOK bool
232                 var availableType arvados.InstanceType
233                 for _, it := range types {
234                         if atcapacity[it.ProviderType] {
235                                 continue
236                         } else if sch.pool.AtCapacity(it) {
237                                 atcapacity[it.ProviderType] = true
238                                 continue
239                         } else {
240                                 availableOK = true
241                                 availableType = it
242                                 break
243                         }
244                 }
245                 switch ctr.State {
246                 case arvados.ContainerStateQueued:
247                         if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
248                                 logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency)
249                                 continue
250                         }
251                         trying++
252                         if !unallocOK && sch.pool.AtQuota() {
253                                 logger.Trace("not starting: AtQuota and no unalloc workers")
254                                 overquota = sorted[i:]
255                                 break tryrun
256                         }
257                         if !unallocOK && !availableOK {
258                                 logger.Trace("not locking: AtCapacity and no unalloc workers")
259                                 continue
260                         }
261                         if sch.pool.KillContainer(ctr.UUID, "about to lock") {
262                                 logger.Info("not locking: crunch-run process from previous attempt has not exited")
263                                 continue
264                         }
265                         go sch.lockContainer(logger, ctr.UUID)
266                         unalloc[unallocType]--
267                 case arvados.ContainerStateLocked:
268                         if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
269                                 logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency)
270                                 continue
271                         }
272                         trying++
273                         if unallocOK {
274                                 // We have a suitable instance type,
275                                 // so mark it as allocated, and try to
276                                 // start the container.
277                                 unalloc[unallocType]--
278                                 logger = logger.WithField("InstanceType", unallocType.Name)
279                                 if dontstart[unallocType] {
280                                         // We already tried & failed to start
281                                         // a higher-priority container on the
282                                         // same instance type. Don't let this
283                                         // one sneak in ahead of it.
284                                 } else if sch.pool.KillContainer(ctr.UUID, "about to start") {
285                                         sorted[i].SchedulingStatus = schedStatusWaitingForPreviousAttempt
286                                         logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
287                                 } else if sch.pool.StartContainer(unallocType, ctr) {
288                                         sorted[i].SchedulingStatus = schedStatusPreparingRuntimeEnvironment
289                                         logger.Trace("StartContainer => true")
290                                 } else {
291                                         sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusWaitingNewInstance, unallocType.Name)
292                                         logger.Trace("StartContainer => false")
293                                         containerAllocatedWorkerBootingCount += 1
294                                         dontstart[unallocType] = true
295                                 }
296                                 continue
297                         }
298                         if sch.pool.AtQuota() {
299                                 // Don't let lower-priority containers
300                                 // starve this one by using keeping
301                                 // idle workers alive on different
302                                 // instance types.
303                                 logger.Trace("overquota")
304                                 overquota = sorted[i:]
305                                 break tryrun
306                         }
307                         if !availableOK {
308                                 // Continue trying lower-priority
309                                 // containers in case they can run on
310                                 // different instance types that are
311                                 // available.
312                                 //
313                                 // The local "atcapacity" cache helps
314                                 // when the pool's flag resets after
315                                 // we look at container A but before
316                                 // we look at lower-priority container
317                                 // B. In that case we want to run
318                                 // container A on the next call to
319                                 // runQueue(), rather than run
320                                 // container B now.
321                                 qpos++
322                                 var typenames []string
323                                 for _, tp := range types {
324                                         typenames = append(typenames, tp.Name)
325                                 }
326                                 sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusWaitingInstanceType, qpos, strings.Join(typenames, ", "))
327                                 logger.Trace("all eligible types at capacity")
328                                 continue
329                         }
330                         logger = logger.WithField("InstanceType", availableType.Name)
331                         if !sch.pool.Create(availableType) {
332                                 // Failed despite not being at quota,
333                                 // e.g., cloud ops throttled.
334                                 logger.Trace("pool declined to create new instance")
335                                 continue
336                         }
337                         // Success. (Note pool.Create works
338                         // asynchronously and does its own logging
339                         // about the eventual outcome, so we don't
340                         // need to.)
341                         sorted[i].SchedulingStatus = fmt.Sprintf(schedStatusWaitingNewInstance, availableType.Name)
342                         logger.Info("creating new instance")
343                         // Don't bother trying to start the container
344                         // yet -- obviously the instance will take
345                         // some time to boot and become ready.
346                         containerAllocatedWorkerBootingCount += 1
347                         dontstart[availableType] = true
348                 }
349         }
350
351         sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
352         sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper)))
353
354         var qreason string
355         if sch.pool.AtQuota() {
356                 qreason = schedStatusWaitingCloudResources
357         } else {
358                 qreason = schedStatusWaitingClusterCapacity
359         }
360         for i, ent := range sorted {
361                 if ent.SchedulingStatus == "" && (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
362                         qpos++
363                         sorted[i].SchedulingStatus = fmt.Sprintf(qreason, qpos)
364                 }
365         }
366         sch.lastQueue.Store(sorted)
367
368         if len(overquota)+len(overmaxsuper) > 0 {
369                 // Unlock any containers that are unmappable while
370                 // we're at quota (but if they have already been
371                 // scheduled and they're loading docker images etc.,
372                 // let them run).
373                 var unlock []QueueEnt
374                 unlock = append(unlock, overmaxsuper...)
375                 if totalInstances > 0 && len(overquota) > 1 {
376                         // We don't unlock the next-in-line container
377                         // when at quota.  This avoids a situation
378                         // where our "at quota" state expires, we lock
379                         // the next container and try to create an
380                         // instance, the cloud provider still returns
381                         // a quota error, we unlock the container, and
382                         // we repeat this until the container reaches
383                         // its limit of lock/unlock cycles.
384                         unlock = append(unlock, overquota[1:]...)
385                 } else {
386                         // However, if totalInstances is 0 and we're
387                         // still getting quota errors, then the
388                         // next-in-line container is evidently not
389                         // possible to run, so we should let it
390                         // exhaust its lock/unlock cycles and
391                         // eventually cancel, to avoid starvation.
392                         unlock = append(unlock, overquota...)
393                 }
394                 for _, ctr := range unlock {
395                         ctr := ctr.Container
396                         _, toolate := running[ctr.UUID]
397                         if ctr.State == arvados.ContainerStateLocked && !toolate {
398                                 logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
399                                 logger.Info("unlock because pool capacity is used by higher priority containers")
400                                 err := sch.queue.Unlock(ctr.UUID)
401                                 if err != nil {
402                                         logger.WithError(err).Warn("error unlocking")
403                                 }
404                         }
405                 }
406         }
407         if len(overquota) > 0 {
408                 // Shut down idle workers that didn't get any
409                 // containers mapped onto them before we hit quota.
410                 for it, n := range unalloc {
411                         if n < 1 {
412                                 continue
413                         }
414                         sch.pool.Shutdown(it)
415                 }
416         }
417 }
418
419 // Lock the given container. Should be called in a new goroutine.
420 func (sch *Scheduler) lockContainer(logger logrus.FieldLogger, uuid string) {
421         if !sch.uuidLock(uuid, "lock") {
422                 return
423         }
424         defer sch.uuidUnlock(uuid)
425         if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued {
426                 // This happens if the container has been cancelled or
427                 // locked since runQueue called sch.queue.Entries(),
428                 // possibly by a lockContainer() call from a previous
429                 // runQueue iteration. In any case, we will respond
430                 // appropriately on the next runQueue iteration, which
431                 // will have already been triggered by the queue
432                 // update.
433                 logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing")
434                 return
435         }
436         err := sch.queue.Lock(uuid)
437         if err != nil {
438                 logger.WithError(err).Warn("error locking container")
439                 return
440         }
441         logger.Debug("lock succeeded")
442         ctr, ok := sch.queue.Get(uuid)
443         if !ok {
444                 logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
445         } else if ctr.State != arvados.ContainerStateLocked {
446                 logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
447         }
448 }
449
450 // Acquire a non-blocking lock for specified UUID, returning true if
451 // successful.  The op argument is used only for debug logs.
452 //
453 // If the lock is not available, uuidLock arranges to wake up the
454 // scheduler after a short delay, so it can retry whatever operation
455 // is trying to get the lock (if that operation is still worth doing).
456 //
457 // This mechanism helps avoid spamming the controller/database with
458 // concurrent updates for any single container, even when the
459 // scheduler loop is running frequently.
460 func (sch *Scheduler) uuidLock(uuid, op string) bool {
461         sch.mtx.Lock()
462         defer sch.mtx.Unlock()
463         logger := sch.logger.WithFields(logrus.Fields{
464                 "ContainerUUID": uuid,
465                 "Op":            op,
466         })
467         if op, locked := sch.uuidOp[uuid]; locked {
468                 logger.Debugf("uuidLock not available, Op=%s in progress", op)
469                 // Make sure the scheduler loop wakes up to retry.
470                 sch.wakeup.Reset(time.Second / 4)
471                 return false
472         }
473         logger.Debug("uuidLock acquired")
474         sch.uuidOp[uuid] = op
475         return true
476 }
477
478 func (sch *Scheduler) uuidUnlock(uuid string) {
479         sch.mtx.Lock()
480         defer sch.mtx.Unlock()
481         delete(sch.uuidOp, uuid)
482 }