20511: Don't shutdown excess instances just because MaxSupervisors.
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "context"
11         "sync"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 // A Scheduler maps queued containers onto unallocated workers in
21 // priority order, creating new workers if needed. It locks containers
22 // that can be mapped onto existing/pending workers, and starts them
23 // if possible.
24 //
25 // A Scheduler unlocks any containers that are locked but can't be
26 // mapped. (For example, this happens when the cloud provider reaches
27 // quota/capacity and a previously mappable container's priority is
28 // surpassed by a newer container.)
29 //
30 // If it encounters errors while creating new workers, a Scheduler
31 // shuts down idle workers, in case they are consuming quota.
32 type Scheduler struct {
33         logger              logrus.FieldLogger
34         client              *arvados.Client
35         queue               ContainerQueue
36         pool                WorkerPool
37         reg                 *prometheus.Registry
38         staleLockTimeout    time.Duration
39         queueUpdateInterval time.Duration
40
41         uuidOp map[string]string // operation in progress: "lock", "cancel", ...
42         mtx    sync.Mutex
43         wakeup *time.Timer
44
45         runOnce sync.Once
46         stop    chan struct{}
47         stopped chan struct{}
48
49         last503time    time.Time // last time API responded 503
50         maxConcurrency int       // dynamic container limit (0 = unlimited), see runQueue()
51         maxSupervisors int       // maximum number of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
52
53         mContainersAllocatedNotStarted   prometheus.Gauge
54         mContainersNotAllocatedOverQuota prometheus.Gauge
55         mLongestWaitTimeSinceQueue       prometheus.Gauge
56         mLast503Time                     prometheus.Gauge
57         mMaxContainerConcurrency         prometheus.Gauge
58 }
59
60 // New returns a new unstarted Scheduler.
61 //
62 // Any given queue and pool should not be used by more than one
63 // scheduler at a time.
64 func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxSupervisors int) *Scheduler {
65         sch := &Scheduler{
66                 logger:              ctxlog.FromContext(ctx),
67                 client:              client,
68                 queue:               queue,
69                 pool:                pool,
70                 reg:                 reg,
71                 staleLockTimeout:    staleLockTimeout,
72                 queueUpdateInterval: queueUpdateInterval,
73                 wakeup:              time.NewTimer(time.Second),
74                 stop:                make(chan struct{}),
75                 stopped:             make(chan struct{}),
76                 uuidOp:              map[string]string{},
77                 maxSupervisors:      maxSupervisors,
78         }
79         sch.registerMetrics(reg)
80         return sch
81 }
82
83 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
84         if reg == nil {
85                 reg = prometheus.NewRegistry()
86         }
87         sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
88                 Namespace: "arvados",
89                 Subsystem: "dispatchcloud",
90                 Name:      "containers_allocated_not_started",
91                 Help:      "Number of containers allocated to a worker but not started yet (worker is booting).",
92         })
93         reg.MustRegister(sch.mContainersAllocatedNotStarted)
94         sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
95                 Namespace: "arvados",
96                 Subsystem: "dispatchcloud",
97                 Name:      "containers_not_allocated_over_quota",
98                 Help:      "Number of containers not allocated to a worker because the system has hit a quota.",
99         })
100         reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
101         sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
102                 Namespace: "arvados",
103                 Subsystem: "dispatchcloud",
104                 Name:      "containers_longest_wait_time_seconds",
105                 Help:      "Current longest wait time of any container since queuing, and before the start of crunch-run.",
106         })
107         reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
108         sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
109                 Namespace: "arvados",
110                 Subsystem: "dispatchcloud",
111                 Name:      "last_503_time",
112                 Help:      "Time of most recent 503 error received from API.",
113         })
114         reg.MustRegister(sch.mLast503Time)
115         sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
116                 Namespace: "arvados",
117                 Subsystem: "dispatchcloud",
118                 Name:      "max_concurrent_containers",
119                 Help:      "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
120         })
121         reg.MustRegister(sch.mMaxContainerConcurrency)
122 }
123
124 func (sch *Scheduler) updateMetrics() {
125         earliest := time.Time{}
126         entries, _ := sch.queue.Entries()
127         running := sch.pool.Running()
128         for _, ent := range entries {
129                 if ent.Container.Priority > 0 &&
130                         (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
131                         // Exclude containers that are preparing to run the payload (i.e.
132                         // ContainerStateLocked and running on a worker, most likely loading the
133                         // payload image
134                         if _, ok := running[ent.Container.UUID]; !ok {
135                                 if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
136                                         earliest = ent.Container.CreatedAt
137                                 }
138                         }
139                 }
140         }
141         if !earliest.IsZero() {
142                 sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
143         } else {
144                 sch.mLongestWaitTimeSinceQueue.Set(0)
145         }
146 }
147
148 // Start starts the scheduler.
149 func (sch *Scheduler) Start() {
150         go sch.runOnce.Do(sch.run)
151 }
152
153 // Stop stops the scheduler. No other method should be called after
154 // Stop.
155 func (sch *Scheduler) Stop() {
156         close(sch.stop)
157         <-sch.stopped
158 }
159
160 func (sch *Scheduler) run() {
161         defer close(sch.stopped)
162
163         // Ensure the queue is fetched once before attempting anything.
164         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
165                 sch.logger.Errorf("error updating queue: %s", err)
166                 d := sch.queueUpdateInterval / 10
167                 if d < time.Second {
168                         d = time.Second
169                 }
170                 sch.logger.Infof("waiting %s before retry", d)
171                 time.Sleep(d)
172         }
173
174         // Keep the queue up to date.
175         poll := time.NewTicker(sch.queueUpdateInterval)
176         defer poll.Stop()
177         go func() {
178                 for range poll.C {
179                         err := sch.queue.Update()
180                         if err != nil {
181                                 sch.logger.Errorf("error updating queue: %s", err)
182                         }
183                 }
184         }()
185
186         t0 := time.Now()
187         sch.logger.Infof("FixStaleLocks starting.")
188         sch.fixStaleLocks()
189         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
190
191         poolNotify := sch.pool.Subscribe()
192         defer sch.pool.Unsubscribe(poolNotify)
193
194         queueNotify := sch.queue.Subscribe()
195         defer sch.queue.Unsubscribe(queueNotify)
196
197         for {
198                 sch.runQueue()
199                 sch.sync()
200                 sch.updateMetrics()
201                 select {
202                 case <-sch.stop:
203                         return
204                 case <-queueNotify:
205                 case <-poolNotify:
206                 case <-sch.wakeup.C:
207                 }
208         }
209 }