Merge branch '16814-remove-python2-arvbox' into master
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "context"
11         "sync"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/ctxlog"
15         "github.com/prometheus/client_golang/prometheus"
16         "github.com/sirupsen/logrus"
17 )
18
19 // A Scheduler maps queued containers onto unallocated workers in
20 // priority order, creating new workers if needed. It locks containers
21 // that can be mapped onto existing/pending workers, and starts them
22 // if possible.
23 //
24 // A Scheduler unlocks any containers that are locked but can't be
25 // mapped. (For example, this happens when the cloud provider reaches
26 // quota/capacity and a previously mappable container's priority is
27 // surpassed by a newer container.)
28 //
29 // If it encounters errors while creating new workers, a Scheduler
30 // shuts down idle workers, in case they are consuming quota.
31 type Scheduler struct {
32         logger              logrus.FieldLogger
33         queue               ContainerQueue
34         pool                WorkerPool
35         reg                 *prometheus.Registry
36         staleLockTimeout    time.Duration
37         queueUpdateInterval time.Duration
38
39         uuidOp map[string]string // operation in progress: "lock", "cancel", ...
40         mtx    sync.Mutex
41         wakeup *time.Timer
42
43         runOnce sync.Once
44         stop    chan struct{}
45         stopped chan struct{}
46
47         mContainersAllocatedNotStarted   prometheus.Gauge
48         mContainersNotAllocatedOverQuota prometheus.Gauge
49 }
50
51 // New returns a new unstarted Scheduler.
52 //
53 // Any given queue and pool should not be used by more than one
54 // scheduler at a time.
55 func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
56         sch := &Scheduler{
57                 logger:              ctxlog.FromContext(ctx),
58                 queue:               queue,
59                 pool:                pool,
60                 reg:                 reg,
61                 staleLockTimeout:    staleLockTimeout,
62                 queueUpdateInterval: queueUpdateInterval,
63                 wakeup:              time.NewTimer(time.Second),
64                 stop:                make(chan struct{}),
65                 stopped:             make(chan struct{}),
66                 uuidOp:              map[string]string{},
67         }
68         sch.registerMetrics(reg)
69         return sch
70 }
71
72 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
73         if reg == nil {
74                 reg = prometheus.NewRegistry()
75         }
76         sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
77                 Namespace: "arvados",
78                 Subsystem: "dispatchcloud",
79                 Name:      "containers_allocated_not_started",
80                 Help:      "Number of containers allocated to a worker but not started yet (worker is booting).",
81         })
82         reg.MustRegister(sch.mContainersAllocatedNotStarted)
83         sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
84                 Namespace: "arvados",
85                 Subsystem: "dispatchcloud",
86                 Name:      "containers_not_allocated_over_quota",
87                 Help:      "Number of containers not allocated to a worker because the system has hit a quota.",
88         })
89         reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
90 }
91
92 // Start starts the scheduler.
93 func (sch *Scheduler) Start() {
94         go sch.runOnce.Do(sch.run)
95 }
96
97 // Stop stops the scheduler. No other method should be called after
98 // Stop.
99 func (sch *Scheduler) Stop() {
100         close(sch.stop)
101         <-sch.stopped
102 }
103
104 func (sch *Scheduler) run() {
105         defer close(sch.stopped)
106
107         // Ensure the queue is fetched once before attempting anything.
108         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
109                 sch.logger.Errorf("error updating queue: %s", err)
110                 d := sch.queueUpdateInterval / 10
111                 if d < time.Second {
112                         d = time.Second
113                 }
114                 sch.logger.Infof("waiting %s before retry", d)
115                 time.Sleep(d)
116         }
117
118         // Keep the queue up to date.
119         poll := time.NewTicker(sch.queueUpdateInterval)
120         defer poll.Stop()
121         go func() {
122                 for range poll.C {
123                         err := sch.queue.Update()
124                         if err != nil {
125                                 sch.logger.Errorf("error updating queue: %s", err)
126                         }
127                 }
128         }()
129
130         t0 := time.Now()
131         sch.logger.Infof("FixStaleLocks starting.")
132         sch.fixStaleLocks()
133         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
134
135         poolNotify := sch.pool.Subscribe()
136         defer sch.pool.Unsubscribe(poolNotify)
137
138         queueNotify := sch.queue.Subscribe()
139         defer sch.queue.Unsubscribe(queueNotify)
140
141         for {
142                 sch.runQueue()
143                 sch.sync()
144                 select {
145                 case <-sch.stop:
146                         return
147                 case <-queueNotify:
148                 case <-poolNotify:
149                 case <-sch.wakeup.C:
150                 }
151         }
152 }