1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 check "gopkg.in/check.v1"
20 // arbitrary example container UUIDs
21 uuids = func() (r []string) {
22 for i := 0; i < 16; i++ {
23 r = append(r, test.ContainerUUID(i))
29 type stubQuotaError struct {
33 func (stubQuotaError) IsQuotaError() bool { return true }
35 type stubPool struct {
36 notify <-chan struct{}
37 unalloc map[arvados.InstanceType]int // idle+booting+unknown
38 idle map[arvados.InstanceType]int
39 unknown map[arvados.InstanceType]int
40 running map[string]time.Time
43 creates []arvados.InstanceType
49 func (p *stubPool) AtQuota() bool {
52 return len(p.unalloc)+len(p.running)+len(p.unknown) >= p.quota
54 func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
55 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
56 func (p *stubPool) Running() map[string]time.Time {
59 r := map[string]time.Time{}
60 for k, v := range p.running {
65 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
68 r := map[arvados.InstanceType]int{}
69 for it, n := range p.unalloc {
70 r[it] = n - p.unknown[it]
74 func (p *stubPool) Create(it arvados.InstanceType) bool {
77 p.creates = append(p.creates, it)
85 func (p *stubPool) ForgetContainer(uuid string) {
87 func (p *stubPool) KillContainer(uuid, reason string) bool {
90 defer delete(p.running, uuid)
91 t, ok := p.running[uuid]
92 return ok && t.IsZero()
94 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
98 func (p *stubPool) CountWorkers() map[worker.State]int {
101 return map[worker.State]int{
102 worker.StateBooting: len(p.unalloc) - len(p.idle),
103 worker.StateIdle: len(p.idle),
104 worker.StateRunning: len(p.running),
105 worker.StateUnknown: len(p.unknown),
108 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
111 p.starts = append(p.starts, ctr.UUID)
117 p.running[ctr.UUID] = time.Time{}
121 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
122 return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
125 var _ = check.Suite(&SchedulerSuite{})
127 type SchedulerSuite struct{}
129 // Assign priority=4 container to idle node. Create new instances for
130 // the priority=3, 2, 1 containers.
131 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
132 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
134 ChooseType: chooseType,
135 Containers: []arvados.Container{
137 UUID: test.ContainerUUID(1),
139 State: arvados.ContainerStateLocked,
140 RuntimeConstraints: arvados.RuntimeConstraints{
146 UUID: test.ContainerUUID(2),
148 State: arvados.ContainerStateLocked,
149 RuntimeConstraints: arvados.RuntimeConstraints{
155 UUID: test.ContainerUUID(3),
157 State: arvados.ContainerStateLocked,
158 RuntimeConstraints: arvados.RuntimeConstraints{
164 UUID: test.ContainerUUID(4),
166 State: arvados.ContainerStateLocked,
167 RuntimeConstraints: arvados.RuntimeConstraints{
177 unalloc: map[arvados.InstanceType]int{
178 test.InstanceType(1): 1,
179 test.InstanceType(2): 2,
181 idle: map[arvados.InstanceType]int{
182 test.InstanceType(1): 1,
183 test.InstanceType(2): 2,
185 running: map[string]time.Time{},
188 New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
189 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
190 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
191 c.Check(pool.running, check.HasLen, 1)
192 for uuid := range pool.running {
193 c.Check(uuid, check.Equals, uuids[4])
197 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
199 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
200 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
201 for quota := 1; quota < 3; quota++ {
202 c.Logf("quota=%d", quota)
203 shouldCreate := []arvados.InstanceType{}
204 for i := 1; i < quota; i++ {
205 shouldCreate = append(shouldCreate, test.InstanceType(3))
208 ChooseType: chooseType,
209 Containers: []arvados.Container{
211 UUID: test.ContainerUUID(2),
213 State: arvados.ContainerStateLocked,
214 RuntimeConstraints: arvados.RuntimeConstraints{
220 UUID: test.ContainerUUID(3),
222 State: arvados.ContainerStateLocked,
223 RuntimeConstraints: arvados.RuntimeConstraints{
233 unalloc: map[arvados.InstanceType]int{
234 test.InstanceType(2): 2,
236 idle: map[arvados.InstanceType]int{
237 test.InstanceType(2): 2,
239 running: map[string]time.Time{},
240 creates: []arvados.InstanceType{},
244 New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
245 c.Check(pool.creates, check.DeepEquals, shouldCreate)
246 if len(shouldCreate) == 0 {
247 c.Check(pool.starts, check.DeepEquals, []string{})
248 c.Check(pool.shutdowns, check.Not(check.Equals), 0)
250 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
251 c.Check(pool.shutdowns, check.Equals, 0)
256 // Start lower-priority containers while waiting for new/existing
257 // workers to come up for higher-priority containers.
258 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
259 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
262 unalloc: map[arvados.InstanceType]int{
263 test.InstanceType(1): 2,
264 test.InstanceType(2): 2,
266 idle: map[arvados.InstanceType]int{
267 test.InstanceType(1): 1,
268 test.InstanceType(2): 1,
270 running: map[string]time.Time{},
274 ChooseType: chooseType,
275 Containers: []arvados.Container{
277 // create a new worker
278 UUID: test.ContainerUUID(1),
280 State: arvados.ContainerStateLocked,
281 RuntimeConstraints: arvados.RuntimeConstraints{
287 // tentatively map to unalloc worker
288 UUID: test.ContainerUUID(2),
290 State: arvados.ContainerStateLocked,
291 RuntimeConstraints: arvados.RuntimeConstraints{
297 // start now on idle worker
298 UUID: test.ContainerUUID(3),
300 State: arvados.ContainerStateLocked,
301 RuntimeConstraints: arvados.RuntimeConstraints{
307 // create a new worker
308 UUID: test.ContainerUUID(4),
310 State: arvados.ContainerStateLocked,
311 RuntimeConstraints: arvados.RuntimeConstraints{
317 // tentatively map to unalloc worker
318 UUID: test.ContainerUUID(5),
320 State: arvados.ContainerStateLocked,
321 RuntimeConstraints: arvados.RuntimeConstraints{
327 // start now on idle worker
328 UUID: test.ContainerUUID(6),
330 State: arvados.ContainerStateLocked,
331 RuntimeConstraints: arvados.RuntimeConstraints{
339 New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
340 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
341 c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
342 running := map[string]bool{}
343 for uuid, t := range pool.running {
345 running[uuid] = false
350 c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
353 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
354 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
357 unalloc: map[arvados.InstanceType]int{
358 test.InstanceType(2): 0,
360 idle: map[arvados.InstanceType]int{
361 test.InstanceType(2): 0,
363 running: map[string]time.Time{
364 test.ContainerUUID(2): {},
368 ChooseType: chooseType,
369 Containers: []arvados.Container{
371 // create a new worker
372 UUID: test.ContainerUUID(1),
374 State: arvados.ContainerStateLocked,
375 RuntimeConstraints: arvados.RuntimeConstraints{
383 sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
384 c.Check(pool.running, check.HasLen, 1)
386 for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
388 c.Check(pool.Running(), check.HasLen, 0)