14360: Rename Workers -> CountWorkers.
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "sync"
11         "time"
12
13         "github.com/Sirupsen/logrus"
14 )
15
16 // A Scheduler maps queued containers onto unallocated workers in
17 // priority order, creating new workers if needed. It locks containers
18 // that can be mapped onto existing/pending workers, and starts them
19 // if possible.
20 //
21 // A Scheduler unlocks any containers that are locked but can't be
22 // mapped. (For example, this happens when the cloud provider reaches
23 // quota/capacity and a previously mappable container's priority is
24 // surpassed by a newer container.)
25 //
26 // If it encounters errors while creating new workers, a Scheduler
27 // shuts down idle workers, in case they are consuming quota.
28 type Scheduler struct {
29         logger              logrus.FieldLogger
30         queue               ContainerQueue
31         pool                WorkerPool
32         staleLockTimeout    time.Duration
33         queueUpdateInterval time.Duration
34
35         locking map[string]bool
36         mtx     sync.Mutex
37
38         runOnce sync.Once
39         stop    chan struct{}
40 }
41
42 // New returns a new unstarted Scheduler.
43 //
44 // Any given queue and pool should not be used by more than one
45 // scheduler at a time.
46 func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
47         return &Scheduler{
48                 logger:              logger,
49                 queue:               queue,
50                 pool:                pool,
51                 staleLockTimeout:    staleLockTimeout,
52                 queueUpdateInterval: queueUpdateInterval,
53                 stop:                make(chan struct{}),
54                 locking:             map[string]bool{},
55         }
56 }
57
58 // Start starts the scheduler.
59 func (sch *Scheduler) Start() {
60         go sch.runOnce.Do(sch.run)
61 }
62
63 // Stop stops the scheduler. No other method should be called after
64 // Stop.
65 func (sch *Scheduler) Stop() {
66         close(sch.stop)
67 }
68
69 func (sch *Scheduler) run() {
70         // Ensure the queue is fetched once before attempting anything.
71         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
72                 sch.logger.Errorf("error updating queue: %s", err)
73                 d := sch.queueUpdateInterval / 60
74                 sch.logger.Infof("waiting %s before retry", d)
75                 time.Sleep(d)
76         }
77
78         // Keep the queue up to date.
79         poll := time.NewTicker(sch.queueUpdateInterval)
80         defer poll.Stop()
81         go func() {
82                 for range poll.C {
83                         err := sch.queue.Update()
84                         if err != nil {
85                                 sch.logger.Errorf("error updating queue: %s", err)
86                         }
87                 }
88         }()
89
90         t0 := time.Now()
91         sch.logger.Infof("FixStaleLocks starting.")
92         sch.fixStaleLocks()
93         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
94
95         poolNotify := sch.pool.Subscribe()
96         defer sch.pool.Unsubscribe(poolNotify)
97
98         queueNotify := sch.queue.Subscribe()
99         defer sch.queue.Unsubscribe(queueNotify)
100
101         for {
102                 sch.runQueue()
103                 sch.sync()
104                 select {
105                 case <-sch.stop:
106                         return
107                 case <-queueNotify:
108                 case <-poolNotify:
109                 }
110         }
111 }