14807: Wait at least 1 second between retries on initial queue poll.
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "sync"
11         "time"
12
13         "github.com/sirupsen/logrus"
14 )
15
16 // A Scheduler maps queued containers onto unallocated workers in
17 // priority order, creating new workers if needed. It locks containers
18 // that can be mapped onto existing/pending workers, and starts them
19 // if possible.
20 //
21 // A Scheduler unlocks any containers that are locked but can't be
22 // mapped. (For example, this happens when the cloud provider reaches
23 // quota/capacity and a previously mappable container's priority is
24 // surpassed by a newer container.)
25 //
26 // If it encounters errors while creating new workers, a Scheduler
27 // shuts down idle workers, in case they are consuming quota.
28 type Scheduler struct {
29         logger              logrus.FieldLogger
30         queue               ContainerQueue
31         pool                WorkerPool
32         staleLockTimeout    time.Duration
33         queueUpdateInterval time.Duration
34
35         locking map[string]bool
36         mtx     sync.Mutex
37
38         runOnce sync.Once
39         stop    chan struct{}
40         stopped chan struct{}
41 }
42
43 // New returns a new unstarted Scheduler.
44 //
45 // Any given queue and pool should not be used by more than one
46 // scheduler at a time.
47 func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
48         return &Scheduler{
49                 logger:              logger,
50                 queue:               queue,
51                 pool:                pool,
52                 staleLockTimeout:    staleLockTimeout,
53                 queueUpdateInterval: queueUpdateInterval,
54                 stop:                make(chan struct{}),
55                 stopped:             make(chan struct{}),
56                 locking:             map[string]bool{},
57         }
58 }
59
60 // Start starts the scheduler.
61 func (sch *Scheduler) Start() {
62         go sch.runOnce.Do(sch.run)
63 }
64
65 // Stop stops the scheduler. No other method should be called after
66 // Stop.
67 func (sch *Scheduler) Stop() {
68         close(sch.stop)
69         <-sch.stopped
70 }
71
72 func (sch *Scheduler) run() {
73         defer close(sch.stopped)
74
75         // Ensure the queue is fetched once before attempting anything.
76         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
77                 sch.logger.Errorf("error updating queue: %s", err)
78                 d := sch.queueUpdateInterval / 10
79                 if d < time.Second {
80                         d = time.Second
81                 }
82                 sch.logger.Infof("waiting %s before retry", d)
83                 time.Sleep(d)
84         }
85
86         // Keep the queue up to date.
87         poll := time.NewTicker(sch.queueUpdateInterval)
88         defer poll.Stop()
89         go func() {
90                 for range poll.C {
91                         err := sch.queue.Update()
92                         if err != nil {
93                                 sch.logger.Errorf("error updating queue: %s", err)
94                         }
95                 }
96         }()
97
98         t0 := time.Now()
99         sch.logger.Infof("FixStaleLocks starting.")
100         sch.fixStaleLocks()
101         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
102
103         poolNotify := sch.pool.Subscribe()
104         defer sch.pool.Unsubscribe(poolNotify)
105
106         queueNotify := sch.queue.Subscribe()
107         defer sch.queue.Unsubscribe(queueNotify)
108
109         for {
110                 sch.runQueue()
111                 sch.sync()
112                 select {
113                 case <-sch.stop:
114                         return
115                 case <-queueNotify:
116                 case <-poolNotify:
117                 }
118         }
119 }