Merge branch '20522-load-dispatch-priv-key'
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "context"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16
17         "github.com/prometheus/client_golang/prometheus/testutil"
18
19         check "gopkg.in/check.v1"
20 )
21
22 var (
23         // arbitrary example container UUIDs
24         uuids = func() (r []string) {
25                 for i := 0; i < 16; i++ {
26                         r = append(r, test.ContainerUUID(i))
27                 }
28                 return
29         }()
30 )
31
32 type stubQuotaError struct {
33         error
34 }
35
36 func (stubQuotaError) IsQuotaError() bool { return true }
37
38 type stubPool struct {
39         notify    <-chan struct{}
40         unalloc   map[arvados.InstanceType]int // idle+booting+unknown
41         idle      map[arvados.InstanceType]int
42         unknown   map[arvados.InstanceType]int
43         running   map[string]time.Time
44         quota     int
45         canCreate int
46         creates   []arvados.InstanceType
47         starts    []string
48         shutdowns int
49         sync.Mutex
50 }
51
52 func (p *stubPool) AtQuota() bool {
53         p.Lock()
54         defer p.Unlock()
55         n := len(p.running)
56         for _, nn := range p.unalloc {
57                 n += nn
58         }
59         for _, nn := range p.unknown {
60                 n += nn
61         }
62         return n >= p.quota
63 }
64 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
65 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
66 func (p *stubPool) Running() map[string]time.Time {
67         p.Lock()
68         defer p.Unlock()
69         r := map[string]time.Time{}
70         for k, v := range p.running {
71                 r[k] = v
72         }
73         return r
74 }
75 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
76         p.Lock()
77         defer p.Unlock()
78         r := map[arvados.InstanceType]int{}
79         for it, n := range p.unalloc {
80                 r[it] = n - p.unknown[it]
81         }
82         return r
83 }
84 func (p *stubPool) Create(it arvados.InstanceType) bool {
85         p.Lock()
86         defer p.Unlock()
87         p.creates = append(p.creates, it)
88         if p.canCreate < 1 {
89                 return false
90         }
91         p.canCreate--
92         p.unalloc[it]++
93         return true
94 }
95 func (p *stubPool) ForgetContainer(uuid string) {
96 }
97 func (p *stubPool) KillContainer(uuid, reason string) bool {
98         p.Lock()
99         defer p.Unlock()
100         defer delete(p.running, uuid)
101         t, ok := p.running[uuid]
102         return ok && t.IsZero()
103 }
104 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
105         p.shutdowns++
106         return false
107 }
108 func (p *stubPool) CountWorkers() map[worker.State]int {
109         p.Lock()
110         defer p.Unlock()
111         return map[worker.State]int{
112                 worker.StateBooting: len(p.unalloc) - len(p.idle),
113                 worker.StateIdle:    len(p.idle),
114                 worker.StateRunning: len(p.running),
115                 worker.StateUnknown: len(p.unknown),
116         }
117 }
118 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
119         p.Lock()
120         defer p.Unlock()
121         p.starts = append(p.starts, ctr.UUID)
122         if p.idle[it] == 0 {
123                 return false
124         }
125         p.idle[it]--
126         p.unalloc[it]--
127         p.running[ctr.UUID] = time.Time{}
128         return true
129 }
130
131 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
132         return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
133 }
134
135 var _ = check.Suite(&SchedulerSuite{})
136
137 type SchedulerSuite struct{}
138
139 // Assign priority=4 container to idle node. Create new instances for
140 // the priority=3, 2, 1 containers.
141 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
142         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
143         queue := test.Queue{
144                 ChooseType: chooseType,
145                 Containers: []arvados.Container{
146                         {
147                                 UUID:     test.ContainerUUID(1),
148                                 Priority: 1,
149                                 State:    arvados.ContainerStateLocked,
150                                 RuntimeConstraints: arvados.RuntimeConstraints{
151                                         VCPUs: 1,
152                                         RAM:   1 << 30,
153                                 },
154                         },
155                         {
156                                 UUID:     test.ContainerUUID(2),
157                                 Priority: 2,
158                                 State:    arvados.ContainerStateLocked,
159                                 RuntimeConstraints: arvados.RuntimeConstraints{
160                                         VCPUs: 1,
161                                         RAM:   1 << 30,
162                                 },
163                         },
164                         {
165                                 UUID:     test.ContainerUUID(3),
166                                 Priority: 3,
167                                 State:    arvados.ContainerStateLocked,
168                                 RuntimeConstraints: arvados.RuntimeConstraints{
169                                         VCPUs: 1,
170                                         RAM:   1 << 30,
171                                 },
172                         },
173                         {
174                                 UUID:     test.ContainerUUID(4),
175                                 Priority: 4,
176                                 State:    arvados.ContainerStateLocked,
177                                 RuntimeConstraints: arvados.RuntimeConstraints{
178                                         VCPUs: 1,
179                                         RAM:   1 << 30,
180                                 },
181                         },
182                 },
183         }
184         queue.Update()
185         pool := stubPool{
186                 quota: 1000,
187                 unalloc: map[arvados.InstanceType]int{
188                         test.InstanceType(1): 1,
189                         test.InstanceType(2): 2,
190                 },
191                 idle: map[arvados.InstanceType]int{
192                         test.InstanceType(1): 1,
193                         test.InstanceType(2): 2,
194                 },
195                 running:   map[string]time.Time{},
196                 canCreate: 0,
197         }
198         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
199         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
200         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
201         c.Check(pool.running, check.HasLen, 1)
202         for uuid := range pool.running {
203                 c.Check(uuid, check.Equals, uuids[4])
204         }
205 }
206
207 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
208 // call Create().
209 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
210         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
211         for quota := 1; quota <= 3; quota++ {
212                 c.Logf("quota=%d", quota)
213                 queue := test.Queue{
214                         ChooseType: chooseType,
215                         Containers: []arvados.Container{
216                                 {
217                                         UUID:     test.ContainerUUID(2),
218                                         Priority: 2,
219                                         State:    arvados.ContainerStateLocked,
220                                         RuntimeConstraints: arvados.RuntimeConstraints{
221                                                 VCPUs: 2,
222                                                 RAM:   2 << 30,
223                                         },
224                                 },
225                                 {
226                                         UUID:     test.ContainerUUID(3),
227                                         Priority: 3,
228                                         State:    arvados.ContainerStateLocked,
229                                         RuntimeConstraints: arvados.RuntimeConstraints{
230                                                 VCPUs: 3,
231                                                 RAM:   3 << 30,
232                                         },
233                                 },
234                         },
235                 }
236                 queue.Update()
237                 pool := stubPool{
238                         quota: quota,
239                         unalloc: map[arvados.InstanceType]int{
240                                 test.InstanceType(2): 2,
241                         },
242                         idle: map[arvados.InstanceType]int{
243                                 test.InstanceType(2): 2,
244                         },
245                         running:   map[string]time.Time{},
246                         creates:   []arvados.InstanceType{},
247                         starts:    []string{},
248                         canCreate: 0,
249                 }
250                 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
251                 sch.sync()
252                 sch.runQueue()
253                 sch.sync()
254                 switch quota {
255                 case 1, 2:
256                         // Can't create a type3 node for ctr3, so we
257                         // shutdown an unallocated node (type2), and
258                         // unlock both containers.
259                         c.Check(pool.starts, check.HasLen, 0)
260                         c.Check(pool.shutdowns, check.Equals, 1)
261                         c.Check(pool.creates, check.HasLen, 0)
262                         c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
263                                 {UUID: test.ContainerUUID(3), From: "Locked", To: "Queued"},
264                                 {UUID: test.ContainerUUID(2), From: "Locked", To: "Queued"},
265                         })
266                 case 3:
267                         // Creating a type3 instance works, so we
268                         // start ctr2 on a type2 instance, and leave
269                         // ctr3 locked while we wait for the new
270                         // instance to come up.
271                         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
272                         c.Check(pool.shutdowns, check.Equals, 0)
273                         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(3)})
274                         c.Check(queue.StateChanges(), check.HasLen, 0)
275                 default:
276                         panic("test not written for quota>3")
277                 }
278         }
279 }
280
281 // Don't unlock containers or shutdown unalloc (booting/idle) nodes
282 // just because some 503 errors caused us to reduce maxConcurrency
283 // below the current load level.
284 //
285 // We expect to raise maxConcurrency soon when we stop seeing 503s. If
286 // that doesn't happen soon, the idle timeout will take care of the
287 // excess nodes.
288 func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
289         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
290         queue := test.Queue{
291                 ChooseType: chooseType,
292                 Containers: []arvados.Container{
293                         // scheduled on an instance (but not Running yet)
294                         {
295                                 UUID:     test.ContainerUUID(1),
296                                 Priority: 1000,
297                                 State:    arvados.ContainerStateLocked,
298                                 RuntimeConstraints: arvados.RuntimeConstraints{
299                                         VCPUs: 2,
300                                         RAM:   2 << 30,
301                                 },
302                         },
303                         // not yet scheduled
304                         {
305                                 UUID:     test.ContainerUUID(2),
306                                 Priority: 1000,
307                                 State:    arvados.ContainerStateLocked,
308                                 RuntimeConstraints: arvados.RuntimeConstraints{
309                                         VCPUs: 2,
310                                         RAM:   2 << 30,
311                                 },
312                         },
313                         // scheduled on an instance (but not Running yet)
314                         {
315                                 UUID:     test.ContainerUUID(3),
316                                 Priority: 1000,
317                                 State:    arvados.ContainerStateLocked,
318                                 RuntimeConstraints: arvados.RuntimeConstraints{
319                                         VCPUs: 3,
320                                         RAM:   3 << 30,
321                                 },
322                         },
323                         // not yet scheduled
324                         {
325                                 UUID:     test.ContainerUUID(4),
326                                 Priority: 1000,
327                                 State:    arvados.ContainerStateLocked,
328                                 RuntimeConstraints: arvados.RuntimeConstraints{
329                                         VCPUs: 3,
330                                         RAM:   3 << 30,
331                                 },
332                         },
333                         // not yet locked
334                         {
335                                 UUID:     test.ContainerUUID(5),
336                                 Priority: 1000,
337                                 State:    arvados.ContainerStateQueued,
338                                 RuntimeConstraints: arvados.RuntimeConstraints{
339                                         VCPUs: 3,
340                                         RAM:   3 << 30,
341                                 },
342                         },
343                 },
344         }
345         queue.Update()
346         pool := stubPool{
347                 quota: 16,
348                 unalloc: map[arvados.InstanceType]int{
349                         test.InstanceType(2): 2,
350                         test.InstanceType(3): 2,
351                 },
352                 idle: map[arvados.InstanceType]int{
353                         test.InstanceType(2): 1,
354                         test.InstanceType(3): 1,
355                 },
356                 running: map[string]time.Time{
357                         test.ContainerUUID(1): {},
358                         test.ContainerUUID(3): {},
359                 },
360                 creates:   []arvados.InstanceType{},
361                 starts:    []string{},
362                 canCreate: 0,
363         }
364         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
365         sch.last503time = time.Now()
366         sch.maxConcurrency = 3
367         sch.sync()
368         sch.runQueue()
369         sch.sync()
370
371         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
372         c.Check(pool.shutdowns, check.Equals, 0)
373         c.Check(pool.creates, check.HasLen, 0)
374         c.Check(queue.StateChanges(), check.HasLen, 0)
375 }
376
377 // If we somehow have more supervisor containers in Locked state than
378 // we should (e.g., config changed since they started), and some
379 // appropriate-sized instances booting up, unlock the excess
380 // supervisor containers, but let the instances keep booting.
381 func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
382         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
383         queue := test.Queue{
384                 ChooseType: chooseType,
385         }
386         for i := 1; i <= 6; i++ {
387                 queue.Containers = append(queue.Containers, arvados.Container{
388                         UUID:     test.ContainerUUID(i),
389                         Priority: int64(1000 - i),
390                         State:    arvados.ContainerStateLocked,
391                         RuntimeConstraints: arvados.RuntimeConstraints{
392                                 VCPUs: 2,
393                                 RAM:   2 << 30,
394                         },
395                         SchedulingParameters: arvados.SchedulingParameters{
396                                 Supervisor: true,
397                         },
398                 })
399         }
400         queue.Update()
401         pool := stubPool{
402                 quota: 16,
403                 unalloc: map[arvados.InstanceType]int{
404                         test.InstanceType(2): 2,
405                 },
406                 idle: map[arvados.InstanceType]int{
407                         test.InstanceType(2): 1,
408                 },
409                 running: map[string]time.Time{
410                         test.ContainerUUID(1): {},
411                         test.ContainerUUID(2): {},
412                         test.ContainerUUID(3): {},
413                         test.ContainerUUID(4): {},
414                 },
415                 creates:   []arvados.InstanceType{},
416                 starts:    []string{},
417                 canCreate: 0,
418         }
419         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 4)
420         sch.sync()
421         sch.runQueue()
422         sch.sync()
423
424         c.Check(pool.starts, check.DeepEquals, []string{})
425         c.Check(pool.shutdowns, check.Equals, 0)
426         c.Check(pool.creates, check.HasLen, 0)
427         c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
428                 {UUID: test.ContainerUUID(5), From: "Locked", To: "Queued"},
429                 {UUID: test.ContainerUUID(6), From: "Locked", To: "Queued"},
430         })
431 }
432
433 // Assuming we're not at quota, don't try to shutdown idle nodes
434 // merely because we have more queued/locked supervisor containers
435 // than MaxSupervisors -- it won't help.
436 func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
437         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
438         queue := test.Queue{
439                 ChooseType: chooseType,
440         }
441         for i := 1; i <= 8; i++ {
442                 queue.Containers = append(queue.Containers, arvados.Container{
443                         UUID:     test.ContainerUUID(i),
444                         Priority: int64(1000 + i),
445                         State:    arvados.ContainerStateQueued,
446                         RuntimeConstraints: arvados.RuntimeConstraints{
447                                 VCPUs: 2,
448                                 RAM:   2 << 30,
449                         },
450                         SchedulingParameters: arvados.SchedulingParameters{
451                                 Supervisor: true,
452                         },
453                 })
454         }
455         for i := 2; i < 4; i++ {
456                 queue.Containers[i].State = arvados.ContainerStateLocked
457         }
458         for i := 4; i < 6; i++ {
459                 queue.Containers[i].State = arvados.ContainerStateRunning
460         }
461         queue.Update()
462         pool := stubPool{
463                 quota: 16,
464                 unalloc: map[arvados.InstanceType]int{
465                         test.InstanceType(2): 2,
466                 },
467                 idle: map[arvados.InstanceType]int{
468                         test.InstanceType(2): 1,
469                 },
470                 running: map[string]time.Time{
471                         test.ContainerUUID(5): {},
472                         test.ContainerUUID(6): {},
473                 },
474                 creates:   []arvados.InstanceType{},
475                 starts:    []string{},
476                 canCreate: 0,
477         }
478         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 4)
479         sch.sync()
480         sch.runQueue()
481         sch.sync()
482
483         c.Check(pool.starts, check.HasLen, 2)
484         c.Check(pool.shutdowns, check.Equals, 0)
485         c.Check(pool.creates, check.HasLen, 0)
486         c.Check(queue.StateChanges(), check.HasLen, 0)
487 }
488
489 // Don't flap lock/unlock when equal-priority containers compete for
490 // limited workers.
491 //
492 // (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
493 // tends to choose a different one of the equal-priority containers as
494 // the "first" one that should be locked, and unlock the one it chose
495 // last time. This generates logging noise, and fails containers by
496 // reaching MaxDispatchAttempts quickly.)
497 func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
498         logger := ctxlog.TestLogger(c)
499         ctx := ctxlog.Context(context.Background(), logger)
500         queue := test.Queue{
501                 ChooseType: chooseType,
502                 Logger:     logger,
503         }
504         for i := 0; i < 8; i++ {
505                 queue.Containers = append(queue.Containers, arvados.Container{
506                         UUID:     test.ContainerUUID(i),
507                         Priority: 333,
508                         State:    arvados.ContainerStateQueued,
509                         RuntimeConstraints: arvados.RuntimeConstraints{
510                                 VCPUs: 3,
511                                 RAM:   3 << 30,
512                         },
513                 })
514         }
515         queue.Update()
516         pool := stubPool{
517                 quota: 2,
518                 unalloc: map[arvados.InstanceType]int{
519                         test.InstanceType(3): 2,
520                 },
521                 idle: map[arvados.InstanceType]int{
522                         test.InstanceType(3): 2,
523                 },
524                 running:   map[string]time.Time{},
525                 creates:   []arvados.InstanceType{},
526                 starts:    []string{},
527                 canCreate: 0,
528         }
529         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
530         for i := 0; i < 30; i++ {
531                 sch.runQueue()
532                 sch.sync()
533                 time.Sleep(time.Millisecond)
534         }
535         c.Check(pool.shutdowns, check.Equals, 0)
536         c.Check(pool.starts, check.HasLen, 2)
537         unlocked := map[string]int{}
538         for _, chg := range queue.StateChanges() {
539                 if chg.To == arvados.ContainerStateQueued {
540                         unlocked[chg.UUID]++
541                 }
542         }
543         for uuid, count := range unlocked {
544                 c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
545         }
546 }
547
548 // Start lower-priority containers while waiting for new/existing
549 // workers to come up for higher-priority containers.
550 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
551         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
552         pool := stubPool{
553                 quota: 1000,
554                 unalloc: map[arvados.InstanceType]int{
555                         test.InstanceType(1): 2,
556                         test.InstanceType(2): 2,
557                 },
558                 idle: map[arvados.InstanceType]int{
559                         test.InstanceType(1): 1,
560                         test.InstanceType(2): 1,
561                 },
562                 running:   map[string]time.Time{},
563                 canCreate: 4,
564         }
565         queue := test.Queue{
566                 ChooseType: chooseType,
567                 Containers: []arvados.Container{
568                         {
569                                 // create a new worker
570                                 UUID:     test.ContainerUUID(1),
571                                 Priority: 1,
572                                 State:    arvados.ContainerStateLocked,
573                                 RuntimeConstraints: arvados.RuntimeConstraints{
574                                         VCPUs: 1,
575                                         RAM:   1 << 30,
576                                 },
577                         },
578                         {
579                                 // tentatively map to unalloc worker
580                                 UUID:     test.ContainerUUID(2),
581                                 Priority: 2,
582                                 State:    arvados.ContainerStateLocked,
583                                 RuntimeConstraints: arvados.RuntimeConstraints{
584                                         VCPUs: 1,
585                                         RAM:   1 << 30,
586                                 },
587                         },
588                         {
589                                 // start now on idle worker
590                                 UUID:     test.ContainerUUID(3),
591                                 Priority: 3,
592                                 State:    arvados.ContainerStateLocked,
593                                 RuntimeConstraints: arvados.RuntimeConstraints{
594                                         VCPUs: 1,
595                                         RAM:   1 << 30,
596                                 },
597                         },
598                         {
599                                 // create a new worker
600                                 UUID:     test.ContainerUUID(4),
601                                 Priority: 4,
602                                 State:    arvados.ContainerStateLocked,
603                                 RuntimeConstraints: arvados.RuntimeConstraints{
604                                         VCPUs: 2,
605                                         RAM:   2 << 30,
606                                 },
607                         },
608                         {
609                                 // tentatively map to unalloc worker
610                                 UUID:     test.ContainerUUID(5),
611                                 Priority: 5,
612                                 State:    arvados.ContainerStateLocked,
613                                 RuntimeConstraints: arvados.RuntimeConstraints{
614                                         VCPUs: 2,
615                                         RAM:   2 << 30,
616                                 },
617                         },
618                         {
619                                 // start now on idle worker
620                                 UUID:     test.ContainerUUID(6),
621                                 Priority: 6,
622                                 State:    arvados.ContainerStateLocked,
623                                 RuntimeConstraints: arvados.RuntimeConstraints{
624                                         VCPUs: 2,
625                                         RAM:   2 << 30,
626                                 },
627                         },
628                 },
629         }
630         queue.Update()
631         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
632         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
633         c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
634         running := map[string]bool{}
635         for uuid, t := range pool.running {
636                 if t.IsZero() {
637                         running[uuid] = false
638                 } else {
639                         running[uuid] = true
640                 }
641         }
642         c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
643 }
644
645 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
646         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
647         pool := stubPool{
648                 quota: 1000,
649                 unalloc: map[arvados.InstanceType]int{
650                         test.InstanceType(2): 0,
651                 },
652                 idle: map[arvados.InstanceType]int{
653                         test.InstanceType(2): 0,
654                 },
655                 running: map[string]time.Time{
656                         test.ContainerUUID(2): {},
657                 },
658         }
659         queue := test.Queue{
660                 ChooseType: chooseType,
661                 Containers: []arvados.Container{
662                         {
663                                 // create a new worker
664                                 UUID:     test.ContainerUUID(1),
665                                 Priority: 1,
666                                 State:    arvados.ContainerStateLocked,
667                                 RuntimeConstraints: arvados.RuntimeConstraints{
668                                         VCPUs: 1,
669                                         RAM:   1 << 30,
670                                 },
671                         },
672                 },
673         }
674         queue.Update()
675         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
676         c.Check(pool.running, check.HasLen, 1)
677         sch.sync()
678         for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
679         }
680         c.Check(pool.Running(), check.HasLen, 0)
681 }
682
683 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
684         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
685         queue := test.Queue{
686                 ChooseType: chooseType,
687                 Containers: []arvados.Container{
688                         {
689                                 UUID:      test.ContainerUUID(1),
690                                 Priority:  1,
691                                 State:     arvados.ContainerStateLocked,
692                                 CreatedAt: time.Now().Add(-10 * time.Second),
693                                 RuntimeConstraints: arvados.RuntimeConstraints{
694                                         VCPUs: 1,
695                                         RAM:   1 << 30,
696                                 },
697                         },
698                 },
699         }
700         queue.Update()
701
702         // Create a pool with one unallocated (idle/booting/unknown) worker,
703         // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
704         // state, and the container will be allocated but not started yet.
705         pool := stubPool{
706                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
707         }
708         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
709         sch.runQueue()
710         sch.updateMetrics()
711
712         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
713         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
714         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
715
716         // Create a pool without workers. The queued container will not be started, and the
717         // 'over quota' metric will be 1 because no workers are available and canCreate defaults
718         // to zero.
719         pool = stubPool{}
720         sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
721         sch.runQueue()
722         sch.updateMetrics()
723
724         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
725         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
726         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
727
728         // Reset the queue, and create a pool with an idle worker. The queued
729         // container will be started immediately and mLongestWaitTimeSinceQueue
730         // should be zero.
731         queue = test.Queue{
732                 ChooseType: chooseType,
733                 Containers: []arvados.Container{
734                         {
735                                 UUID:      test.ContainerUUID(1),
736                                 Priority:  1,
737                                 State:     arvados.ContainerStateLocked,
738                                 CreatedAt: time.Now().Add(-10 * time.Second),
739                                 RuntimeConstraints: arvados.RuntimeConstraints{
740                                         VCPUs: 1,
741                                         RAM:   1 << 30,
742                                 },
743                         },
744                 },
745         }
746         queue.Update()
747
748         pool = stubPool{
749                 idle:    map[arvados.InstanceType]int{test.InstanceType(1): 1},
750                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
751                 running: map[string]time.Time{},
752         }
753         sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
754         sch.runQueue()
755         sch.updateMetrics()
756
757         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
758 }
759
760 // Assign priority=4, 3 and 1 containers to idle nodes. Ignore the supervisor at priority 2.
761 func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
762         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
763         queue := test.Queue{
764                 ChooseType: chooseType,
765                 Containers: []arvados.Container{
766                         {
767                                 UUID:     test.ContainerUUID(1),
768                                 Priority: 1,
769                                 State:    arvados.ContainerStateLocked,
770                                 RuntimeConstraints: arvados.RuntimeConstraints{
771                                         VCPUs: 1,
772                                         RAM:   1 << 30,
773                                 },
774                         },
775                         {
776                                 UUID:     test.ContainerUUID(2),
777                                 Priority: 2,
778                                 State:    arvados.ContainerStateLocked,
779                                 RuntimeConstraints: arvados.RuntimeConstraints{
780                                         VCPUs: 1,
781                                         RAM:   1 << 30,
782                                 },
783                                 SchedulingParameters: arvados.SchedulingParameters{
784                                         Supervisor: true,
785                                 },
786                         },
787                         {
788                                 UUID:     test.ContainerUUID(3),
789                                 Priority: 3,
790                                 State:    arvados.ContainerStateLocked,
791                                 RuntimeConstraints: arvados.RuntimeConstraints{
792                                         VCPUs: 1,
793                                         RAM:   1 << 30,
794                                 },
795                                 SchedulingParameters: arvados.SchedulingParameters{
796                                         Supervisor: true,
797                                 },
798                         },
799                         {
800                                 UUID:     test.ContainerUUID(4),
801                                 Priority: 4,
802                                 State:    arvados.ContainerStateLocked,
803                                 RuntimeConstraints: arvados.RuntimeConstraints{
804                                         VCPUs: 1,
805                                         RAM:   1 << 30,
806                                 },
807                                 SchedulingParameters: arvados.SchedulingParameters{
808                                         Supervisor: true,
809                                 },
810                         },
811                 },
812         }
813         queue.Update()
814         pool := stubPool{
815                 quota: 1000,
816                 unalloc: map[arvados.InstanceType]int{
817                         test.InstanceType(1): 4,
818                         test.InstanceType(2): 4,
819                 },
820                 idle: map[arvados.InstanceType]int{
821                         test.InstanceType(1): 4,
822                         test.InstanceType(2): 4,
823                 },
824                 running:   map[string]time.Time{},
825                 canCreate: 0,
826         }
827         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 2).runQueue()
828         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
829         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})
830 }