1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/prometheus/client_golang/prometheus/testutil"
19 check "gopkg.in/check.v1"
23 // arbitrary example container UUIDs
24 uuids = func() (r []string) {
25 for i := 0; i < 16; i++ {
26 r = append(r, test.ContainerUUID(i))
32 type stubPool struct {
33 notify <-chan struct{}
34 unalloc map[arvados.InstanceType]int // idle+booting+unknown
35 idle map[arvados.InstanceType]int
36 unknown map[arvados.InstanceType]int
37 running map[string]time.Time
40 creates []arvados.InstanceType
46 func (p *stubPool) AtQuota() bool {
50 for _, nn := range p.unalloc {
53 for _, nn := range p.unknown {
58 func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
59 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
60 func (p *stubPool) Running() map[string]time.Time {
63 r := map[string]time.Time{}
64 for k, v := range p.running {
69 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
72 r := map[arvados.InstanceType]int{}
73 for it, n := range p.unalloc {
74 r[it] = n - p.unknown[it]
78 func (p *stubPool) Create(it arvados.InstanceType) bool {
81 p.creates = append(p.creates, it)
89 func (p *stubPool) ForgetContainer(uuid string) {
91 func (p *stubPool) KillContainer(uuid, reason string) bool {
94 defer delete(p.running, uuid)
95 t, ok := p.running[uuid]
96 return ok && t.IsZero()
98 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
102 func (p *stubPool) CountWorkers() map[worker.State]int {
105 return map[worker.State]int{
106 worker.StateBooting: len(p.unalloc) - len(p.idle),
107 worker.StateIdle: len(p.idle),
108 worker.StateRunning: len(p.running),
109 worker.StateUnknown: len(p.unknown),
112 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
115 p.starts = append(p.starts, ctr.UUID)
121 p.running[ctr.UUID] = time.Time{}
125 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
126 return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
129 var _ = check.Suite(&SchedulerSuite{})
131 type SchedulerSuite struct{}
133 // Assign priority=4 container to idle node. Create new instances for
134 // the priority=3, 2, 1 containers.
135 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
136 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
138 ChooseType: chooseType,
139 Containers: []arvados.Container{
141 UUID: test.ContainerUUID(1),
143 State: arvados.ContainerStateLocked,
144 RuntimeConstraints: arvados.RuntimeConstraints{
150 UUID: test.ContainerUUID(2),
152 State: arvados.ContainerStateLocked,
153 RuntimeConstraints: arvados.RuntimeConstraints{
159 UUID: test.ContainerUUID(3),
161 State: arvados.ContainerStateLocked,
162 RuntimeConstraints: arvados.RuntimeConstraints{
168 UUID: test.ContainerUUID(4),
170 State: arvados.ContainerStateLocked,
171 RuntimeConstraints: arvados.RuntimeConstraints{
181 unalloc: map[arvados.InstanceType]int{
182 test.InstanceType(1): 1,
183 test.InstanceType(2): 2,
185 idle: map[arvados.InstanceType]int{
186 test.InstanceType(1): 1,
187 test.InstanceType(2): 2,
189 running: map[string]time.Time{},
192 New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0).runQueue()
193 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
194 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
195 c.Check(pool.running, check.HasLen, 1)
196 for uuid := range pool.running {
197 c.Check(uuid, check.Equals, uuids[4])
201 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
203 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
204 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
205 for quota := 1; quota <= 3; quota++ {
206 c.Logf("quota=%d", quota)
208 ChooseType: chooseType,
209 Containers: []arvados.Container{
211 UUID: test.ContainerUUID(2),
213 State: arvados.ContainerStateLocked,
214 RuntimeConstraints: arvados.RuntimeConstraints{
220 UUID: test.ContainerUUID(3),
222 State: arvados.ContainerStateLocked,
223 RuntimeConstraints: arvados.RuntimeConstraints{
233 unalloc: map[arvados.InstanceType]int{
234 test.InstanceType(2): 2,
236 idle: map[arvados.InstanceType]int{
237 test.InstanceType(2): 2,
239 running: map[string]time.Time{},
240 creates: []arvados.InstanceType{},
244 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
250 // Can't create a type3 node for ctr3, so we
251 // shutdown an unallocated node (type2), and
252 // unlock both containers.
253 c.Check(pool.starts, check.HasLen, 0)
254 c.Check(pool.shutdowns, check.Equals, 1)
255 c.Check(pool.creates, check.HasLen, 0)
256 c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
257 {UUID: test.ContainerUUID(3), From: "Locked", To: "Queued"},
258 {UUID: test.ContainerUUID(2), From: "Locked", To: "Queued"},
261 // Creating a type3 instance works, so we
262 // start ctr2 on a type2 instance, and leave
263 // ctr3 locked while we wait for the new
264 // instance to come up.
265 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
266 c.Check(pool.shutdowns, check.Equals, 0)
267 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(3)})
268 c.Check(queue.StateChanges(), check.HasLen, 0)
270 panic("test not written for quota>3")
275 // Don't unlock containers or shutdown unalloc (booting/idle) nodes
276 // just because some 503 errors caused us to reduce maxConcurrency
277 // below the current load level.
279 // We expect to raise maxConcurrency soon when we stop seeing 503s. If
280 // that doesn't happen soon, the idle timeout will take care of the
282 func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
283 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
285 ChooseType: chooseType,
286 Containers: []arvados.Container{
287 // scheduled on an instance (but not Running yet)
289 UUID: test.ContainerUUID(1),
291 State: arvados.ContainerStateLocked,
292 RuntimeConstraints: arvados.RuntimeConstraints{
299 UUID: test.ContainerUUID(2),
301 State: arvados.ContainerStateLocked,
302 RuntimeConstraints: arvados.RuntimeConstraints{
307 // scheduled on an instance (but not Running yet)
309 UUID: test.ContainerUUID(3),
311 State: arvados.ContainerStateLocked,
312 RuntimeConstraints: arvados.RuntimeConstraints{
319 UUID: test.ContainerUUID(4),
321 State: arvados.ContainerStateLocked,
322 RuntimeConstraints: arvados.RuntimeConstraints{
329 UUID: test.ContainerUUID(5),
331 State: arvados.ContainerStateQueued,
332 RuntimeConstraints: arvados.RuntimeConstraints{
342 unalloc: map[arvados.InstanceType]int{
343 test.InstanceType(2): 2,
344 test.InstanceType(3): 2,
346 idle: map[arvados.InstanceType]int{
347 test.InstanceType(2): 1,
348 test.InstanceType(3): 1,
350 running: map[string]time.Time{
351 test.ContainerUUID(1): {},
352 test.ContainerUUID(3): {},
354 creates: []arvados.InstanceType{},
358 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
359 sch.last503time = time.Now()
360 sch.maxConcurrency = 3
365 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
366 c.Check(pool.shutdowns, check.Equals, 0)
367 c.Check(pool.creates, check.HasLen, 0)
368 c.Check(queue.StateChanges(), check.HasLen, 0)
371 // If we somehow have more supervisor containers in Locked state than
372 // we should (e.g., config changed since they started), and some
373 // appropriate-sized instances booting up, unlock the excess
374 // supervisor containers, but let the instances keep booting.
375 func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
376 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
378 ChooseType: chooseType,
380 for i := 1; i <= 6; i++ {
381 queue.Containers = append(queue.Containers, arvados.Container{
382 UUID: test.ContainerUUID(i),
383 Priority: int64(1000 - i),
384 State: arvados.ContainerStateLocked,
385 RuntimeConstraints: arvados.RuntimeConstraints{
389 SchedulingParameters: arvados.SchedulingParameters{
397 unalloc: map[arvados.InstanceType]int{
398 test.InstanceType(2): 2,
400 idle: map[arvados.InstanceType]int{
401 test.InstanceType(2): 1,
403 running: map[string]time.Time{
404 test.ContainerUUID(1): {},
405 test.ContainerUUID(2): {},
406 test.ContainerUUID(3): {},
407 test.ContainerUUID(4): {},
409 creates: []arvados.InstanceType{},
413 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 8, 0.5)
418 c.Check(pool.starts, check.DeepEquals, []string{})
419 c.Check(pool.shutdowns, check.Equals, 0)
420 c.Check(pool.creates, check.HasLen, 0)
421 c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
422 {UUID: test.ContainerUUID(5), From: "Locked", To: "Queued"},
423 {UUID: test.ContainerUUID(6), From: "Locked", To: "Queued"},
427 // Assuming we're not at quota, don't try to shutdown idle nodes
428 // merely because we have more queued/locked supervisor containers
429 // than MaxSupervisors -- it won't help.
430 func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
431 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
433 ChooseType: chooseType,
435 for i := 1; i <= 8; i++ {
436 queue.Containers = append(queue.Containers, arvados.Container{
437 UUID: test.ContainerUUID(i),
438 Priority: int64(1000 + i),
439 State: arvados.ContainerStateQueued,
440 RuntimeConstraints: arvados.RuntimeConstraints{
444 SchedulingParameters: arvados.SchedulingParameters{
449 for i := 2; i < 4; i++ {
450 queue.Containers[i].State = arvados.ContainerStateLocked
452 for i := 4; i < 6; i++ {
453 queue.Containers[i].State = arvados.ContainerStateRunning
458 unalloc: map[arvados.InstanceType]int{
459 test.InstanceType(2): 2,
461 idle: map[arvados.InstanceType]int{
462 test.InstanceType(2): 1,
464 running: map[string]time.Time{
465 test.ContainerUUID(5): {},
466 test.ContainerUUID(6): {},
468 creates: []arvados.InstanceType{},
472 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 8, 0.5)
477 c.Check(pool.starts, check.HasLen, 2)
478 c.Check(pool.shutdowns, check.Equals, 0)
479 c.Check(pool.creates, check.HasLen, 0)
480 c.Check(queue.StateChanges(), check.HasLen, 0)
483 // Don't flap lock/unlock when equal-priority containers compete for
486 // (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
487 // tends to choose a different one of the equal-priority containers as
488 // the "first" one that should be locked, and unlock the one it chose
489 // last time. This generates logging noise, and fails containers by
490 // reaching MaxDispatchAttempts quickly.)
491 func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
492 logger := ctxlog.TestLogger(c)
493 ctx := ctxlog.Context(context.Background(), logger)
495 ChooseType: chooseType,
498 for i := 0; i < 8; i++ {
499 queue.Containers = append(queue.Containers, arvados.Container{
500 UUID: test.ContainerUUID(i),
502 State: arvados.ContainerStateQueued,
503 RuntimeConstraints: arvados.RuntimeConstraints{
512 unalloc: map[arvados.InstanceType]int{
513 test.InstanceType(3): 2,
515 idle: map[arvados.InstanceType]int{
516 test.InstanceType(3): 2,
518 running: map[string]time.Time{},
519 creates: []arvados.InstanceType{},
523 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
524 for i := 0; i < 30; i++ {
527 time.Sleep(time.Millisecond)
529 c.Check(pool.shutdowns, check.Equals, 0)
530 c.Check(pool.starts, check.HasLen, 2)
531 unlocked := map[string]int{}
532 for _, chg := range queue.StateChanges() {
533 if chg.To == arvados.ContainerStateQueued {
537 for uuid, count := range unlocked {
538 c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
542 // Start lower-priority containers while waiting for new/existing
543 // workers to come up for higher-priority containers.
544 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
545 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
548 unalloc: map[arvados.InstanceType]int{
549 test.InstanceType(1): 2,
550 test.InstanceType(2): 2,
552 idle: map[arvados.InstanceType]int{
553 test.InstanceType(1): 1,
554 test.InstanceType(2): 1,
556 running: map[string]time.Time{},
560 ChooseType: chooseType,
561 Containers: []arvados.Container{
563 // create a new worker
564 UUID: test.ContainerUUID(1),
566 State: arvados.ContainerStateLocked,
567 RuntimeConstraints: arvados.RuntimeConstraints{
573 // tentatively map to unalloc worker
574 UUID: test.ContainerUUID(2),
576 State: arvados.ContainerStateLocked,
577 RuntimeConstraints: arvados.RuntimeConstraints{
583 // start now on idle worker
584 UUID: test.ContainerUUID(3),
586 State: arvados.ContainerStateLocked,
587 RuntimeConstraints: arvados.RuntimeConstraints{
593 // create a new worker
594 UUID: test.ContainerUUID(4),
596 State: arvados.ContainerStateLocked,
597 RuntimeConstraints: arvados.RuntimeConstraints{
603 // tentatively map to unalloc worker
604 UUID: test.ContainerUUID(5),
606 State: arvados.ContainerStateLocked,
607 RuntimeConstraints: arvados.RuntimeConstraints{
613 // start now on idle worker
614 UUID: test.ContainerUUID(6),
616 State: arvados.ContainerStateLocked,
617 RuntimeConstraints: arvados.RuntimeConstraints{
625 New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0).runQueue()
626 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
627 c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
628 running := map[string]bool{}
629 for uuid, t := range pool.running {
631 running[uuid] = false
636 c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
639 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
640 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
643 unalloc: map[arvados.InstanceType]int{
644 test.InstanceType(2): 0,
646 idle: map[arvados.InstanceType]int{
647 test.InstanceType(2): 0,
649 running: map[string]time.Time{
650 test.ContainerUUID(2): {},
654 ChooseType: chooseType,
655 Containers: []arvados.Container{
657 // create a new worker
658 UUID: test.ContainerUUID(1),
660 State: arvados.ContainerStateLocked,
661 RuntimeConstraints: arvados.RuntimeConstraints{
669 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
670 c.Check(pool.running, check.HasLen, 1)
672 for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
674 c.Check(pool.Running(), check.HasLen, 0)
677 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
678 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
680 ChooseType: chooseType,
681 Containers: []arvados.Container{
683 UUID: test.ContainerUUID(1),
685 State: arvados.ContainerStateLocked,
686 CreatedAt: time.Now().Add(-10 * time.Second),
687 RuntimeConstraints: arvados.RuntimeConstraints{
696 // Create a pool with one unallocated (idle/booting/unknown) worker,
697 // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
698 // state, and the container will be allocated but not started yet.
700 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
702 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
706 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
707 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
708 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
710 // Create a pool without workers. The queued container will not be started, and the
711 // 'over quota' metric will be 1 because no workers are available and canCreate defaults
714 sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
718 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
719 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
720 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
722 // Reset the queue, and create a pool with an idle worker. The queued
723 // container will be started immediately and mLongestWaitTimeSinceQueue
726 ChooseType: chooseType,
727 Containers: []arvados.Container{
729 UUID: test.ContainerUUID(1),
731 State: arvados.ContainerStateLocked,
732 CreatedAt: time.Now().Add(-10 * time.Second),
733 RuntimeConstraints: arvados.RuntimeConstraints{
743 idle: map[arvados.InstanceType]int{test.InstanceType(1): 1},
744 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
745 running: map[string]time.Time{},
747 sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
751 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
754 // Assign priority=4, 3 and 1 containers to idle nodes. Ignore the supervisor at priority 2.
755 func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
756 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
758 ChooseType: chooseType,
759 Containers: []arvados.Container{
761 UUID: test.ContainerUUID(1),
763 State: arvados.ContainerStateLocked,
764 RuntimeConstraints: arvados.RuntimeConstraints{
770 UUID: test.ContainerUUID(2),
772 State: arvados.ContainerStateLocked,
773 RuntimeConstraints: arvados.RuntimeConstraints{
777 SchedulingParameters: arvados.SchedulingParameters{
782 UUID: test.ContainerUUID(3),
784 State: arvados.ContainerStateLocked,
785 RuntimeConstraints: arvados.RuntimeConstraints{
789 SchedulingParameters: arvados.SchedulingParameters{
794 UUID: test.ContainerUUID(4),
796 State: arvados.ContainerStateLocked,
797 RuntimeConstraints: arvados.RuntimeConstraints{
801 SchedulingParameters: arvados.SchedulingParameters{
810 unalloc: map[arvados.InstanceType]int{
811 test.InstanceType(1): 4,
812 test.InstanceType(2): 4,
814 idle: map[arvados.InstanceType]int{
815 test.InstanceType(1): 4,
816 test.InstanceType(2): 4,
818 running: map[string]time.Time{},
821 New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 10, 0.2).runQueue()
822 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
823 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})