15928: Fix deadlock.
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
index 60e21b716b90dc5a45f5b4e187841c12a2b3536d..4b87ce503157a6d873e5d7d13eed018fc0ad35af 100644 (file)
@@ -12,8 +12,8 @@ import (
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
-       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
@@ -33,10 +33,6 @@ var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtai
 
 type PoolSuite struct{}
 
-func (suite *PoolSuite) SetUpSuite(c *check.C) {
-       logrus.StandardLogger().SetLevel(logrus.DebugLevel)
-}
-
 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        type1 := test.InstanceType(1)
        type2 := test.InstanceType(2)
@@ -67,9 +63,10 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
 
-       logger := logrus.StandardLogger()
+       logger := ctxlog.TestLogger(c)
        driver := &test.StubDriver{}
-       is, err := driver.InstanceSet(nil, "", logger)
+       instanceSetID := cloud.InstanceSetID("test-instance-set-id")
+       is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
        c.Assert(err, check.IsNil)
 
        newExecutor := func(cloud.Instance) Executor {
@@ -80,13 +77,14 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        }
 
        cluster := &arvados.Cluster{
-               Dispatch: arvados.Dispatch{
-                       MaxProbesPerSecond: 1000,
-                       ProbeInterval:      arvados.Duration(time.Millisecond * 10),
-               },
-               CloudVMs: arvados.CloudVMs{
-                       BootProbeCommand: "true",
-                       SyncInterval:     arvados.Duration(time.Millisecond * 10),
+               Containers: arvados.ContainersConfig{
+                       CloudVMs: arvados.CloudVMsConfig{
+                               BootProbeCommand:   "true",
+                               MaxProbesPerSecond: 1000,
+                               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
+                               SyncInterval:       arvados.Duration(time.Millisecond * 10),
+                               TagKeyPrefix:       "testprefix:",
+                       },
                },
                InstanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
@@ -95,7 +93,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                },
        }
 
-       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
        pool.Create(type1)
@@ -109,11 +107,29 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                        pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
                }
        }
+       // Wait for the tags to save to the cloud provider
+       tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+       deadline := time.Now().Add(time.Second)
+       for !func() bool {
+               pool.mtx.RLock()
+               defer pool.mtx.RUnlock()
+               for _, wkr := range pool.workers {
+                       if wkr.instType == type2 {
+                               return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
+                       }
+               }
+               return false
+       }() {
+               if time.Now().After(deadline) {
+                       c.Fatal("timeout")
+               }
+               time.Sleep(time.Millisecond * 10)
+       }
        pool.Stop()
 
        c.Log("------- starting new pool, waiting to recover state")
 
-       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
        notify2 := pool2.Subscribe()
        defer pool2.Unsubscribe(notify2)
        waitForIdle(pool2, notify2)
@@ -129,14 +145,18 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 }
 
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
-       lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
+       logger := ctxlog.TestLogger(c)
+       driver := test.StubDriver{HoldCloudOps: true}
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       c.Assert(err, check.IsNil)
+
        type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
        type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
        type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
-               logger:      logrus.StandardLogger(),
+               logger:      logger,
                newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
-               instanceSet: &throttledInstanceSet{InstanceSet: lameInstanceSet},
+               instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
                        type2.Name: type2,
@@ -160,7 +180,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        c.Check(pool.Unallocated()[type3], check.Equals, 1)
 
        // Unblock the pending Create calls.
-       go lameInstanceSet.Release(4)
+       go driver.ReleaseCloudOps(4)
 
        // Wait for each instance to either return from its Create
        // call, or show up in a poll.
@@ -174,7 +194,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        ivs := suite.instancesByType(pool, type3)
        c.Assert(ivs, check.HasLen, 1)
        type3instanceID := ivs[0].Instance
-       err := pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
+       err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
        c.Check(err, check.IsNil)
 
        // Check admin-hold behavior: refuse to shutdown, and don't
@@ -240,7 +260,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        // if a node still appears in the provider list after a
        // previous attempt, so there might be more than 4 Destroy
        // calls to unblock.
-       go lameInstanceSet.Release(4444)
+       go driver.ReleaseCloudOps(4444)
 
        // Sync until all instances disappear from the provider list.
        suite.wait(c, pool, notify, func() bool {