1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 "github.com/prometheus/client_golang/prometheus"
17 "github.com/sirupsen/logrus"
20 // A Scheduler maps queued containers onto unallocated workers in
21 // priority order, creating new workers if needed. It locks containers
22 // that can be mapped onto existing/pending workers, and starts them
25 // A Scheduler unlocks any containers that are locked but can't be
26 // mapped. (For example, this happens when the cloud provider reaches
27 // quota/capacity and a previously mappable container's priority is
28 // surpassed by a newer container.)
30 // If it encounters errors while creating new workers, a Scheduler
31 // shuts down idle workers, in case they are consuming quota.
32 type Scheduler struct {
33 logger logrus.FieldLogger
34 client *arvados.Client
37 reg *prometheus.Registry
38 staleLockTimeout time.Duration
39 queueUpdateInterval time.Duration
41 uuidOp map[string]string // operation in progress: "lock", "cancel", ...
49 last503time time.Time // last time API responded 503
50 maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
52 mContainersAllocatedNotStarted prometheus.Gauge
53 mContainersNotAllocatedOverQuota prometheus.Gauge
54 mLongestWaitTimeSinceQueue prometheus.Gauge
55 mLast503Time prometheus.Gauge
56 mMaxContainerConcurrency prometheus.Gauge
59 // New returns a new unstarted Scheduler.
61 // Any given queue and pool should not be used by more than one
62 // scheduler at a time.
63 func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
65 logger: ctxlog.FromContext(ctx),
70 staleLockTimeout: staleLockTimeout,
71 queueUpdateInterval: queueUpdateInterval,
72 wakeup: time.NewTimer(time.Second),
73 stop: make(chan struct{}),
74 stopped: make(chan struct{}),
75 uuidOp: map[string]string{},
77 sch.registerMetrics(reg)
81 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
83 reg = prometheus.NewRegistry()
85 sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
87 Subsystem: "dispatchcloud",
88 Name: "containers_allocated_not_started",
89 Help: "Number of containers allocated to a worker but not started yet (worker is booting).",
91 reg.MustRegister(sch.mContainersAllocatedNotStarted)
92 sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
94 Subsystem: "dispatchcloud",
95 Name: "containers_not_allocated_over_quota",
96 Help: "Number of containers not allocated to a worker because the system has hit a quota.",
98 reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
99 sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
100 Namespace: "arvados",
101 Subsystem: "dispatchcloud",
102 Name: "containers_longest_wait_time_seconds",
103 Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.",
105 reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
106 sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
107 Namespace: "arvados",
108 Subsystem: "dispatchcloud",
109 Name: "last_503_time",
110 Help: "Time of most recent 503 error received from API.",
112 reg.MustRegister(sch.mLast503Time)
113 sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
114 Namespace: "arvados",
115 Subsystem: "dispatchcloud",
116 Name: "max_concurrent_containers",
117 Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
119 reg.MustRegister(sch.mMaxContainerConcurrency)
122 func (sch *Scheduler) updateMetrics() {
123 earliest := time.Time{}
124 entries, _ := sch.queue.Entries()
125 running := sch.pool.Running()
126 for _, ent := range entries {
127 if ent.Container.Priority > 0 &&
128 (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
129 // Exclude containers that are preparing to run the payload (i.e.
130 // ContainerStateLocked and running on a worker, most likely loading the
132 if _, ok := running[ent.Container.UUID]; !ok {
133 if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
134 earliest = ent.Container.CreatedAt
139 if !earliest.IsZero() {
140 sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
142 sch.mLongestWaitTimeSinceQueue.Set(0)
146 // Start starts the scheduler.
147 func (sch *Scheduler) Start() {
148 go sch.runOnce.Do(sch.run)
151 // Stop stops the scheduler. No other method should be called after
153 func (sch *Scheduler) Stop() {
158 func (sch *Scheduler) run() {
159 defer close(sch.stopped)
161 // Ensure the queue is fetched once before attempting anything.
162 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
163 sch.logger.Errorf("error updating queue: %s", err)
164 d := sch.queueUpdateInterval / 10
168 sch.logger.Infof("waiting %s before retry", d)
172 // Keep the queue up to date.
173 poll := time.NewTicker(sch.queueUpdateInterval)
177 err := sch.queue.Update()
179 sch.logger.Errorf("error updating queue: %s", err)
185 sch.logger.Infof("FixStaleLocks starting.")
187 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
189 poolNotify := sch.pool.Subscribe()
190 defer sch.pool.Unsubscribe(poolNotify)
192 queueNotify := sch.queue.Subscribe()
193 defer sch.queue.Unsubscribe(queueNotify)