1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/cloud"
13 "git.arvados.org/arvados.git/lib/config"
14 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
15 "git.arvados.org/arvados.git/sdk/go/arvados"
16 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
19 check "gopkg.in/check.v1"
22 const GiB arvados.ByteSize = 1 << 30
24 var _ = check.Suite(&PoolSuite{})
26 type lessChecker struct {
30 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
31 return params[0].(int) < params[1].(int), ""
34 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
36 type PoolSuite struct {
37 logger logrus.FieldLogger
38 testCluster *arvados.Cluster
41 func (suite *PoolSuite) SetUpTest(c *check.C) {
42 suite.logger = ctxlog.TestLogger(c)
43 cfg, err := config.NewLoader(nil, suite.logger).Load()
44 c.Assert(err, check.IsNil)
45 suite.testCluster, err = cfg.GetCluster("")
46 c.Assert(err, check.IsNil)
49 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
50 type1 := test.InstanceType(1)
51 type2 := test.InstanceType(2)
52 type3 := test.InstanceType(3)
53 waitForIdle := func(pool *Pool, notify <-chan struct{}) {
54 timeout := time.NewTimer(time.Second)
56 instances := pool.Instances()
57 sort.Slice(instances, func(i, j int) bool {
58 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
60 if len(instances) == 3 &&
61 instances[0].ArvadosInstanceType == type1.Name &&
62 instances[0].WorkerState == StateIdle.String() &&
63 instances[1].ArvadosInstanceType == type1.Name &&
64 instances[1].WorkerState == StateIdle.String() &&
65 instances[2].ArvadosInstanceType == type2.Name &&
66 instances[2].WorkerState == StateIdle.String() {
71 c.Logf("pool.Instances() == %#v", instances)
79 driver := &test.StubDriver{}
80 instanceSetID := cloud.InstanceSetID("test-instance-set-id")
81 is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger, nil)
82 c.Assert(err, check.IsNil)
85 newExecutor := func(cloud.Instance) Executor {
87 response: map[string]stubResp{
88 "crunch-run-custom --list": {},
94 suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
95 BootProbeCommand: "true",
96 MaxProbesPerSecond: 1000,
97 ProbeInterval: arvados.Duration(time.Millisecond * 10),
98 SyncInterval: arvados.Duration(time.Millisecond * 10),
99 TagKeyPrefix: "testprefix:",
101 suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
102 suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
108 pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
109 notify := pool.Subscribe()
110 defer pool.Unsubscribe(notify)
114 waitForIdle(pool, notify)
115 var heldInstanceID cloud.InstanceID
116 for _, inst := range pool.Instances() {
117 if inst.ArvadosInstanceType == type2.Name {
118 heldInstanceID = cloud.InstanceID(inst.Instance)
119 pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
122 // Wait for the tags to save to the cloud provider
123 tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
124 deadline := time.Now().Add(time.Second)
127 defer pool.mtx.RUnlock()
128 for _, wkr := range pool.workers {
129 if wkr.instType == type2 {
130 return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
135 if time.Now().After(deadline) {
138 time.Sleep(time.Millisecond * 10)
142 c.Log("------- starting new pool, waiting to recover state")
144 pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
145 notify2 := pool2.Subscribe()
146 defer pool2.Unsubscribe(notify2)
147 waitForIdle(pool2, notify2)
148 for _, inst := range pool2.Instances() {
149 if inst.ArvadosInstanceType == type2.Name {
150 c.Check(inst.Instance, check.Equals, heldInstanceID)
151 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
153 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
159 func (suite *PoolSuite) TestDrain(c *check.C) {
160 driver := test.StubDriver{}
161 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
162 c.Assert(err, check.IsNil)
163 defer instanceSet.Stop()
165 ac := arvados.NewClientFromEnv()
167 type1 := test.InstanceType(1)
170 logger: suite.logger,
171 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
172 cluster: suite.testCluster,
173 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
174 instanceTypes: arvados.InstanceTypeMap{
178 notify := pool.Subscribe()
179 defer pool.Unsubscribe(notify)
183 // Wait for the instance to either return from its Create
184 // call, or show up in a poll.
185 suite.wait(c, pool, notify, func() bool {
187 defer pool.mtx.RUnlock()
188 return len(pool.workers) == 1
193 idleBehavior IdleBehavior
196 {StateIdle, IdleBehaviorHold, false},
197 {StateIdle, IdleBehaviorDrain, false},
198 {StateIdle, IdleBehaviorRun, true},
201 for _, test := range tests {
202 for _, wkr := range pool.workers {
203 wkr.state = test.state
204 wkr.idleBehavior = test.idleBehavior
207 // Try to start a container
208 started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
209 c.Check(started, check.Equals, test.result)
213 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
214 driver := test.StubDriver{HoldCloudOps: true}
215 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
216 c.Assert(err, check.IsNil)
217 defer instanceSet.Stop()
219 type1 := test.InstanceType(1)
221 logger: suite.logger,
222 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
223 cluster: suite.testCluster,
224 maxConcurrentInstanceCreateOps: 1,
225 instanceTypes: arvados.InstanceTypeMap{
230 c.Check(pool.Unallocated()[type1], check.Equals, 0)
231 res := pool.Create(type1)
232 c.Check(pool.Unallocated()[type1], check.Equals, 1)
233 c.Check(res, check.Equals, true)
235 res = pool.Create(type1)
236 c.Check(pool.Unallocated()[type1], check.Equals, 1)
237 c.Check(res, check.Equals, false)
239 pool.instanceSet.throttleCreate.err = nil
240 pool.maxConcurrentInstanceCreateOps = 2
242 res = pool.Create(type1)
243 c.Check(pool.Unallocated()[type1], check.Equals, 2)
244 c.Check(res, check.Equals, true)
246 pool.instanceSet.throttleCreate.err = nil
247 pool.maxConcurrentInstanceCreateOps = 0
249 res = pool.Create(type1)
250 c.Check(pool.Unallocated()[type1], check.Equals, 3)
251 c.Check(res, check.Equals, true)
254 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
255 driver := test.StubDriver{HoldCloudOps: true}
256 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
257 c.Assert(err, check.IsNil)
258 defer instanceSet.Stop()
260 type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
261 type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
262 type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
264 logger: suite.logger,
265 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
266 cluster: suite.testCluster,
267 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
268 instanceTypes: arvados.InstanceTypeMap{
273 instanceInitCommand: "echo 'instance init command goes here'",
275 notify := pool.Subscribe()
276 defer pool.Unsubscribe(notify)
277 notify2 := pool.Subscribe()
278 defer pool.Unsubscribe(notify2)
280 c.Check(pool.Unallocated()[type1], check.Equals, 0)
281 c.Check(pool.Unallocated()[type2], check.Equals, 0)
282 c.Check(pool.Unallocated()[type3], check.Equals, 0)
287 c.Check(pool.Unallocated()[type1], check.Equals, 1)
288 c.Check(pool.Unallocated()[type2], check.Equals, 2)
289 c.Check(pool.Unallocated()[type3], check.Equals, 1)
291 // Unblock the pending Create calls.
292 go driver.ReleaseCloudOps(4)
294 // Wait for each instance to either return from its Create
295 // call, or show up in a poll.
296 suite.wait(c, pool, notify, func() bool {
298 defer pool.mtx.RUnlock()
299 return len(pool.workers) == 4
302 vms := instanceSet.(*test.StubInstanceSet).StubVMs()
303 c.Check(string(vms[0].InitCommand), check.Matches, `umask 0177 && echo -n "[0-9a-f]+" >/var/run/arvados-instance-secret\necho 'instance init command goes here'`)
305 // Place type3 node on admin-hold
306 ivs := suite.instancesByType(pool, type3)
307 c.Assert(ivs, check.HasLen, 1)
308 type3instanceID := ivs[0].Instance
309 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
310 c.Check(err, check.IsNil)
312 // Check admin-hold behavior: refuse to shutdown, and don't
313 // report as Unallocated ("available now or soon").
314 c.Check(pool.Shutdown(type3), check.Equals, false)
315 suite.wait(c, pool, notify, func() bool {
316 return pool.Unallocated()[type3] == 0
318 c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
320 // Shutdown both type2 nodes
321 c.Check(pool.Shutdown(type2), check.Equals, true)
322 suite.wait(c, pool, notify, func() bool {
323 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
325 c.Check(pool.Shutdown(type2), check.Equals, true)
326 suite.wait(c, pool, notify, func() bool {
327 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
329 c.Check(pool.Shutdown(type2), check.Equals, false)
331 // Consume any waiting notifications to ensure the
332 // next one we get is from Shutdown.
341 // Shutdown type1 node
342 c.Check(pool.Shutdown(type1), check.Equals, true)
343 suite.wait(c, pool, notify, func() bool {
344 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
348 case <-time.After(time.Second):
349 c.Error("notify did not receive")
352 // Put type3 node back in service.
353 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
354 c.Check(err, check.IsNil)
355 suite.wait(c, pool, notify, func() bool {
356 return pool.Unallocated()[type3] == 1
359 // Check admin-drain behavior: shut down right away, and don't
360 // report as Unallocated.
361 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
362 c.Check(err, check.IsNil)
363 suite.wait(c, pool, notify, func() bool {
364 return pool.Unallocated()[type3] == 0
366 suite.wait(c, pool, notify, func() bool {
367 ivs := suite.instancesByType(pool, type3)
368 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
371 // Unblock all pending Destroy calls. Pool calls Destroy again
372 // if a node still appears in the provider list after a
373 // previous attempt, so there might be more than 4 Destroy
375 go driver.ReleaseCloudOps(4444)
377 // Sync until all instances disappear from the provider list.
378 suite.wait(c, pool, notify, func() bool {
379 pool.getInstancesAndSync()
380 return len(pool.Instances()) == 0
384 func (suite *PoolSuite) TestInstanceQuotaGroup(c *check.C) {
385 driver := test.StubDriver{}
386 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
387 c.Assert(err, check.IsNil)
388 defer instanceSet.Stop()
390 // Note the stub driver uses the first character of
391 // ProviderType as the instance family, so we have two
392 // instance families here, "a" and "b".
393 typeA1 := test.InstanceType(1)
394 typeA1.ProviderType = "a1"
395 typeA2 := test.InstanceType(2)
396 typeA2.ProviderType = "a2"
397 typeB3 := test.InstanceType(3)
398 typeB3.ProviderType = "b3"
399 typeB4 := test.InstanceType(4)
400 typeB4.ProviderType = "b4"
403 logger: suite.logger,
404 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
405 cluster: suite.testCluster,
406 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
407 instanceTypes: arvados.InstanceTypeMap{
415 // Arrange for the driver to fail when the pool calls
416 // instanceSet.Create().
417 driver.SetupVM = func(*test.StubVM) error { return test.CapacityError{InstanceQuotaGroupSpecific: true} }
418 // pool.Create() returns true when it starts a goroutine to
419 // call instanceSet.Create() in the background.
420 c.Check(pool.Create(typeA1), check.Equals, true)
421 // Wait for the pool to start reporting that the provider is
422 // at capacity for instance type A1.
423 for deadline := time.Now().Add(time.Second); !pool.AtCapacity(typeA1); time.Sleep(time.Millisecond) {
424 if time.Now().After(deadline) {
425 c.Fatal("timed out waiting for pool to report quota")
429 // The pool should now report AtCapacity for the affected
430 // instance family (A1, A2) and refuse to call
431 // instanceSet.Create() for those types -- but other types
432 // (B3, B4) are still usable.
433 driver.SetupVM = func(*test.StubVM) error { return nil }
434 c.Check(pool.AtCapacity(typeA1), check.Equals, true)
435 c.Check(pool.AtCapacity(typeA2), check.Equals, true)
436 c.Check(pool.AtCapacity(typeB3), check.Equals, false)
437 c.Check(pool.AtCapacity(typeB4), check.Equals, false)
438 c.Check(pool.Create(typeA2), check.Equals, false)
439 c.Check(pool.Create(typeB3), check.Equals, true)
440 c.Check(pool.Create(typeB4), check.Equals, true)
441 c.Check(pool.Create(typeA2), check.Equals, false)
442 c.Check(pool.Create(typeA1), check.Equals, false)
445 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
446 var ivs []InstanceView
447 for _, iv := range pool.Instances() {
448 if iv.ArvadosInstanceType == it.Name {
449 ivs = append(ivs, iv)
455 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
456 timeout := time.NewTimer(time.Second).C
465 c.Check(ready(), check.Equals, true)