1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 "github.com/prometheus/client_golang/prometheus"
17 "github.com/sirupsen/logrus"
20 // A Scheduler maps queued containers onto unallocated workers in
21 // priority order, creating new workers if needed. It locks containers
22 // that can be mapped onto existing/pending workers, and starts them
25 // A Scheduler unlocks any containers that are locked but can't be
26 // mapped. (For example, this happens when the cloud provider reaches
27 // quota/capacity and a previously mappable container's priority is
28 // surpassed by a newer container.)
30 // If it encounters errors while creating new workers, a Scheduler
31 // shuts down idle workers, in case they are consuming quota.
32 type Scheduler struct {
33 logger logrus.FieldLogger
34 client *arvados.Client
37 reg *prometheus.Registry
38 staleLockTimeout time.Duration
39 queueUpdateInterval time.Duration
41 uuidOp map[string]string // operation in progress: "lock", "cancel", ...
49 last503time time.Time // last time API responded 503
50 maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
51 maxSupervisors int // maximum number of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
53 mContainersAllocatedNotStarted prometheus.Gauge
54 mContainersNotAllocatedOverQuota prometheus.Gauge
55 mLongestWaitTimeSinceQueue prometheus.Gauge
56 mLast503Time prometheus.Gauge
57 mMaxContainerConcurrency prometheus.Gauge
60 // New returns a new unstarted Scheduler.
62 // Any given queue and pool should not be used by more than one
63 // scheduler at a time.
64 func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxSupervisors int) *Scheduler {
66 logger: ctxlog.FromContext(ctx),
71 staleLockTimeout: staleLockTimeout,
72 queueUpdateInterval: queueUpdateInterval,
73 wakeup: time.NewTimer(time.Second),
74 stop: make(chan struct{}),
75 stopped: make(chan struct{}),
76 uuidOp: map[string]string{},
77 maxSupervisors: maxSupervisors,
79 sch.registerMetrics(reg)
83 func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
85 reg = prometheus.NewRegistry()
87 sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
89 Subsystem: "dispatchcloud",
90 Name: "containers_allocated_not_started",
91 Help: "Number of containers allocated to a worker but not started yet (worker is booting).",
93 reg.MustRegister(sch.mContainersAllocatedNotStarted)
94 sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
96 Subsystem: "dispatchcloud",
97 Name: "containers_not_allocated_over_quota",
98 Help: "Number of containers not allocated to a worker because the system has hit a quota.",
100 reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
101 sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
102 Namespace: "arvados",
103 Subsystem: "dispatchcloud",
104 Name: "containers_longest_wait_time_seconds",
105 Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.",
107 reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
108 sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
109 Namespace: "arvados",
110 Subsystem: "dispatchcloud",
111 Name: "last_503_time",
112 Help: "Time of most recent 503 error received from API.",
114 reg.MustRegister(sch.mLast503Time)
115 sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
116 Namespace: "arvados",
117 Subsystem: "dispatchcloud",
118 Name: "max_concurrent_containers",
119 Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
121 reg.MustRegister(sch.mMaxContainerConcurrency)
124 func (sch *Scheduler) updateMetrics() {
125 earliest := time.Time{}
126 entries, _ := sch.queue.Entries()
127 running := sch.pool.Running()
128 for _, ent := range entries {
129 if ent.Container.Priority > 0 &&
130 (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
131 // Exclude containers that are preparing to run the payload (i.e.
132 // ContainerStateLocked and running on a worker, most likely loading the
134 if _, ok := running[ent.Container.UUID]; !ok {
135 if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
136 earliest = ent.Container.CreatedAt
141 if !earliest.IsZero() {
142 sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
144 sch.mLongestWaitTimeSinceQueue.Set(0)
148 // Start starts the scheduler.
149 func (sch *Scheduler) Start() {
150 go sch.runOnce.Do(sch.run)
153 // Stop stops the scheduler. No other method should be called after
155 func (sch *Scheduler) Stop() {
160 func (sch *Scheduler) run() {
161 defer close(sch.stopped)
163 // Ensure the queue is fetched once before attempting anything.
164 for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
165 sch.logger.Errorf("error updating queue: %s", err)
166 d := sch.queueUpdateInterval / 10
170 sch.logger.Infof("waiting %s before retry", d)
174 // Keep the queue up to date.
175 poll := time.NewTicker(sch.queueUpdateInterval)
179 err := sch.queue.Update()
181 sch.logger.Errorf("error updating queue: %s", err)
187 sch.logger.Infof("FixStaleLocks starting.")
189 sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
191 poolNotify := sch.pool.Subscribe()
192 defer sch.pool.Unsubscribe(poolNotify)
194 queueNotify := sch.queue.Subscribe()
195 defer sch.queue.Unsubscribe(queueNotify)