1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 "github.com/prometheus/client_golang/prometheus"
17 "github.com/sirupsen/logrus"
20 // A Scheduler maps queued containers onto unallocated workers in
21 // priority order, creating new workers if needed. It locks containers
22 // that can be mapped onto existing/pending workers, and starts them
25 // A Scheduler unlocks any containers that are locked but can't be
26 // mapped. (For example, this happens when the cloud provider reaches
27 // quota/capacity and a previously mappable container's priority is
28 // surpassed by a newer container.)
30 // If it encounters errors while creating new workers, a Scheduler
31 // shuts down idle workers, in case they are consuming quota.
32 type Scheduler struct {
33 logger logrus.FieldLogger
36 reg *prometheus.Registry
37 staleLockTimeout time.Duration
38 queueUpdateInterval time.Duration
40 uuidOp map[string]string // operation in progress: "lock", "cancel", ...
48 mContainersAllocatedNotStarted prometheus.Gauge
49 mContainersNotAllocatedOverQuota prometheus.Gauge
50 mLongestWaitTimeSinceQueue prometheus.Gauge
53 // New returns a new unstarted Scheduler.
55 // Any given queue and pool should not be used by more than one
56 // scheduler at a time.
57 func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
59 logger: ctxlog.FromContext(ctx),
63 staleLockTimeout: staleLockTimeout,
64 queueUpdateInterval: queueUpdateInterval,
65 wakeup: time.NewTimer(time.Second),
66 stop: make(chan struct{}),
67 stopped: make(chan struct{}),
68 uuidOp: map[string]string{},
70 sch.registerMetrics(reg)
74 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
76 reg = prometheus.NewRegistry()
78 sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
80 Subsystem: "dispatchcloud",
81 Name: "containers_allocated_not_started",
82 Help: "Number of containers allocated to a worker but not started yet (worker is booting).",
84 reg.MustRegister(sch.mContainersAllocatedNotStarted)
85 sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
87 Subsystem: "dispatchcloud",
88 Name: "containers_not_allocated_over_quota",
89 Help: "Number of containers not allocated to a worker because the system has hit a quota.",
91 reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
92 sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
94 Subsystem: "dispatchcloud",
95 Name: "containers_longest_wait_time_seconds",
96 Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.",
98 reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
101 func (sch *Scheduler) updateMetrics() {
102 earliest := time.Time{}
103 entries, _ := sch.queue.Entries()
104 running := sch.pool.Running()
105 for _, ent := range entries {
106 if ent.Container.Priority > 0 &&
107 (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
108 // Exclude containers that are preparing to run the payload (i.e.
109 // ContainerStateLocked and running on a worker, most likely loading the
111 if _, ok := running[ent.Container.UUID]; !ok {
112 if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
113 earliest = ent.Container.CreatedAt
118 if !earliest.IsZero() {
119 sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
121 sch.mLongestWaitTimeSinceQueue.Set(0)
125 // Start starts the scheduler.
126 func (sch *Scheduler) Start() {
127 go sch.runOnce.Do(sch.run)
130 // Stop stops the scheduler. No other method should be called after
132 func (sch *Scheduler) Stop() {
137 func (sch *Scheduler) run() {
138 defer close(sch.stopped)
140 // Ensure the queue is fetched once before attempting anything.
141 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
142 sch.logger.Errorf("error updating queue: %s", err)
143 d := sch.queueUpdateInterval / 10
147 sch.logger.Infof("waiting %s before retry", d)
151 // Keep the queue up to date.
152 poll := time.NewTicker(sch.queueUpdateInterval)
156 err := sch.queue.Update()
158 sch.logger.Errorf("error updating queue: %s", err)
164 sch.logger.Infof("FixStaleLocks starting.")
166 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
168 poolNotify := sch.pool.Subscribe()
169 defer sch.pool.Unsubscribe(poolNotify)
171 queueNotify := sch.queue.Subscribe()
172 defer sch.queue.Unsubscribe(queueNotify)