16347: Run a dedicated keepstore process for each container.
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
index 76e4f71a7bd58bf4a06685611b39df12fbf0a947..7b5634605fee5c20b987c06078eb78b0dc6841b6 100644 (file)
@@ -10,10 +10,12 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/dispatchcloud/test"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
@@ -31,7 +33,18 @@ func (*lessChecker) Check(params []interface{}, names []string) (result bool, er
 
 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
 
-type PoolSuite struct{}
+type PoolSuite struct {
+       logger      logrus.FieldLogger
+       testCluster *arvados.Cluster
+}
+
+func (suite *PoolSuite) SetUpTest(c *check.C) {
+       suite.logger = ctxlog.TestLogger(c)
+       cfg, err := config.NewLoader(nil, suite.logger).Load()
+       c.Assert(err, check.IsNil)
+       suite.testCluster, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+}
 
 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        type1 := test.InstanceType(1)
@@ -63,39 +76,35 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
 
-       logger := ctxlog.TestLogger(c)
        driver := &test.StubDriver{}
        instanceSetID := cloud.InstanceSetID("test-instance-set-id")
-       is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+       is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        newExecutor := func(cloud.Instance) Executor {
                return &stubExecutor{
                        response: map[string]stubResp{
-                               "crunch-run --list": {},
-                               "true":              {},
+                               "crunch-run-custom --list": {},
+                               "true":                     {},
                        },
                }
        }
 
-       cluster := &arvados.Cluster{
-               Containers: arvados.ContainersConfig{
-                       CloudVMs: arvados.CloudVMsConfig{
-                               BootProbeCommand:   "true",
-                               MaxProbesPerSecond: 1000,
-                               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
-                               SyncInterval:       arvados.Duration(time.Millisecond * 10),
-                               TagKeyPrefix:       "testprefix:",
-                       },
-               },
-               InstanceTypes: arvados.InstanceTypeMap{
-                       type1.Name: type1,
-                       type2.Name: type2,
-                       type3.Name: type3,
-               },
+       suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
+               BootProbeCommand:   "true",
+               MaxProbesPerSecond: 1000,
+               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
+               SyncInterval:       arvados.Duration(time.Millisecond * 10),
+               TagKeyPrefix:       "testprefix:",
+       }
+       suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
+       suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
+               type1.Name: type1,
+               type2.Name: type2,
+               type3.Name: type3,
        }
 
-       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+       pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
        pool.Create(type1)
@@ -110,7 +119,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
        // Wait for the tags to save to the cloud provider
-       tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+       tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
        deadline := time.Now().Add(time.Second)
        for !func() bool {
                pool.mtx.RLock()
@@ -131,7 +140,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 
        c.Log("------- starting new pool, waiting to recover state")
 
-       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+       pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
        notify2 := pool2.Subscribe()
        defer pool2.Unsubscribe(notify2)
        waitForIdle(pool2, notify2)
@@ -147,73 +156,27 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 }
 
 func (suite *PoolSuite) TestDrain(c *check.C) {
-       type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
-
-       waitForIdle := func(pool *Pool, notify <-chan struct{}) {
-               timeout := time.NewTimer(time.Second)
-               for {
-                       instances := pool.Instances()
-                       sort.Slice(instances, func(i, j int) bool {
-                               return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
-                       })
-                       if len(instances) == 1 &&
-                               instances[0].ArvadosInstanceType == type1.Name &&
-                               instances[0].WorkerState == StateIdle.String() {
-                               return
-                       }
-                       select {
-                       case <-timeout.C:
-                               c.Logf("pool.Instances() == %#v", instances)
-                               c.Error("timed out")
-                               return
-                       case <-notify:
-                       }
-               }
-       }
-
-       logger := ctxlog.TestLogger(c)
-       driver := test.StubDriver{HoldCloudOps: true}
-       instanceSetID := cloud.InstanceSetID("test-instance-set-id")
-       is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+       driver := test.StubDriver{}
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
 
-       newExecutor := func(cloud.Instance) Executor {
-               return &stubExecutor{
-                       response: map[string]stubResp{
-                               "crunch-run --list":                  {},
-                               "true":                               {},
-                               "crunch-run --detach --stdin-env ''": {},
-                       },
-               }
-       }
+       ac := arvados.NewClientFromEnv()
 
-       cluster := &arvados.Cluster{
-               Containers: arvados.ContainersConfig{
-                       CloudVMs: arvados.CloudVMsConfig{
-                               BootProbeCommand:   "true",
-                               MaxProbesPerSecond: 1000,
-                               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
-                               SyncInterval:       arvados.Duration(time.Millisecond * 10),
-                               //TimeoutIdle:        arvados.Duration(time.Second),
-                               TagKeyPrefix: "testprefix:",
-                       },
-               },
-               InstanceTypes: arvados.InstanceTypeMap{
+       type1 := test.InstanceType(1)
+       pool := &Pool{
+               arvClient:   ac,
+               logger:      suite.logger,
+               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               cluster:     suite.testCluster,
+               instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+               instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
                },
        }
-
-       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
-
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
 
-       c.Check(pool.Unallocated()[type1], check.Equals, 0)
        pool.Create(type1)
-       c.Check(pool.Unallocated()[type1], check.Equals, 1)
-
-       // Unblock the pending Create call.
-       go driver.ReleaseCloudOps(1)
 
        // Wait for the instance to either return from its Create
        // call, or show up in a poll.
@@ -223,69 +186,80 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
                return len(pool.workers) == 1
        })
 
