1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 check "gopkg.in/check.v1"
20 // arbitrary example container UUIDs
21 uuids = func() (r []string) {
22 for i := 0; i < 16; i++ {
23 r = append(r, test.ContainerUUID(i))
29 type stubQuotaError struct {
33 func (stubQuotaError) IsQuotaError() bool { return true }
35 type stubPool struct {
36 notify <-chan struct{}
37 unalloc map[arvados.InstanceType]int // idle+booting+unknown
38 idle map[arvados.InstanceType]int
39 running map[string]time.Time
42 creates []arvados.InstanceType
48 func (p *stubPool) AtQuota() bool { return p.atQuota }
49 func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
50 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
51 func (p *stubPool) Running() map[string]time.Time {
54 r := map[string]time.Time{}
55 for k, v := range p.running {
60 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
63 r := map[arvados.InstanceType]int{}
64 for it, n := range p.unalloc {
69 func (p *stubPool) Create(it arvados.InstanceType) bool {
72 p.creates = append(p.creates, it)
80 func (p *stubPool) ForgetContainer(uuid string) {
82 func (p *stubPool) KillContainer(uuid, reason string) bool {
85 delete(p.running, uuid)
88 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
92 func (p *stubPool) CountWorkers() map[worker.State]int {
95 return map[worker.State]int{
96 worker.StateBooting: len(p.unalloc) - len(p.idle),
97 worker.StateIdle: len(p.idle),
98 worker.StateRunning: len(p.running),
101 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
104 p.starts = append(p.starts, ctr.UUID)
110 p.running[ctr.UUID] = time.Time{}
114 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
115 return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
118 var _ = check.Suite(&SchedulerSuite{})
120 type SchedulerSuite struct{}
122 // Assign priority=4 container to idle node. Create a new instance for
123 // the priority=3 container. Don't try to start any priority<3
124 // containers because priority=3 container didn't start
125 // immediately. Don't try to create any other nodes after the failed
127 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
128 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
130 ChooseType: chooseType,
131 Containers: []arvados.Container{
133 UUID: test.ContainerUUID(1),
135 State: arvados.ContainerStateLocked,
136 RuntimeConstraints: arvados.RuntimeConstraints{
142 UUID: test.ContainerUUID(2),
144 State: arvados.ContainerStateLocked,
145 RuntimeConstraints: arvados.RuntimeConstraints{
151 UUID: test.ContainerUUID(3),
153 State: arvados.ContainerStateLocked,
154 RuntimeConstraints: arvados.RuntimeConstraints{
160 UUID: test.ContainerUUID(4),
162 State: arvados.ContainerStateLocked,
163 RuntimeConstraints: arvados.RuntimeConstraints{
172 unalloc: map[arvados.InstanceType]int{
173 test.InstanceType(1): 1,
174 test.InstanceType(2): 2,
176 idle: map[arvados.InstanceType]int{
177 test.InstanceType(1): 1,
178 test.InstanceType(2): 2,
180 running: map[string]time.Time{},
183 New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
184 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
185 c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
186 c.Check(pool.running, check.HasLen, 1)
187 for uuid := range pool.running {
188 c.Check(uuid, check.Equals, uuids[4])
192 // If Create() fails, shutdown some nodes, and don't call Create()
193 // again. Don't call Create() at all if AtQuota() is true.
194 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
195 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
196 for quota := 0; quota < 2; quota++ {
197 c.Logf("quota=%d", quota)
198 shouldCreate := []arvados.InstanceType{}
199 for i := 0; i < quota; i++ {
200 shouldCreate = append(shouldCreate, test.InstanceType(3))
203 ChooseType: chooseType,
204 Containers: []arvados.Container{
206 UUID: test.ContainerUUID(2),
208 State: arvados.ContainerStateLocked,
209 RuntimeConstraints: arvados.RuntimeConstraints{
215 UUID: test.ContainerUUID(3),
217 State: arvados.ContainerStateLocked,
218 RuntimeConstraints: arvados.RuntimeConstraints{
228 unalloc: map[arvados.InstanceType]int{
229 test.InstanceType(2): 2,
231 idle: map[arvados.InstanceType]int{
232 test.InstanceType(2): 2,
234 running: map[string]time.Time{},
235 creates: []arvados.InstanceType{},
239 New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
240 c.Check(pool.creates, check.DeepEquals, shouldCreate)
241 c.Check(pool.starts, check.DeepEquals, []string{})
242 c.Check(pool.shutdowns, check.Not(check.Equals), 0)
246 // Start lower-priority containers while waiting for new/existing
247 // workers to come up for higher-priority containers.
248 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
249 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
251 unalloc: map[arvados.InstanceType]int{
252 test.InstanceType(1): 2,
253 test.InstanceType(2): 2,
255 idle: map[arvados.InstanceType]int{
256 test.InstanceType(1): 1,
257 test.InstanceType(2): 1,
259 running: map[string]time.Time{},
263 ChooseType: chooseType,
264 Containers: []arvados.Container{
266 // create a new worker
267 UUID: test.ContainerUUID(1),
269 State: arvados.ContainerStateLocked,
270 RuntimeConstraints: arvados.RuntimeConstraints{
276 // tentatively map to unalloc worker
277 UUID: test.ContainerUUID(2),
279 State: arvados.ContainerStateLocked,
280 RuntimeConstraints: arvados.RuntimeConstraints{
286 // start now on idle worker
287 UUID: test.ContainerUUID(3),
289 State: arvados.ContainerStateLocked,
290 RuntimeConstraints: arvados.RuntimeConstraints{
296 // create a new worker
297 UUID: test.ContainerUUID(4),
299 State: arvados.ContainerStateLocked,
300 RuntimeConstraints: arvados.RuntimeConstraints{
306 // tentatively map to unalloc worker
307 UUID: test.ContainerUUID(5),
309 State: arvados.ContainerStateLocked,
310 RuntimeConstraints: arvados.RuntimeConstraints{
316 // start now on idle worker
317 UUID: test.ContainerUUID(6),
319 State: arvados.ContainerStateLocked,
320 RuntimeConstraints: arvados.RuntimeConstraints{
328 New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
329 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
330 c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
331 running := map[string]bool{}
332 for uuid, t := range pool.running {
334 running[uuid] = false
339 c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
342 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
343 ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
345 unalloc: map[arvados.InstanceType]int{
346 test.InstanceType(2): 0,
348 idle: map[arvados.InstanceType]int{
349 test.InstanceType(2): 0,
351 running: map[string]time.Time{
352 test.ContainerUUID(2): time.Time{},
356 ChooseType: chooseType,
357 Containers: []arvados.Container{
359 // create a new worker
360 UUID: test.ContainerUUID(1),
362 State: arvados.ContainerStateLocked,
363 RuntimeConstraints: arvados.RuntimeConstraints{
371 sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
372 c.Check(pool.running, check.HasLen, 1)
374 for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
376 c.Check(pool.Running(), check.HasLen, 0)