From d186ff022ee427ded90307d795537c6429431f09 Mon Sep 17 00:00:00 2001 From: Ward Vandewege Date: Thu, 6 Aug 2020 17:16:53 -0400 Subject: [PATCH] 16631: add test Arvados-DCO-1.1-Signed-off-by: Ward Vandewege --- lib/dispatchcloud/worker/pool_test.go | 132 +++++++++++++++++++++++++- 1 file changed, 130 insertions(+), 2 deletions(-) diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go index 1948c1e874..76e4f71a7b 100644 --- a/lib/dispatchcloud/worker/pool_test.go +++ b/lib/dispatchcloud/worker/pool_test.go @@ -72,8 +72,8 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { newExecutor := func(cloud.Instance) Executor { return &stubExecutor{ response: map[string]stubResp{ - "crunch-run --list": stubResp{}, - "true": stubResp{}, + "crunch-run --list": {}, + "true": {}, }, } } @@ -146,6 +146,134 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { pool2.Stop() } +func (suite *PoolSuite) TestDrain(c *check.C) { + type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01} + + waitForIdle := func(pool *Pool, notify <-chan struct{}) { + timeout := time.NewTimer(time.Second) + for { + instances := pool.Instances() + sort.Slice(instances, func(i, j int) bool { + return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0 + }) + if len(instances) == 1 && + instances[0].ArvadosInstanceType == type1.Name && + instances[0].WorkerState == StateIdle.String() { + return + } + select { + case <-timeout.C: + c.Logf("pool.Instances() == %#v", instances) + c.Error("timed out") + return + case <-notify: + } + } + } + + logger := ctxlog.TestLogger(c) + driver := test.StubDriver{HoldCloudOps: true} + instanceSetID := cloud.InstanceSetID("test-instance-set-id") + is, err := driver.InstanceSet(nil, instanceSetID, nil, logger) + c.Assert(err, check.IsNil) + + newExecutor := func(cloud.Instance) Executor { + return &stubExecutor{ + response: map[string]stubResp{ + "crunch-run --list": {}, + "true": {}, + "crunch-run --detach --stdin-env ''": {}, + }, + } + } + + cluster := &arvados.Cluster{ + Containers: arvados.ContainersConfig{ + CloudVMs: arvados.CloudVMsConfig{ + BootProbeCommand: "true", + MaxProbesPerSecond: 1000, + ProbeInterval: arvados.Duration(time.Millisecond * 10), + SyncInterval: arvados.Duration(time.Millisecond * 10), + //TimeoutIdle: arvados.Duration(time.Second), + TagKeyPrefix: "testprefix:", + }, + }, + InstanceTypes: arvados.InstanceTypeMap{ + type1.Name: type1, + }, + } + + pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) + + notify := pool.Subscribe() + defer pool.Unsubscribe(notify) + + c.Check(pool.Unallocated()[type1], check.Equals, 0) + pool.Create(type1) + c.Check(pool.Unallocated()[type1], check.Equals, 1) + + // Unblock the pending Create call. + go driver.ReleaseCloudOps(1) + + // Wait for the instance to either return from its Create + // call, or show up in a poll. + suite.wait(c, pool, notify, func() bool { + pool.mtx.RLock() + defer pool.mtx.RUnlock() + return len(pool.workers) == 1 + }) + + waitForIdle(pool, notify) + + // Start a container on the worker + for _, wkr := range pool.workers { + if wkr.instType == type1 { + wkr.startContainer(arvados.Container{}) + } + } + + ivs := suite.instancesByType(pool, type1) + c.Assert(ivs, check.HasLen, 1) + type1instanceID := ivs[0].Instance + + // Place our node in drain state + err = pool.SetIdleBehavior(type1instanceID, IdleBehaviorDrain) + c.Check(err, check.IsNil) + + waitForIdle(pool, notify) + + ivs = suite.instancesByType(pool, type1) + c.Assert(ivs, check.HasLen, 1) + + // Try to start another container, this should fail because our lone worker has + // IdleBehavior set to Drain + started := pool.StartContainer(type1, arvados.Container{}) + c.Check(started, check.Equals, false) + + // There should be no unallocated workers + suite.wait(c, pool, notify, func() bool { + return pool.Unallocated()[type1] == 0 + }) + + // And our worker should eventually go into state ShutDown + suite.wait(c, pool, notify, func() bool { + ivs := suite.instancesByType(pool, type1) + return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String() + }) + + // Unblock all pending Destroy calls. Pool calls Destroy again + // if a node still appears in the provider list after a + // previous attempt, so there might be more than 1 Destroy + // calls to unblock. + go driver.ReleaseCloudOps(1111) + + // Sync until all instances disappear from the provider list. + suite.wait(c, pool, notify, func() bool { + pool.getInstancesAndSync() + return len(pool.Instances()) == 0 + }) +} + func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { logger := ctxlog.TestLogger(c) driver := test.StubDriver{HoldCloudOps: true} -- 2.30.2