1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
14 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
15 "github.com/sirupsen/logrus"
18 // A Scheduler maps queued containers onto unallocated workers in
19 // priority order, creating new workers if needed. It locks containers
20 // that can be mapped onto existing/pending workers, and starts them
23 // A Scheduler unlocks any containers that are locked but can't be
24 // mapped. (For example, this happens when the cloud provider reaches
25 // quota/capacity and a previously mappable container's priority is
26 // surpassed by a newer container.)
28 // If it encounters errors while creating new workers, a Scheduler
29 // shuts down idle workers, in case they are consuming quota.
30 type Scheduler struct {
31 logger logrus.FieldLogger
34 staleLockTimeout time.Duration
35 queueUpdateInterval time.Duration
37 uuidOp map[string]string // operation in progress: "lock", "cancel", ...
46 // New returns a new unstarted Scheduler.
48 // Any given queue and pool should not be used by more than one
49 // scheduler at a time.
50 func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
52 logger: ctxlog.FromContext(ctx),
55 staleLockTimeout: staleLockTimeout,
56 queueUpdateInterval: queueUpdateInterval,
57 wakeup: time.NewTimer(time.Second),
58 stop: make(chan struct{}),
59 stopped: make(chan struct{}),
60 uuidOp: map[string]string{},
64 // Start starts the scheduler.
65 func (sch *Scheduler) Start() {
66 go sch.runOnce.Do(sch.run)
69 // Stop stops the scheduler. No other method should be called after
71 func (sch *Scheduler) Stop() {
76 func (sch *Scheduler) run() {
77 defer close(sch.stopped)
79 // Ensure the queue is fetched once before attempting anything.
80 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
81 sch.logger.Errorf("error updating queue: %s", err)
82 d := sch.queueUpdateInterval / 10
86 sch.logger.Infof("waiting %s before retry", d)
90 // Keep the queue up to date.
91 poll := time.NewTicker(sch.queueUpdateInterval)
95 err := sch.queue.Update()
97 sch.logger.Errorf("error updating queue: %s", err)
103 sch.logger.Infof("FixStaleLocks starting.")
105 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
107 poolNotify := sch.pool.Subscribe()
108 defer sch.pool.Unsubscribe(poolNotify)
110 queueNotify := sch.queue.Subscribe()
111 defer sch.queue.Unsubscribe(queueNotify)