3971a5319d72135ca82d7f899a432ef8601fe677
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "sync"
11         "time"
12
13         "github.com/Sirupsen/logrus"
14 )
15
16 // A Scheduler maps queued containers onto unallocated workers in
17 // priority order, creating new workers if needed. It locks containers
18 // that can be mapped onto existing/pending workers, and starts them
19 // if possible.
20 //
21 // A Scheduler unlocks any containers that are locked but can't be
22 // mapped. (For example, this happens when the cloud provider reaches
23 // quota/capacity and a previously mappable container's priority is
24 // surpassed by a newer container.)
25 //
26 // If it encounters errors while creating new workers, a Scheduler
27 // shuts down idle workers, in case they are consuming quota.
28 type Scheduler struct {
29         logger              logrus.FieldLogger
30         queue               ContainerQueue
31         pool                WorkerPool
32         staleLockTimeout    time.Duration
33         queueUpdateInterval time.Duration
34
35         locking map[string]bool
36         mtx     sync.Mutex
37
38         runOnce sync.Once
39         stop    chan struct{}
40         stopped chan struct{}
41 }
42
43 // New returns a new unstarted Scheduler.
44 //
45 // Any given queue and pool should not be used by more than one
46 // scheduler at a time.
47 func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
48         return &Scheduler{
49                 logger:              logger,
50                 queue:               queue,
51                 pool:                pool,
52                 staleLockTimeout:    staleLockTimeout,
53                 queueUpdateInterval: queueUpdateInterval,
54                 stop:                make(chan struct{}),
55                 stopped:             make(chan struct{}),
56                 locking:             map[string]bool{},
57         }
58 }
59
60 // Start starts the scheduler.
61 func (sch *Scheduler) Start() {
62         go sch.runOnce.Do(sch.run)
63 }
64
65 // Stop stops the scheduler. No other method should be called after
66 // Stop.
67 func (sch *Scheduler) Stop() {
68         close(sch.stop)
69         <-sch.stopped
70 }
71
72 func (sch *Scheduler) run() {
73         defer close(sch.stopped)
74
75         // Ensure the queue is fetched once before attempting anything.
76         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
77                 sch.logger.Errorf("error updating queue: %s", err)
78                 d := sch.queueUpdateInterval / 60
79                 sch.logger.Infof("waiting %s before retry", d)
80                 time.Sleep(d)
81         }
82
83         // Keep the queue up to date.
84         poll := time.NewTicker(sch.queueUpdateInterval)
85         defer poll.Stop()
86         go func() {
87                 for range poll.C {
88                         err := sch.queue.Update()
89                         if err != nil {
90                                 sch.logger.Errorf("error updating queue: %s", err)
91                         }
92                 }
93         }()
94
95         t0 := time.Now()
96         sch.logger.Infof("FixStaleLocks starting.")
97         sch.fixStaleLocks()
98         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
99
100         poolNotify := sch.pool.Subscribe()
101         defer sch.pool.Unsubscribe(poolNotify)
102
103         queueNotify := sch.queue.Subscribe()
104         defer sch.queue.Unsubscribe(queueNotify)
105
106         for {
107                 sch.runQueue()
108                 sch.sync()
109                 select {
110                 case <-sch.stop:
111                         return
112                 case <-queueNotify:
113                 case <-poolNotify:
114                 }
115         }
116 }