16739: implement review feedback.
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
index 8ab4c987544e06e8535a9de2ca12df7052ff8edb..7b116c3eefb09cdba27043598786f711a9c7cf53 100644 (file)
@@ -9,10 +9,10 @@ import (
        "strings"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/test"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
@@ -66,13 +66,15 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        logger := ctxlog.TestLogger(c)
        driver := &test.StubDriver{}
        instanceSetID := cloud.InstanceSetID("test-instance-set-id")
-       is, err := driver.InstanceSet(nil, instanceSetID, logger)
+       is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
        c.Assert(err, check.IsNil)
 
        newExecutor := func(cloud.Instance) Executor {
-               return stubExecutor{
-                       "crunch-run --list": stubResp{},
-                       "true":              stubResp{},
+               return &stubExecutor{
+                       response: map[string]stubResp{
+                               "crunch-run --list": {},
+                               "true":              {},
+                       },
                }
        }
 
@@ -83,6 +85,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                                MaxProbesPerSecond: 1000,
                                ProbeInterval:      arvados.Duration(time.Millisecond * 10),
                                SyncInterval:       arvados.Duration(time.Millisecond * 10),
+                               TagKeyPrefix:       "testprefix:",
                        },
                },
                InstanceTypes: arvados.InstanceTypeMap{
@@ -107,13 +110,14 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
        // Wait for the tags to save to the cloud provider
+       tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
        deadline := time.Now().Add(time.Second)
        for !func() bool {
                pool.mtx.RLock()
                defer pool.mtx.RUnlock()
                for _, wkr := range pool.workers {
                        if wkr.instType == type2 {
-                               return wkr.instance.Tags()[tagKeyIdleBehavior] == string(IdleBehaviorHold)
+                               return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
                        }
                }
                return false
@@ -142,10 +146,103 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        pool2.Stop()
 }
 
+func (suite *PoolSuite) TestDrain(c *check.C) {
+       logger := ctxlog.TestLogger(c)
+       driver := test.StubDriver{}
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       c.Assert(err, check.IsNil)
+
+       ac := arvados.NewClientFromEnv()
+
+       type1 := test.InstanceType(1)
+       pool := &Pool{
+               arvClient:   ac,
+               logger:      logger,
+               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+               instanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+               },
+       }
+       notify := pool.Subscribe()
+       defer pool.Unsubscribe(notify)
+
+       pool.Create(type1)
+
+       // Wait for the instance to either return from its Create
+       // call, or show up in a poll.
+       suite.wait(c, pool, notify, func() bool {
+               pool.mtx.RLock()
+               defer pool.mtx.RUnlock()
+               return len(pool.workers) == 1
+       })
+
+       tests := []struct {
+               state        State
+               idleBehavior IdleBehavior
+               result       bool
+       }{
+               {StateIdle, IdleBehaviorHold, false},
+               {StateIdle, IdleBehaviorDrain, false},
+               {StateIdle, IdleBehaviorRun, true},
+       }
+
+       for _, test := range tests {
+               for _, wkr := range pool.workers {
+                       wkr.state = test.state
+                       wkr.idleBehavior = test.idleBehavior
+               }
+
+               // Try to start a container
+               started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
+               c.Check(started, check.Equals, test.result)
+       }
+}
+
+func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
+       logger := ctxlog.TestLogger(c)
+       driver := test.StubDriver{HoldCloudOps: true}
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       c.Assert(err, check.IsNil)
+
+       type1 := test.InstanceType(1)
+       pool := &Pool{
+               logger:                         logger,
+               instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
+               maxConcurrentInstanceCreateOps: 1,
+               instanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+               },
+       }
+
+       c.Check(pool.Unallocated()[type1], check.Equals, 0)
+       res := pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(res, check.Equals, true)
+
+       res = pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(res, check.Equals, false)
+
+       pool.instanceSet.throttleCreate.ResetError()
+       pool.maxConcurrentInstanceCreateOps = 2
+
+       res = pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 2)
+       c.Check(res, check.Equals, true)
+
+       pool.instanceSet.throttleCreate.ResetError()
+       pool.maxConcurrentInstanceCreateOps = 0
+
+       res = pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 3)
+       c.Check(res, check.Equals, true)
+}
+
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}
-       instanceSet, err := driver.InstanceSet(nil, "", logger)
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
        c.Assert(err, check.IsNil)
 
        type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
@@ -153,7 +250,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
                logger:      logger,
-               newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,