1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
14 "git.arvados.org/arvados.git/sdk/go/ctxlog"
15 "github.com/prometheus/client_golang/prometheus"
16 "github.com/sirupsen/logrus"
19 // A Scheduler maps queued containers onto unallocated workers in
20 // priority order, creating new workers if needed. It locks containers
21 // that can be mapped onto existing/pending workers, and starts them
24 // A Scheduler unlocks any containers that are locked but can't be
25 // mapped. (For example, this happens when the cloud provider reaches
26 // quota/capacity and a previously mappable container's priority is
27 // surpassed by a newer container.)
29 // If it encounters errors while creating new workers, a Scheduler
30 // shuts down idle workers, in case they are consuming quota.
31 type Scheduler struct {
32 logger logrus.FieldLogger
35 reg *prometheus.Registry
36 staleLockTimeout time.Duration
37 queueUpdateInterval time.Duration
39 uuidOp map[string]string // operation in progress: "lock", "cancel", ...
47 mContainersAllocatedNotStarted prometheus.Gauge
48 mContainersNotAllocatedOverQuota prometheus.Gauge
51 // New returns a new unstarted Scheduler.
53 // Any given queue and pool should not be used by more than one
54 // scheduler at a time.
55 func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
57 logger: ctxlog.FromContext(ctx),
61 staleLockTimeout: staleLockTimeout,
62 queueUpdateInterval: queueUpdateInterval,
63 wakeup: time.NewTimer(time.Second),
64 stop: make(chan struct{}),
65 stopped: make(chan struct{}),
66 uuidOp: map[string]string{},
68 sch.registerMetrics(reg)
72 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
74 reg = prometheus.NewRegistry()
76 sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
78 Subsystem: "dispatchcloud",
79 Name: "containers_allocated_not_started",
80 Help: "Number of containers allocated to a worker but not started yet (worker is booting).",
82 reg.MustRegister(sch.mContainersAllocatedNotStarted)
83 sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
85 Subsystem: "dispatchcloud",
86 Name: "containers_not_allocated_over_quota",
87 Help: "Number of containers not allocated to a worker because the system has hit a quota.",
89 reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
92 // Start starts the scheduler.
93 func (sch *Scheduler) Start() {
94 go sch.runOnce.Do(sch.run)
97 // Stop stops the scheduler. No other method should be called after
99 func (sch *Scheduler) Stop() {
104 func (sch *Scheduler) run() {
105 defer close(sch.stopped)
107 // Ensure the queue is fetched once before attempting anything.
108 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
109 sch.logger.Errorf("error updating queue: %s", err)
110 d := sch.queueUpdateInterval / 10
114 sch.logger.Infof("waiting %s before retry", d)
118 // Keep the queue up to date.
119 poll := time.NewTicker(sch.queueUpdateInterval)
123 err := sch.queue.Update()
125 sch.logger.Errorf("error updating queue: %s", err)
131 sch.logger.Infof("FixStaleLocks starting.")
133 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
135 poolNotify := sch.pool.Subscribe()
136 defer sch.pool.Unsubscribe(poolNotify)
138 queueNotify := sch.queue.Subscribe()
139 defer sch.queue.Unsubscribe(queueNotify)