"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
-type PoolSuite struct{}
+type PoolSuite struct {
+ logger logrus.FieldLogger
+ testCluster *arvados.Cluster
+}
+
+func (suite *PoolSuite) SetUpTest(c *check.C) {
+ suite.logger = ctxlog.TestLogger(c)
+ cfg, err := config.NewLoader(nil, suite.logger).Load()
+ c.Assert(err, check.IsNil)
+ suite.testCluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+}
func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
type1 := test.InstanceType(1)
}
}
- logger := ctxlog.TestLogger(c)
driver := &test.StubDriver{}
instanceSetID := cloud.InstanceSetID("test-instance-set-id")
- is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+ is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
return &stubExecutor{
response: map[string]stubResp{
- "crunch-run --list": {},
- "true": {},
+ "crunch-run-custom --list": {},
+ "true": {},
},
}
}
- cluster := &arvados.Cluster{
- Containers: arvados.ContainersConfig{
- CloudVMs: arvados.CloudVMsConfig{
- BootProbeCommand: "true",
- MaxProbesPerSecond: 1000,
- ProbeInterval: arvados.Duration(time.Millisecond * 10),
- SyncInterval: arvados.Duration(time.Millisecond * 10),
- TagKeyPrefix: "testprefix:",
- },
- },
- InstanceTypes: arvados.InstanceTypeMap{
- type1.Name: type1,
- type2.Name: type2,
- type3.Name: type3,
- },
+ suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
+ BootProbeCommand: "true",
+ MaxProbesPerSecond: 1000,
+ ProbeInterval: arvados.Duration(time.Millisecond * 10),
+ SyncInterval: arvados.Duration(time.Millisecond * 10),
+ TagKeyPrefix: "testprefix:",
+ }
+ suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
+ suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
+ type1.Name: type1,
+ type2.Name: type2,
+ type3.Name: type3,
}
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+ pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
pool.Create(type1)
}
}
// Wait for the tags to save to the cloud provider
- tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+ tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
deadline := time.Now().Add(time.Second)
for !func() bool {
pool.mtx.RLock()
c.Log("------- starting new pool, waiting to recover state")
- pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+ pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
notify2 := pool2.Subscribe()
defer pool2.Unsubscribe(notify2)
waitForIdle(pool2, notify2)
}
func (suite *PoolSuite) TestDrain(c *check.C) {
- type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
-
- waitForIdle := func(pool *Pool, notify <-chan struct{}) {
- timeout := time.NewTimer(time.Second)
- for {
- instances := pool.Instances()
- sort.Slice(instances, func(i, j int) bool {
- return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
- })
- if len(instances) == 1 &&
- instances[0].ArvadosInstanceType == type1.Name &&
- instances[0].WorkerState == StateIdle.String() {
- return
- }
- select {
- case <-timeout.C:
- c.Logf("pool.Instances() == %#v", instances)
- c.Error("timed out")
- return
- case <-notify:
- }
- }
- }
-
- logger := ctxlog.TestLogger(c)
- driver := test.StubDriver{HoldCloudOps: true}
- instanceSetID := cloud.InstanceSetID("test-instance-set-id")
- is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+ driver := test.StubDriver{}
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
- newExecutor := func(cloud.Instance) Executor {
- return &stubExecutor{
- response: map[string]stubResp{
- "crunch-run --list": {},
- "true": {},
- "crunch-run --detach --stdin-env ''": {},
- },
- }
- }
+ ac := arvados.NewClientFromEnv()
- cluster := &arvados.Cluster{
- Containers: arvados.ContainersConfig{
- CloudVMs: arvados.CloudVMsConfig{
- BootProbeCommand: "true",
- MaxProbesPerSecond: 1000,
- ProbeInterval: arvados.Duration(time.Millisecond * 10),
- SyncInterval: arvados.Duration(time.Millisecond * 10),
- //TimeoutIdle: arvados.Duration(time.Second),
- TagKeyPrefix: "testprefix:",
- },
- },
- InstanceTypes: arvados.InstanceTypeMap{
+ type1 := test.InstanceType(1)
+ pool := &Pool{
+ arvClient: ac,
+ logger: suite.logger,
+ newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+ cluster: suite.testCluster,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
},
}
-
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
-
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
- c.Check(pool.Unallocated()[type1], check.Equals, 0)
pool.Create(type1)
- c.Check(pool.Unallocated()[type1], check.Equals, 1)
-
- // Unblock the pending Create call.
- go driver.ReleaseCloudOps(1)
// Wait for the instance to either return from its Create
// call, or show up in a poll.
return len(pool.workers) == 1
})
- waitForIdle(pool, notify)
+ tests := []struct {
+ state State
+ idleBehavior IdleBehavior
+ result bool
+ }{
+ {StateIdle, IdleBehaviorHold, false},
+ {StateIdle, IdleBehaviorDrain, false},
+ {StateIdle, IdleBehaviorRun, true},
+ }
- // Start a container on the worker
- for _, wkr := range pool.workers {
- if wkr.instType == type1 {
- wkr.startContainer(arvados.Container{})
+ for _, test := range tests {
+ for _, wkr := range pool.workers {
+ wkr.state = test.state
+ wkr.idleBehavior = test.idleBehavior
}
- }
- ivs := suite.instancesByType(pool, type1)
- c.Assert(ivs, check.HasLen, 1)
- type1instanceID := ivs[0].Instance
+ // Try to start a container
+ started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
+ c.Check(started, check.Equals, test.result)
+ }
+}
- // Place our node in drain state
- err = pool.SetIdleBehavior(type1instanceID, IdleBehaviorDrain)
- c.Check(err, check.IsNil)
+func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
+ driver := test.StubDriver{HoldCloudOps: true}
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
+ c.Assert(err, check.IsNil)
- waitForIdle(pool, notify)
+ type1 := test.InstanceType(1)
+ pool := &Pool{
+ logger: suite.logger,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ cluster: suite.testCluster,
+ maxConcurrentInstanceCreateOps: 1,
+ instanceTypes: arvados.InstanceTypeMap{
+ type1.Name: type1,
+ },
+ }
- ivs = suite.instancesByType(pool, type1)
- c.Assert(ivs, check.HasLen, 1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 0)
+ res := pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 1)
+ c.Check(res, check.Equals, true)
- // Try to start another container, this should fail because our lone worker has
- // IdleBehavior set to Drain
- started := pool.StartContainer(type1, arvados.Container{})
- c.Check(started, check.Equals, false)
+ res = pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 1)
+ c.Check(res, check.Equals, false)
- // There should be no unallocated workers
- suite.wait(c, pool, notify, func() bool {
- return pool.Unallocated()[type1] == 0
- })
+ pool.instanceSet.throttleCreate.err = nil
+ pool.maxConcurrentInstanceCreateOps = 2
- // And our worker should eventually go into state ShutDown
- suite.wait(c, pool, notify, func() bool {
- ivs := suite.instancesByType(pool, type1)
- return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
- })
+ res = pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 2)
+ c.Check(res, check.Equals, true)
- // Unblock all pending Destroy calls. Pool calls Destroy again
- // if a node still appears in the provider list after a
- // previous attempt, so there might be more than 1 Destroy
- // calls to unblock.
- go driver.ReleaseCloudOps(1111)
+ pool.instanceSet.throttleCreate.err = nil
+ pool.maxConcurrentInstanceCreateOps = 0
- // Sync until all instances disappear from the provider list.
- suite.wait(c, pool, notify, func() bool {
- pool.getInstancesAndSync()
- return len(pool.Instances()) == 0
- })
+ res = pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 3)
+ c.Check(res, check.Equals, true)
}
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
- logger: logger,
+ logger: suite.logger,
newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+ cluster: suite.testCluster,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,