14325: Clean up unsafe concurrency in tests.
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
index f8667e06cdb3917fcbdee4d563833367eb599545..aa81958434dd73792f399a61f5f1b0be6e2cfc1b 100644 (file)
@@ -5,13 +5,12 @@
 package worker
 
 import (
-       "io"
        "time"
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
+       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
@@ -47,13 +46,15 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
        type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
        type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
+       type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
                logger:      logrus.StandardLogger(),
-               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
-               instanceSet: lameInstanceSet,
+               newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+               instanceSet: &throttledInstanceSet{InstanceSet: lameInstanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
                        type2.Name: type2,
+                       type3.Name: type3,
                },
        }
        notify := pool.Subscribe()
@@ -63,30 +64,42 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 
        c.Check(pool.Unallocated()[type1], check.Equals, 0)
        c.Check(pool.Unallocated()[type2], check.Equals, 0)
+       c.Check(pool.Unallocated()[type3], check.Equals, 0)
        pool.Create(type2)
        pool.Create(type1)
        pool.Create(type2)
+       pool.Create(type3)
        c.Check(pool.Unallocated()[type1], check.Equals, 1)
        c.Check(pool.Unallocated()[type2], check.Equals, 2)
-       // Unblock the pending Create calls and (before calling Sync!)
-       // wait for the pool to process the returned instances.
-       go lameInstanceSet.Release(3)
+       c.Check(pool.Unallocated()[type3], check.Equals, 1)
+
+       // Unblock the pending Create calls.
+       go lameInstanceSet.Release(4)
+
+       // Wait for each instance to either return from its Create
+       // call, or show up in a poll.
        suite.wait(c, pool, notify, func() bool {
-               list, err := lameInstanceSet.Instances(nil)
-               return err == nil && len(list) == 3
+               pool.mtx.RLock()
+               defer pool.mtx.RUnlock()
+               return len(pool.workers) == 4
        })
 
-       c.Check(pool.Unallocated()[type1], check.Equals, 1)
-       c.Check(pool.Unallocated()[type2], check.Equals, 2)
-       pool.getInstancesAndSync()
-       // Returned counts can be temporarily higher than 1 and 2 if
-       // poll ran before Create() returned.
-       c.Check(pool.Unallocated()[type1], check.Not(less), 1)
-       c.Check(pool.Unallocated()[type2], check.Not(less), 2)
+       // Place type3 node on admin-hold
+       ivs := suite.instancesByType(pool, type3)
+       c.Assert(ivs, check.HasLen, 1)
+       type3instanceID := ivs[0].Instance
+       err := pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
+       c.Check(err, check.IsNil)
+
+       // Check admin-hold behavior: refuse to shutdown, and don't
+       // report as Unallocated ("available now or soon").
+       c.Check(pool.Shutdown(type3), check.Equals, false)
        suite.wait(c, pool, notify, func() bool {
-               return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 2
+               return pool.Unallocated()[type3] == 0
        })
+       c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
 
+       // Shutdown both type2 nodes
        c.Check(pool.Shutdown(type2), check.Equals, true)
        suite.wait(c, pool, notify, func() bool {
                return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
@@ -106,16 +119,58 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
                }
                break
        }
+
+       // Shutdown type1 node
        c.Check(pool.Shutdown(type1), check.Equals, true)
        suite.wait(c, pool, notify, func() bool {
-               return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
+               return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
        })
        select {
        case <-notify2:
        case <-time.After(time.Second):
                c.Error("notify did not receive")
        }
-       go lameInstanceSet.Release(3) // unblock Destroy calls
+
+       // Put type3 node back in service.
+       err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
+       c.Check(err, check.IsNil)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type3] == 1
+       })
+
+       // Check admin-drain behavior: shut down right away, and don't
+       // report as Unallocated.
+       err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
+       c.Check(err, check.IsNil)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type3] == 0
+       })
+       suite.wait(c, pool, notify, func() bool {
+               ivs := suite.instancesByType(pool, type3)
+               return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
+       })
+
+       // Unblock all pending Destroy calls. Pool calls Destroy again
+       // if a node still appears in the provider list after a
+       // previous attempt, so there might be more than 4 Destroy
+       // calls to unblock.
+       go lameInstanceSet.Release(4444)
+
+       // Sync until all instances disappear from the provider list.
+       suite.wait(c, pool, notify, func() bool {
+               pool.getInstancesAndSync()
+               return len(pool.Instances()) == 0
+       })
+}
+
+func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
+       var ivs []InstanceView
+       for _, iv := range pool.Instances() {
+               if iv.ArvadosInstanceType == it.Name {
+                       ivs = append(ivs, iv)
+               }
+       }
+       return ivs
 }
 
 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
@@ -130,13 +185,3 @@ func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, rea
        }
        c.Check(ready(), check.Equals, true)
 }
-
-type stubExecutor struct{}
-
-func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-
-func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
-       return nil, nil, nil
-}
-
-func (*stubExecutor) Close() {}