X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/53843c61d600a0911e9f4f512b0d5c34b18f9d33..8685bdc41012f1623cc02b573e27439fdf314799:/lib/dispatchcloud/worker/pool_test.go diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go index fc33a7ab23..0f5c5ee196 100644 --- a/lib/dispatchcloud/worker/pool_test.go +++ b/lib/dispatchcloud/worker/pool_test.go @@ -9,10 +9,10 @@ import ( "strings" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/lib/dispatchcloud/test" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/dispatchcloud/test" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/prometheus/client_golang/prometheus" check "gopkg.in/check.v1" ) @@ -65,24 +65,29 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { logger := ctxlog.TestLogger(c) driver := &test.StubDriver{} - is, err := driver.InstanceSet(nil, "", logger) + instanceSetID := cloud.InstanceSetID("test-instance-set-id") + is, err := driver.InstanceSet(nil, instanceSetID, nil, logger) c.Assert(err, check.IsNil) newExecutor := func(cloud.Instance) Executor { - return stubExecutor{ - "crunch-run --list": stubResp{}, - "true": stubResp{}, + return &stubExecutor{ + response: map[string]stubResp{ + "crunch-run-custom --list": {}, + "true": {}, + }, } } cluster := &arvados.Cluster{ - Dispatch: arvados.Dispatch{ - MaxProbesPerSecond: 1000, - ProbeInterval: arvados.Duration(time.Millisecond * 10), - }, - CloudVMs: arvados.CloudVMs{ - BootProbeCommand: "true", - SyncInterval: arvados.Duration(time.Millisecond * 10), + Containers: arvados.ContainersConfig{ + CloudVMs: arvados.CloudVMsConfig{ + BootProbeCommand: "true", + MaxProbesPerSecond: 1000, + ProbeInterval: arvados.Duration(time.Millisecond * 10), + SyncInterval: arvados.Duration(time.Millisecond * 10), + TagKeyPrefix: "testprefix:", + }, + CrunchRunCommand: "crunch-run-custom", }, InstanceTypes: arvados.InstanceTypeMap{ type1.Name: type1, @@ -91,7 +96,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { }, } - pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster) + pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) notify := pool.Subscribe() defer pool.Unsubscribe(notify) pool.Create(type1) @@ -106,13 +111,14 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { } } // Wait for the tags to save to the cloud provider + tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior deadline := time.Now().Add(time.Second) for !func() bool { pool.mtx.RLock() defer pool.mtx.RUnlock() for _, wkr := range pool.workers { if wkr.instType == type2 { - return wkr.instance.Tags()[tagKeyIdleBehavior] == string(IdleBehaviorHold) + return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold) } } return false @@ -126,7 +132,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { c.Log("------- starting new pool, waiting to recover state") - pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster) + pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) notify2 := pool2.Subscribe() defer pool2.Unsubscribe(notify2) waitForIdle(pool2, notify2) @@ -141,10 +147,103 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { pool2.Stop() } +func (suite *PoolSuite) TestDrain(c *check.C) { + logger := ctxlog.TestLogger(c) + driver := test.StubDriver{} + instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger) + c.Assert(err, check.IsNil) + + ac := arvados.NewClientFromEnv() + + type1 := test.InstanceType(1) + pool := &Pool{ + arvClient: ac, + logger: logger, + newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, + instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, + instanceTypes: arvados.InstanceTypeMap{ + type1.Name: type1, + }, + } + notify := pool.Subscribe() + defer pool.Unsubscribe(notify) + + pool.Create(type1) + + // Wait for the instance to either return from its Create + // call, or show up in a poll. + suite.wait(c, pool, notify, func() bool { + pool.mtx.RLock() + defer pool.mtx.RUnlock() + return len(pool.workers) == 1 + }) + + tests := []struct { + state State + idleBehavior IdleBehavior + result bool + }{ + {StateIdle, IdleBehaviorHold, false}, + {StateIdle, IdleBehaviorDrain, false}, + {StateIdle, IdleBehaviorRun, true}, + } + + for _, test := range tests { + for _, wkr := range pool.workers { + wkr.state = test.state + wkr.idleBehavior = test.idleBehavior + } + + // Try to start a container + started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"}) + c.Check(started, check.Equals, test.result) + } +} + +func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) { + logger := ctxlog.TestLogger(c) + driver := test.StubDriver{HoldCloudOps: true} + instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger) + c.Assert(err, check.IsNil) + + type1 := test.InstanceType(1) + pool := &Pool{ + logger: logger, + instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, + maxConcurrentInstanceCreateOps: 1, + instanceTypes: arvados.InstanceTypeMap{ + type1.Name: type1, + }, + } + + c.Check(pool.Unallocated()[type1], check.Equals, 0) + res := pool.Create(type1) + c.Check(pool.Unallocated()[type1], check.Equals, 1) + c.Check(res, check.Equals, true) + + res = pool.Create(type1) + c.Check(pool.Unallocated()[type1], check.Equals, 1) + c.Check(res, check.Equals, false) + + pool.instanceSet.throttleCreate.err = nil + pool.maxConcurrentInstanceCreateOps = 2 + + res = pool.Create(type1) + c.Check(pool.Unallocated()[type1], check.Equals, 2) + c.Check(res, check.Equals, true) + + pool.instanceSet.throttleCreate.err = nil + pool.maxConcurrentInstanceCreateOps = 0 + + res = pool.Create(type1) + c.Check(pool.Unallocated()[type1], check.Equals, 3) + c.Check(res, check.Equals, true) +} + func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { logger := ctxlog.TestLogger(c) driver := test.StubDriver{HoldCloudOps: true} - instanceSet, err := driver.InstanceSet(nil, "", logger) + instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger) c.Assert(err, check.IsNil) type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01} @@ -152,7 +251,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04} pool := &Pool{ logger: logger, - newExecutor: func(cloud.Instance) Executor { return stubExecutor{} }, + newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, instanceTypes: arvados.InstanceTypeMap{ type1.Name: type1,