16631: add test
authorWard Vandewege <ward@curii.com>
Thu, 6 Aug 2020 21:16:53 +0000 (17:16 -0400)
committerWard Vandewege <ward@curii.com>
Thu, 6 Aug 2020 21:16:53 +0000 (17:16 -0400)
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward@curii.com>

lib/dispatchcloud/worker/pool_test.go

index 1948c1e874859f2d8355115b3671f2c5ef0ae32d..76e4f71a7bd58bf4a06685611b39df12fbf0a947 100644 (file)
@@ -72,8 +72,8 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        newExecutor := func(cloud.Instance) Executor {
                return &stubExecutor{
                        response: map[string]stubResp{
-                               "crunch-run --list": stubResp{},
-                               "true":              stubResp{},
+                               "crunch-run --list": {},
+                               "true":              {},
                        },
                }
        }
@@ -146,6 +146,134 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        pool2.Stop()
 }
 
+func (suite *PoolSuite) TestDrain(c *check.C) {
+       type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
+
+       waitForIdle := func(pool *Pool, notify <-chan struct{}) {
+               timeout := time.NewTimer(time.Second)
+               for {
+                       instances := pool.Instances()
+                       sort.Slice(instances, func(i, j int) bool {
+                               return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
+                       })
+                       if len(instances) == 1 &&
+                               instances[0].ArvadosInstanceType == type1.Name &&
+                               instances[0].WorkerState == StateIdle.String() {
+                               return
+                       }
+                       select {
+                       case <-timeout.C:
+                               c.Logf("pool.Instances() == %#v", instances)
+                               c.Error("timed out")
+                               return
+                       case <-notify:
+                       }
+               }
+       }
+
+       logger := ctxlog.TestLogger(c)
+       driver := test.StubDriver{HoldCloudOps: true}
+       instanceSetID := cloud.InstanceSetID("test-instance-set-id")
+       is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+       c.Assert(err, check.IsNil)
+
+       newExecutor := func(cloud.Instance) Executor {
+               return &stubExecutor{
+                       response: map[string]stubResp{
+                               "crunch-run --list":                  {},
+                               "true":                               {},
+                               "crunch-run --detach --stdin-env ''": {},
+                       },
+               }
+       }
+
+       cluster := &arvados.Cluster{
+               Containers: arvados.ContainersConfig{
+                       CloudVMs: arvados.CloudVMsConfig{
+                               BootProbeCommand:   "true",
+                               MaxProbesPerSecond: 1000,
+                               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
+                               SyncInterval:       arvados.Duration(time.Millisecond * 10),
+                               //TimeoutIdle:        arvados.Duration(time.Second),
+                               TagKeyPrefix: "testprefix:",
+                       },
+               },
+               InstanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+               },
+       }
+
+       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+
+       notify := pool.Subscribe()
+       defer pool.Unsubscribe(notify)
+
+       c.Check(pool.Unallocated()[type1], check.Equals, 0)
+       pool.Create(type1)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+
+       // Unblock the pending Create call.
+       go driver.ReleaseCloudOps(1)
+
+       // Wait for the instance to either return from its Create
+       // call, or show up in a poll.
+       suite.wait(c, pool, notify, func() bool {
+               pool.mtx.RLock()
+               defer pool.mtx.RUnlock()
+               return len(pool.workers) == 1
+       })
+
+       waitForIdle(pool, notify)
+
+       // Start a container on the worker
+       for _, wkr := range pool.workers {
+               if wkr.instType == type1 {
+                       wkr.startContainer(arvados.Container{})
+               }
+       }
+
+       ivs := suite.instancesByType(pool, type1)
+       c.Assert(ivs, check.HasLen, 1)
+       type1instanceID := ivs[0].Instance
+
+       // Place our node in drain state
+       err = pool.SetIdleBehavior(type1instanceID, IdleBehaviorDrain)
+       c.Check(err, check.IsNil)
+
+       waitForIdle(pool, notify)
+
+       ivs = suite.instancesByType(pool, type1)
+       c.Assert(ivs, check.HasLen, 1)
+
+       // Try to start another container, this should fail because our lone worker has
+       // IdleBehavior set to Drain
+       started := pool.StartContainer(type1, arvados.Container{})
+       c.Check(started, check.Equals, false)
+
+       // There should be no unallocated workers
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 0
+       })
+
+       // And our worker should eventually go into state ShutDown
+       suite.wait(c, pool, notify, func() bool {
+               ivs := suite.instancesByType(pool, type1)
+               return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
+       })
+
+       // Unblock all pending Destroy calls. Pool calls Destroy again
+       // if a node still appears in the provider list after a
+       // previous attempt, so there might be more than 1 Destroy
+       // calls to unblock.
+       go driver.ReleaseCloudOps(1111)
+
+       // Sync until all instances disappear from the provider list.
+       suite.wait(c, pool, notify, func() bool {
+               pool.getInstancesAndSync()
+               return len(pool.Instances()) == 0
+       })
+}
+
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}