14325: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "sync"
9         "time"
10
11         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
12         "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         check "gopkg.in/check.v1"
15 )
16
17 var (
18         // arbitrary example container UUIDs
19         uuids = func() (r []string) {
20                 for i := 0; i < 16; i++ {
21                         r = append(r, test.ContainerUUID(i))
22                 }
23                 return
24         }()
25 )
26
27 type stubQuotaError struct {
28         error
29 }
30
31 func (stubQuotaError) IsQuotaError() bool { return true }
32
33 type stubPool struct {
34         notify    <-chan struct{}
35         unalloc   map[arvados.InstanceType]int // idle+booting+unknown
36         idle      map[arvados.InstanceType]int
37         running   map[string]time.Time
38         atQuota   bool
39         canCreate int
40         creates   []arvados.InstanceType
41         starts    []string
42         shutdowns int
43         sync.Mutex
44 }
45
46 func (p *stubPool) AtQuota() bool               { return p.atQuota }
47 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
48 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
49 func (p *stubPool) Running() map[string]time.Time {
50         p.Lock()
51         defer p.Unlock()
52         r := map[string]time.Time{}
53         for k, v := range p.running {
54                 r[k] = v
55         }
56         return r
57 }
58 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
59         p.Lock()
60         defer p.Unlock()
61         r := map[arvados.InstanceType]int{}
62         for it, n := range p.unalloc {
63                 r[it] = n
64         }
65         return r
66 }
67 func (p *stubPool) Create(it arvados.InstanceType) bool {
68         p.Lock()
69         defer p.Unlock()
70         p.creates = append(p.creates, it)
71         if p.canCreate < 1 {
72                 return false
73         }
74         p.canCreate--
75         p.unalloc[it]++
76         return true
77 }
78 func (p *stubPool) KillContainer(uuid string) {
79         p.Lock()
80         defer p.Unlock()
81         delete(p.running, uuid)
82 }
83 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
84         p.shutdowns++
85         return false
86 }
87 func (p *stubPool) CountWorkers() map[worker.State]int {
88         p.Lock()
89         defer p.Unlock()
90         return map[worker.State]int{
91                 worker.StateBooting: len(p.unalloc) - len(p.idle),
92                 worker.StateIdle:    len(p.idle),
93                 worker.StateRunning: len(p.running),
94         }
95 }
96 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
97         p.Lock()
98         defer p.Unlock()
99         p.starts = append(p.starts, ctr.UUID)
100         if p.idle[it] == 0 {
101                 return false
102         }
103         p.idle[it]--
104         p.unalloc[it]--
105         p.running[ctr.UUID] = time.Time{}
106         return true
107 }
108
109 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
110         return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
111 }
112
113 var _ = check.Suite(&SchedulerSuite{})
114
115 type SchedulerSuite struct{}
116
117 // Assign priority=4 container to idle node. Create a new instance for
118 // the priority=3 container. Don't try to start any priority<3
119 // containers because priority=3 container didn't start
120 // immediately. Don't try to create any other nodes after the failed
121 // create.
122 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
123         queue := test.Queue{
124                 ChooseType: chooseType,
125                 Containers: []arvados.Container{
126                         {
127                                 UUID:     test.ContainerUUID(1),
128                                 Priority: 1,
129                                 State:    arvados.ContainerStateLocked,
130                                 RuntimeConstraints: arvados.RuntimeConstraints{
131                                         VCPUs: 1,
132                                         RAM:   1 << 30,
133                                 },
134                         },
135                         {
136                                 UUID:     test.ContainerUUID(2),
137                                 Priority: 2,
138                                 State:    arvados.ContainerStateLocked,
139                                 RuntimeConstraints: arvados.RuntimeConstraints{
140                                         VCPUs: 1,
141                                         RAM:   1 << 30,
142                                 },
143                         },
144                         {
145                                 UUID:     test.ContainerUUID(3),
146                                 Priority: 3,
147                                 State:    arvados.ContainerStateLocked,
148                                 RuntimeConstraints: arvados.RuntimeConstraints{
149                                         VCPUs: 1,
150                                         RAM:   1 << 30,
151                                 },
152                         },
153                         {
154                                 UUID:     test.ContainerUUID(4),
155                                 Priority: 4,
156                                 State:    arvados.ContainerStateLocked,
157                                 RuntimeConstraints: arvados.RuntimeConstraints{
158                                         VCPUs: 1,
159                                         RAM:   1 << 30,
160                                 },
161                         },
162                 },
163         }
164         queue.Update()
165         pool := stubPool{
166                 unalloc: map[arvados.InstanceType]int{
167                         test.InstanceType(1): 1,
168                         test.InstanceType(2): 2,
169                 },
170                 idle: map[arvados.InstanceType]int{
171                         test.InstanceType(1): 1,
172                         test.InstanceType(2): 2,
173                 },
174                 running:   map[string]time.Time{},
175                 canCreate: 0,
176         }
177         New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
178         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
179         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
180         c.Check(pool.running, check.HasLen, 1)
181         for uuid := range pool.running {
182                 c.Check(uuid, check.Equals, uuids[4])
183         }
184 }
185
186 // If Create() fails, shutdown some nodes, and don't call Create()
187 // again.  Don't call Create() at all if AtQuota() is true.
188 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
189         for quota := 0; quota < 2; quota++ {
190                 c.Logf("quota=%d", quota)
191                 shouldCreate := []arvados.InstanceType{}
192                 for i := 0; i < quota; i++ {
193                         shouldCreate = append(shouldCreate, test.InstanceType(3))
194                 }
195                 queue := test.Queue{
196                         ChooseType: chooseType,
197                         Containers: []arvados.Container{
198                                 {
199                                         UUID:     test.ContainerUUID(2),
200                                         Priority: 2,
201                                         State:    arvados.ContainerStateLocked,
202                                         RuntimeConstraints: arvados.RuntimeConstraints{
203                                                 VCPUs: 2,
204                                                 RAM:   2 << 30,
205                                         },
206                                 },
207                                 {
208                                         UUID:     test.ContainerUUID(3),
209                                         Priority: 3,
210                                         State:    arvados.ContainerStateLocked,
211                                         RuntimeConstraints: arvados.RuntimeConstraints{
212                                                 VCPUs: 3,
213                                                 RAM:   3 << 30,
214                                         },
215                                 },
216                         },
217                 }
218                 queue.Update()
219                 pool := stubPool{
220                         atQuota: quota == 0,
221                         unalloc: map[arvados.InstanceType]int{
222                                 test.InstanceType(2): 2,
223                         },
224                         idle: map[arvados.InstanceType]int{
225                                 test.InstanceType(2): 2,
226                         },
227                         running:   map[string]time.Time{},
228                         creates:   []arvados.InstanceType{},
229                         starts:    []string{},
230                         canCreate: 0,
231                 }
232                 New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
233                 c.Check(pool.creates, check.DeepEquals, shouldCreate)
234                 c.Check(pool.starts, check.DeepEquals, []string{})
235                 c.Check(pool.shutdowns, check.Not(check.Equals), 0)
236         }
237 }
238
239 // Start lower-priority containers while waiting for new/existing
240 // workers to come up for higher-priority containers.
241 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
242         pool := stubPool{
243                 unalloc: map[arvados.InstanceType]int{
244                         test.InstanceType(1): 2,
245                         test.InstanceType(2): 2,
246                 },
247                 idle: map[arvados.InstanceType]int{
248                         test.InstanceType(1): 1,
249                         test.InstanceType(2): 1,
250                 },
251                 running:   map[string]time.Time{},
252                 canCreate: 4,
253         }
254         queue := test.Queue{
255                 ChooseType: chooseType,
256                 Containers: []arvados.Container{
257                         {
258                                 // create a new worker
259                                 UUID:     test.ContainerUUID(1),
260                                 Priority: 1,
261                                 State:    arvados.ContainerStateLocked,
262                                 RuntimeConstraints: arvados.RuntimeConstraints{
263                                         VCPUs: 1,
264                                         RAM:   1 << 30,
265                                 },
266                         },
267                         {
268                                 // tentatively map to unalloc worker
269                                 UUID:     test.ContainerUUID(2),
270                                 Priority: 2,
271                                 State:    arvados.ContainerStateLocked,
272                                 RuntimeConstraints: arvados.RuntimeConstraints{
273                                         VCPUs: 1,
274                                         RAM:   1 << 30,
275                                 },
276                         },
277                         {
278                                 // start now on idle worker
279                                 UUID:     test.ContainerUUID(3),
280                                 Priority: 3,
281                                 State:    arvados.ContainerStateLocked,
282                                 RuntimeConstraints: arvados.RuntimeConstraints{
283                                         VCPUs: 1,
284                                         RAM:   1 << 30,
285                                 },
286                         },
287                         {
288                                 // create a new worker
289                                 UUID:     test.ContainerUUID(4),
290                                 Priority: 4,
291                                 State:    arvados.ContainerStateLocked,
292                                 RuntimeConstraints: arvados.RuntimeConstraints{
293                                         VCPUs: 2,
294                                         RAM:   2 << 30,
295                                 },
296                         },
297                         {
298                                 // tentatively map to unalloc worker
299                                 UUID:     test.ContainerUUID(5),
300                                 Priority: 5,
301                                 State:    arvados.ContainerStateLocked,
302                                 RuntimeConstraints: arvados.RuntimeConstraints{
303                                         VCPUs: 2,
304                                         RAM:   2 << 30,
305                                 },
306                         },
307                         {
308                                 // start now on idle worker
309                                 UUID:     test.ContainerUUID(6),
310                                 Priority: 6,
311                                 State:    arvados.ContainerStateLocked,
312                                 RuntimeConstraints: arvados.RuntimeConstraints{
313                                         VCPUs: 2,
314                                         RAM:   2 << 30,
315                                 },
316                         },
317                 },
318         }
319         queue.Update()
320         New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
321         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
322         c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
323         running := map[string]bool{}
324         for uuid, t := range pool.running {
325                 if t.IsZero() {
326                         running[uuid] = false
327                 } else {
328                         running[uuid] = true
329                 }
330         }
331         c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
332 }