X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e8f99cfef7cfbfcf1a1485d69250f24ced3fd609..cd020c016106fbe844501c5f434c16f4def4e08d:/lib/dispatchcloud/scheduler/run_queue_test.go diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go index aff0e227a8..9f26877f55 100644 --- a/lib/dispatchcloud/scheduler/run_queue_test.go +++ b/lib/dispatchcloud/scheduler/run_queue_test.go @@ -5,19 +5,18 @@ package scheduler import ( - "errors" + "context" + "sync" "time" "git.curoverse.com/arvados.git/lib/dispatchcloud/test" "git.curoverse.com/arvados.git/lib/dispatchcloud/worker" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" check "gopkg.in/check.v1" ) var ( - logger = logrus.StandardLogger() - // arbitrary example container UUIDs uuids = func() (r []string) { for i := 0; i < 16; i++ { @@ -43,36 +42,53 @@ type stubPool struct { creates []arvados.InstanceType starts []string shutdowns int + sync.Mutex } -func (p *stubPool) AtQuota() bool { return p.atQuota } -func (p *stubPool) Subscribe() <-chan struct{} { return p.notify } -func (p *stubPool) Unsubscribe(<-chan struct{}) {} -func (p *stubPool) Running() map[string]time.Time { return p.running } +func (p *stubPool) AtQuota() bool { return p.atQuota } +func (p *stubPool) Subscribe() <-chan struct{} { return p.notify } +func (p *stubPool) Unsubscribe(<-chan struct{}) {} +func (p *stubPool) Running() map[string]time.Time { + p.Lock() + defer p.Unlock() + r := map[string]time.Time{} + for k, v := range p.running { + r[k] = v + } + return r +} func (p *stubPool) Unallocated() map[arvados.InstanceType]int { + p.Lock() + defer p.Unlock() r := map[arvados.InstanceType]int{} for it, n := range p.unalloc { r[it] = n } return r } -func (p *stubPool) Create(it arvados.InstanceType) error { +func (p *stubPool) Create(it arvados.InstanceType) bool { + p.Lock() + defer p.Unlock() p.creates = append(p.creates, it) if p.canCreate < 1 { - return stubQuotaError{errors.New("quota")} + return false } p.canCreate-- p.unalloc[it]++ - return nil + return true } func (p *stubPool) KillContainer(uuid string) { - p.running[uuid] = time.Now() + p.Lock() + defer p.Unlock() + delete(p.running, uuid) } func (p *stubPool) Shutdown(arvados.InstanceType) bool { p.shutdowns++ return false } func (p *stubPool) CountWorkers() map[worker.State]int { + p.Lock() + defer p.Unlock() return map[worker.State]int{ worker.StateBooting: len(p.unalloc) - len(p.idle), worker.StateIdle: len(p.idle), @@ -80,6 +96,8 @@ func (p *stubPool) CountWorkers() map[worker.State]int { } } func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool { + p.Lock() + defer p.Unlock() p.starts = append(p.starts, ctr.UUID) if p.idle[it] == 0 { return false @@ -90,6 +108,10 @@ func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container return true } +func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) { + return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil +} + var _ = check.Suite(&SchedulerSuite{}) type SchedulerSuite struct{} @@ -100,10 +122,9 @@ type SchedulerSuite struct{} // immediately. Don't try to create any other nodes after the failed // create. func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { + ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c)) queue := test.Queue{ - ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { - return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil - }, + ChooseType: chooseType, Containers: []arvados.Container{ { UUID: test.ContainerUUID(1), @@ -156,7 +177,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { running: map[string]time.Time{}, canCreate: 0, } - New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() + New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)}) c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)}) c.Check(pool.running, check.HasLen, 1) @@ -165,27 +186,35 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { } } -// Shutdown some nodes if Create() fails -- and without even calling -// Create(), if AtQuota() is true. +// If Create() fails, shutdown some nodes, and don't call Create() +// again. Don't call Create() at all if AtQuota() is true. func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) { + ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c)) for quota := 0; quota < 2; quota++ { c.Logf("quota=%d", quota) shouldCreate := []arvados.InstanceType{} for i := 0; i < quota; i++ { - shouldCreate = append(shouldCreate, test.InstanceType(1)) + shouldCreate = append(shouldCreate, test.InstanceType(3)) } queue := test.Queue{ - ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { - return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil - }, + ChooseType: chooseType, Containers: []arvados.Container{ { - UUID: test.ContainerUUID(1), - Priority: 1, + UUID: test.ContainerUUID(2), + Priority: 2, State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ - VCPUs: 1, - RAM: 1 << 30, + VCPUs: 2, + RAM: 2 << 30, + }, + }, + { + UUID: test.ContainerUUID(3), + Priority: 3, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 3, + RAM: 3 << 30, }, }, }, @@ -204,7 +233,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) { starts: []string{}, canCreate: 0, } - New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() + New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() c.Check(pool.creates, check.DeepEquals, shouldCreate) c.Check(pool.starts, check.DeepEquals, []string{}) c.Check(pool.shutdowns, check.Not(check.Equals), 0) @@ -214,10 +243,11 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) { // Start lower-priority containers while waiting for new/existing // workers to come up for higher-priority containers. func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { + ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c)) pool := stubPool{ unalloc: map[arvados.InstanceType]int{ - test.InstanceType(1): 1, - test.InstanceType(2): 1, + test.InstanceType(1): 2, + test.InstanceType(2): 2, }, idle: map[arvados.InstanceType]int{ test.InstanceType(1): 1, @@ -227,9 +257,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { canCreate: 4, } queue := test.Queue{ - ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { - return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil - }, + ChooseType: chooseType, Containers: []arvados.Container{ { // create a new worker @@ -294,7 +322,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { }, } queue.Update() - New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() + New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)}) c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]}) running := map[string]bool{} @@ -307,3 +335,40 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { } c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false}) } + +func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) { + ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c)) + pool := stubPool{ + unalloc: map[arvados.InstanceType]int{ + test.InstanceType(2): 0, + }, + idle: map[arvados.InstanceType]int{ + test.InstanceType(2): 0, + }, + running: map[string]time.Time{ + test.ContainerUUID(2): time.Time{}, + }, + } + queue := test.Queue{ + ChooseType: chooseType, + Containers: []arvados.Container{ + { + // create a new worker + UUID: test.ContainerUUID(1), + Priority: 1, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, + }, + }, + } + queue.Update() + sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond) + c.Check(pool.running, check.HasLen, 1) + sch.sync() + for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) { + } + c.Check(pool.Running(), check.HasLen, 0) +}