1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/prometheus/client_golang/prometheus/testutil"
19 check "gopkg.in/check.v1"
23 // arbitrary example container UUIDs
24 uuids = func() (r []string) {
25 for i := 0; i < 16; i++ {
26 r = append(r, test.ContainerUUID(i))
32 type stubQuotaError struct {
36 func (stubQuotaError) IsQuotaError() bool { return true }
38 type stubPool struct {
39 notify <-chan struct{}
40 unalloc map[arvados.InstanceType]int // idle+booting+unknown
41 idle map[arvados.InstanceType]int
42 unknown map[arvados.InstanceType]int
43 running map[string]time.Time
46 creates []arvados.InstanceType
52 func (p *stubPool) AtQuota() bool {
55 return len(p.unalloc)+len(p.running)+len(p.unknown) >= p.quota
57 func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
58 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
59 func (p *stubPool) Running() map[string]time.Time {
62 r := map[string]time.Time{}
63 for k, v := range p.running {
68 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
71 r := map[arvados.InstanceType]int{}
72 for it, n := range p.unalloc {
73 r[it] = n - p.unknown[it]
77 func (p *stubPool) Create(it arvados.InstanceType) bool {
80 p.creates = append(p.creates, it)
88 func (p *stubPool) ForgetContainer(uuid string) {
90 func (p *stubPool) KillContainer(uuid, reason string) bool {
93 defer delete(p.running, uuid)
94 t, ok := p.running[uuid]
95 return ok && t.IsZero()
97 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
101 func (p *stubPool) CountWorkers() map[worker.State]int {
104 return map[worker.State]int{
105 worker.StateBooting: len(p.unalloc) - len(p.idle),
106 worker.StateIdle: len(p.idle),
107 worker.StateRunning: len(p.running),
108 worker.StateUnknown: len(p.unknown),
111 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
114 p.starts = append(p.starts, ctr.UUID)
120 p.running[ctr.UUID] = time.Time{}
124 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
125 return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
128 var _ = check.Suite(&SchedulerSuite{})
130 type SchedulerSuite struct{}
132 // Assign priority=4 container to idle node. Create new instances for
133 // the priority=3, 2, 1 containers.
134 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
135 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
137 ChooseType: chooseType,
138 Containers: []arvados.Container{
140 UUID: test.ContainerUUID(1),
142 State: arvados.ContainerStateLocked,
143 RuntimeConstraints: arvados.RuntimeConstraints{
149 UUID: test.ContainerUUID(2),
151 State: arvados.ContainerStateLocked,
152 RuntimeConstraints: arvados.RuntimeConstraints{
158 UUID: test.ContainerUUID(3),
160 State: arvados.ContainerStateLocked,
161 RuntimeConstraints: arvados.RuntimeConstraints{
167 UUID: test.ContainerUUID(4),
169 State: arvados.ContainerStateLocked,
170 RuntimeConstraints: arvados.RuntimeConstraints{
180 unalloc: map[arvados.InstanceType]int{
181 test.InstanceType(1): 1,
182 test.InstanceType(2): 2,
184 idle: map[arvados.InstanceType]int{
185 test.InstanceType(1): 1,
186 test.InstanceType(2): 2,
188 running: map[string]time.Time{},
191 New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
192 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
193 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
194 c.Check(pool.running, check.HasLen, 1)
195 for uuid := range pool.running {
196 c.Check(uuid, check.Equals, uuids[4])
200 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
202 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
203 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
204 for quota := 1; quota < 3; quota++ {
205 c.Logf("quota=%d", quota)
206 shouldCreate := []arvados.InstanceType{}
207 for i := 1; i < quota; i++ {
208 shouldCreate = append(shouldCreate, test.InstanceType(3))
211 ChooseType: chooseType,
212 Containers: []arvados.Container{
214 UUID: test.ContainerUUID(2),
216 State: arvados.ContainerStateLocked,
217 RuntimeConstraints: arvados.RuntimeConstraints{
223 UUID: test.ContainerUUID(3),
225 State: arvados.ContainerStateLocked,
226 RuntimeConstraints: arvados.RuntimeConstraints{
236 unalloc: map[arvados.InstanceType]int{
237 test.InstanceType(2): 2,
239 idle: map[arvados.InstanceType]int{
240 test.InstanceType(2): 2,
242 running: map[string]time.Time{},
243 creates: []arvados.InstanceType{},
247 sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
252 c.Check(pool.creates, check.DeepEquals, shouldCreate)
253 if len(shouldCreate) == 0 {
254 c.Check(pool.starts, check.DeepEquals, []string{})
256 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
258 c.Check(pool.shutdowns, check.Equals, 3-quota)
259 c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
260 {UUID: "zzzzz-dz642-000000000000003", From: "Locked", To: "Queued"},
261 {UUID: "zzzzz-dz642-000000000000002", From: "Locked", To: "Queued"},
266 // Don't flap lock/unlock when equal-priority containers compete for
269 // (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
270 // tends to choose a different one of the equal-priority containers as
271 // the "first" one that should be locked, and unlock the one it chose
272 // last time. This generates logging noise, and fails containers by
273 // reaching MaxDispatchAttempts quickly.)
274 func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
275 logger := ctxlog.TestLogger(c)
276 ctx := ctxlog.Context(context.Background(), logger)
278 ChooseType: chooseType,
281 for i := 0; i < 8; i++ {
282 queue.Containers = append(queue.Containers, arvados.Container{
283 UUID: test.ContainerUUID(i),
285 State: arvados.ContainerStateQueued,
286 RuntimeConstraints: arvados.RuntimeConstraints{
295 unalloc: map[arvados.InstanceType]int{
296 test.InstanceType(3): 1,
298 idle: map[arvados.InstanceType]int{
299 test.InstanceType(3): 1,
301 running: map[string]time.Time{},
302 creates: []arvados.InstanceType{},
306 sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
307 for i := 0; i < 30; i++ {
310 time.Sleep(time.Millisecond)
312 c.Check(pool.shutdowns, check.Equals, 0)
313 c.Check(pool.starts, check.HasLen, 1)
314 unlocked := map[string]int{}
315 for _, chg := range queue.StateChanges() {
316 if chg.To == arvados.ContainerStateQueued {
320 for uuid, count := range unlocked {
321 c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
325 // Start lower-priority containers while waiting for new/existing
326 // workers to come up for higher-priority containers.
327 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
328 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
331 unalloc: map[arvados.InstanceType]int{
332 test.InstanceType(1): 2,
333 test.InstanceType(2): 2,
335 idle: map[arvados.InstanceType]int{
336 test.InstanceType(1): 1,
337 test.InstanceType(2): 1,
339 running: map[string]time.Time{},
343 ChooseType: chooseType,
344 Containers: []arvados.Container{
346 // create a new worker
347 UUID: test.ContainerUUID(1),
349 State: arvados.ContainerStateLocked,
350 RuntimeConstraints: arvados.RuntimeConstraints{
356 // tentatively map to unalloc worker
357 UUID: test.ContainerUUID(2),
359 State: arvados.ContainerStateLocked,
360 RuntimeConstraints: arvados.RuntimeConstraints{
366 // start now on idle worker
367 UUID: test.ContainerUUID(3),
369 State: arvados.ContainerStateLocked,
370 RuntimeConstraints: arvados.RuntimeConstraints{
376 // create a new worker
377 UUID: test.ContainerUUID(4),
379 State: arvados.ContainerStateLocked,
380 RuntimeConstraints: arvados.RuntimeConstraints{
386 // tentatively map to unalloc worker
387 UUID: test.ContainerUUID(5),
389 State: arvados.ContainerStateLocked,
390 RuntimeConstraints: arvados.RuntimeConstraints{
396 // start now on idle worker
397 UUID: test.ContainerUUID(6),
399 State: arvados.ContainerStateLocked,
400 RuntimeConstraints: arvados.RuntimeConstraints{
408 New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
409 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
410 c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
411 running := map[string]bool{}
412 for uuid, t := range pool.running {
414 running[uuid] = false
419 c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
422 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
423 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
426 unalloc: map[arvados.InstanceType]int{
427 test.InstanceType(2): 0,
429 idle: map[arvados.InstanceType]int{
430 test.InstanceType(2): 0,
432 running: map[string]time.Time{
433 test.ContainerUUID(2): {},
437 ChooseType: chooseType,
438 Containers: []arvados.Container{
440 // create a new worker
441 UUID: test.ContainerUUID(1),
443 State: arvados.ContainerStateLocked,
444 RuntimeConstraints: arvados.RuntimeConstraints{
452 sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
453 c.Check(pool.running, check.HasLen, 1)
455 for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
457 c.Check(pool.Running(), check.HasLen, 0)
460 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
461 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
463 ChooseType: chooseType,
464 Containers: []arvados.Container{
466 UUID: test.ContainerUUID(1),
468 State: arvados.ContainerStateLocked,
469 CreatedAt: time.Now().Add(-10 * time.Second),
470 RuntimeConstraints: arvados.RuntimeConstraints{
479 // Create a pool with one unallocated (idle/booting/unknown) worker,
480 // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
481 // state, and the container will be allocated but not started yet.
483 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
485 sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
489 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
490 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
491 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
493 // Create a pool without workers. The queued container will not be started, and the
494 // 'over quota' metric will be 1 because no workers are available and canCreate defaults
497 sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
501 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
502 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
503 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
505 // Reset the queue, and create a pool with an idle worker. The queued
506 // container will be started immediately and mLongestWaitTimeSinceQueue
509 ChooseType: chooseType,
510 Containers: []arvados.Container{
512 UUID: test.ContainerUUID(1),
514 State: arvados.ContainerStateLocked,
515 CreatedAt: time.Now().Add(-10 * time.Second),
516 RuntimeConstraints: arvados.RuntimeConstraints{
526 idle: map[arvados.InstanceType]int{test.InstanceType(1): 1},
527 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
528 running: map[string]time.Time{},
530 sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
534 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)