14807: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package scheduler uses a resizable worker pool to execute
6 // containers in priority order.
7 package scheduler
8
9 import (
10         "context"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
15         "github.com/sirupsen/logrus"
16 )
17
18 // A Scheduler maps queued containers onto unallocated workers in
19 // priority order, creating new workers if needed. It locks containers
20 // that can be mapped onto existing/pending workers, and starts them
21 // if possible.
22 //
23 // A Scheduler unlocks any containers that are locked but can't be
24 // mapped. (For example, this happens when the cloud provider reaches
25 // quota/capacity and a previously mappable container's priority is
26 // surpassed by a newer container.)
27 //
28 // If it encounters errors while creating new workers, a Scheduler
29 // shuts down idle workers, in case they are consuming quota.
30 type Scheduler struct {
31         logger              logrus.FieldLogger
32         queue               ContainerQueue
33         pool                WorkerPool
34         staleLockTimeout    time.Duration
35         queueUpdateInterval time.Duration
36
37         locking map[string]bool
38         mtx     sync.Mutex
39
40         runOnce sync.Once
41         stop    chan struct{}
42         stopped chan struct{}
43 }
44
45 // New returns a new unstarted Scheduler.
46 //
47 // Any given queue and pool should not be used by more than one
48 // scheduler at a time.
49 func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
50         return &Scheduler{
51                 logger:              ctxlog.FromContext(ctx),
52                 queue:               queue,
53                 pool:                pool,
54                 staleLockTimeout:    staleLockTimeout,
55                 queueUpdateInterval: queueUpdateInterval,
56                 stop:                make(chan struct{}),
57                 stopped:             make(chan struct{}),
58                 locking:             map[string]bool{},
59         }
60 }
61
62 // Start starts the scheduler.
63 func (sch *Scheduler) Start() {
64         go sch.runOnce.Do(sch.run)
65 }
66
67 // Stop stops the scheduler. No other method should be called after
68 // Stop.
69 func (sch *Scheduler) Stop() {
70         close(sch.stop)
71         <-sch.stopped
72 }
73
74 func (sch *Scheduler) run() {
75         defer close(sch.stopped)
76
77         // Ensure the queue is fetched once before attempting anything.
78         for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
79                 sch.logger.Errorf("error updating queue: %s", err)
80                 d := sch.queueUpdateInterval / 10
81                 if d < time.Second {
82                         d = time.Second
83                 }
84                 sch.logger.Infof("waiting %s before retry", d)
85                 time.Sleep(d)
86         }
87
88         // Keep the queue up to date.
89         poll := time.NewTicker(sch.queueUpdateInterval)
90         defer poll.Stop()
91         go func() {
92                 for range poll.C {
93                         err := sch.queue.Update()
94                         if err != nil {
95                                 sch.logger.Errorf("error updating queue: %s", err)
96                         }
97                 }
98         }()
99
100         t0 := time.Now()
101         sch.logger.Infof("FixStaleLocks starting.")
102         sch.fixStaleLocks()
103         sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
104
105         poolNotify := sch.pool.Subscribe()
106         defer sch.pool.Unsubscribe(poolNotify)
107
108         queueNotify := sch.queue.Subscribe()
109         defer sch.queue.Unsubscribe(queueNotify)
110
111         for {
112                 sch.runQueue()
113                 sch.sync()
114                 select {
115                 case <-sch.stop:
116                         return
117                 case <-queueNotify:
118                 case <-poolNotify:
119                 }
120         }
121 }