Merge branch '21535-multi-wf-delete'
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
index d437668aadb1d31fdba4f32434f98014bf9c682d..8d2ba09ebe849f5962f7fcf1570de27f5fe3012e 100644 (file)
@@ -10,10 +10,12 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/dispatchcloud/test"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
@@ -31,7 +33,18 @@ func (*lessChecker) Check(params []interface{}, names []string) (result bool, er
 
 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
 
-type PoolSuite struct{}
+type PoolSuite struct {
+       logger      logrus.FieldLogger
+       testCluster *arvados.Cluster
+}
+
+func (suite *PoolSuite) SetUpTest(c *check.C) {
+       suite.logger = ctxlog.TestLogger(c)
+       cfg, err := config.NewLoader(nil, suite.logger).Load()
+       c.Assert(err, check.IsNil)
+       suite.testCluster, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+}
 
 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        type1 := test.InstanceType(1)
@@ -63,39 +76,35 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
 
-       logger := ctxlog.TestLogger(c)
        driver := &test.StubDriver{}
        instanceSetID := cloud.InstanceSetID("test-instance-set-id")
-       is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+       is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger, nil)
        c.Assert(err, check.IsNil)
 
        newExecutor := func(cloud.Instance) Executor {
                return &stubExecutor{
                        response: map[string]stubResp{
-                               "crunch-run --list": {},
-                               "true":              {},
+                               "crunch-run-custom --list": {},
+                               "true":                     {},
                        },
                }
        }
 
-       cluster := &arvados.Cluster{
-               Containers: arvados.ContainersConfig{
-                       CloudVMs: arvados.CloudVMsConfig{
-                               BootProbeCommand:   "true",
-                               MaxProbesPerSecond: 1000,
-                               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
-                               SyncInterval:       arvados.Duration(time.Millisecond * 10),
-                               TagKeyPrefix:       "testprefix:",
-                       },
-               },
-               InstanceTypes: arvados.InstanceTypeMap{
-                       type1.Name: type1,
-                       type2.Name: type2,
-                       type3.Name: type3,
-               },
+       suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
+               BootProbeCommand:   "true",
+               MaxProbesPerSecond: 1000,
+               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
+               SyncInterval:       arvados.Duration(time.Millisecond * 10),
+               TagKeyPrefix:       "testprefix:",
+       }
+       suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
+       suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
+               type1.Name: type1,
+               type2.Name: type2,
+               type3.Name: type3,
        }
 
-       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+       pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
        pool.Create(type1)
@@ -110,7 +119,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
        // Wait for the tags to save to the cloud provider
-       tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+       tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
        deadline := time.Now().Add(time.Second)
        for !func() bool {
                pool.mtx.RLock()
@@ -131,7 +140,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 
        c.Log("------- starting new pool, waiting to recover state")
 
-       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+       pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
        notify2 := pool2.Subscribe()
        defer pool2.Unsubscribe(notify2)
        waitForIdle(pool2, notify2)
@@ -147,18 +156,18 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 }
 
 func (suite *PoolSuite) TestDrain(c *check.C) {
-       logger := ctxlog.TestLogger(c)
-       driver := test.StubDriver{HoldCloudOps: true}
-       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       driver := test.StubDriver{}
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
        c.Assert(err, check.IsNil)
 
        ac := arvados.NewClientFromEnv()
 
-       type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
+       type1 := test.InstanceType(1)
        pool := &Pool{
                arvClient:   ac,
-               logger:      logger,
+               logger:      suite.logger,
                newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               cluster:     suite.testCluster,
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
@@ -167,12 +176,7 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
 
-       c.Check(pool.Unallocated()[type1], check.Equals, 0)
        pool.Create(type1)
-       c.Check(pool.Unallocated()[type1], check.Equals, 1)
-
-       // Unblock the pending Create call.
-       go driver.ReleaseCloudOps(1)
 
        // Wait for the instance to either return from its Create
        // call, or show up in a poll.
@@ -194,36 +198,75 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
 
        for _, test := range tests {
                for _, wkr := range pool.workers {
-                       if wkr.instType == type1 {
-                               wkr.state = test.state
-                               wkr.idleBehavior = test.idleBehavior
-                       }
+                       wkr.state = test.state
+                       wkr.idleBehavior = test.idleBehavior
                }
 
-               // Try to start another container
+               // Try to start a container
                started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
                c.Check(started, check.Equals, test.result)
        }
 }
 
+func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
+       driver := test.StubDriver{HoldCloudOps: true}
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
+       c.Assert(err, check.IsNil)
+
+       type1 := test.InstanceType(1)
+       pool := &Pool{
+               logger:                         suite.logger,
+               instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
+               cluster:                        suite.testCluster,
+               maxConcurrentInstanceCreateOps: 1,
+               instanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+               },
+       }
+
+       c.Check(pool.Unallocated()[type1], check.Equals, 0)
+       res := pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(res, check.Equals, true)
+
+       res = pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(res, check.Equals, false)
+
+       pool.instanceSet.throttleCreate.err = nil
+       pool.maxConcurrentInstanceCreateOps = 2
+
+       res = pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 2)
+       c.Check(res, check.Equals, true)
+
+       pool.instanceSet.throttleCreate.err = nil
+       pool.maxConcurrentInstanceCreateOps = 0
+
+       res = pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 3)
+       c.Check(res, check.Equals, true)
+}
+
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}
-       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
        c.Assert(err, check.IsNil)
 
        type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
        type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
        type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
-               logger:      logger,
+               logger:      suite.logger,
                newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               cluster:     suite.testCluster,
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
                        type2.Name: type2,
                        type3.Name: type3,
                },
+               instanceInitCommand: "echo 'instance init command goes here'",
        }
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
@@ -252,6 +295,9 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
                return len(pool.workers) == 4
        })
 
+       vms := instanceSet.(*test.StubInstanceSet).StubVMs()
+       c.Check(string(vms[0].InitCommand), check.Matches, `umask 0177 && echo -n "[0-9a-f]+" >/var/run/arvados-instance-secret\necho 'instance init command goes here'`)
+
        // Place type3 node on admin-hold
        ivs := suite.instancesByType(pool, type3)
        c.Assert(ivs, check.HasLen, 1)