Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/loofah...
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "context"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         check "gopkg.in/check.v1"
17 )
18
19 var (
20         // arbitrary example container UUIDs
21         uuids = func() (r []string) {
22                 for i := 0; i < 16; i++ {
23                         r = append(r, test.ContainerUUID(i))
24                 }
25                 return
26         }()
27 )
28
29 type stubQuotaError struct {
30         error
31 }
32
33 func (stubQuotaError) IsQuotaError() bool { return true }
34
35 type stubPool struct {
36         notify    <-chan struct{}
37         unalloc   map[arvados.InstanceType]int // idle+booting+unknown
38         idle      map[arvados.InstanceType]int
39         running   map[string]time.Time
40         atQuota   bool
41         canCreate int
42         creates   []arvados.InstanceType
43         starts    []string
44         shutdowns int
45         sync.Mutex
46 }
47
48 func (p *stubPool) AtQuota() bool               { return p.atQuota }
49 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
50 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
51 func (p *stubPool) Running() map[string]time.Time {
52         p.Lock()
53         defer p.Unlock()
54         r := map[string]time.Time{}
55         for k, v := range p.running {
56                 r[k] = v
57         }
58         return r
59 }
60 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
61         p.Lock()
62         defer p.Unlock()
63         r := map[arvados.InstanceType]int{}
64         for it, n := range p.unalloc {
65                 r[it] = n
66         }
67         return r
68 }
69 func (p *stubPool) Create(it arvados.InstanceType) bool {
70         p.Lock()
71         defer p.Unlock()
72         p.creates = append(p.creates, it)
73         if p.canCreate < 1 {
74                 return false
75         }
76         p.canCreate--
77         p.unalloc[it]++
78         return true
79 }
80 func (p *stubPool) ForgetContainer(uuid string) {
81 }
82 func (p *stubPool) KillContainer(uuid, reason string) bool {
83         p.Lock()
84         defer p.Unlock()
85         delete(p.running, uuid)
86         return true
87 }
88 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
89         p.shutdowns++
90         return false
91 }
92 func (p *stubPool) CountWorkers() map[worker.State]int {
93         p.Lock()
94         defer p.Unlock()
95         return map[worker.State]int{
96                 worker.StateBooting: len(p.unalloc) - len(p.idle),
97                 worker.StateIdle:    len(p.idle),
98                 worker.StateRunning: len(p.running),
99         }
100 }
101 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
102         p.Lock()
103         defer p.Unlock()
104         p.starts = append(p.starts, ctr.UUID)
105         if p.idle[it] == 0 {
106                 return false
107         }
108         p.idle[it]--
109         p.unalloc[it]--
110         p.running[ctr.UUID] = time.Time{}
111         return true
112 }
113
114 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
115         return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
116 }
117
118 var _ = check.Suite(&SchedulerSuite{})
119
120 type SchedulerSuite struct{}
121
122 // Assign priority=4 container to idle node. Create a new instance for
123 // the priority=3 container. Don't try to start any priority<3
124 // containers because priority=3 container didn't start
125 // immediately. Don't try to create any other nodes after the failed
126 // create.
127 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
128         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
129         queue := test.Queue{
130                 ChooseType: chooseType,
131                 Containers: []arvados.Container{
132                         {
133                                 UUID:     test.ContainerUUID(1),
134                                 Priority: 1,
135                                 State:    arvados.ContainerStateLocked,
136                                 RuntimeConstraints: arvados.RuntimeConstraints{
137                                         VCPUs: 1,
138                                         RAM:   1 << 30,
139                                 },
140                         },
141                         {
142                                 UUID:     test.ContainerUUID(2),
143                                 Priority: 2,
144                                 State:    arvados.ContainerStateLocked,
145                                 RuntimeConstraints: arvados.RuntimeConstraints{
146                                         VCPUs: 1,
147                                         RAM:   1 << 30,
148                                 },
149                         },
150                         {
151                                 UUID:     test.ContainerUUID(3),
152                                 Priority: 3,
153                                 State:    arvados.ContainerStateLocked,
154                                 RuntimeConstraints: arvados.RuntimeConstraints{
155                                         VCPUs: 1,
156                                         RAM:   1 << 30,
157                                 },
158                         },
159                         {
160                                 UUID:     test.ContainerUUID(4),
161                                 Priority: 4,
162                                 State:    arvados.ContainerStateLocked,
163                                 RuntimeConstraints: arvados.RuntimeConstraints{
164                                         VCPUs: 1,
165                                         RAM:   1 << 30,
166                                 },
167                         },
168                 },
169         }
170         queue.Update()
171         pool := stubPool{
172                 unalloc: map[arvados.InstanceType]int{
173                         test.InstanceType(1): 1,
174                         test.InstanceType(2): 2,
175                 },
176                 idle: map[arvados.InstanceType]int{
177                         test.InstanceType(1): 1,
178                         test.InstanceType(2): 2,
179                 },
180                 running:   map[string]time.Time{},
181                 canCreate: 0,
182         }
183         New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
184         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
185         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
186         c.Check(pool.running, check.HasLen, 1)
187         for uuid := range pool.running {
188                 c.Check(uuid, check.Equals, uuids[4])
189         }
190 }
191
192 // If Create() fails, shutdown some nodes, and don't call Create()
193 // again.  Don't call Create() at all if AtQuota() is true.
194 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
195         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
196         for quota := 0; quota < 2; quota++ {
197                 c.Logf("quota=%d", quota)
198                 shouldCreate := []arvados.InstanceType{}
199                 for i := 0; i < quota; i++ {
200                         shouldCreate = append(shouldCreate, test.InstanceType(3))
201                 }
202                 queue := test.Queue{
203                         ChooseType: chooseType,
204                         Containers: []arvados.Container{
205                                 {
206                                         UUID:     test.ContainerUUID(2),
207                                         Priority: 2,
208                                         State:    arvados.ContainerStateLocked,
209                                         RuntimeConstraints: arvados.RuntimeConstraints{
210                                                 VCPUs: 2,
211                                                 RAM:   2 << 30,
212                                         },
213                                 },
214                                 {
215                                         UUID:     test.ContainerUUID(3),
216                                         Priority: 3,
217                                         State:    arvados.ContainerStateLocked,
218                                         RuntimeConstraints: arvados.RuntimeConstraints{
219                                                 VCPUs: 3,
220                                                 RAM:   3 << 30,
221                                         },
222                                 },
223                         },
224                 }
225                 queue.Update()
226                 pool := stubPool{
227                         atQuota: quota == 0,
228                         unalloc: map[arvados.InstanceType]int{
229                                 test.InstanceType(2): 2,
230                         },
231                         idle: map[arvados.InstanceType]int{
232                                 test.InstanceType(2): 2,
233                         },
234                         running:   map[string]time.Time{},
235                         creates:   []arvados.InstanceType{},
236                         starts:    []string{},
237                         canCreate: 0,
238                 }
239                 New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
240                 c.Check(pool.creates, check.DeepEquals, shouldCreate)
241                 c.Check(pool.starts, check.DeepEquals, []string{})
242                 c.Check(pool.shutdowns, check.Not(check.Equals), 0)
243         }
244 }
245
246 // Start lower-priority containers while waiting for new/existing
247 // workers to come up for higher-priority containers.
248 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
249         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
250         pool := stubPool{
251                 unalloc: map[arvados.InstanceType]int{
252                         test.InstanceType(1): 2,
253                         test.InstanceType(2): 2,
254                 },
255                 idle: map[arvados.InstanceType]int{
256                         test.InstanceType(1): 1,
257                         test.InstanceType(2): 1,
258                 },
259                 running:   map[string]time.Time{},
260                 canCreate: 4,
261         }
262         queue := test.Queue{
263                 ChooseType: chooseType,
264                 Containers: []arvados.Container{
265                         {
266                                 // create a new worker
267                                 UUID:     test.ContainerUUID(1),
268                                 Priority: 1,
269                                 State:    arvados.ContainerStateLocked,
270                                 RuntimeConstraints: arvados.RuntimeConstraints{
271                                         VCPUs: 1,
272                                         RAM:   1 << 30,
273                                 },
274                         },
275                         {
276                                 // tentatively map to unalloc worker
277                                 UUID:     test.ContainerUUID(2),
278                                 Priority: 2,
279                                 State:    arvados.ContainerStateLocked,
280                                 RuntimeConstraints: arvados.RuntimeConstraints{
281                                         VCPUs: 1,
282                                         RAM:   1 << 30,
283                                 },
284                         },
285                         {
286                                 // start now on idle worker
287                                 UUID:     test.ContainerUUID(3),
288                                 Priority: 3,
289                                 State:    arvados.ContainerStateLocked,
290                                 RuntimeConstraints: arvados.RuntimeConstraints{
291                                         VCPUs: 1,
292                                         RAM:   1 << 30,
293                                 },
294                         },
295                         {
296                                 // create a new worker
297                                 UUID:     test.ContainerUUID(4),
298                                 Priority: 4,
299                                 State:    arvados.ContainerStateLocked,
300                                 RuntimeConstraints: arvados.RuntimeConstraints{
301                                         VCPUs: 2,
302                                         RAM:   2 << 30,
303                                 },
304                         },
305                         {
306                                 // tentatively map to unalloc worker
307                                 UUID:     test.ContainerUUID(5),
308                                 Priority: 5,
309                                 State:    arvados.ContainerStateLocked,
310                                 RuntimeConstraints: arvados.RuntimeConstraints{
311                                         VCPUs: 2,
312                                         RAM:   2 << 30,
313                                 },
314                         },
315                         {
316                                 // start now on idle worker
317                                 UUID:     test.ContainerUUID(6),
318                                 Priority: 6,
319                                 State:    arvados.ContainerStateLocked,
320                                 RuntimeConstraints: arvados.RuntimeConstraints{
321                                         VCPUs: 2,
322                                         RAM:   2 << 30,
323                                 },
324                         },
325                 },
326         }
327         queue.Update()
328         New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
329         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
330         c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
331         running := map[string]bool{}
332         for uuid, t := range pool.running {
333                 if t.IsZero() {
334                         running[uuid] = false
335                 } else {
336                         running[uuid] = true
337                 }
338         }
339         c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
340 }
341
342 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
343         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
344         pool := stubPool{
345                 unalloc: map[arvados.InstanceType]int{
346                         test.InstanceType(2): 0,
347                 },
348                 idle: map[arvados.InstanceType]int{
349                         test.InstanceType(2): 0,
350                 },
351                 running: map[string]time.Time{
352                         test.ContainerUUID(2): time.Time{},
353                 },
354         }
355         queue := test.Queue{
356                 ChooseType: chooseType,
357                 Containers: []arvados.Container{
358                         {
359                                 // create a new worker
360                                 UUID:     test.ContainerUUID(1),
361                                 Priority: 1,
362                                 State:    arvados.ContainerStateLocked,
363                                 RuntimeConstraints: arvados.RuntimeConstraints{
364                                         VCPUs: 1,
365                                         RAM:   1 << 30,
366                                 },
367                         },
368                 },
369         }
370         queue.Update()
371         sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
372         c.Check(pool.running, check.HasLen, 1)
373         sch.sync()
374         for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
375         }
376         c.Check(pool.Running(), check.HasLen, 0)
377 }