-       waitForIdle(pool, notify)
+       tests := []struct {
+               state        State
+               idleBehavior IdleBehavior
+               result       bool
+       }{
+               {StateIdle, IdleBehaviorHold, false},
+               {StateIdle, IdleBehaviorDrain, false},
+               {StateIdle, IdleBehaviorRun, true},
+       }
 
-       // Start a container on the worker
-       for _, wkr := range pool.workers {
-               if wkr.instType == type1 {
-                       wkr.startContainer(arvados.Container{})
+       for _, test := range tests {
+               for _, wkr := range pool.workers {
+                       wkr.state = test.state
+                       wkr.idleBehavior = test.idleBehavior
                }
-       }
 
-       ivs := suite.instancesByType(pool, type1)
-       c.Assert(ivs, check.HasLen, 1)
-       type1instanceID := ivs[0].Instance
+               // Try to start a container
+               started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
+               c.Check(started, check.Equals, test.result)
+       }
+}
 
-       // Place our node in drain state
-       err = pool.SetIdleBehavior(type1instanceID, IdleBehaviorDrain)
-       c.Check(err, check.IsNil)
+func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
+       driver := test.StubDriver{HoldCloudOps: true}
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
+       c.Assert(err, check.IsNil)
 
-       waitForIdle(pool, notify)
+       type1 := test.InstanceType(1)
+       pool := &Pool{
+               logger:                         suite.logger,
+               instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
+               cluster:                        suite.testCluster,
+               maxConcurrentInstanceCreateOps: 1,
+               instanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+               },
+       }
 
-       ivs = suite.instancesByType(pool, type1)
-       c.Assert(ivs, check.HasLen, 1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 0)
+       res := pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(res, check.Equals, true)
 
-       // Try to start another container, this should fail because our lone worker has
-       // IdleBehavior set to Drain
-       started := pool.StartContainer(type1, arvados.Container{})
-       c.Check(started, check.Equals, false)
+       res = pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(res, check.Equals, false)
 
-       // There should be no unallocated workers
-       suite.wait(c, pool, notify, func() bool {
-               return pool.Unallocated()[type1] == 0
-       })
+       pool.instanceSet.throttleCreate.err = nil
+       pool.maxConcurrentInstanceCreateOps = 2
 
-       // And our worker should eventually go into state ShutDown
-       suite.wait(c, pool, notify, func() bool {
-               ivs := suite.instancesByType(pool, type1)
-               return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
-       })
+       res = pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 2)
+       c.Check(res, check.Equals, true)
 
-       // Unblock all pending Destroy calls. Pool calls Destroy again
-       // if a node still appears in the provider list after a
-       // previous attempt, so there might be more than 1 Destroy
-       // calls to unblock.
-       go driver.ReleaseCloudOps(1111)
+       pool.instanceSet.throttleCreate.err = nil
+       pool.maxConcurrentInstanceCreateOps = 0
 
-       // Sync until all instances disappear from the provider list.
-       suite.wait(c, pool, notify, func() bool {
-               pool.getInstancesAndSync()
-               return len(pool.Instances()) == 0
-       })
+       res = pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 3)
+       c.Check(res, check.Equals, true)
 }
 
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}
-       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
        type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
        type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
-               logger:      logger,
+               logger:      suite.logger,
                newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               cluster:     suite.testCluster,
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,