1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
13 "github.com/Sirupsen/logrus"
16 // A Scheduler maps queued containers onto unallocated workers in
17 // priority order, creating new workers if needed. It locks containers
18 // that can be mapped onto existing/pending workers, and starts them
21 // A Scheduler unlocks any containers that are locked but can't be
22 // mapped. (For example, this happens when the cloud provider reaches
23 // quota/capacity and a previously mappable container's priority is
24 // surpassed by a newer container.)
26 // If it encounters errors while creating new workers, a Scheduler
27 // shuts down idle workers, in case they are consuming quota.
28 type Scheduler struct {
29 logger logrus.FieldLogger
32 staleLockTimeout time.Duration
33 queueUpdateInterval time.Duration
35 locking map[string]bool
42 // New returns a new unstarted Scheduler.
44 // Any given queue and pool should not be used by more than one
45 // scheduler at a time.
46 func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
51 staleLockTimeout: staleLockTimeout,
52 queueUpdateInterval: queueUpdateInterval,
53 stop: make(chan struct{}),
54 locking: map[string]bool{},
58 // Start starts the scheduler.
59 func (sch *Scheduler) Start() {
60 go sch.runOnce.Do(sch.run)
63 // Stop stops the scheduler. No other method should be called after
65 func (sch *Scheduler) Stop() {
69 func (sch *Scheduler) run() {
70 // Ensure the queue is fetched once before attempting anything.
71 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
72 sch.logger.Errorf("error updating queue: %s", err)
73 d := sch.queueUpdateInterval / 60
74 sch.logger.Infof("waiting %s before retry", d)
78 // Keep the queue up to date.
79 poll := time.NewTicker(sch.queueUpdateInterval)
83 err := sch.queue.Update()
85 sch.logger.Errorf("error updating queue: %s", err)
91 sch.logger.Infof("FixStaleLocks starting.")
93 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
95 poolNotify := sch.pool.Subscribe()
96 defer sch.pool.Unsubscribe(poolNotify)
98 queueNotify := sch.queue.Subscribe()
99 defer sch.queue.Unsubscribe(queueNotify)