20259: Add documentation for banner and tooltip features
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "context"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16
17         "github.com/prometheus/client_golang/prometheus/testutil"
18
19         check "gopkg.in/check.v1"
20 )
21
22 var (
23         // arbitrary example container UUIDs
24         uuids = func() (r []string) {
25                 for i := 0; i < 16; i++ {
26                         r = append(r, test.ContainerUUID(i))
27                 }
28                 return
29         }()
30 )
31
32 type stubQuotaError struct {
33         error
34 }
35
36 func (stubQuotaError) IsQuotaError() bool { return true }
37
38 type stubPool struct {
39         notify    <-chan struct{}
40         unalloc   map[arvados.InstanceType]int // idle+booting+unknown
41         idle      map[arvados.InstanceType]int
42         unknown   map[arvados.InstanceType]int
43         running   map[string]time.Time
44         quota     int
45         canCreate int
46         creates   []arvados.InstanceType
47         starts    []string
48         shutdowns int
49         sync.Mutex
50 }
51
52 func (p *stubPool) AtQuota() bool {
53         p.Lock()
54         defer p.Unlock()
55         n := len(p.running)
56         for _, nn := range p.unalloc {
57                 n += nn
58         }
59         for _, nn := range p.unknown {
60                 n += nn
61         }
62         return n >= p.quota
63 }
64 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
65 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
66 func (p *stubPool) Running() map[string]time.Time {
67         p.Lock()
68         defer p.Unlock()
69         r := map[string]time.Time{}
70         for k, v := range p.running {
71                 r[k] = v
72         }
73         return r
74 }
75 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
76         p.Lock()
77         defer p.Unlock()
78         r := map[arvados.InstanceType]int{}
79         for it, n := range p.unalloc {
80                 r[it] = n - p.unknown[it]
81         }
82         return r
83 }
84 func (p *stubPool) Create(it arvados.InstanceType) bool {
85         p.Lock()
86         defer p.Unlock()
87         p.creates = append(p.creates, it)
88         if p.canCreate < 1 {
89                 return false
90         }
91         p.canCreate--
92         p.unalloc[it]++
93         return true
94 }
95 func (p *stubPool) ForgetContainer(uuid string) {
96 }
97 func (p *stubPool) KillContainer(uuid, reason string) bool {
98         p.Lock()
99         defer p.Unlock()
100         defer delete(p.running, uuid)
101         t, ok := p.running[uuid]
102         return ok && t.IsZero()
103 }
104 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
105         p.shutdowns++
106         return false
107 }
108 func (p *stubPool) CountWorkers() map[worker.State]int {
109         p.Lock()
110         defer p.Unlock()
111         return map[worker.State]int{
112                 worker.StateBooting: len(p.unalloc) - len(p.idle),
113                 worker.StateIdle:    len(p.idle),
114                 worker.StateRunning: len(p.running),
115                 worker.StateUnknown: len(p.unknown),
116         }
117 }
118 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
119         p.Lock()
120         defer p.Unlock()
121         p.starts = append(p.starts, ctr.UUID)
122         if p.idle[it] == 0 {
123                 return false
124         }
125         p.idle[it]--
126         p.unalloc[it]--
127         p.running[ctr.UUID] = time.Time{}
128         return true
129 }
130
131 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
132         return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
133 }
134
135 var _ = check.Suite(&SchedulerSuite{})
136
137 type SchedulerSuite struct{}
138
139 // Assign priority=4 container to idle node. Create new instances for
140 // the priority=3, 2, 1 containers.
141 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
142         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
143         queue := test.Queue{
144                 ChooseType: chooseType,
145                 Containers: []arvados.Container{
146                         {
147                                 UUID:     test.ContainerUUID(1),
148                                 Priority: 1,
149                                 State:    arvados.ContainerStateLocked,
150                                 RuntimeConstraints: arvados.RuntimeConstraints{
151                                         VCPUs: 1,
152                                         RAM:   1 << 30,
153                                 },
154                         },
155                         {
156                                 UUID:     test.ContainerUUID(2),
157                                 Priority: 2,
158                                 State:    arvados.ContainerStateLocked,
159                                 RuntimeConstraints: arvados.RuntimeConstraints{
160                                         VCPUs: 1,
161                                         RAM:   1 << 30,
162                                 },
163                         },
164                         {
165                                 UUID:     test.ContainerUUID(3),
166                                 Priority: 3,
167                                 State:    arvados.ContainerStateLocked,
168                                 RuntimeConstraints: arvados.RuntimeConstraints{
169                                         VCPUs: 1,
170                                         RAM:   1 << 30,
171                                 },
172                         },
173                         {
174                                 UUID:     test.ContainerUUID(4),
175                                 Priority: 4,
176                                 State:    arvados.ContainerStateLocked,
177                                 RuntimeConstraints: arvados.RuntimeConstraints{
178                                         VCPUs: 1,
179                                         RAM:   1 << 30,
180                                 },
181                         },
182                 },
183         }
184         queue.Update()
185         pool := stubPool{
186                 quota: 1000,
187                 unalloc: map[arvados.InstanceType]int{
188                         test.InstanceType(1): 1,
189                         test.InstanceType(2): 2,
190                 },
191                 idle: map[arvados.InstanceType]int{
192                         test.InstanceType(1): 1,
193                         test.InstanceType(2): 2,
194                 },
195                 running:   map[string]time.Time{},
196                 canCreate: 0,
197         }
198         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
199         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
200         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
201         c.Check(pool.running, check.HasLen, 1)
202         for uuid := range pool.running {
203                 c.Check(uuid, check.Equals, uuids[4])
204         }
205 }
206
207 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
208 // call Create().
209 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
210         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
211         for quota := 1; quota <= 3; quota++ {
212                 c.Logf("quota=%d", quota)
213                 queue := test.Queue{
214                         ChooseType: chooseType,
215                         Containers: []arvados.Container{
216                                 {
217                                         UUID:     test.ContainerUUID(2),
218                                         Priority: 2,
219                                         State:    arvados.ContainerStateLocked,
220                                         RuntimeConstraints: arvados.RuntimeConstraints{
221                                                 VCPUs: 2,
222                                                 RAM:   2 << 30,
223                                         },
224                                 },
225                                 {
226                                         UUID:     test.ContainerUUID(3),
227                                         Priority: 3,
228                                         State:    arvados.ContainerStateLocked,
229                                         RuntimeConstraints: arvados.RuntimeConstraints{
230                                                 VCPUs: 3,
231                                                 RAM:   3 << 30,
232                                         },
233                                 },
234                         },
235                 }
236                 queue.Update()
237                 pool := stubPool{
238                         quota: quota,
239                         unalloc: map[arvados.InstanceType]int{
240                                 test.InstanceType(2): 2,
241                         },
242                         idle: map[arvados.InstanceType]int{
243                                 test.InstanceType(2): 2,
244                         },
245                         running:   map[string]time.Time{},
246                         creates:   []arvados.InstanceType{},
247                         starts:    []string{},
248                         canCreate: 0,
249                 }
250                 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
251                 sch.sync()
252                 sch.runQueue()
253                 sch.sync()
254                 switch quota {
255                 case 1, 2:
256                         // Can't create a type3 node for ctr3, so we
257                         // shutdown an unallocated node (type2), and
258                         // unlock both containers.
259                         c.Check(pool.starts, check.HasLen, 0)
260                         c.Check(pool.shutdowns, check.Equals, 1)
261                         c.Check(pool.creates, check.HasLen, 0)
262                         c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
263                                 {UUID: test.ContainerUUID(3), From: "Locked", To: "Queued"},
264                                 {UUID: test.ContainerUUID(2), From: "Locked", To: "Queued"},
265                         })
266                 case 3:
267                         // Creating a type3 instance works, so we
268                         // start ctr2 on a type2 instance, and leave
269                         // ctr3 locked while we wait for the new
270                         // instance to come up.
271                         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
272                         c.Check(pool.shutdowns, check.Equals, 0)
273                         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(3)})
274                         c.Check(queue.StateChanges(), check.HasLen, 0)
275                 default:
276                         panic("test not written for quota>3")
277                 }
278         }
279 }
280
281 // Don't flap lock/unlock when equal-priority containers compete for
282 // limited workers.
283 //
284 // (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
285 // tends to choose a different one of the equal-priority containers as
286 // the "first" one that should be locked, and unlock the one it chose
287 // last time. This generates logging noise, and fails containers by
288 // reaching MaxDispatchAttempts quickly.)
289 func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
290         logger := ctxlog.TestLogger(c)
291         ctx := ctxlog.Context(context.Background(), logger)
292         queue := test.Queue{
293                 ChooseType: chooseType,
294                 Logger:     logger,
295         }
296         for i := 0; i < 8; i++ {
297                 queue.Containers = append(queue.Containers, arvados.Container{
298                         UUID:     test.ContainerUUID(i),
299                         Priority: 333,
300                         State:    arvados.ContainerStateQueued,
301                         RuntimeConstraints: arvados.RuntimeConstraints{
302                                 VCPUs: 3,
303                                 RAM:   3 << 30,
304                         },
305                 })
306         }
307         queue.Update()
308         pool := stubPool{
309                 quota: 2,
310                 unalloc: map[arvados.InstanceType]int{
311                         test.InstanceType(3): 2,
312                 },
313                 idle: map[arvados.InstanceType]int{
314                         test.InstanceType(3): 2,
315                 },
316                 running:   map[string]time.Time{},
317                 creates:   []arvados.InstanceType{},
318                 starts:    []string{},
319                 canCreate: 0,
320         }
321         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
322         for i := 0; i < 30; i++ {
323                 sch.runQueue()
324                 sch.sync()
325                 time.Sleep(time.Millisecond)
326         }
327         c.Check(pool.shutdowns, check.Equals, 0)
328         c.Check(pool.starts, check.HasLen, 2)
329         unlocked := map[string]int{}
330         for _, chg := range queue.StateChanges() {
331                 if chg.To == arvados.ContainerStateQueued {
332                         unlocked[chg.UUID]++
333                 }
334         }
335         for uuid, count := range unlocked {
336                 c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
337         }
338 }
339
340 // Start lower-priority containers while waiting for new/existing
341 // workers to come up for higher-priority containers.
342 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
343         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
344         pool := stubPool{
345                 quota: 1000,
346                 unalloc: map[arvados.InstanceType]int{
347                         test.InstanceType(1): 2,
348                         test.InstanceType(2): 2,
349                 },
350                 idle: map[arvados.InstanceType]int{
351                         test.InstanceType(1): 1,
352                         test.InstanceType(2): 1,
353                 },
354                 running:   map[string]time.Time{},
355                 canCreate: 4,
356         }
357         queue := test.Queue{
358                 ChooseType: chooseType,
359                 Containers: []arvados.Container{
360                         {
361                                 // create a new worker
362                                 UUID:     test.ContainerUUID(1),
363                                 Priority: 1,
364                                 State:    arvados.ContainerStateLocked,
365                                 RuntimeConstraints: arvados.RuntimeConstraints{
366                                         VCPUs: 1,
367                                         RAM:   1 << 30,
368                                 },
369                         },
370                         {
371                                 // tentatively map to unalloc worker
372                                 UUID:     test.ContainerUUID(2),
373                                 Priority: 2,
374                                 State:    arvados.ContainerStateLocked,
375                                 RuntimeConstraints: arvados.RuntimeConstraints{
376                                         VCPUs: 1,
377                                         RAM:   1 << 30,
378                                 },
379                         },
380                         {
381                                 // start now on idle worker
382                                 UUID:     test.ContainerUUID(3),
383                                 Priority: 3,
384                                 State:    arvados.ContainerStateLocked,
385                                 RuntimeConstraints: arvados.RuntimeConstraints{
386                                         VCPUs: 1,
387                                         RAM:   1 << 30,
388                                 },
389                         },
390                         {
391                                 // create a new worker
392                                 UUID:     test.ContainerUUID(4),
393                                 Priority: 4,
394                                 State:    arvados.ContainerStateLocked,
395                                 RuntimeConstraints: arvados.RuntimeConstraints{
396                                         VCPUs: 2,
397                                         RAM:   2 << 30,
398                                 },
399                         },
400                         {
401                                 // tentatively map to unalloc worker
402                                 UUID:     test.ContainerUUID(5),
403                                 Priority: 5,
404                                 State:    arvados.ContainerStateLocked,
405                                 RuntimeConstraints: arvados.RuntimeConstraints{
406                                         VCPUs: 2,
407                                         RAM:   2 << 30,
408                                 },
409                         },
410                         {
411                                 // start now on idle worker
412                                 UUID:     test.ContainerUUID(6),
413                                 Priority: 6,
414                                 State:    arvados.ContainerStateLocked,
415                                 RuntimeConstraints: arvados.RuntimeConstraints{
416                                         VCPUs: 2,
417                                         RAM:   2 << 30,
418                                 },
419                         },
420                 },
421         }
422         queue.Update()
423         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
424         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
425         c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
426         running := map[string]bool{}
427         for uuid, t := range pool.running {
428                 if t.IsZero() {
429                         running[uuid] = false
430                 } else {
431                         running[uuid] = true
432                 }
433         }
434         c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
435 }
436
437 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
438         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
439         pool := stubPool{
440                 quota: 1000,
441                 unalloc: map[arvados.InstanceType]int{
442                         test.InstanceType(2): 0,
443                 },
444                 idle: map[arvados.InstanceType]int{
445                         test.InstanceType(2): 0,
446                 },
447                 running: map[string]time.Time{
448                         test.ContainerUUID(2): {},
449                 },
450         }
451         queue := test.Queue{
452                 ChooseType: chooseType,
453                 Containers: []arvados.Container{
454                         {
455                                 // create a new worker
456                                 UUID:     test.ContainerUUID(1),
457                                 Priority: 1,
458                                 State:    arvados.ContainerStateLocked,
459                                 RuntimeConstraints: arvados.RuntimeConstraints{
460                                         VCPUs: 1,
461                                         RAM:   1 << 30,
462                                 },
463                         },
464                 },
465         }
466         queue.Update()
467         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
468         c.Check(pool.running, check.HasLen, 1)
469         sch.sync()
470         for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
471         }
472         c.Check(pool.Running(), check.HasLen, 0)
473 }
474
475 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
476         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
477         queue := test.Queue{
478                 ChooseType: chooseType,
479                 Containers: []arvados.Container{
480                         {
481                                 UUID:      test.ContainerUUID(1),
482                                 Priority:  1,
483                                 State:     arvados.ContainerStateLocked,
484                                 CreatedAt: time.Now().Add(-10 * time.Second),
485                                 RuntimeConstraints: arvados.RuntimeConstraints{
486                                         VCPUs: 1,
487                                         RAM:   1 << 30,
488                                 },
489                         },
490                 },
491         }
492         queue.Update()
493
494         // Create a pool with one unallocated (idle/booting/unknown) worker,
495         // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
496         // state, and the container will be allocated but not started yet.
497         pool := stubPool{
498                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
499         }
500         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
501         sch.runQueue()
502         sch.updateMetrics()
503
504         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
505         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
506         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
507
508         // Create a pool without workers. The queued container will not be started, and the
509         // 'over quota' metric will be 1 because no workers are available and canCreate defaults
510         // to zero.
511         pool = stubPool{}
512         sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
513         sch.runQueue()
514         sch.updateMetrics()
515
516         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
517         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
518         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
519
520         // Reset the queue, and create a pool with an idle worker. The queued
521         // container will be started immediately and mLongestWaitTimeSinceQueue
522         // should be zero.
523         queue = test.Queue{
524                 ChooseType: chooseType,
525                 Containers: []arvados.Container{
526                         {
527                                 UUID:      test.ContainerUUID(1),
528                                 Priority:  1,
529                                 State:     arvados.ContainerStateLocked,
530                                 CreatedAt: time.Now().Add(-10 * time.Second),
531                                 RuntimeConstraints: arvados.RuntimeConstraints{
532                                         VCPUs: 1,
533                                         RAM:   1 << 30,
534                                 },
535                         },
536                 },
537         }
538         queue.Update()
539
540         pool = stubPool{
541                 idle:    map[arvados.InstanceType]int{test.InstanceType(1): 1},
542                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
543                 running: map[string]time.Time{},
544         }
545         sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
546         sch.runQueue()
547         sch.updateMetrics()
548
549         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
550 }
551
552 // Assign priority=4, 3 and 1 containers to idle nodes. Ignore the supervisor at priority 2.
553 func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
554         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
555         queue := test.Queue{
556                 ChooseType: chooseType,
557                 Containers: []arvados.Container{
558                         {
559                                 UUID:     test.ContainerUUID(1),
560                                 Priority: 1,
561                                 State:    arvados.ContainerStateLocked,
562                                 RuntimeConstraints: arvados.RuntimeConstraints{
563                                         VCPUs: 1,
564                                         RAM:   1 << 30,
565                                 },
566                         },
567                         {
568                                 UUID:     test.ContainerUUID(2),
569                                 Priority: 2,
570                                 State:    arvados.ContainerStateLocked,
571                                 RuntimeConstraints: arvados.RuntimeConstraints{
572                                         VCPUs: 1,
573                                         RAM:   1 << 30,
574                                 },
575                                 SchedulingParameters: arvados.SchedulingParameters{
576                                         Supervisor: true,
577                                 },
578                         },
579                         {
580                                 UUID:     test.ContainerUUID(3),
581                                 Priority: 3,
582                                 State:    arvados.ContainerStateLocked,
583                                 RuntimeConstraints: arvados.RuntimeConstraints{
584                                         VCPUs: 1,
585                                         RAM:   1 << 30,
586                                 },
587                                 SchedulingParameters: arvados.SchedulingParameters{
588                                         Supervisor: true,
589                                 },
590                         },
591                         {
592                                 UUID:     test.ContainerUUID(4),
593                                 Priority: 4,
594                                 State:    arvados.ContainerStateLocked,
595                                 RuntimeConstraints: arvados.RuntimeConstraints{
596                                         VCPUs: 1,
597                                         RAM:   1 << 30,
598                                 },
599                                 SchedulingParameters: arvados.SchedulingParameters{
600                                         Supervisor: true,
601                                 },
602                         },
603                 },
604         }
605         queue.Update()
606         pool := stubPool{
607                 quota: 1000,
608                 unalloc: map[arvados.InstanceType]int{
609                         test.InstanceType(1): 4,
610                         test.InstanceType(2): 4,
611                 },
612                 idle: map[arvados.InstanceType]int{
613                         test.InstanceType(1): 4,
614                         test.InstanceType(2): 4,
615                 },
616                 running:   map[string]time.Time{},
617                 canCreate: 0,
618         }
619         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 2).runQueue()
620         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
621         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})
622 }