14360: Log level debug->warning for unexpected condition.
[arvados.git] / lib / dispatchcloud / scheduler / map.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 //
8 // Scheduler functions must not be called concurrently using the same
9 // queue or pool.
10 package scheduler
11
12 import (
13         "sort"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "github.com/Sirupsen/logrus"
19 )
20
21 // Map maps queued containers onto unallocated workers in priority
22 // order, creating new workers if needed. It locks containers that can
23 // be mapped onto existing/pending workers, and starts them if
24 // possible.
25 //
26 // Map unlocks any containers that are locked but can't be
27 // mapped. (For example, this happens when the cloud provider reaches
28 // quota/capacity and a previously mappable container's priority is
29 // surpassed by a newer container.)
30 //
31 // If it encounters errors while creating new workers, Map shuts down
32 // idle workers, in case they are consuming quota.
33 //
34 // Map should not be called without first calling FixStaleLocks.
35 //
36 //      FixStaleLocks()
37 //      for {
38 //              Map()
39 //              Sync()
40 //      }
41 func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
42         unsorted, _ := queue.Entries()
43         sorted := make([]container.QueueEnt, 0, len(unsorted))
44         for _, ent := range unsorted {
45                 sorted = append(sorted, ent)
46         }
47         sort.Slice(sorted, func(i, j int) bool {
48                 return sorted[i].Container.Priority > sorted[j].Container.Priority
49         })
50
51         running := pool.Running()
52         unalloc := pool.Unallocated()
53
54         logger.WithFields(logrus.Fields{
55                 "Containers": len(sorted),
56                 "Processes":  len(running),
57         }).Debug("mapping")
58
59         dontstart := map[arvados.InstanceType]bool{}
60         var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
61
62         for i, ctr := range sorted {
63                 ctr, it := ctr.Container, ctr.InstanceType
64                 logger := logger.WithFields(logrus.Fields{
65                         "ContainerUUID": ctr.UUID,
66                         "InstanceType":  it.Name,
67                 })
68                 if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
69                         continue
70                 }
71                 if ctr.State == arvados.ContainerStateQueued {
72                         logger.Debugf("locking")
73                         if unalloc[it] < 1 && pool.AtQuota() {
74                                 overquota = sorted[i:]
75                                 break
76                         }
77                         err := queue.Lock(ctr.UUID)
78                         if err != nil {
79                                 logger.WithError(err).Warnf("lock error")
80                                 unalloc[it]++
81                                 continue
82                         }
83                         var ok bool
84                         ctr, ok = queue.Get(ctr.UUID)
85                         if !ok {
86                                 logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
87                                 continue
88                         }
89                         if ctr.State != arvados.ContainerStateLocked {
90                                 logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
91                         }
92                 }
93                 if ctr.State != arvados.ContainerStateLocked {
94                         continue
95                 }
96                 if unalloc[it] < 1 {
97                         logger.Info("creating new instance")
98                         err := pool.Create(it)
99                         if err != nil {
100                                 if _, ok := err.(cloud.QuotaError); !ok {
101                                         logger.WithError(err).Warn("error creating worker")
102                                 }
103                                 queue.Unlock(ctr.UUID)
104                                 // Don't let lower-priority containers
105                                 // starve this one by using keeping
106                                 // idle workers alive on different
107                                 // instance types.  TODO: avoid
108                                 // getting starved here if instances
109                                 // of a specific type always fail.
110                                 overquota = sorted[i:]
111                                 break
112                         }
113                         unalloc[it]++
114                 }
115                 if dontstart[it] {
116                         // We already tried & failed to start a
117                         // higher-priority container on the same
118                         // instance type. Don't let this one sneak in
119                         // ahead of it.
120                 } else if pool.StartContainer(it, ctr) {
121                         unalloc[it]--
122                 } else {
123                         dontstart[it] = true
124                 }
125         }
126
127         if len(overquota) > 0 {
128                 // Unlock any containers that are unmappable while
129                 // we're at quota.
130                 for _, ctr := range overquota {
131                         ctr := ctr.Container
132                         if ctr.State == arvados.ContainerStateLocked {
133                                 logger := logger.WithField("ContainerUUID", ctr.UUID)
134                                 logger.Debug("unlock because pool capacity is used by higher priority containers")
135                                 err := queue.Unlock(ctr.UUID)
136                                 if err != nil {
137                                         logger.WithError(err).Warn("error unlocking")
138                                 }
139                         }
140                 }
141                 // Shut down idle workers that didn't get any
142                 // containers mapped onto them before we hit quota.
143                 for it, n := range unalloc {
144                         if n < 1 {
145                                 continue
146                         }
147                         pool.Shutdown(it)
148                 }
149         }
150 }