Merge branch 'main' from workbench2.git
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "context"
11         "sync"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 // A Scheduler maps queued containers onto unallocated workers in
21 // priority order, creating new workers if needed. It locks containers
22 // that can be mapped onto existing/pending workers, and starts them
23 // if possible.
24 //
25 // A Scheduler unlocks any containers that are locked but can't be
26 // mapped. (For example, this happens when the cloud provider reaches
27 // quota/capacity and a previously mappable container's priority is
28 // surpassed by a newer container.)
29 //
30 // If it encounters errors while creating new workers, a Scheduler
31 // shuts down idle workers, in case they are consuming quota.
32 type Scheduler struct {
33         logger              logrus.FieldLogger
34         client              *arvados.Client
35         queue               ContainerQueue
36         pool                WorkerPool
37         reg                 *prometheus.Registry
38         staleLockTimeout    time.Duration
39         queueUpdateInterval time.Duration
40
41         uuidOp map[string]string // operation in progress: "lock", "cancel", ...
42         mtx    sync.Mutex
43         wakeup *time.Timer
44
45         runOnce sync.Once
46         stop    chan struct{}
47         stopped chan struct{}
48
49         last503time          time.Time // last time API responded 503
50         maxConcurrency       int       // dynamic container limit (0 = unlimited), see runQueue()
51         supervisorFraction   float64   // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
52         maxInstances         int       // maximum number of instances the pool will bring up (0 = unlimited)
53         instancesWithinQuota int       // max concurrency achieved since last quota error (0 = no quota error yet)
54
55         mContainersAllocatedNotStarted   prometheus.Gauge
56         mContainersNotAllocatedOverQuota prometheus.Gauge
57         mLongestWaitTimeSinceQueue       prometheus.Gauge
58         mLast503Time                     prometheus.Gauge
59         mMaxContainerConcurrency         prometheus.Gauge
60 }
61
62 // New returns a new unstarted Scheduler.
63 //
64 // Any given queue and pool should not be used by more than one
65 // scheduler at a time.
66 func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, minQuota, maxInstances int, supervisorFraction float64) *Scheduler {
67         sch := &Scheduler{
68                 logger:              ctxlog.FromContext(ctx),
69                 client:              client,
70                 queue:               queue,
71                 pool:                pool,
72                 reg:                 reg,
73                 staleLockTimeout:    staleLockTimeout,
74                 queueUpdateInterval: queueUpdateInterval,
75                 wakeup:              time.NewTimer(time.Second),
76                 stop:                make(chan struct{}),
77                 stopped:             make(chan struct{}),
78                 uuidOp:              map[string]string{},
79                 supervisorFraction:  supervisorFraction,
80                 maxInstances:        maxInstances,
81         }
82         if minQuota > 0 {
83                 sch.maxConcurrency = minQuota
84         } else {
85                 sch.maxConcurrency = maxInstances
86         }
87         sch.registerMetrics(reg)
88         return sch
89 }
90
91 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
92         if reg == nil {
93                 reg = prometheus.NewRegistry()
94         }
95         sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
96                 Namespace: "arvados",
97                 Subsystem: "dispatchcloud",
98                 Name:      "containers_allocated_not_started",
99                 Help:      "Number of containers allocated to a worker but not started yet (worker is booting).",
100         })
101         reg.MustRegister(sch.mContainersAllocatedNotStarted)
102         sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
103                 Namespace: "arvados",
104                 Subsystem: "dispatchcloud",
105                 Name:      "containers_not_allocated_over_quota",
106                 Help:      "Number of containers not allocated to a worker because the system has hit a quota.",
107         })
108         reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
109         sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
110                 Namespace: "arvados",
111                 Subsystem: "dispatchcloud",
112                 Name:      "containers_longest_wait_time_seconds",
113                 Help:      "Current longest wait time of any container since queuing, and before the start of crunch-run.",
114         })
115         reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
116         sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
117                 Namespace: "arvados",
118                 Subsystem: "dispatchcloud",
119                 Name:      "last_503_time",
120                 Help:      "Time of most recent 503 error received from API.",
121         })
122         reg.MustRegister(sch.mLast503Time)
123         sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
124                 Namespace: "arvados",
125                 Subsystem: "dispatchcloud",
126                 Name:      "max_concurrent_containers",
127                 Help:      "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
128         })
129         reg.MustRegister(sch.mMaxContainerConcurrency)
130         reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
131                 Namespace: "arvados",
132                 Subsystem: "dispatchcloud",
133                 Name:      "at_quota",
134                 Help:      "Flag indicating the cloud driver is reporting an at-quota condition.",
135         }, func() float64 {
136                 if sch.pool.AtQuota() {
137                         return 1
138                 } else {
139                         return 0
140                 }
141         }))
142 }
143
144 func (sch *Scheduler) updateMetrics() {
145         earliest := time.Time{}
146         entries, _ := sch.queue.Entries()
147         running := sch.pool.Running()
148         for _, ent := range entries {
149                 if ent.Container.Priority > 0 &&
150                         (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
151                         // Exclude containers that are preparing to run the payload (i.e.
152                         // ContainerStateLocked and running on a worker, most likely loading the
153                         // payload image
154                         if _, ok := running[ent.Container.UUID]; !ok {
155                                 if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
156                                         earliest = ent.Container.CreatedAt
157                                 }
158                         }
159                 }
160         }
161         if !earliest.IsZero() {
162                 sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
163         } else {
164                 sch.mLongestWaitTimeSinceQueue.Set(0)
165         }
166 }
167
168 // Start starts the scheduler.
169 func (sch *Scheduler) Start() {
170         go sch.runOnce.Do(sch.run)
171 }
172
173 // Stop stops the scheduler. No other method should be called after
174 // Stop.
175 func (sch *Scheduler) Stop() {
176         close(sch.stop)
177         <-sch.stopped
178 }
179
180 func (sch *Scheduler) run() {
181         defer close(sch.stopped)
182
183         // Ensure the queue is fetched once before attempting anything.
184         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
185                 sch.logger.Errorf("error updating queue: %s", err)
186                 d := sch.queueUpdateInterval / 10
187                 if d < time.Second {
188                         d = time.Second
189                 }
190                 sch.logger.Infof("waiting %s before retry", d)
191                 time.Sleep(d)
192         }
193
194         // Keep the queue up to date.
195         go func() {
196                 for {
197                         starttime := time.Now()
198                         err := sch.queue.Update()
199                         if err != nil {
200                                 sch.logger.Errorf("error updating queue: %s", err)
201                         }
202                         // If the previous update took a long time,
203                         // that probably means the server is
204                         // overloaded, so wait that long before doing
205                         // another. Otherwise, wait for the configured
206                         // poll interval.
207                         delay := time.Since(starttime)
208                         if delay < sch.queueUpdateInterval {
209                                 delay = sch.queueUpdateInterval
210                         }
211                         time.Sleep(delay)
212                 }
213         }()
214
215         t0 := time.Now()
216         sch.logger.Infof("FixStaleLocks starting.")
217         sch.fixStaleLocks()
218         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
219
220         poolNotify := sch.pool.Subscribe()
221         defer sch.pool.Unsubscribe(poolNotify)
222
223         queueNotify := sch.queue.Subscribe()
224         defer sch.queue.Unsubscribe(queueNotify)
225
226         for {
227                 sch.runQueue()
228                 sch.sync()
229                 sch.updateMetrics()
230                 select {
231                 case <-sch.stop:
232                         return
233                 case <-queueNotify:
234                 case <-poolNotify:
235                 case <-sch.wakeup.C:
236                 }
237         }
238 }