1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
13 "github.com/Sirupsen/logrus"
16 // A Scheduler maps queued containers onto unallocated workers in
17 // priority order, creating new workers if needed. It locks containers
18 // that can be mapped onto existing/pending workers, and starts them
21 // A Scheduler unlocks any containers that are locked but can't be
22 // mapped. (For example, this happens when the cloud provider reaches
23 // quota/capacity and a previously mappable container's priority is
24 // surpassed by a newer container.)
26 // If it encounters errors while creating new workers, a Scheduler
27 // shuts down idle workers, in case they are consuming quota.
28 type Scheduler struct {
29 logger logrus.FieldLogger
32 staleLockTimeout time.Duration
33 queueUpdateInterval time.Duration
35 locking map[string]bool
43 // New returns a new unstarted Scheduler.
45 // Any given queue and pool should not be used by more than one
46 // scheduler at a time.
47 func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
52 staleLockTimeout: staleLockTimeout,
53 queueUpdateInterval: queueUpdateInterval,
54 stop: make(chan struct{}),
55 stopped: make(chan struct{}),
56 locking: map[string]bool{},
60 // Start starts the scheduler.
61 func (sch *Scheduler) Start() {
62 go sch.runOnce.Do(sch.run)
65 // Stop stops the scheduler. No other method should be called after
67 func (sch *Scheduler) Stop() {
72 func (sch *Scheduler) run() {
73 defer close(sch.stopped)
75 // Ensure the queue is fetched once before attempting anything.
76 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
77 sch.logger.Errorf("error updating queue: %s", err)
78 d := sch.queueUpdateInterval / 60
79 sch.logger.Infof("waiting %s before retry", d)
83 // Keep the queue up to date.
84 poll := time.NewTicker(sch.queueUpdateInterval)
88 err := sch.queue.Update()
90 sch.logger.Errorf("error updating queue: %s", err)
96 sch.logger.Infof("FixStaleLocks starting.")
98 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
100 poolNotify := sch.pool.Subscribe()
101 defer sch.pool.Unsubscribe(poolNotify)
103 queueNotify := sch.queue.Subscribe()
104 defer sch.queue.Unsubscribe(queueNotify)