package worker
import (
+ "sort"
+ "strings"
"time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
logrus.StandardLogger().SetLevel(logrus.DebugLevel)
}
+func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
+ type1 := test.InstanceType(1)
+ type2 := test.InstanceType(2)
+ type3 := test.InstanceType(3)
+ waitForIdle := func(pool *Pool, notify <-chan struct{}) {
+ timeout := time.NewTimer(time.Second)
+ for {
+ instances := pool.Instances()
+ sort.Slice(instances, func(i, j int) bool {
+ return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
+ })
+ if len(instances) == 3 &&
+ instances[0].ArvadosInstanceType == type1.Name &&
+ instances[0].WorkerState == StateIdle.String() &&
+ instances[1].ArvadosInstanceType == type1.Name &&
+ instances[1].WorkerState == StateIdle.String() &&
+ instances[2].ArvadosInstanceType == type2.Name &&
+ instances[2].WorkerState == StateIdle.String() {
+ return
+ }
+ select {
+ case <-timeout.C:
+ c.Logf("pool.Instances() == %#v", instances)
+ c.Error("timed out")
+ return
+ case <-notify:
+ }
+ }
+ }
+
+ logger := logrus.StandardLogger()
+ driver := &test.StubDriver{}
+ is, err := driver.InstanceSet(nil, "", logger)
+ c.Assert(err, check.IsNil)
+
+ newExecutor := func(cloud.Instance) Executor {
+ return stubExecutor{
+ "crunch-run --list": stubResp{},
+ "true": stubResp{},
+ }
+ }
+
+ cluster := &arvados.Cluster{
+ Dispatch: arvados.Dispatch{
+ MaxProbesPerSecond: 1000,
+ ProbeInterval: arvados.Duration(time.Millisecond * 10),
+ },
+ CloudVMs: arvados.CloudVMs{
+ BootProbeCommand: "true",
+ SyncInterval: arvados.Duration(time.Millisecond * 10),
+ },
+ InstanceTypes: arvados.InstanceTypeMap{
+ type1.Name: type1,
+ type2.Name: type2,
+ type3.Name: type3,
+ },
+ }
+
+ pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ notify := pool.Subscribe()
+ defer pool.Unsubscribe(notify)
+ pool.Create(type1)
+ pool.Create(type1)
+ pool.Create(type2)
+ waitForIdle(pool, notify)
+ var heldInstanceID cloud.InstanceID
+ for _, inst := range pool.Instances() {
+ if inst.ArvadosInstanceType == type2.Name {
+ heldInstanceID = cloud.InstanceID(inst.Instance)
+ pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
+ }
+ }
+ pool.Stop()
+
+ c.Log("------- starting new pool, waiting to recover state")
+
+ pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ notify2 := pool2.Subscribe()
+ defer pool2.Unsubscribe(notify2)
+ waitForIdle(pool2, notify2)
+ for _, inst := range pool2.Instances() {
+ if inst.ArvadosInstanceType == type2.Name {
+ c.Check(inst.Instance, check.Equals, heldInstanceID)
+ c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
+ } else {
+ c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
+ }
+ }
+ pool2.Stop()
+}
+
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}