14360: Encapsulate scheduler object.
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "sync"
11         "time"
12
13         "github.com/Sirupsen/logrus"
14 )
15
16 // A Scheduler maps queued containers onto unallocated workers in
17 // priority order, creating new workers if needed. It locks containers
18 // that can be mapped onto existing/pending workers, and starts them
19 // if possible.
20 //
21 // A Scheduler unlocks any containers that are locked but can't be
22 // mapped. (For example, this happens when the cloud provider reaches
23 // quota/capacity and a previously mappable container's priority is
24 // surpassed by a newer container.)
25 //
26 // If it encounters errors while creating new workers, a Scheduler
27 // shuts down idle workers, in case they are consuming quota.
28 type Scheduler struct {
29         logger              logrus.FieldLogger
30         queue               ContainerQueue
31         pool                WorkerPool
32         staleLockTimeout    time.Duration
33         queueUpdateInterval time.Duration
34
35         runOnce sync.Once
36         stop    chan struct{}
37 }
38
39 // New returns a new unstarted Scheduler.
40 //
41 // Any given queue and pool should not be used by more than one
42 // scheduler at a time.
43 func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
44         return &Scheduler{
45                 logger:              logger,
46                 queue:               queue,
47                 pool:                pool,
48                 staleLockTimeout:    staleLockTimeout,
49                 queueUpdateInterval: queueUpdateInterval,
50                 stop:                make(chan struct{}),
51         }
52 }
53
54 // Start starts the scheduler.
55 func (sch *Scheduler) Start() {
56         go sch.runOnce.Do(sch.run)
57 }
58
59 // Stop stops the scheduler. No other method should be called after
60 // Stop.
61 func (sch *Scheduler) Stop() {
62         close(sch.stop)
63 }
64
65 func (sch *Scheduler) run() {
66         // Ensure the queue is fetched once before attempting anything.
67         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
68                 sch.logger.Errorf("error updating queue: %s", err)
69                 d := sch.queueUpdateInterval / 60
70                 sch.logger.Infof("waiting %s before retry", d)
71                 time.Sleep(d)
72         }
73
74         // Keep the queue up to date.
75         poll := time.NewTicker(sch.queueUpdateInterval)
76         defer poll.Stop()
77         go func() {
78                 for range poll.C {
79                         err := sch.queue.Update()
80                         if err != nil {
81                                 sch.logger.Errorf("error updating queue: %s", err)
82                         }
83                 }
84         }()
85
86         t0 := time.Now()
87         sch.logger.Infof("FixStaleLocks starting.")
88         sch.fixStaleLocks()
89         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
90
91         poolNotify := sch.pool.Subscribe()
92         defer sch.pool.Unsubscribe(poolNotify)
93
94         queueNotify := sch.queue.Subscribe()
95         defer sch.queue.Unsubscribe(queueNotify)
96
97         for {
98                 sch.runQueue()
99                 sch.sync()
100                 select {
101                 case <-sch.stop:
102                         return
103                 case <-queueNotify:
104                 case <-poolNotify:
105                 }
106         }
107 }