Merge remote-tracking branch 'origin/master' into 14965-arv-mount-py-three
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "context"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
15         "github.com/sirupsen/logrus"
16 )
17
18 // A Scheduler maps queued containers onto unallocated workers in
19 // priority order, creating new workers if needed. It locks containers
20 // that can be mapped onto existing/pending workers, and starts them
21 // if possible.
22 //
23 // A Scheduler unlocks any containers that are locked but can't be
24 // mapped. (For example, this happens when the cloud provider reaches
25 // quota/capacity and a previously mappable container's priority is
26 // surpassed by a newer container.)
27 //
28 // If it encounters errors while creating new workers, a Scheduler
29 // shuts down idle workers, in case they are consuming quota.
30 type Scheduler struct {
31         logger              logrus.FieldLogger
32         queue               ContainerQueue
33         pool                WorkerPool
34         staleLockTimeout    time.Duration
35         queueUpdateInterval time.Duration
36
37         uuidOp map[string]string // operation in progress: "lock", "cancel", ...
38         mtx    sync.Mutex
39         wakeup *time.Timer
40
41         runOnce sync.Once
42         stop    chan struct{}
43         stopped chan struct{}
44 }
45
46 // New returns a new unstarted Scheduler.
47 //
48 // Any given queue and pool should not be used by more than one
49 // scheduler at a time.
50 func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
51         return &Scheduler{
52                 logger:              ctxlog.FromContext(ctx),
53                 queue:               queue,
54                 pool:                pool,
55                 staleLockTimeout:    staleLockTimeout,
56                 queueUpdateInterval: queueUpdateInterval,
57                 wakeup:              time.NewTimer(time.Second),
58                 stop:                make(chan struct{}),
59                 stopped:             make(chan struct{}),
60                 uuidOp:              map[string]string{},
61         }
62 }
63
64 // Start starts the scheduler.
65 func (sch *Scheduler) Start() {
66         go sch.runOnce.Do(sch.run)
67 }
68
69 // Stop stops the scheduler. No other method should be called after
70 // Stop.
71 func (sch *Scheduler) Stop() {
72         close(sch.stop)
73         <-sch.stopped
74 }
75
76 func (sch *Scheduler) run() {
77         defer close(sch.stopped)
78
79         // Ensure the queue is fetched once before attempting anything.
80         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
81                 sch.logger.Errorf("error updating queue: %s", err)
82                 d := sch.queueUpdateInterval / 10
83                 if d < time.Second {
84                         d = time.Second
85                 }
86                 sch.logger.Infof("waiting %s before retry", d)
87                 time.Sleep(d)
88         }
89
90         // Keep the queue up to date.
91         poll := time.NewTicker(sch.queueUpdateInterval)
92         defer poll.Stop()
93         go func() {
94                 for range poll.C {
95                         err := sch.queue.Update()
96                         if err != nil {
97                                 sch.logger.Errorf("error updating queue: %s", err)
98                         }
99                 }
100         }()
101
102         t0 := time.Now()
103         sch.logger.Infof("FixStaleLocks starting.")
104         sch.fixStaleLocks()
105         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
106
107         poolNotify := sch.pool.Subscribe()
108         defer sch.pool.Unsubscribe(poolNotify)
109
110         queueNotify := sch.queue.Subscribe()
111         defer sch.queue.Unsubscribe(queueNotify)
112
113         for {
114                 sch.runQueue()
115                 sch.sync()
116                 select {
117                 case <-sch.stop:
118                         return
119                 case <-queueNotify:
120                 case <-poolNotify:
121                 case <-sch.wakeup.C:
122                 }
123         }
124 }