1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/prometheus/client_golang/prometheus/testutil"
19 check "gopkg.in/check.v1"
23 // arbitrary example container UUIDs
24 uuids = func() (r []string) {
25 for i := 0; i < 16; i++ {
26 r = append(r, test.ContainerUUID(i))
32 type stubQuotaError struct {
36 func (stubQuotaError) IsQuotaError() bool { return true }
38 type stubPool struct {
39 notify <-chan struct{}
40 unalloc map[arvados.InstanceType]int // idle+booting+unknown
41 idle map[arvados.InstanceType]int
42 unknown map[arvados.InstanceType]int
43 running map[string]time.Time
46 creates []arvados.InstanceType
52 func (p *stubPool) AtQuota() bool {
56 for _, nn := range p.unalloc {
59 for _, nn := range p.unknown {
64 func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
65 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
66 func (p *stubPool) Running() map[string]time.Time {
69 r := map[string]time.Time{}
70 for k, v := range p.running {
75 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
78 r := map[arvados.InstanceType]int{}
79 for it, n := range p.unalloc {
80 r[it] = n - p.unknown[it]
84 func (p *stubPool) Create(it arvados.InstanceType) bool {
87 p.creates = append(p.creates, it)
95 func (p *stubPool) ForgetContainer(uuid string) {
97 func (p *stubPool) KillContainer(uuid, reason string) bool {
100 defer delete(p.running, uuid)
101 t, ok := p.running[uuid]
102 return ok && t.IsZero()
104 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
108 func (p *stubPool) CountWorkers() map[worker.State]int {
111 return map[worker.State]int{
112 worker.StateBooting: len(p.unalloc) - len(p.idle),
113 worker.StateIdle: len(p.idle),
114 worker.StateRunning: len(p.running),
115 worker.StateUnknown: len(p.unknown),
118 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
121 p.starts = append(p.starts, ctr.UUID)
127 p.running[ctr.UUID] = time.Time{}
131 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
132 return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
135 var _ = check.Suite(&SchedulerSuite{})
137 type SchedulerSuite struct{}
139 // Assign priority=4 container to idle node. Create new instances for
140 // the priority=3, 2, 1 containers.
141 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
142 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
144 ChooseType: chooseType,
145 Containers: []arvados.Container{
147 UUID: test.ContainerUUID(1),
149 State: arvados.ContainerStateLocked,
150 RuntimeConstraints: arvados.RuntimeConstraints{
156 UUID: test.ContainerUUID(2),
158 State: arvados.ContainerStateLocked,
159 RuntimeConstraints: arvados.RuntimeConstraints{
165 UUID: test.ContainerUUID(3),
167 State: arvados.ContainerStateLocked,
168 RuntimeConstraints: arvados.RuntimeConstraints{
174 UUID: test.ContainerUUID(4),
176 State: arvados.ContainerStateLocked,
177 RuntimeConstraints: arvados.RuntimeConstraints{
187 unalloc: map[arvados.InstanceType]int{
188 test.InstanceType(1): 1,
189 test.InstanceType(2): 2,
191 idle: map[arvados.InstanceType]int{
192 test.InstanceType(1): 1,
193 test.InstanceType(2): 2,
195 running: map[string]time.Time{},
198 New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0).runQueue()
199 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
200 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
201 c.Check(pool.running, check.HasLen, 1)
202 for uuid := range pool.running {
203 c.Check(uuid, check.Equals, uuids[4])
207 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
209 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
210 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
211 for quota := 1; quota <= 3; quota++ {
212 c.Logf("quota=%d", quota)
214 ChooseType: chooseType,
215 Containers: []arvados.Container{
217 UUID: test.ContainerUUID(2),
219 State: arvados.ContainerStateLocked,
220 RuntimeConstraints: arvados.RuntimeConstraints{
226 UUID: test.ContainerUUID(3),
228 State: arvados.ContainerStateLocked,
229 RuntimeConstraints: arvados.RuntimeConstraints{
239 unalloc: map[arvados.InstanceType]int{
240 test.InstanceType(2): 2,
242 idle: map[arvados.InstanceType]int{
243 test.InstanceType(2): 2,
245 running: map[string]time.Time{},
246 creates: []arvados.InstanceType{},
250 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
256 // Can't create a type3 node for ctr3, so we
257 // shutdown an unallocated node (type2), and
258 // unlock both containers.
259 c.Check(pool.starts, check.HasLen, 0)
260 c.Check(pool.shutdowns, check.Equals, 1)
261 c.Check(pool.creates, check.HasLen, 0)
262 c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
263 {UUID: test.ContainerUUID(3), From: "Locked", To: "Queued"},
264 {UUID: test.ContainerUUID(2), From: "Locked", To: "Queued"},
267 // Creating a type3 instance works, so we
268 // start ctr2 on a type2 instance, and leave
269 // ctr3 locked while we wait for the new
270 // instance to come up.
271 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
272 c.Check(pool.shutdowns, check.Equals, 0)
273 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(3)})
274 c.Check(queue.StateChanges(), check.HasLen, 0)
276 panic("test not written for quota>3")
281 // Don't unlock containers or shutdown unalloc (booting/idle) nodes
282 // just because some 503 errors caused us to reduce maxConcurrency
283 // below the current load level.
285 // We expect to raise maxConcurrency soon when we stop seeing 503s. If
286 // that doesn't happen soon, the idle timeout will take care of the
288 func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
289 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
291 ChooseType: chooseType,
292 Containers: []arvados.Container{
293 // scheduled on an instance (but not Running yet)
295 UUID: test.ContainerUUID(1),
297 State: arvados.ContainerStateLocked,
298 RuntimeConstraints: arvados.RuntimeConstraints{
305 UUID: test.ContainerUUID(2),
307 State: arvados.ContainerStateLocked,
308 RuntimeConstraints: arvados.RuntimeConstraints{
313 // scheduled on an instance (but not Running yet)
315 UUID: test.ContainerUUID(3),
317 State: arvados.ContainerStateLocked,
318 RuntimeConstraints: arvados.RuntimeConstraints{
325 UUID: test.ContainerUUID(4),
327 State: arvados.ContainerStateLocked,
328 RuntimeConstraints: arvados.RuntimeConstraints{
335 UUID: test.ContainerUUID(5),
337 State: arvados.ContainerStateQueued,
338 RuntimeConstraints: arvados.RuntimeConstraints{
348 unalloc: map[arvados.InstanceType]int{
349 test.InstanceType(2): 2,
350 test.InstanceType(3): 2,
352 idle: map[arvados.InstanceType]int{
353 test.InstanceType(2): 1,
354 test.InstanceType(3): 1,
356 running: map[string]time.Time{
357 test.ContainerUUID(1): {},
358 test.ContainerUUID(3): {},
360 creates: []arvados.InstanceType{},
364 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
365 sch.last503time = time.Now()
366 sch.maxConcurrency = 3
371 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
372 c.Check(pool.shutdowns, check.Equals, 0)
373 c.Check(pool.creates, check.HasLen, 0)
374 c.Check(queue.StateChanges(), check.HasLen, 0)
377 // If we somehow have more supervisor containers in Locked state than
378 // we should (e.g., config changed since they started), and some
379 // appropriate-sized instances booting up, unlock the excess
380 // supervisor containers, but let the instances keep booting.
381 func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
382 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
384 ChooseType: chooseType,
386 for i := 1; i <= 6; i++ {
387 queue.Containers = append(queue.Containers, arvados.Container{
388 UUID: test.ContainerUUID(i),
389 Priority: int64(1000 - i),
390 State: arvados.ContainerStateLocked,
391 RuntimeConstraints: arvados.RuntimeConstraints{
395 SchedulingParameters: arvados.SchedulingParameters{
403 unalloc: map[arvados.InstanceType]int{
404 test.InstanceType(2): 2,
406 idle: map[arvados.InstanceType]int{
407 test.InstanceType(2): 1,
409 running: map[string]time.Time{
410 test.ContainerUUID(1): {},
411 test.ContainerUUID(2): {},
412 test.ContainerUUID(3): {},
413 test.ContainerUUID(4): {},
415 creates: []arvados.InstanceType{},
419 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 8, 0.5)
424 c.Check(pool.starts, check.DeepEquals, []string{})
425 c.Check(pool.shutdowns, check.Equals, 0)
426 c.Check(pool.creates, check.HasLen, 0)
427 c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
428 {UUID: test.ContainerUUID(5), From: "Locked", To: "Queued"},
429 {UUID: test.ContainerUUID(6), From: "Locked", To: "Queued"},
433 // Assuming we're not at quota, don't try to shutdown idle nodes
434 // merely because we have more queued/locked supervisor containers
435 // than MaxSupervisors -- it won't help.
436 func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
437 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
439 ChooseType: chooseType,
441 for i := 1; i <= 8; i++ {
442 queue.Containers = append(queue.Containers, arvados.Container{
443 UUID: test.ContainerUUID(i),
444 Priority: int64(1000 + i),
445 State: arvados.ContainerStateQueued,
446 RuntimeConstraints: arvados.RuntimeConstraints{
450 SchedulingParameters: arvados.SchedulingParameters{
455 for i := 2; i < 4; i++ {
456 queue.Containers[i].State = arvados.ContainerStateLocked
458 for i := 4; i < 6; i++ {
459 queue.Containers[i].State = arvados.ContainerStateRunning
464 unalloc: map[arvados.InstanceType]int{
465 test.InstanceType(2): 2,
467 idle: map[arvados.InstanceType]int{
468 test.InstanceType(2): 1,
470 running: map[string]time.Time{
471 test.ContainerUUID(5): {},
472 test.ContainerUUID(6): {},
474 creates: []arvados.InstanceType{},
478 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 8, 0.5)
483 c.Check(pool.starts, check.HasLen, 2)
484 c.Check(pool.shutdowns, check.Equals, 0)
485 c.Check(pool.creates, check.HasLen, 0)
486 c.Check(queue.StateChanges(), check.HasLen, 0)
489 // Don't flap lock/unlock when equal-priority containers compete for
492 // (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
493 // tends to choose a different one of the equal-priority containers as
494 // the "first" one that should be locked, and unlock the one it chose
495 // last time. This generates logging noise, and fails containers by
496 // reaching MaxDispatchAttempts quickly.)
497 func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
498 logger := ctxlog.TestLogger(c)
499 ctx := ctxlog.Context(context.Background(), logger)
501 ChooseType: chooseType,
504 for i := 0; i < 8; i++ {
505 queue.Containers = append(queue.Containers, arvados.Container{
506 UUID: test.ContainerUUID(i),
508 State: arvados.ContainerStateQueued,
509 RuntimeConstraints: arvados.RuntimeConstraints{
518 unalloc: map[arvados.InstanceType]int{
519 test.InstanceType(3): 2,
521 idle: map[arvados.InstanceType]int{
522 test.InstanceType(3): 2,
524 running: map[string]time.Time{},
525 creates: []arvados.InstanceType{},
529 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
530 for i := 0; i < 30; i++ {
533 time.Sleep(time.Millisecond)
535 c.Check(pool.shutdowns, check.Equals, 0)
536 c.Check(pool.starts, check.HasLen, 2)
537 unlocked := map[string]int{}
538 for _, chg := range queue.StateChanges() {
539 if chg.To == arvados.ContainerStateQueued {
543 for uuid, count := range unlocked {
544 c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
548 // Start lower-priority containers while waiting for new/existing
549 // workers to come up for higher-priority containers.
550 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
551 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
554 unalloc: map[arvados.InstanceType]int{
555 test.InstanceType(1): 2,
556 test.InstanceType(2): 2,
558 idle: map[arvados.InstanceType]int{
559 test.InstanceType(1): 1,
560 test.InstanceType(2): 1,
562 running: map[string]time.Time{},
566 ChooseType: chooseType,
567 Containers: []arvados.Container{
569 // create a new worker
570 UUID: test.ContainerUUID(1),
572 State: arvados.ContainerStateLocked,
573 RuntimeConstraints: arvados.RuntimeConstraints{
579 // tentatively map to unalloc worker
580 UUID: test.ContainerUUID(2),
582 State: arvados.ContainerStateLocked,
583 RuntimeConstraints: arvados.RuntimeConstraints{
589 // start now on idle worker
590 UUID: test.ContainerUUID(3),
592 State: arvados.ContainerStateLocked,
593 RuntimeConstraints: arvados.RuntimeConstraints{
599 // create a new worker
600 UUID: test.ContainerUUID(4),
602 State: arvados.ContainerStateLocked,
603 RuntimeConstraints: arvados.RuntimeConstraints{
609 // tentatively map to unalloc worker
610 UUID: test.ContainerUUID(5),
612 State: arvados.ContainerStateLocked,
613 RuntimeConstraints: arvados.RuntimeConstraints{
619 // start now on idle worker
620 UUID: test.ContainerUUID(6),
622 State: arvados.ContainerStateLocked,
623 RuntimeConstraints: arvados.RuntimeConstraints{
631 New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0).runQueue()
632 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
633 c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
634 running := map[string]bool{}
635 for uuid, t := range pool.running {
637 running[uuid] = false
642 c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
645 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
646 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
649 unalloc: map[arvados.InstanceType]int{
650 test.InstanceType(2): 0,
652 idle: map[arvados.InstanceType]int{
653 test.InstanceType(2): 0,
655 running: map[string]time.Time{
656 test.ContainerUUID(2): {},
660 ChooseType: chooseType,
661 Containers: []arvados.Container{
663 // create a new worker
664 UUID: test.ContainerUUID(1),
666 State: arvados.ContainerStateLocked,
667 RuntimeConstraints: arvados.RuntimeConstraints{
675 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
676 c.Check(pool.running, check.HasLen, 1)
678 for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
680 c.Check(pool.Running(), check.HasLen, 0)
683 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
684 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
686 ChooseType: chooseType,
687 Containers: []arvados.Container{
689 UUID: test.ContainerUUID(1),
691 State: arvados.ContainerStateLocked,
692 CreatedAt: time.Now().Add(-10 * time.Second),
693 RuntimeConstraints: arvados.RuntimeConstraints{
702 // Create a pool with one unallocated (idle/booting/unknown) worker,
703 // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
704 // state, and the container will be allocated but not started yet.
706 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
708 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
712 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
713 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
714 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
716 // Create a pool without workers. The queued container will not be started, and the
717 // 'over quota' metric will be 1 because no workers are available and canCreate defaults
720 sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
724 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
725 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
726 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
728 // Reset the queue, and create a pool with an idle worker. The queued
729 // container will be started immediately and mLongestWaitTimeSinceQueue
732 ChooseType: chooseType,
733 Containers: []arvados.Container{
735 UUID: test.ContainerUUID(1),
737 State: arvados.ContainerStateLocked,
738 CreatedAt: time.Now().Add(-10 * time.Second),
739 RuntimeConstraints: arvados.RuntimeConstraints{
749 idle: map[arvados.InstanceType]int{test.InstanceType(1): 1},
750 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
751 running: map[string]time.Time{},
753 sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
757 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
760 // Assign priority=4, 3 and 1 containers to idle nodes. Ignore the supervisor at priority 2.
761 func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
762 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
764 ChooseType: chooseType,
765 Containers: []arvados.Container{
767 UUID: test.ContainerUUID(1),
769 State: arvados.ContainerStateLocked,
770 RuntimeConstraints: arvados.RuntimeConstraints{
776 UUID: test.ContainerUUID(2),
778 State: arvados.ContainerStateLocked,
779 RuntimeConstraints: arvados.RuntimeConstraints{
783 SchedulingParameters: arvados.SchedulingParameters{
788 UUID: test.ContainerUUID(3),
790 State: arvados.ContainerStateLocked,
791 RuntimeConstraints: arvados.RuntimeConstraints{
795 SchedulingParameters: arvados.SchedulingParameters{
800 UUID: test.ContainerUUID(4),
802 State: arvados.ContainerStateLocked,
803 RuntimeConstraints: arvados.RuntimeConstraints{
807 SchedulingParameters: arvados.SchedulingParameters{
816 unalloc: map[arvados.InstanceType]int{
817 test.InstanceType(1): 4,
818 test.InstanceType(2): 4,
820 idle: map[arvados.InstanceType]int{
821 test.InstanceType(1): 4,
822 test.InstanceType(2): 4,
824 running: map[string]time.Time{},
827 New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 10, 0.2).runQueue()
828 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
829 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})