1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
8 // Scheduler functions must not be called concurrently using the same
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
17 "git.curoverse.com/arvados.git/sdk/go/arvados"
18 "github.com/Sirupsen/logrus"
21 // Map maps queued containers onto unallocated workers in priority
22 // order, creating new workers if needed. It locks containers that can
23 // be mapped onto existing/pending workers, and starts them if
26 // Map unlocks any containers that are locked but can't be
27 // mapped. (For example, this happens when the cloud provider reaches
28 // quota/capacity and a previously mappable container's priority is
29 // surpassed by a newer container.)
31 // If it encounters errors while creating new workers, Map shuts down
32 // idle workers, in case they are consuming quota.
34 // Map should not be called without first calling FixStaleLocks.
41 func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
42 unsorted, _ := queue.Entries()
43 sorted := make([]container.QueueEnt, 0, len(unsorted))
44 for _, ent := range unsorted {
45 sorted = append(sorted, ent)
47 sort.Slice(sorted, func(i, j int) bool {
48 return sorted[i].Container.Priority > sorted[j].Container.Priority
51 running := pool.Running()
52 unalloc := pool.Unallocated()
54 logger.WithFields(logrus.Fields{
55 "Containers": len(sorted),
56 "Processes": len(running),
59 dontstart := map[arvados.InstanceType]bool{}
60 var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
62 for i, ctr := range sorted {
63 ctr, it := ctr.Container, ctr.InstanceType
64 logger := logger.WithFields(logrus.Fields{
65 "ContainerUUID": ctr.UUID,
66 "InstanceType": it.Name,
68 if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
71 if ctr.State == arvados.ContainerStateQueued {
72 logger.Debugf("locking")
73 if unalloc[it] < 1 && pool.AtQuota() {
74 overquota = sorted[i:]
77 err := queue.Lock(ctr.UUID)
79 logger.WithError(err).Warnf("lock error")
84 ctr, ok = queue.Get(ctr.UUID)
86 logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
89 if ctr.State != arvados.ContainerStateLocked {
90 logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
93 if ctr.State != arvados.ContainerStateLocked {
97 logger.Info("creating new instance")
98 err := pool.Create(it)
100 if _, ok := err.(cloud.QuotaError); !ok {
101 logger.WithError(err).Warn("error creating worker")
103 queue.Unlock(ctr.UUID)
104 // Don't let lower-priority containers
105 // starve this one by using keeping
106 // idle workers alive on different
107 // instance types. TODO: avoid
108 // getting starved here if instances
109 // of a specific type always fail.
110 overquota = sorted[i:]
116 // We already tried & failed to start a
117 // higher-priority container on the same
118 // instance type. Don't let this one sneak in
120 } else if pool.StartContainer(it, ctr) {
127 if len(overquota) > 0 {
128 // Unlock any containers that are unmappable while
130 for _, ctr := range overquota {
132 if ctr.State == arvados.ContainerStateLocked {
133 logger := logger.WithField("ContainerUUID", ctr.UUID)
134 logger.Debug("unlock because pool capacity is used by higher priority containers")
135 err := queue.Unlock(ctr.UUID)
137 logger.WithError(err).Warn("error unlocking")
141 // Shut down idle workers that didn't get any
142 // containers mapped onto them before we hit quota.
143 for it, n := range unalloc {