1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/cloud"
13 "git.arvados.org/arvados.git/lib/config"
14 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
15 "git.arvados.org/arvados.git/sdk/go/arvados"
16 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
19 check "gopkg.in/check.v1"
22 const GiB arvados.ByteSize = 1 << 30
24 var _ = check.Suite(&PoolSuite{})
26 type lessChecker struct {
30 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
31 return params[0].(int) < params[1].(int), ""
34 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
36 type PoolSuite struct {
37 logger logrus.FieldLogger
38 testCluster *arvados.Cluster
41 func (suite *PoolSuite) SetUpTest(c *check.C) {
42 suite.logger = ctxlog.TestLogger(c)
43 cfg, err := config.NewLoader(nil, suite.logger).Load()
44 c.Assert(err, check.IsNil)
45 suite.testCluster, err = cfg.GetCluster("")
46 c.Assert(err, check.IsNil)
49 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
50 type1 := test.InstanceType(1)
51 type2 := test.InstanceType(2)
52 type3 := test.InstanceType(3)
53 waitForIdle := func(pool *Pool, notify <-chan struct{}) {
54 timeout := time.NewTimer(time.Second)
56 instances := pool.Instances()
57 sort.Slice(instances, func(i, j int) bool {
58 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
60 if len(instances) == 3 &&
61 instances[0].ArvadosInstanceType == type1.Name &&
62 instances[0].WorkerState == StateIdle.String() &&
63 instances[1].ArvadosInstanceType == type1.Name &&
64 instances[1].WorkerState == StateIdle.String() &&
65 instances[2].ArvadosInstanceType == type2.Name &&
66 instances[2].WorkerState == StateIdle.String() {
71 c.Logf("pool.Instances() == %#v", instances)
79 driver := &test.StubDriver{}
80 instanceSetID := cloud.InstanceSetID("test-instance-set-id")
81 is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
82 c.Assert(err, check.IsNil)
84 newExecutor := func(cloud.Instance) Executor {
86 response: map[string]stubResp{
87 "crunch-run-custom --list": {},
93 suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
94 BootProbeCommand: "true",
95 MaxProbesPerSecond: 1000,
96 ProbeInterval: arvados.Duration(time.Millisecond * 10),
97 SyncInterval: arvados.Duration(time.Millisecond * 10),
98 TagKeyPrefix: "testprefix:",
100 suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
101 suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
107 pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
108 notify := pool.Subscribe()
109 defer pool.Unsubscribe(notify)
113 waitForIdle(pool, notify)
114 var heldInstanceID cloud.InstanceID
115 for _, inst := range pool.Instances() {
116 if inst.ArvadosInstanceType == type2.Name {
117 heldInstanceID = cloud.InstanceID(inst.Instance)
118 pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
121 // Wait for the tags to save to the cloud provider
122 tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
123 deadline := time.Now().Add(time.Second)
126 defer pool.mtx.RUnlock()
127 for _, wkr := range pool.workers {
128 if wkr.instType == type2 {
129 return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
134 if time.Now().After(deadline) {
137 time.Sleep(time.Millisecond * 10)
141 c.Log("------- starting new pool, waiting to recover state")
143 pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
144 notify2 := pool2.Subscribe()
145 defer pool2.Unsubscribe(notify2)
146 waitForIdle(pool2, notify2)
147 for _, inst := range pool2.Instances() {
148 if inst.ArvadosInstanceType == type2.Name {
149 c.Check(inst.Instance, check.Equals, heldInstanceID)
150 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
152 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
158 func (suite *PoolSuite) TestDrain(c *check.C) {
159 driver := test.StubDriver{}
160 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
161 c.Assert(err, check.IsNil)
163 ac := arvados.NewClientFromEnv()
165 type1 := test.InstanceType(1)
168 logger: suite.logger,
169 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
170 cluster: suite.testCluster,
171 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
172 instanceTypes: arvados.InstanceTypeMap{
176 notify := pool.Subscribe()
177 defer pool.Unsubscribe(notify)
181 // Wait for the instance to either return from its Create
182 // call, or show up in a poll.
183 suite.wait(c, pool, notify, func() bool {
185 defer pool.mtx.RUnlock()
186 return len(pool.workers) == 1
191 idleBehavior IdleBehavior
194 {StateIdle, IdleBehaviorHold, false},
195 {StateIdle, IdleBehaviorDrain, false},
196 {StateIdle, IdleBehaviorRun, true},
199 for _, test := range tests {
200 for _, wkr := range pool.workers {
201 wkr.state = test.state
202 wkr.idleBehavior = test.idleBehavior
205 // Try to start a container
206 started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
207 c.Check(started, check.Equals, test.result)
211 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
212 driver := test.StubDriver{HoldCloudOps: true}
213 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
214 c.Assert(err, check.IsNil)
216 type1 := test.InstanceType(1)
218 logger: suite.logger,
219 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
220 cluster: suite.testCluster,
221 maxConcurrentInstanceCreateOps: 1,
222 instanceTypes: arvados.InstanceTypeMap{
227 c.Check(pool.Unallocated()[type1], check.Equals, 0)
228 res := pool.Create(type1)
229 c.Check(pool.Unallocated()[type1], check.Equals, 1)
230 c.Check(res, check.Equals, true)
232 res = pool.Create(type1)
233 c.Check(pool.Unallocated()[type1], check.Equals, 1)
234 c.Check(res, check.Equals, false)
236 pool.instanceSet.throttleCreate.err = nil
237 pool.maxConcurrentInstanceCreateOps = 2
239 res = pool.Create(type1)
240 c.Check(pool.Unallocated()[type1], check.Equals, 2)
241 c.Check(res, check.Equals, true)
243 pool.instanceSet.throttleCreate.err = nil
244 pool.maxConcurrentInstanceCreateOps = 0
246 res = pool.Create(type1)
247 c.Check(pool.Unallocated()[type1], check.Equals, 3)
248 c.Check(res, check.Equals, true)
251 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
252 driver := test.StubDriver{HoldCloudOps: true}
253 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
254 c.Assert(err, check.IsNil)
256 type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
257 type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
258 type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
260 logger: suite.logger,
261 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
262 cluster: suite.testCluster,
263 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
264 instanceTypes: arvados.InstanceTypeMap{
270 notify := pool.Subscribe()
271 defer pool.Unsubscribe(notify)
272 notify2 := pool.Subscribe()
273 defer pool.Unsubscribe(notify2)
275 c.Check(pool.Unallocated()[type1], check.Equals, 0)
276 c.Check(pool.Unallocated()[type2], check.Equals, 0)
277 c.Check(pool.Unallocated()[type3], check.Equals, 0)
282 c.Check(pool.Unallocated()[type1], check.Equals, 1)
283 c.Check(pool.Unallocated()[type2], check.Equals, 2)
284 c.Check(pool.Unallocated()[type3], check.Equals, 1)
286 // Unblock the pending Create calls.
287 go driver.ReleaseCloudOps(4)
289 // Wait for each instance to either return from its Create
290 // call, or show up in a poll.
291 suite.wait(c, pool, notify, func() bool {
293 defer pool.mtx.RUnlock()
294 return len(pool.workers) == 4
297 // Place type3 node on admin-hold
298 ivs := suite.instancesByType(pool, type3)
299 c.Assert(ivs, check.HasLen, 1)
300 type3instanceID := ivs[0].Instance
301 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
302 c.Check(err, check.IsNil)
304 // Check admin-hold behavior: refuse to shutdown, and don't
305 // report as Unallocated ("available now or soon").
306 c.Check(pool.Shutdown(type3), check.Equals, false)
307 suite.wait(c, pool, notify, func() bool {
308 return pool.Unallocated()[type3] == 0
310 c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
312 // Shutdown both type2 nodes
313 c.Check(pool.Shutdown(type2), check.Equals, true)
314 suite.wait(c, pool, notify, func() bool {
315 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
317 c.Check(pool.Shutdown(type2), check.Equals, true)
318 suite.wait(c, pool, notify, func() bool {
319 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
321 c.Check(pool.Shutdown(type2), check.Equals, false)
323 // Consume any waiting notifications to ensure the
324 // next one we get is from Shutdown.
333 // Shutdown type1 node
334 c.Check(pool.Shutdown(type1), check.Equals, true)
335 suite.wait(c, pool, notify, func() bool {
336 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
340 case <-time.After(time.Second):
341 c.Error("notify did not receive")
344 // Put type3 node back in service.
345 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
346 c.Check(err, check.IsNil)
347 suite.wait(c, pool, notify, func() bool {
348 return pool.Unallocated()[type3] == 1
351 // Check admin-drain behavior: shut down right away, and don't
352 // report as Unallocated.
353 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
354 c.Check(err, check.IsNil)
355 suite.wait(c, pool, notify, func() bool {
356 return pool.Unallocated()[type3] == 0
358 suite.wait(c, pool, notify, func() bool {
359 ivs := suite.instancesByType(pool, type3)
360 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
363 // Unblock all pending Destroy calls. Pool calls Destroy again
364 // if a node still appears in the provider list after a
365 // previous attempt, so there might be more than 4 Destroy
367 go driver.ReleaseCloudOps(4444)
369 // Sync until all instances disappear from the provider list.
370 suite.wait(c, pool, notify, func() bool {
371 pool.getInstancesAndSync()
372 return len(pool.Instances()) == 0
376 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
377 var ivs []InstanceView
378 for _, iv := range pool.Instances() {
379 if iv.ArvadosInstanceType == it.Name {
380 ivs = append(ivs, iv)
386 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
387 timeout := time.NewTimer(time.Second).C
396 c.Check(ready(), check.Equals, true)