1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
14 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
15 "github.com/sirupsen/logrus"
18 // A Scheduler maps queued containers onto unallocated workers in
19 // priority order, creating new workers if needed. It locks containers
20 // that can be mapped onto existing/pending workers, and starts them
23 // A Scheduler unlocks any containers that are locked but can't be
24 // mapped. (For example, this happens when the cloud provider reaches
25 // quota/capacity and a previously mappable container's priority is
26 // surpassed by a newer container.)
28 // If it encounters errors while creating new workers, a Scheduler
29 // shuts down idle workers, in case they are consuming quota.
30 type Scheduler struct {
31 logger logrus.FieldLogger
34 staleLockTimeout time.Duration
35 queueUpdateInterval time.Duration
37 locking map[string]bool
45 // New returns a new unstarted Scheduler.
47 // Any given queue and pool should not be used by more than one
48 // scheduler at a time.
49 func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
51 logger: ctxlog.FromContext(ctx),
54 staleLockTimeout: staleLockTimeout,
55 queueUpdateInterval: queueUpdateInterval,
56 stop: make(chan struct{}),
57 stopped: make(chan struct{}),
58 locking: map[string]bool{},
62 // Start starts the scheduler.
63 func (sch *Scheduler) Start() {
64 go sch.runOnce.Do(sch.run)
67 // Stop stops the scheduler. No other method should be called after
69 func (sch *Scheduler) Stop() {
74 func (sch *Scheduler) run() {
75 defer close(sch.stopped)
77 // Ensure the queue is fetched once before attempting anything.
78 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
79 sch.logger.Errorf("error updating queue: %s", err)
80 d := sch.queueUpdateInterval / 10
84 sch.logger.Infof("waiting %s before retry", d)
88 // Keep the queue up to date.
89 poll := time.NewTicker(sch.queueUpdateInterval)
93 err := sch.queue.Update()
95 sch.logger.Errorf("error updating queue: %s", err)
101 sch.logger.Infof("FixStaleLocks starting.")
103 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
105 poolNotify := sch.pool.Subscribe()
106 defer sch.pool.Unsubscribe(poolNotify)
108 queueNotify := sch.queue.Subscribe()
109 defer sch.queue.Unsubscribe(queueNotify)