1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
13 "github.com/Sirupsen/logrus"
16 // A Scheduler maps queued containers onto unallocated workers in
17 // priority order, creating new workers if needed. It locks containers
18 // that can be mapped onto existing/pending workers, and starts them
21 // A Scheduler unlocks any containers that are locked but can't be
22 // mapped. (For example, this happens when the cloud provider reaches
23 // quota/capacity and a previously mappable container's priority is
24 // surpassed by a newer container.)
26 // If it encounters errors while creating new workers, a Scheduler
27 // shuts down idle workers, in case they are consuming quota.
28 type Scheduler struct {
29 logger logrus.FieldLogger
32 staleLockTimeout time.Duration
33 queueUpdateInterval time.Duration
39 // New returns a new unstarted Scheduler.
41 // Any given queue and pool should not be used by more than one
42 // scheduler at a time.
43 func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
48 staleLockTimeout: staleLockTimeout,
49 queueUpdateInterval: queueUpdateInterval,
50 stop: make(chan struct{}),
54 // Start starts the scheduler.
55 func (sch *Scheduler) Start() {
56 go sch.runOnce.Do(sch.run)
59 // Stop stops the scheduler. No other method should be called after
61 func (sch *Scheduler) Stop() {
65 func (sch *Scheduler) run() {
66 // Ensure the queue is fetched once before attempting anything.
67 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
68 sch.logger.Errorf("error updating queue: %s", err)
69 d := sch.queueUpdateInterval / 60
70 sch.logger.Infof("waiting %s before retry", d)
74 // Keep the queue up to date.
75 poll := time.NewTicker(sch.queueUpdateInterval)
79 err := sch.queue.Update()
81 sch.logger.Errorf("error updating queue: %s", err)
87 sch.logger.Infof("FixStaleLocks starting.")
89 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
91 poolNotify := sch.pool.Subscribe()
92 defer sch.pool.Unsubscribe(poolNotify)
94 queueNotify := sch.queue.Subscribe()
95 defer sch.queue.Unsubscribe(queueNotify)