Merge branch '15964-fix-docs' refs #15964
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "context"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         check "gopkg.in/check.v1"
17 )
18
19 var (
20         // arbitrary example container UUIDs
21         uuids = func() (r []string) {
22                 for i := 0; i < 16; i++ {
23                         r = append(r, test.ContainerUUID(i))
24                 }
25                 return
26         }()
27 )
28
29 type stubQuotaError struct {
30         error
31 }
32
33 func (stubQuotaError) IsQuotaError() bool { return true }
34
35 type stubPool struct {
36         notify    <-chan struct{}
37         unalloc   map[arvados.InstanceType]int // idle+booting+unknown
38         idle      map[arvados.InstanceType]int
39         unknown   map[arvados.InstanceType]int
40         running   map[string]time.Time
41         atQuota   bool
42         canCreate int
43         creates   []arvados.InstanceType
44         starts    []string
45         shutdowns int
46         sync.Mutex
47 }
48
49 func (p *stubPool) AtQuota() bool               { return p.atQuota }
50 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
51 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
52 func (p *stubPool) Running() map[string]time.Time {
53         p.Lock()
54         defer p.Unlock()
55         r := map[string]time.Time{}
56         for k, v := range p.running {
57                 r[k] = v
58         }
59         return r
60 }
61 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
62         p.Lock()
63         defer p.Unlock()
64         r := map[arvados.InstanceType]int{}
65         for it, n := range p.unalloc {
66                 r[it] = n - p.unknown[it]
67         }
68         return r
69 }
70 func (p *stubPool) Create(it arvados.InstanceType) bool {
71         p.Lock()
72         defer p.Unlock()
73         p.creates = append(p.creates, it)
74         if p.canCreate < 1 {
75                 return false
76         }
77         p.canCreate--
78         p.unalloc[it]++
79         return true
80 }
81 func (p *stubPool) ForgetContainer(uuid string) {
82 }
83 func (p *stubPool) KillContainer(uuid, reason string) bool {
84         p.Lock()
85         defer p.Unlock()
86         defer delete(p.running, uuid)
87         t, ok := p.running[uuid]
88         return ok && t.IsZero()
89 }
90 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
91         p.shutdowns++
92         return false
93 }
94 func (p *stubPool) CountWorkers() map[worker.State]int {
95         p.Lock()
96         defer p.Unlock()
97         return map[worker.State]int{
98                 worker.StateBooting: len(p.unalloc) - len(p.idle),
99                 worker.StateIdle:    len(p.idle),
100                 worker.StateRunning: len(p.running),
101                 worker.StateUnknown: len(p.unknown),
102         }
103 }
104 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
105         p.Lock()
106         defer p.Unlock()
107         p.starts = append(p.starts, ctr.UUID)
108         if p.idle[it] == 0 {
109                 return false
110         }
111         p.idle[it]--
112         p.unalloc[it]--
113         p.running[ctr.UUID] = time.Time{}
114         return true
115 }
116
117 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
118         return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
119 }
120
121 var _ = check.Suite(&SchedulerSuite{})
122
123 type SchedulerSuite struct{}
124
125 // Assign priority=4 container to idle node. Create a new instance for
126 // the priority=3 container. Don't try to start any priority<3
127 // containers because priority=3 container didn't start
128 // immediately. Don't try to create any other nodes after the failed
129 // create.
130 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
131         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
132         queue := test.Queue{
133                 ChooseType: chooseType,
134                 Containers: []arvados.Container{
135                         {
136                                 UUID:     test.ContainerUUID(1),
137                                 Priority: 1,
138                                 State:    arvados.ContainerStateLocked,
139                                 RuntimeConstraints: arvados.RuntimeConstraints{
140                                         VCPUs: 1,
141                                         RAM:   1 << 30,
142                                 },
143                         },
144                         {
145                                 UUID:     test.ContainerUUID(2),
146                                 Priority: 2,
147                                 State:    arvados.ContainerStateLocked,
148                                 RuntimeConstraints: arvados.RuntimeConstraints{
149                                         VCPUs: 1,
150                                         RAM:   1 << 30,
151                                 },
152                         },
153                         {
154                                 UUID:     test.ContainerUUID(3),
155                                 Priority: 3,
156                                 State:    arvados.ContainerStateLocked,
157                                 RuntimeConstraints: arvados.RuntimeConstraints{
158                                         VCPUs: 1,
159                                         RAM:   1 << 30,
160                                 },
161                         },
162                         {
163                                 UUID:     test.ContainerUUID(4),
164                                 Priority: 4,
165                                 State:    arvados.ContainerStateLocked,
166                                 RuntimeConstraints: arvados.RuntimeConstraints{
167                                         VCPUs: 1,
168                                         RAM:   1 << 30,
169                                 },
170                         },
171                 },
172         }
173         queue.Update()
174         pool := stubPool{
175                 unalloc: map[arvados.InstanceType]int{
176                         test.InstanceType(1): 1,
177                         test.InstanceType(2): 2,
178                 },
179                 idle: map[arvados.InstanceType]int{
180                         test.InstanceType(1): 1,
181                         test.InstanceType(2): 2,
182                 },
183                 running:   map[string]time.Time{},
184                 canCreate: 0,
185         }
186         New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
187         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
188         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
189         c.Check(pool.running, check.HasLen, 1)
190         for uuid := range pool.running {
191                 c.Check(uuid, check.Equals, uuids[4])
192         }
193 }
194
195 // If Create() fails, shutdown some nodes, and don't call Create()
196 // again.  Don't call Create() at all if AtQuota() is true.
197 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
198         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
199         for quota := 0; quota < 2; quota++ {
200                 c.Logf("quota=%d", quota)
201                 shouldCreate := []arvados.InstanceType{}
202                 for i := 0; i < quota; i++ {
203                         shouldCreate = append(shouldCreate, test.InstanceType(3))
204                 }
205                 queue := test.Queue{
206                         ChooseType: chooseType,
207                         Containers: []arvados.Container{
208                                 {
209                                         UUID:     test.ContainerUUID(2),
210                                         Priority: 2,
211                                         State:    arvados.ContainerStateLocked,
212                                         RuntimeConstraints: arvados.RuntimeConstraints{
213                                                 VCPUs: 2,
214                                                 RAM:   2 << 30,
215                                         },
216                                 },
217                                 {
218                                         UUID:     test.ContainerUUID(3),
219                                         Priority: 3,
220                                         State:    arvados.ContainerStateLocked,
221                                         RuntimeConstraints: arvados.RuntimeConstraints{
222                                                 VCPUs: 3,
223                                                 RAM:   3 << 30,
224                                         },
225                                 },
226                         },
227                 }
228                 queue.Update()
229                 pool := stubPool{
230                         atQuota: quota == 0,
231                         unalloc: map[arvados.InstanceType]int{
232                                 test.InstanceType(2): 2,
233                         },
234                         idle: map[arvados.InstanceType]int{
235                                 test.InstanceType(2): 2,
236                         },
237                         running:   map[string]time.Time{},
238                         creates:   []arvados.InstanceType{},
239                         starts:    []string{},
240                         canCreate: 0,
241                 }
242                 New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
243                 c.Check(pool.creates, check.DeepEquals, shouldCreate)
244                 c.Check(pool.starts, check.DeepEquals, []string{})
245                 c.Check(pool.shutdowns, check.Not(check.Equals), 0)
246         }
247 }
248
249 // Start lower-priority containers while waiting for new/existing
250 // workers to come up for higher-priority containers.
251 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
252         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
253         pool := stubPool{
254                 unalloc: map[arvados.InstanceType]int{
255                         test.InstanceType(1): 2,
256                         test.InstanceType(2): 2,
257                 },
258                 idle: map[arvados.InstanceType]int{
259                         test.InstanceType(1): 1,
260                         test.InstanceType(2): 1,
261                 },
262                 running:   map[string]time.Time{},
263                 canCreate: 4,
264         }
265         queue := test.Queue{
266                 ChooseType: chooseType,
267                 Containers: []arvados.Container{
268                         {
269                                 // create a new worker
270                                 UUID:     test.ContainerUUID(1),
271                                 Priority: 1,
272                                 State:    arvados.ContainerStateLocked,
273                                 RuntimeConstraints: arvados.RuntimeConstraints{
274                                         VCPUs: 1,
275                                         RAM:   1 << 30,
276                                 },
277                         },
278                         {
279                                 // tentatively map to unalloc worker
280                                 UUID:     test.ContainerUUID(2),
281                                 Priority: 2,
282                                 State:    arvados.ContainerStateLocked,
283                                 RuntimeConstraints: arvados.RuntimeConstraints{
284                                         VCPUs: 1,
285                                         RAM:   1 << 30,
286                                 },
287                         },
288                         {
289                                 // start now on idle worker
290                                 UUID:     test.ContainerUUID(3),
291                                 Priority: 3,
292                                 State:    arvados.ContainerStateLocked,
293                                 RuntimeConstraints: arvados.RuntimeConstraints{
294                                         VCPUs: 1,
295                                         RAM:   1 << 30,
296                                 },
297                         },
298                         {
299                                 // create a new worker
300                                 UUID:     test.ContainerUUID(4),
301                                 Priority: 4,
302                                 State:    arvados.ContainerStateLocked,
303                                 RuntimeConstraints: arvados.RuntimeConstraints{
304                                         VCPUs: 2,
305                                         RAM:   2 << 30,
306                                 },
307                         },
308                         {
309                                 // tentatively map to unalloc worker
310                                 UUID:     test.ContainerUUID(5),
311                                 Priority: 5,
312                                 State:    arvados.ContainerStateLocked,
313                                 RuntimeConstraints: arvados.RuntimeConstraints{
314                                         VCPUs: 2,
315                                         RAM:   2 << 30,
316                                 },
317                         },
318                         {
319                                 // start now on idle worker
320                                 UUID:     test.ContainerUUID(6),
321                                 Priority: 6,
322                                 State:    arvados.ContainerStateLocked,
323                                 RuntimeConstraints: arvados.RuntimeConstraints{
324                                         VCPUs: 2,
325                                         RAM:   2 << 30,
326                                 },
327                         },
328                 },
329         }
330         queue.Update()
331         New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
332         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
333         c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
334         running := map[string]bool{}
335         for uuid, t := range pool.running {
336                 if t.IsZero() {
337                         running[uuid] = false
338                 } else {
339                         running[uuid] = true
340                 }
341         }
342         c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
343 }
344
345 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
346         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
347         pool := stubPool{
348                 unalloc: map[arvados.InstanceType]int{
349                         test.InstanceType(2): 0,
350                 },
351                 idle: map[arvados.InstanceType]int{
352                         test.InstanceType(2): 0,
353                 },
354                 running: map[string]time.Time{
355                         test.ContainerUUID(2): time.Time{},
356                 },
357         }
358         queue := test.Queue{
359                 ChooseType: chooseType,
360                 Containers: []arvados.Container{
361                         {
362                                 // create a new worker
363                                 UUID:     test.ContainerUUID(1),
364                                 Priority: 1,
365                                 State:    arvados.ContainerStateLocked,
366                                 RuntimeConstraints: arvados.RuntimeConstraints{
367                                         VCPUs: 1,
368                                         RAM:   1 << 30,
369                                 },
370                         },
371                 },
372         }
373         queue.Update()
374         sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
375         c.Check(pool.running, check.HasLen, 1)
376         sch.sync()
377         for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
378         }
379         c.Check(pool.Running(), check.HasLen, 0)
380 }