X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/93671366e7633cbf0ca3cab68395e211e3afc31c..c88ffa1a163c929ffa963af3eb1bcdbca1f6b6f2:/lib/dispatchcloud/worker/pool_test.go diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go index f8667e06cd..aa81958434 100644 --- a/lib/dispatchcloud/worker/pool_test.go +++ b/lib/dispatchcloud/worker/pool_test.go @@ -5,13 +5,12 @@ package worker import ( - "io" "time" "git.curoverse.com/arvados.git/lib/cloud" "git.curoverse.com/arvados.git/lib/dispatchcloud/test" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" + "github.com/sirupsen/logrus" check "gopkg.in/check.v1" ) @@ -47,13 +46,15 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)} type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01} type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02} + type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04} pool := &Pool{ logger: logrus.StandardLogger(), - newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, - instanceSet: lameInstanceSet, + newExecutor: func(cloud.Instance) Executor { return stubExecutor{} }, + instanceSet: &throttledInstanceSet{InstanceSet: lameInstanceSet}, instanceTypes: arvados.InstanceTypeMap{ type1.Name: type1, type2.Name: type2, + type3.Name: type3, }, } notify := pool.Subscribe() @@ -63,30 +64,42 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { c.Check(pool.Unallocated()[type1], check.Equals, 0) c.Check(pool.Unallocated()[type2], check.Equals, 0) + c.Check(pool.Unallocated()[type3], check.Equals, 0) pool.Create(type2) pool.Create(type1) pool.Create(type2) + pool.Create(type3) c.Check(pool.Unallocated()[type1], check.Equals, 1) c.Check(pool.Unallocated()[type2], check.Equals, 2) - // Unblock the pending Create calls and (before calling Sync!) - // wait for the pool to process the returned instances. - go lameInstanceSet.Release(3) + c.Check(pool.Unallocated()[type3], check.Equals, 1) + + // Unblock the pending Create calls. + go lameInstanceSet.Release(4) + + // Wait for each instance to either return from its Create + // call, or show up in a poll. suite.wait(c, pool, notify, func() bool { - list, err := lameInstanceSet.Instances(nil) - return err == nil && len(list) == 3 + pool.mtx.RLock() + defer pool.mtx.RUnlock() + return len(pool.workers) == 4 }) - c.Check(pool.Unallocated()[type1], check.Equals, 1) - c.Check(pool.Unallocated()[type2], check.Equals, 2) - pool.getInstancesAndSync() - // Returned counts can be temporarily higher than 1 and 2 if - // poll ran before Create() returned. - c.Check(pool.Unallocated()[type1], check.Not(less), 1) - c.Check(pool.Unallocated()[type2], check.Not(less), 2) + // Place type3 node on admin-hold + ivs := suite.instancesByType(pool, type3) + c.Assert(ivs, check.HasLen, 1) + type3instanceID := ivs[0].Instance + err := pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold) + c.Check(err, check.IsNil) + + // Check admin-hold behavior: refuse to shutdown, and don't + // report as Unallocated ("available now or soon"). + c.Check(pool.Shutdown(type3), check.Equals, false) suite.wait(c, pool, notify, func() bool { - return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 2 + return pool.Unallocated()[type3] == 0 }) + c.Check(suite.instancesByType(pool, type3), check.HasLen, 1) + // Shutdown both type2 nodes c.Check(pool.Shutdown(type2), check.Equals, true) suite.wait(c, pool, notify, func() bool { return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1 @@ -106,16 +119,58 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { } break } + + // Shutdown type1 node c.Check(pool.Shutdown(type1), check.Equals, true) suite.wait(c, pool, notify, func() bool { - return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 + return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0 }) select { case <-notify2: case <-time.After(time.Second): c.Error("notify did not receive") } - go lameInstanceSet.Release(3) // unblock Destroy calls + + // Put type3 node back in service. + err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun) + c.Check(err, check.IsNil) + suite.wait(c, pool, notify, func() bool { + return pool.Unallocated()[type3] == 1 + }) + + // Check admin-drain behavior: shut down right away, and don't + // report as Unallocated. + err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain) + c.Check(err, check.IsNil) + suite.wait(c, pool, notify, func() bool { + return pool.Unallocated()[type3] == 0 + }) + suite.wait(c, pool, notify, func() bool { + ivs := suite.instancesByType(pool, type3) + return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String() + }) + + // Unblock all pending Destroy calls. Pool calls Destroy again + // if a node still appears in the provider list after a + // previous attempt, so there might be more than 4 Destroy + // calls to unblock. + go lameInstanceSet.Release(4444) + + // Sync until all instances disappear from the provider list. + suite.wait(c, pool, notify, func() bool { + pool.getInstancesAndSync() + return len(pool.Instances()) == 0 + }) +} + +func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView { + var ivs []InstanceView + for _, iv := range pool.Instances() { + if iv.ArvadosInstanceType == it.Name { + ivs = append(ivs, iv) + } + } + return ivs } func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) { @@ -130,13 +185,3 @@ func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, rea } c.Check(ready(), check.Equals, true) } - -type stubExecutor struct{} - -func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {} - -func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) { - return nil, nil, nil -} - -func (*stubExecutor) Close() {}