14325: Test resuming worker pool state after restart.
authorTom Clegg <tclegg@veritasgenetics.com>
Fri, 25 Jan 2019 20:24:19 +0000 (15:24 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Fri, 25 Jan 2019 20:24:19 +0000 (15:24 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/worker/pool_test.go
lib/dispatchcloud/worker/worker.go

index 6a6cdc423827573a5810746474bebc2b0791310b..60e21b716b90dc5a45f5b4e187841c12a2b3536d 100644 (file)
@@ -5,11 +5,14 @@
 package worker
 
 import (
+       "sort"
+       "strings"
        "time"
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
@@ -34,6 +37,97 @@ func (suite *PoolSuite) SetUpSuite(c *check.C) {
        logrus.StandardLogger().SetLevel(logrus.DebugLevel)
 }
 
+func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
+       type1 := test.InstanceType(1)
+       type2 := test.InstanceType(2)
+       type3 := test.InstanceType(3)
+       waitForIdle := func(pool *Pool, notify <-chan struct{}) {
+               timeout := time.NewTimer(time.Second)
+               for {
+                       instances := pool.Instances()
+                       sort.Slice(instances, func(i, j int) bool {
+                               return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
+                       })
+                       if len(instances) == 3 &&
+                               instances[0].ArvadosInstanceType == type1.Name &&
+                               instances[0].WorkerState == StateIdle.String() &&
+                               instances[1].ArvadosInstanceType == type1.Name &&
+                               instances[1].WorkerState == StateIdle.String() &&
+                               instances[2].ArvadosInstanceType == type2.Name &&
+                               instances[2].WorkerState == StateIdle.String() {
+                               return
+                       }
+                       select {
+                       case <-timeout.C:
+                               c.Logf("pool.Instances() == %#v", instances)
+                               c.Error("timed out")
+                               return
+                       case <-notify:
+                       }
+               }
+       }
+
+       logger := logrus.StandardLogger()
+       driver := &test.StubDriver{}
+       is, err := driver.InstanceSet(nil, "", logger)
+       c.Assert(err, check.IsNil)
+
+       newExecutor := func(cloud.Instance) Executor {
+               return stubExecutor{
+                       "crunch-run --list": stubResp{},
+                       "true":              stubResp{},
+               }
+       }
+
+       cluster := &arvados.Cluster{
+               Dispatch: arvados.Dispatch{
+                       MaxProbesPerSecond: 1000,
+                       ProbeInterval:      arvados.Duration(time.Millisecond * 10),
+               },
+               CloudVMs: arvados.CloudVMs{
+                       BootProbeCommand: "true",
+                       SyncInterval:     arvados.Duration(time.Millisecond * 10),
+               },
+               InstanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+                       type2.Name: type2,
+                       type3.Name: type3,
+               },
+       }
+
+       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       notify := pool.Subscribe()
+       defer pool.Unsubscribe(notify)
+       pool.Create(type1)
+       pool.Create(type1)
+       pool.Create(type2)
+       waitForIdle(pool, notify)
+       var heldInstanceID cloud.InstanceID
+       for _, inst := range pool.Instances() {
+               if inst.ArvadosInstanceType == type2.Name {
+                       heldInstanceID = cloud.InstanceID(inst.Instance)
+                       pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
+               }
+       }
+       pool.Stop()
+
+       c.Log("------- starting new pool, waiting to recover state")
+
+       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       notify2 := pool2.Subscribe()
+       defer pool2.Unsubscribe(notify2)
+       waitForIdle(pool2, notify2)
+       for _, inst := range pool2.Instances() {
+               if inst.ArvadosInstanceType == type2.Name {
+                       c.Check(inst.Instance, check.Equals, heldInstanceID)
+                       c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
+               } else {
+                       c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
+               }
+       }
+       pool2.Stop()
+}
+
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
        type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
index a75d2bbb88611225c709a98f151dded697b5974c..baa56addeaab2cac4bbb0acb2a804d6828f7496a 100644 (file)
@@ -57,8 +57,8 @@ type IdleBehavior string
 
 const (
        IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
-       IdleBehaviorHold               = "hold"  // don't shutdown or run more containers
-       IdleBehaviorDrain              = "drain" // shutdown immediately when idle
+       IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
+       IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
 )
 
 var validIdleBehavior = map[IdleBehavior]bool{