1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/prometheus/client_golang/prometheus/testutil"
19 check "gopkg.in/check.v1"
23 // arbitrary example container UUIDs
24 uuids = func() (r []string) {
25 for i := 0; i < 16; i++ {
26 r = append(r, test.ContainerUUID(i))
32 type stubQuotaError struct {
36 func (stubQuotaError) IsQuotaError() bool { return true }
38 type stubPool struct {
39 notify <-chan struct{}
40 unalloc map[arvados.InstanceType]int // idle+booting+unknown
41 idle map[arvados.InstanceType]int
42 unknown map[arvados.InstanceType]int
43 running map[string]time.Time
46 creates []arvados.InstanceType
52 func (p *stubPool) AtQuota() bool {
55 return len(p.unalloc)+len(p.running)+len(p.unknown) >= p.quota
57 func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
58 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
59 func (p *stubPool) Running() map[string]time.Time {
62 r := map[string]time.Time{}
63 for k, v := range p.running {
68 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
71 r := map[arvados.InstanceType]int{}
72 for it, n := range p.unalloc {
73 r[it] = n - p.unknown[it]
77 func (p *stubPool) Create(it arvados.InstanceType) bool {
80 p.creates = append(p.creates, it)
88 func (p *stubPool) ForgetContainer(uuid string) {
90 func (p *stubPool) KillContainer(uuid, reason string) bool {
93 defer delete(p.running, uuid)
94 t, ok := p.running[uuid]
95 return ok && t.IsZero()
97 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
101 func (p *stubPool) CountWorkers() map[worker.State]int {
104 return map[worker.State]int{
105 worker.StateBooting: len(p.unalloc) - len(p.idle),
106 worker.StateIdle: len(p.idle),
107 worker.StateRunning: len(p.running),
108 worker.StateUnknown: len(p.unknown),
111 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
114 p.starts = append(p.starts, ctr.UUID)
120 p.running[ctr.UUID] = time.Time{}
124 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
125 return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
128 var _ = check.Suite(&SchedulerSuite{})
130 type SchedulerSuite struct{}
132 // Assign priority=4 container to idle node. Create new instances for
133 // the priority=3, 2, 1 containers.
134 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
135 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
137 ChooseType: chooseType,
138 Containers: []arvados.Container{
140 UUID: test.ContainerUUID(1),
142 State: arvados.ContainerStateLocked,
143 RuntimeConstraints: arvados.RuntimeConstraints{
149 UUID: test.ContainerUUID(2),
151 State: arvados.ContainerStateLocked,
152 RuntimeConstraints: arvados.RuntimeConstraints{
158 UUID: test.ContainerUUID(3),
160 State: arvados.ContainerStateLocked,
161 RuntimeConstraints: arvados.RuntimeConstraints{
167 UUID: test.ContainerUUID(4),
169 State: arvados.ContainerStateLocked,
170 RuntimeConstraints: arvados.RuntimeConstraints{
180 unalloc: map[arvados.InstanceType]int{
181 test.InstanceType(1): 1,
182 test.InstanceType(2): 2,
184 idle: map[arvados.InstanceType]int{
185 test.InstanceType(1): 1,
186 test.InstanceType(2): 2,
188 running: map[string]time.Time{},
191 New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
192 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
193 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
194 c.Check(pool.running, check.HasLen, 1)
195 for uuid := range pool.running {
196 c.Check(uuid, check.Equals, uuids[4])
200 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
202 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
203 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
204 for quota := 1; quota < 3; quota++ {
205 c.Logf("quota=%d", quota)
206 shouldCreate := []arvados.InstanceType{}
207 for i := 1; i < quota; i++ {
208 shouldCreate = append(shouldCreate, test.InstanceType(3))
211 ChooseType: chooseType,
212 Containers: []arvados.Container{
214 UUID: test.ContainerUUID(2),
216 State: arvados.ContainerStateLocked,
217 RuntimeConstraints: arvados.RuntimeConstraints{
223 UUID: test.ContainerUUID(3),
225 State: arvados.ContainerStateLocked,
226 RuntimeConstraints: arvados.RuntimeConstraints{
236 unalloc: map[arvados.InstanceType]int{
237 test.InstanceType(2): 2,
239 idle: map[arvados.InstanceType]int{
240 test.InstanceType(2): 2,
242 running: map[string]time.Time{},
243 creates: []arvados.InstanceType{},
247 New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
248 c.Check(pool.creates, check.DeepEquals, shouldCreate)
249 if len(shouldCreate) == 0 {
250 c.Check(pool.starts, check.DeepEquals, []string{})
251 c.Check(pool.shutdowns, check.Not(check.Equals), 0)
253 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
254 c.Check(pool.shutdowns, check.Equals, 0)
259 // Start lower-priority containers while waiting for new/existing
260 // workers to come up for higher-priority containers.
261 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
262 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
265 unalloc: map[arvados.InstanceType]int{
266 test.InstanceType(1): 2,
267 test.InstanceType(2): 2,
269 idle: map[arvados.InstanceType]int{
270 test.InstanceType(1): 1,
271 test.InstanceType(2): 1,
273 running: map[string]time.Time{},
277 ChooseType: chooseType,
278 Containers: []arvados.Container{
280 // create a new worker
281 UUID: test.ContainerUUID(1),
283 State: arvados.ContainerStateLocked,
284 RuntimeConstraints: arvados.RuntimeConstraints{
290 // tentatively map to unalloc worker
291 UUID: test.ContainerUUID(2),
293 State: arvados.ContainerStateLocked,
294 RuntimeConstraints: arvados.RuntimeConstraints{
300 // start now on idle worker
301 UUID: test.ContainerUUID(3),
303 State: arvados.ContainerStateLocked,
304 RuntimeConstraints: arvados.RuntimeConstraints{
310 // create a new worker
311 UUID: test.ContainerUUID(4),
313 State: arvados.ContainerStateLocked,
314 RuntimeConstraints: arvados.RuntimeConstraints{
320 // tentatively map to unalloc worker
321 UUID: test.ContainerUUID(5),
323 State: arvados.ContainerStateLocked,
324 RuntimeConstraints: arvados.RuntimeConstraints{
330 // start now on idle worker
331 UUID: test.ContainerUUID(6),
333 State: arvados.ContainerStateLocked,
334 RuntimeConstraints: arvados.RuntimeConstraints{
342 New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
343 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
344 c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
345 running := map[string]bool{}
346 for uuid, t := range pool.running {
348 running[uuid] = false
353 c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
356 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
357 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
360 unalloc: map[arvados.InstanceType]int{
361 test.InstanceType(2): 0,
363 idle: map[arvados.InstanceType]int{
364 test.InstanceType(2): 0,
366 running: map[string]time.Time{
367 test.ContainerUUID(2): {},
371 ChooseType: chooseType,
372 Containers: []arvados.Container{
374 // create a new worker
375 UUID: test.ContainerUUID(1),
377 State: arvados.ContainerStateLocked,
378 RuntimeConstraints: arvados.RuntimeConstraints{
386 sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
387 c.Check(pool.running, check.HasLen, 1)
389 for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
391 c.Check(pool.Running(), check.HasLen, 0)
394 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
395 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
397 ChooseType: chooseType,
398 Containers: []arvados.Container{
400 UUID: test.ContainerUUID(1),
402 State: arvados.ContainerStateLocked,
403 CreatedAt: time.Now().Add(-10 * time.Second),
404 RuntimeConstraints: arvados.RuntimeConstraints{
413 // Create a pool with one unallocated (idle/booting/unknown) worker,
414 // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
415 // state, and the container will be allocated but not started yet.
417 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
419 sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
423 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
424 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
425 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
427 // Create a pool without workers. The queued container will not be started, and the
428 // 'over quota' metric will be 1 because no workers are available and canCreate defaults
431 sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
435 c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
436 c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
437 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
439 // Reset the queue, and create a pool with an idle worker. The queued
440 // container will be started immediately and mLongestWaitTimeSinceQueue
443 ChooseType: chooseType,
444 Containers: []arvados.Container{
446 UUID: test.ContainerUUID(1),
448 State: arvados.ContainerStateLocked,
449 CreatedAt: time.Now().Add(-10 * time.Second),
450 RuntimeConstraints: arvados.RuntimeConstraints{
460 idle: map[arvados.InstanceType]int{test.InstanceType(1): 1},
461 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
462 running: map[string]time.Time{},
464 sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
468 c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)