1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
15 "git.arvados.org/arvados.git/sdk/go/arvados"
16 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
21 // A Scheduler maps queued containers onto unallocated workers in
22 // priority order, creating new workers if needed. It locks containers
23 // that can be mapped onto existing/pending workers, and starts them
26 // A Scheduler unlocks any containers that are locked but can't be
27 // mapped. (For example, this happens when the cloud provider reaches
28 // quota/capacity and a previously mappable container's priority is
29 // surpassed by a newer container.)
31 // If it encounters errors while creating new workers, a Scheduler
32 // shuts down idle workers, in case they are consuming quota.
33 type Scheduler struct {
34 logger logrus.FieldLogger
35 client *arvados.Client
38 reg *prometheus.Registry
39 staleLockTimeout time.Duration
40 queueUpdateInterval time.Duration
42 uuidOp map[string]string // operation in progress: "lock", "cancel", ...
50 last503time time.Time // last time API responded 503
51 maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
52 supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
53 maxInstances int // maximum number of instances the pool will bring up (0 = unlimited)
54 instancesWithinQuota int // max concurrency achieved since last quota error (0 = no quota error yet)
56 mContainersAllocatedNotStarted prometheus.Gauge
57 mContainersNotAllocatedOverQuota prometheus.Gauge
58 mLongestWaitTimeSinceQueue prometheus.Gauge
59 mLast503Time prometheus.Gauge
60 mMaxContainerConcurrency prometheus.Gauge
62 lastQueue atomic.Value // stores a []QueueEnt
65 // New returns a new unstarted Scheduler.
67 // Any given queue and pool should not be used by more than one
68 // scheduler at a time.
69 func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, minQuota, maxInstances int, supervisorFraction float64) *Scheduler {
71 logger: ctxlog.FromContext(ctx),
76 staleLockTimeout: staleLockTimeout,
77 queueUpdateInterval: queueUpdateInterval,
78 wakeup: time.NewTimer(time.Second),
79 stop: make(chan struct{}),
80 stopped: make(chan struct{}),
81 uuidOp: map[string]string{},
82 supervisorFraction: supervisorFraction,
83 maxInstances: maxInstances,
86 sch.maxConcurrency = minQuota
88 sch.maxConcurrency = maxInstances
90 sch.registerMetrics(reg)
94 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
96 reg = prometheus.NewRegistry()
98 sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
100 Subsystem: "dispatchcloud",
101 Name: "containers_allocated_not_started",
102 Help: "Number of containers allocated to a worker but not started yet (worker is booting).",
104 reg.MustRegister(sch.mContainersAllocatedNotStarted)
105 sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
106 Namespace: "arvados",
107 Subsystem: "dispatchcloud",
108 Name: "containers_not_allocated_over_quota",
109 Help: "Number of containers not allocated to a worker because the system has hit a quota.",
111 reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
112 sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
113 Namespace: "arvados",
114 Subsystem: "dispatchcloud",
115 Name: "containers_longest_wait_time_seconds",
116 Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.",
118 reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
119 sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
120 Namespace: "arvados",
121 Subsystem: "dispatchcloud",
122 Name: "last_503_time",
123 Help: "Time of most recent 503 error received from API.",
125 reg.MustRegister(sch.mLast503Time)
126 sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
127 Namespace: "arvados",
128 Subsystem: "dispatchcloud",
129 Name: "max_concurrent_containers",
130 Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
132 reg.MustRegister(sch.mMaxContainerConcurrency)
133 reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
134 Namespace: "arvados",
135 Subsystem: "dispatchcloud",
137 Help: "Flag indicating the cloud driver is reporting an at-quota condition.",
139 if sch.pool.AtQuota() {
147 func (sch *Scheduler) updateMetrics() {
148 earliest := time.Time{}
149 entries, _ := sch.queue.Entries()
150 running := sch.pool.Running()
151 for _, ent := range entries {
152 if ent.Container.Priority > 0 &&
153 (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
154 // Exclude containers that are preparing to run the payload (i.e.
155 // ContainerStateLocked and running on a worker, most likely loading the
157 if _, ok := running[ent.Container.UUID]; !ok {
158 if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
159 earliest = ent.Container.CreatedAt
164 if !earliest.IsZero() {
165 sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
167 sch.mLongestWaitTimeSinceQueue.Set(0)
171 // Start starts the scheduler.
172 func (sch *Scheduler) Start() {
173 go sch.runOnce.Do(sch.run)
176 // Stop stops the scheduler. No other method should be called after
178 func (sch *Scheduler) Stop() {
183 func (sch *Scheduler) run() {
184 defer close(sch.stopped)
186 // Ensure the queue is fetched once before attempting anything.
187 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
188 sch.logger.Errorf("error updating queue: %s", err)
189 d := sch.queueUpdateInterval / 10
193 sch.logger.Infof("waiting %s before retry", d)
197 // Keep the queue up to date.
200 starttime := time.Now()
201 err := sch.queue.Update()
203 sch.logger.Errorf("error updating queue: %s", err)
205 // If the previous update took a long time,
206 // that probably means the server is
207 // overloaded, so wait that long before doing
208 // another. Otherwise, wait for the configured
210 delay := time.Since(starttime)
211 if delay < sch.queueUpdateInterval {
212 delay = sch.queueUpdateInterval
219 sch.logger.Infof("FixStaleLocks starting.")
221 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
223 poolNotify := sch.pool.Subscribe()
224 defer sch.pool.Unsubscribe(poolNotify)
226 queueNotify := sch.queue.Subscribe()
227 defer sch.queue.Unsubscribe(queueNotify)