Merge branch '21123-scheduling-status'
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "context"
11         "sync"
12         "sync/atomic"
13         "time"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/ctxlog"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19 )
20
21 // A Scheduler maps queued containers onto unallocated workers in
22 // priority order, creating new workers if needed. It locks containers
23 // that can be mapped onto existing/pending workers, and starts them
24 // if possible.
25 //
26 // A Scheduler unlocks any containers that are locked but can't be
27 // mapped. (For example, this happens when the cloud provider reaches
28 // quota/capacity and a previously mappable container's priority is
29 // surpassed by a newer container.)
30 //
31 // If it encounters errors while creating new workers, a Scheduler
32 // shuts down idle workers, in case they are consuming quota.
33 type Scheduler struct {
34         logger              logrus.FieldLogger
35         client              *arvados.Client
36         queue               ContainerQueue
37         pool                WorkerPool
38         reg                 *prometheus.Registry
39         staleLockTimeout    time.Duration
40         queueUpdateInterval time.Duration
41
42         uuidOp map[string]string // operation in progress: "lock", "cancel", ...
43         mtx    sync.Mutex
44         wakeup *time.Timer
45
46         runOnce sync.Once
47         stop    chan struct{}
48         stopped chan struct{}
49
50         last503time          time.Time // last time API responded 503
51         maxConcurrency       int       // dynamic container limit (0 = unlimited), see runQueue()
52         supervisorFraction   float64   // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
53         maxInstances         int       // maximum number of instances the pool will bring up (0 = unlimited)
54         instancesWithinQuota int       // max concurrency achieved since last quota error (0 = no quota error yet)
55
56         mContainersAllocatedNotStarted   prometheus.Gauge
57         mContainersNotAllocatedOverQuota prometheus.Gauge
58         mLongestWaitTimeSinceQueue       prometheus.Gauge
59         mLast503Time                     prometheus.Gauge
60         mMaxContainerConcurrency         prometheus.Gauge
61
62         lastQueue atomic.Value // stores a []QueueEnt
63 }
64
65 // New returns a new unstarted Scheduler.
66 //
67 // Any given queue and pool should not be used by more than one
68 // scheduler at a time.
69 func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, minQuota, maxInstances int, supervisorFraction float64) *Scheduler {
70         sch := &Scheduler{
71                 logger:              ctxlog.FromContext(ctx),
72                 client:              client,
73                 queue:               queue,
74                 pool:                pool,
75                 reg:                 reg,
76                 staleLockTimeout:    staleLockTimeout,
77                 queueUpdateInterval: queueUpdateInterval,
78                 wakeup:              time.NewTimer(time.Second),
79                 stop:                make(chan struct{}),
80                 stopped:             make(chan struct{}),
81                 uuidOp:              map[string]string{},
82                 supervisorFraction:  supervisorFraction,
83                 maxInstances:        maxInstances,
84         }
85         if minQuota > 0 {
86                 sch.maxConcurrency = minQuota
87         } else {
88                 sch.maxConcurrency = maxInstances
89         }
90         sch.registerMetrics(reg)
91         return sch
92 }
93
94 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
95         if reg == nil {
96                 reg = prometheus.NewRegistry()
97         }
98         sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
99                 Namespace: "arvados",
100                 Subsystem: "dispatchcloud",
101                 Name:      "containers_allocated_not_started",
102                 Help:      "Number of containers allocated to a worker but not started yet (worker is booting).",
103         })
104         reg.MustRegister(sch.mContainersAllocatedNotStarted)
105         sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
106                 Namespace: "arvados",
107                 Subsystem: "dispatchcloud",
108                 Name:      "containers_not_allocated_over_quota",
109                 Help:      "Number of containers not allocated to a worker because the system has hit a quota.",
110         })
111         reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
112         sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
113                 Namespace: "arvados",
114                 Subsystem: "dispatchcloud",
115                 Name:      "containers_longest_wait_time_seconds",
116                 Help:      "Current longest wait time of any container since queuing, and before the start of crunch-run.",
117         })
118         reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
119         sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
120                 Namespace: "arvados",
121                 Subsystem: "dispatchcloud",
122                 Name:      "last_503_time",
123                 Help:      "Time of most recent 503 error received from API.",
124         })
125         reg.MustRegister(sch.mLast503Time)
126         sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
127                 Namespace: "arvados",
128                 Subsystem: "dispatchcloud",
129                 Name:      "max_concurrent_containers",
130                 Help:      "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
131         })
132         reg.MustRegister(sch.mMaxContainerConcurrency)
133         reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
134                 Namespace: "arvados",
135                 Subsystem: "dispatchcloud",
136                 Name:      "at_quota",
137                 Help:      "Flag indicating the cloud driver is reporting an at-quota condition.",
138         }, func() float64 {
139                 if sch.pool.AtQuota() {
140                         return 1
141                 } else {
142                         return 0
143                 }
144         }))
145 }
146
147 func (sch *Scheduler) updateMetrics() {
148         earliest := time.Time{}
149         entries, _ := sch.queue.Entries()
150         running := sch.pool.Running()
151         for _, ent := range entries {
152                 if ent.Container.Priority > 0 &&
153                         (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
154                         // Exclude containers that are preparing to run the payload (i.e.
155                         // ContainerStateLocked and running on a worker, most likely loading the
156                         // payload image
157                         if _, ok := running[ent.Container.UUID]; !ok {
158                                 if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
159                                         earliest = ent.Container.CreatedAt
160                                 }
161                         }
162                 }
163         }
164         if !earliest.IsZero() {
165                 sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
166         } else {
167                 sch.mLongestWaitTimeSinceQueue.Set(0)
168         }
169 }
170
171 // Start starts the scheduler.
172 func (sch *Scheduler) Start() {
173         go sch.runOnce.Do(sch.run)
174 }
175
176 // Stop stops the scheduler. No other method should be called after
177 // Stop.
178 func (sch *Scheduler) Stop() {
179         close(sch.stop)
180         <-sch.stopped
181 }
182
183 func (sch *Scheduler) run() {
184         defer close(sch.stopped)
185
186         // Ensure the queue is fetched once before attempting anything.
187         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
188                 sch.logger.Errorf("error updating queue: %s", err)
189                 d := sch.queueUpdateInterval / 10
190                 if d < time.Second {
191                         d = time.Second
192                 }
193                 sch.logger.Infof("waiting %s before retry", d)
194                 time.Sleep(d)
195         }
196
197         // Keep the queue up to date.
198         go func() {
199                 for {
200                         starttime := time.Now()
201                         err := sch.queue.Update()
202                         if err != nil {
203                                 sch.logger.Errorf("error updating queue: %s", err)
204                         }
205                         // If the previous update took a long time,
206                         // that probably means the server is
207                         // overloaded, so wait that long before doing
208                         // another. Otherwise, wait for the configured
209                         // poll interval.
210                         delay := time.Since(starttime)
211                         if delay < sch.queueUpdateInterval {
212                                 delay = sch.queueUpdateInterval
213                         }
214                         time.Sleep(delay)
215                 }
216         }()
217
218         t0 := time.Now()
219         sch.logger.Infof("FixStaleLocks starting.")
220         sch.fixStaleLocks()
221         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
222
223         poolNotify := sch.pool.Subscribe()
224         defer sch.pool.Unsubscribe(poolNotify)
225
226         queueNotify := sch.queue.Subscribe()
227         defer sch.queue.Unsubscribe(queueNotify)
228
229         for {
230                 sch.runQueue()
231                 sch.sync()
232                 sch.updateMetrics()
233                 select {
234                 case <-sch.stop:
235                         return
236                 case <-queueNotify:
237                 case <-poolNotify:
238                 case <-sch.wakeup.C:
239                 }
240         }
241 }