1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/prometheus/client_golang/prometheus/testutil"
19 check "gopkg.in/check.v1"
23 // arbitrary example container UUIDs
24 uuids = func() (r []string) {
25 for i := 0; i < 16; i++ {
26 r = append(r, test.ContainerUUID(i))
32 type stubQuotaError struct {
36 func (stubQuotaError) IsQuotaError() bool { return true }
38 type stubPool struct {
39 notify <-chan struct{}
40 unalloc map[arvados.InstanceType]int // idle+booting+unknown
41 idle map[arvados.InstanceType]int
42 unknown map[arvados.InstanceType]int
43 running map[string]time.Time
46 creates []arvados.InstanceType
52 func (p *stubPool) AtQuota() bool {
56 for _, nn := range p.unalloc {
59 for _, nn := range p.unknown {
64 func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
65 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
66 func (p *stubPool) Running() map[string]time.Time {
69 r := map[string]time.Time{}
70 for k, v := range p.running {
75 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
78 r := map[arvados.InstanceType]int{}
79 for it, n := range p.unalloc {
80 r[it] = n - p.unknown[it]
84 func (p *stubPool) Create(it arvados.InstanceType) bool {
87 p.creates = append(p.creates, it)
95 func (p *stubPool) ForgetContainer(uuid string) {
97 func (p *stubPool) KillContainer(uuid, reason string) bool {
100 defer delete(p.running, uuid)
101 t, ok := p.running[uuid]
102 return ok && t.IsZero()
104 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
108 func (p *stubPool) CountWorkers() map[worker.State]int {
111 return map[worker.State]int{
112 worker.StateBooting: len(p.unalloc) - len(p.idle),
113 worker.StateIdle: len(p.idle),
114 worker.StateRunning: len(p.running),
115 worker.StateUnknown: len(p.unknown),
118 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
121 p.starts = append(p.starts, ctr.UUID)
127 p.running[ctr.UUID] = time.Time{}
131 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
132 return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
135 var _ = check.Suite(&SchedulerSuite{})
137 type SchedulerSuite struct{}
139 // Assign priority=4 container to idle node. Create new instances for
140 // the priority=3, 2, 1 containers.
141 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
142 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
144 ChooseType: chooseType,
145 Containers: []arvados.Container{
147 UUID: test.ContainerUUID(1),
149 State: arvados.ContainerStateLocked,
150 RuntimeConstraints: arvados.RuntimeConstraints{
156 UUID: test.ContainerUUID(2),
158 State: arvados.ContainerStateLocked,
159 RuntimeConstraints: arvados.RuntimeConstraints{
165 UUID: test.ContainerUUID(3),
167 State: arvados.ContainerStateLocked,
168 RuntimeConstraints: arvados.RuntimeConstraints{
174 UUID: test.ContainerUUID(4),
176 State: arvados.ContainerStateLocked,
177 RuntimeConstraints: arvados.RuntimeConstraints{
187 unalloc: map[arvados.InstanceType]int{
188 test.InstanceType(1): 1,
189 test.InstanceType(2): 2,
191 idle: map[arvados.InstanceType]int{
192 test.InstanceType(1): 1,
193 test.InstanceType(2): 2,
195 running: map[string]time.Time{},
198 New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
199 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
200 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
201 c.Check(pool.running, check.HasLen, 1)
202 for uuid := range pool.running {
203 c.Check(uuid, check.Equals, uuids[4])
207 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
209 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
210 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
211 for quota := 1; quota <= 3; quota++ {
212 c.Logf("quota=%d", quota)
214 ChooseType: chooseType,
215 Containers: []arvados.Container{
217 UUID: test.ContainerUUID(2),
219 State: arvados.ContainerStateLocked,
220 RuntimeConstraints: arvados.RuntimeConstraints{
226 UUID: test.ContainerUUID(3),
228 State: arvados.ContainerStateLocked,
229 RuntimeConstraints: arvados.RuntimeConstraints{
239 unalloc: map[arvados.InstanceType]int{
240 test.InstanceType(2): 2,
242 idle: map[arvados.InstanceType]int{
243 test.InstanceType(2): 2,
245 running: map[string]time.Time{},
246 creates: []arvados.InstanceType{},
250 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
256 // Can't create a type3 node for ctr3, so we
257 // shutdown an unallocated node (type2), and
258 // unlock both containers.
259 c.Check(pool.starts, check.HasLen, 0)
260 c.Check(pool.shutdowns, check.Equals, 1)
261 c.Check(pool.creates, check.HasLen, 0)
262 c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
263 {UUID: test.ContainerUUID(3), From: "Locked", To: "Queued"},
264 {UUID: test.ContainerUUID(2), From: "Locked", To: "Queued"},
267 // Creating a type3 instance works, so we
268 // start ctr2 on a type2 instance, and leave
269 // ctr3 locked while we wait for the new
270 // instance to come up.
271 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
272 c.Check(pool.shutdowns, check.Equals, 0)
273 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(3)})
274 c.Check(queue.StateChanges(), check.HasLen, 0)
276 panic("test not written for quota>3")
281 // Don't flap lock/unlock when equal-priority containers compete for
284 // (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
285 // tends to choose a different one of the equal-priority containers as
286 // the "first" one that should be locked, and unlock the one it chose
287 // last time. This generates logging noise, and fails containers by
288 // reaching MaxDispatchAttempts quickly.)
289 func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
290 logger := ctxlog.TestLogger(c)
291 ctx := ctxlog.Context(context.Background(), logger)
293 ChooseType: chooseType,
296 for i := 0; i < 8; i++ {
297 queue.Containers = append(queue.Containers, arvados.Container{
298 UUID: test.ContainerUUID(i),
300 State: arvados.ContainerStateQueued,
301 RuntimeConstraints: arvados.RuntimeConstraints{
310 unalloc: map[arvados.InstanceType]int{
311 test.InstanceType(3): 2,
313 idle: map[arvados.InstanceType]int{
314 test.InstanceType(3): 2,
316 running: map[string]time.Time{},
317 creates: []arvados.InstanceType{},
321 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
322 for i := 0; i < 30; i++ {
325 time.Sleep(time.Millisecond)
327 c.Check(pool.shutdowns, check.Equals, 0)
328 c.Check(pool.starts, check.HasLen, 2)
329 unlocked := map[string]int{}
330 for _, chg := range queue.StateChanges() {
331 if chg.To == arvados.ContainerStateQueued {
335 for uuid, count := range unlocked {
336 c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
340 // Start lower-priority containers while waiting for new/existing
341 // workers to come up for higher-priority containers.
342 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
343 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
346 unalloc: map[arvados.InstanceType]int{
347 test.InstanceType(1): 2,
348 test.InstanceType(2): 2,
350 idle: map[arvados.InstanceType]int{
351 test.InstanceType(1): 1,
352 test.InstanceType(2): 1,
354 running: map[string]time.Time{},
358 ChooseType: chooseType,
359 Containers: []arvados.Container{
361 // create a new worker
362 UUID: test.ContainerUUID(1),
364 State: arvados.ContainerStateLocked,
365 RuntimeConstraints: arvados.RuntimeConstraints{
371 // tentatively map to unalloc worker
372 UUID: test.ContainerUUID(2),
374 State: arvados.ContainerStateLocked,
375 RuntimeConstraints: arvados.RuntimeConstraints{
381 // start now on idle worker
382 UUID: test.ContainerUUID(3),
384 State: arvados.ContainerStateLocked,
385 RuntimeConstraints: arvados.RuntimeConstraints{
391 // create a new worker
392 UUID: test.ContainerUUID(4),
394 State: arvados.ContainerStateLocked,
395 RuntimeConstraints: arvados.RuntimeConstraints{
401 // tentatively map to unalloc worker
402 UUID: test.ContainerUUID(5),
404 State: arvados.ContainerStateLocked,
405 RuntimeConstraints: arvados.RuntimeConstraints{
411 // start now on idle worker
412 UUID: test.ContainerUUID(6),
414 State: arvados.ContainerStateLocked,
415 RuntimeConstraints: arvados.RuntimeConstraints{
423 New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
424 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
425 c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
426 running := map[string]bool{}
427 for uuid, t := range pool.running {
429 running[uuid] = false
434 c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
437 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
438 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
441 unalloc: map[arvados.InstanceType]int{
442 test.InstanceType(2): 0,
444 idle: map[arvados.InstanceType]int{
445 test.InstanceType(2): 0,
447 running: map[string]time.Time{
448 test.ContainerUUID(2): {},
452 ChooseType: chooseType,
453 Containers: []arvados.Container{
455 // create a new worker
456 UUID: test.ContainerUUID(1),
458 State: arvados.ContainerStateLocked,
459 RuntimeConstraints: arvados.RuntimeConstraints{
467 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
468 c.Check(pool.running, check.HasLen, 1)
470 for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
472 c.Check(pool.Running(), check.HasLen, 0)
475 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
476 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
478 ChooseType: chooseType,
479 Containers: []arvados.Container{
481 UUID: test.ContainerUUID(1),
483 State: arvados.ContainerStateLocked,
484 CreatedAt: time.Now().Add(-10 * time.Second),
485 RuntimeConstraints: arvados.RuntimeConstraints{
494 // Create a pool with one unallocated (idle/booting/unknown) worker,
495 // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
496 // state, and the container will be allocated but not started yet.
498 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
500 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
504 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
505 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
506 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
508 // Create a pool without workers. The queued container will not be started, and the
509 // 'over quota' metric will be 1 because no workers are available and canCreate defaults
512 sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
516 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
517 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
518 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
520 // Reset the queue, and create a pool with an idle worker. The queued
521 // container will be started immediately and mLongestWaitTimeSinceQueue
524 ChooseType: chooseType,
525 Containers: []arvados.Container{
527 UUID: test.ContainerUUID(1),
529 State: arvados.ContainerStateLocked,
530 CreatedAt: time.Now().Add(-10 * time.Second),
531 RuntimeConstraints: arvados.RuntimeConstraints{
541 idle: map[arvados.InstanceType]int{test.InstanceType(1): 1},
542 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
543 running: map[string]time.Time{},
545 sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
549 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
552 // Assign priority=4, 3 and 1 containers to idle nodes. Ignore the supervisor at priority 2.
553 func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
554 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
556 ChooseType: chooseType,
557 Containers: []arvados.Container{
559 UUID: test.ContainerUUID(1),
561 State: arvados.ContainerStateLocked,
562 RuntimeConstraints: arvados.RuntimeConstraints{
568 UUID: test.ContainerUUID(2),
570 State: arvados.ContainerStateLocked,
571 RuntimeConstraints: arvados.RuntimeConstraints{
575 SchedulingParameters: arvados.SchedulingParameters{
580 UUID: test.ContainerUUID(3),
582 State: arvados.ContainerStateLocked,
583 RuntimeConstraints: arvados.RuntimeConstraints{
587 SchedulingParameters: arvados.SchedulingParameters{
592 UUID: test.ContainerUUID(4),
594 State: arvados.ContainerStateLocked,
595 RuntimeConstraints: arvados.RuntimeConstraints{
599 SchedulingParameters: arvados.SchedulingParameters{
608 unalloc: map[arvados.InstanceType]int{
609 test.InstanceType(1): 4,
610 test.InstanceType(2): 4,
612 idle: map[arvados.InstanceType]int{
613 test.InstanceType(1): 4,
614 test.InstanceType(2): 4,
616 running: map[string]time.Time{},
619 New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 2).runQueue()
620 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
621 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})