X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d186ff022ee427ded90307d795537c6429431f09..588358fc0aa03a5036a9aa21f15b604e9e5c98c7:/lib/dispatchcloud/worker/pool_test.go diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go index 76e4f71a7b..7b5634605f 100644 --- a/lib/dispatchcloud/worker/pool_test.go +++ b/lib/dispatchcloud/worker/pool_test.go @@ -10,10 +10,12 @@ import ( "time" "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/dispatchcloud/test" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" check "gopkg.in/check.v1" ) @@ -31,7 +33,18 @@ func (*lessChecker) Check(params []interface{}, names []string) (result bool, er var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}} -type PoolSuite struct{} +type PoolSuite struct { + logger logrus.FieldLogger + testCluster *arvados.Cluster +} + +func (suite *PoolSuite) SetUpTest(c *check.C) { + suite.logger = ctxlog.TestLogger(c) + cfg, err := config.NewLoader(nil, suite.logger).Load() + c.Assert(err, check.IsNil) + suite.testCluster, err = cfg.GetCluster("") + c.Assert(err, check.IsNil) +} func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { type1 := test.InstanceType(1) @@ -63,39 +76,35 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { } } - logger := ctxlog.TestLogger(c) driver := &test.StubDriver{} instanceSetID := cloud.InstanceSetID("test-instance-set-id") - is, err := driver.InstanceSet(nil, instanceSetID, nil, logger) + is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger) c.Assert(err, check.IsNil) newExecutor := func(cloud.Instance) Executor { return &stubExecutor{ response: map[string]stubResp{ - "crunch-run --list": {}, - "true": {}, + "crunch-run-custom --list": {}, + "true": {}, }, } } - cluster := &arvados.Cluster{ - Containers: arvados.ContainersConfig{ - CloudVMs: arvados.CloudVMsConfig{ - BootProbeCommand: "true", - MaxProbesPerSecond: 1000, - ProbeInterval: arvados.Duration(time.Millisecond * 10), - SyncInterval: arvados.Duration(time.Millisecond * 10), - TagKeyPrefix: "testprefix:", - }, - }, - InstanceTypes: arvados.InstanceTypeMap{ - type1.Name: type1, - type2.Name: type2, - type3.Name: type3, - }, + suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{ + BootProbeCommand: "true", + MaxProbesPerSecond: 1000, + ProbeInterval: arvados.Duration(time.Millisecond * 10), + SyncInterval: arvados.Duration(time.Millisecond * 10), + TagKeyPrefix: "testprefix:", + } + suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom" + suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{ + type1.Name: type1, + type2.Name: type2, + type3.Name: type3, } - pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) + pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster) notify := pool.Subscribe() defer pool.Unsubscribe(notify) pool.Create(type1) @@ -110,7 +119,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { } } // Wait for the tags to save to the cloud provider - tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior + tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior deadline := time.Now().Add(time.Second) for !func() bool { pool.mtx.RLock() @@ -131,7 +140,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { c.Log("------- starting new pool, waiting to recover state") - pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) + pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster) notify2 := pool2.Subscribe() defer pool2.Unsubscribe(notify2) waitForIdle(pool2, notify2) @@ -147,73 +156,27 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { } func (suite *PoolSuite) TestDrain(c *check.C) { - type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01} - - waitForIdle := func(pool *Pool, notify <-chan struct{}) { - timeout := time.NewTimer(time.Second) - for { - instances := pool.Instances() - sort.Slice(instances, func(i, j int) bool { - return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0 - }) - if len(instances) == 1 && - instances[0].ArvadosInstanceType == type1.Name && - instances[0].WorkerState == StateIdle.String() { - return - } - select { - case <-timeout.C: - c.Logf("pool.Instances() == %#v", instances) - c.Error("timed out") - return - case <-notify: - } - } - } - - logger := ctxlog.TestLogger(c) - driver := test.StubDriver{HoldCloudOps: true} - instanceSetID := cloud.InstanceSetID("test-instance-set-id") - is, err := driver.InstanceSet(nil, instanceSetID, nil, logger) + driver := test.StubDriver{} + instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger) c.Assert(err, check.IsNil) - newExecutor := func(cloud.Instance) Executor { - return &stubExecutor{ - response: map[string]stubResp{ - "crunch-run --list": {}, - "true": {}, - "crunch-run --detach --stdin-env ''": {}, - }, - } - } + ac := arvados.NewClientFromEnv() - cluster := &arvados.Cluster{ - Containers: arvados.ContainersConfig{ - CloudVMs: arvados.CloudVMsConfig{ - BootProbeCommand: "true", - MaxProbesPerSecond: 1000, - ProbeInterval: arvados.Duration(time.Millisecond * 10), - SyncInterval: arvados.Duration(time.Millisecond * 10), - //TimeoutIdle: arvados.Duration(time.Second), - TagKeyPrefix: "testprefix:", - }, - }, - InstanceTypes: arvados.InstanceTypeMap{ + type1 := test.InstanceType(1) + pool := &Pool{ + arvClient: ac, + logger: suite.logger, + newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, + cluster: suite.testCluster, + instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, + instanceTypes: arvados.InstanceTypeMap{ type1.Name: type1, }, } - - pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) - notify := pool.Subscribe() defer pool.Unsubscribe(notify) - c.Check(pool.Unallocated()[type1], check.Equals, 0) pool.Create(type1) - c.Check(pool.Unallocated()[type1], check.Equals, 1) - - // Unblock the pending Create call. - go driver.ReleaseCloudOps(1) // Wait for the instance to either return from its Create // call, or show up in a poll. @@ -223,69 +186,80 @@ func (suite *PoolSuite) TestDrain(c *check.C) { return len(pool.workers) == 1 }) - waitForIdle(pool, notify) + tests := []struct { + state State + idleBehavior IdleBehavior + result bool + }{ + {StateIdle, IdleBehaviorHold, false}, + {StateIdle, IdleBehaviorDrain, false}, + {StateIdle, IdleBehaviorRun, true}, + } - // Start a container on the worker - for _, wkr := range pool.workers { - if wkr.instType == type1 { - wkr.startContainer(arvados.Container{}) + for _, test := range tests { + for _, wkr := range pool.workers { + wkr.state = test.state + wkr.idleBehavior = test.idleBehavior } - } - ivs := suite.instancesByType(pool, type1) - c.Assert(ivs, check.HasLen, 1) - type1instanceID := ivs[0].Instance + // Try to start a container + started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"}) + c.Check(started, check.Equals, test.result) + } +} - // Place our node in drain state - err = pool.SetIdleBehavior(type1instanceID, IdleBehaviorDrain) - c.Check(err, check.IsNil) +func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) { + driver := test.StubDriver{HoldCloudOps: true} + instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger) + c.Assert(err, check.IsNil) - waitForIdle(pool, notify) + type1 := test.InstanceType(1) + pool := &Pool{ + logger: suite.logger, + instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, + cluster: suite.testCluster, + maxConcurrentInstanceCreateOps: 1, + instanceTypes: arvados.InstanceTypeMap{ + type1.Name: type1, + }, + } - ivs = suite.instancesByType(pool, type1) - c.Assert(ivs, check.HasLen, 1) + c.Check(pool.Unallocated()[type1], check.Equals, 0) + res := pool.Create(type1) + c.Check(pool.Unallocated()[type1], check.Equals, 1) + c.Check(res, check.Equals, true) - // Try to start another container, this should fail because our lone worker has - // IdleBehavior set to Drain - started := pool.StartContainer(type1, arvados.Container{}) - c.Check(started, check.Equals, false) + res = pool.Create(type1) + c.Check(pool.Unallocated()[type1], check.Equals, 1) + c.Check(res, check.Equals, false) - // There should be no unallocated workers - suite.wait(c, pool, notify, func() bool { - return pool.Unallocated()[type1] == 0 - }) + pool.instanceSet.throttleCreate.err = nil + pool.maxConcurrentInstanceCreateOps = 2 - // And our worker should eventually go into state ShutDown - suite.wait(c, pool, notify, func() bool { - ivs := suite.instancesByType(pool, type1) - return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String() - }) + res = pool.Create(type1) + c.Check(pool.Unallocated()[type1], check.Equals, 2) + c.Check(res, check.Equals, true) - // Unblock all pending Destroy calls. Pool calls Destroy again - // if a node still appears in the provider list after a - // previous attempt, so there might be more than 1 Destroy - // calls to unblock. - go driver.ReleaseCloudOps(1111) + pool.instanceSet.throttleCreate.err = nil + pool.maxConcurrentInstanceCreateOps = 0 - // Sync until all instances disappear from the provider list. - suite.wait(c, pool, notify, func() bool { - pool.getInstancesAndSync() - return len(pool.Instances()) == 0 - }) + res = pool.Create(type1) + c.Check(pool.Unallocated()[type1], check.Equals, 3) + c.Check(res, check.Equals, true) } func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { - logger := ctxlog.TestLogger(c) driver := test.StubDriver{HoldCloudOps: true} - instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger) + instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger) c.Assert(err, check.IsNil) type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01} type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02} type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04} pool := &Pool{ - logger: logger, + logger: suite.logger, newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, + cluster: suite.testCluster, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, instanceTypes: arvados.InstanceTypeMap{ type1.Name: type1,