Fix 2.4.2 upgrade notes formatting refs #19330
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "context"
11         "sync"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 // A Scheduler maps queued containers onto unallocated workers in
21 // priority order, creating new workers if needed. It locks containers
22 // that can be mapped onto existing/pending workers, and starts them
23 // if possible.
24 //
25 // A Scheduler unlocks any containers that are locked but can't be
26 // mapped. (For example, this happens when the cloud provider reaches
27 // quota/capacity and a previously mappable container's priority is
28 // surpassed by a newer container.)
29 //
30 // If it encounters errors while creating new workers, a Scheduler
31 // shuts down idle workers, in case they are consuming quota.
32 type Scheduler struct {
33         logger              logrus.FieldLogger
34         queue               ContainerQueue
35         pool                WorkerPool
36         reg                 *prometheus.Registry
37         staleLockTimeout    time.Duration
38         queueUpdateInterval time.Duration
39
40         uuidOp map[string]string // operation in progress: "lock", "cancel", ...
41         mtx    sync.Mutex
42         wakeup *time.Timer
43
44         runOnce sync.Once
45         stop    chan struct{}
46         stopped chan struct{}
47
48         mContainersAllocatedNotStarted   prometheus.Gauge
49         mContainersNotAllocatedOverQuota prometheus.Gauge
50         mLongestWaitTimeSinceQueue       prometheus.Gauge
51 }
52
53 // New returns a new unstarted Scheduler.
54 //
55 // Any given queue and pool should not be used by more than one
56 // scheduler at a time.
57 func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
58         sch := &Scheduler{
59                 logger:              ctxlog.FromContext(ctx),
60                 queue:               queue,
61                 pool:                pool,
62                 reg:                 reg,
63                 staleLockTimeout:    staleLockTimeout,
64                 queueUpdateInterval: queueUpdateInterval,
65                 wakeup:              time.NewTimer(time.Second),
66                 stop:                make(chan struct{}),
67                 stopped:             make(chan struct{}),
68                 uuidOp:              map[string]string{},
69         }
70         sch.registerMetrics(reg)
71         return sch
72 }
73
74 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
75         if reg == nil {
76                 reg = prometheus.NewRegistry()
77         }
78         sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
79                 Namespace: "arvados",
80                 Subsystem: "dispatchcloud",
81                 Name:      "containers_allocated_not_started",
82                 Help:      "Number of containers allocated to a worker but not started yet (worker is booting).",
83         })
84         reg.MustRegister(sch.mContainersAllocatedNotStarted)
85         sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
86                 Namespace: "arvados",
87                 Subsystem: "dispatchcloud",
88                 Name:      "containers_not_allocated_over_quota",
89                 Help:      "Number of containers not allocated to a worker because the system has hit a quota.",
90         })
91         reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
92         sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
93                 Namespace: "arvados",
94                 Subsystem: "dispatchcloud",
95                 Name:      "containers_longest_wait_time_seconds",
96                 Help:      "Current longest wait time of any container since queuing, and before the start of crunch-run.",
97         })
98         reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
99 }
100
101 func (sch *Scheduler) updateMetrics() {
102         earliest := time.Time{}
103         entries, _ := sch.queue.Entries()
104         running := sch.pool.Running()
105         for _, ent := range entries {
106                 if ent.Container.Priority > 0 &&
107                         (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
108                         // Exclude containers that are preparing to run the payload (i.e.
109                         // ContainerStateLocked and running on a worker, most likely loading the
110                         // payload image
111                         if _, ok := running[ent.Container.UUID]; !ok {
112                                 if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
113                                         earliest = ent.Container.CreatedAt
114                                 }
115                         }
116                 }
117         }
118         if !earliest.IsZero() {
119                 sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
120         } else {
121                 sch.mLongestWaitTimeSinceQueue.Set(0)
122         }
123 }
124
125 // Start starts the scheduler.
126 func (sch *Scheduler) Start() {
127         go sch.runOnce.Do(sch.run)
128 }
129
130 // Stop stops the scheduler. No other method should be called after
131 // Stop.
132 func (sch *Scheduler) Stop() {
133         close(sch.stop)
134         <-sch.stopped
135 }
136
137 func (sch *Scheduler) run() {
138         defer close(sch.stopped)
139
140         // Ensure the queue is fetched once before attempting anything.
141         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
142                 sch.logger.Errorf("error updating queue: %s", err)
143                 d := sch.queueUpdateInterval / 10
144                 if d < time.Second {
145                         d = time.Second
146                 }
147                 sch.logger.Infof("waiting %s before retry", d)
148                 time.Sleep(d)
149         }
150
151         // Keep the queue up to date.
152         poll := time.NewTicker(sch.queueUpdateInterval)
153         defer poll.Stop()
154         go func() {
155                 for range poll.C {
156                         err := sch.queue.Update()
157                         if err != nil {
158                                 sch.logger.Errorf("error updating queue: %s", err)
159                         }
160                 }
161         }()
162
163         t0 := time.Now()
164         sch.logger.Infof("FixStaleLocks starting.")
165         sch.fixStaleLocks()
166         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
167
168         poolNotify := sch.pool.Subscribe()
169         defer sch.pool.Unsubscribe(poolNotify)
170
171         queueNotify := sch.queue.Subscribe()
172         defer sch.queue.Unsubscribe(queueNotify)
173
174         for {
175                 sch.runQueue()
176                 sch.sync()
177                 sch.updateMetrics()
178                 select {
179                 case <-sch.stop:
180                         return
181                 case <-queueNotify:
182                 case <-poolNotify:
183                 case <-sch.wakeup.C:
184                 }
185         }
186 }