1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/cloud"
13 "git.arvados.org/arvados.git/lib/config"
14 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
15 "git.arvados.org/arvados.git/sdk/go/arvados"
16 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
19 check "gopkg.in/check.v1"
22 const GiB arvados.ByteSize = 1 << 30
24 var _ = check.Suite(&PoolSuite{})
26 type lessChecker struct {
30 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
31 return params[0].(int) < params[1].(int), ""
34 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
36 type PoolSuite struct {
37 logger logrus.FieldLogger
38 testCluster *arvados.Cluster
41 func (suite *PoolSuite) SetUpTest(c *check.C) {
42 suite.logger = ctxlog.TestLogger(c)
43 cfg, err := config.NewLoader(nil, suite.logger).Load()
44 c.Assert(err, check.IsNil)
45 suite.testCluster, err = cfg.GetCluster("")
46 c.Assert(err, check.IsNil)
49 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
50 type1 := test.InstanceType(1)
51 type2 := test.InstanceType(2)
52 type3 := test.InstanceType(3)
53 waitForIdle := func(pool *Pool, notify <-chan struct{}) {
54 timeout := time.NewTimer(time.Second)
56 instances := pool.Instances()
57 sort.Slice(instances, func(i, j int) bool {
58 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
60 if len(instances) == 3 &&
61 instances[0].ArvadosInstanceType == type1.Name &&
62 instances[0].WorkerState == StateIdle.String() &&
63 instances[1].ArvadosInstanceType == type1.Name &&
64 instances[1].WorkerState == StateIdle.String() &&
65 instances[2].ArvadosInstanceType == type2.Name &&
66 instances[2].WorkerState == StateIdle.String() {
71 c.Logf("pool.Instances() == %#v", instances)
79 driver := &test.StubDriver{}
80 instanceSetID := cloud.InstanceSetID("test-instance-set-id")
81 is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
82 c.Assert(err, check.IsNil)
84 newExecutor := func(cloud.Instance) Executor {
86 response: map[string]stubResp{
87 "crunch-run-custom --list": {},
93 suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
94 BootProbeCommand: "true",
95 MaxProbesPerSecond: 1000,
96 ProbeInterval: arvados.Duration(time.Millisecond * 10),
97 SyncInterval: arvados.Duration(time.Millisecond * 10),
98 TagKeyPrefix: "testprefix:",
100 suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
101 suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
107 pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
108 notify := pool.Subscribe()
109 defer pool.Unsubscribe(notify)
113 waitForIdle(pool, notify)
114 var heldInstanceID cloud.InstanceID
115 for _, inst := range pool.Instances() {
116 if inst.ArvadosInstanceType == type2.Name {
117 heldInstanceID = cloud.InstanceID(inst.Instance)
118 pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
121 // Wait for the tags to save to the cloud provider
122 tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
123 deadline := time.Now().Add(time.Second)
126 defer pool.mtx.RUnlock()
127 for _, wkr := range pool.workers {
128 if wkr.instType == type2 {
129 return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
134 if time.Now().After(deadline) {
137 time.Sleep(time.Millisecond * 10)
141 c.Log("------- starting new pool, waiting to recover state")
143 pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
144 notify2 := pool2.Subscribe()
145 defer pool2.Unsubscribe(notify2)
146 waitForIdle(pool2, notify2)
147 for _, inst := range pool2.Instances() {
148 if inst.ArvadosInstanceType == type2.Name {
149 c.Check(inst.Instance, check.Equals, heldInstanceID)
150 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
152 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
158 func (suite *PoolSuite) TestDrain(c *check.C) {
159 driver := test.StubDriver{}
160 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
161 c.Assert(err, check.IsNil)
163 ac := arvados.NewClientFromEnv()
165 type1 := test.InstanceType(1)
168 logger: suite.logger,
169 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
170 cluster: suite.testCluster,
171 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
172 instanceTypes: arvados.InstanceTypeMap{
176 notify := pool.Subscribe()
177 defer pool.Unsubscribe(notify)
181 // Wait for the instance to either return from its Create
182 // call, or show up in a poll.
183 suite.wait(c, pool, notify, func() bool {
185 defer pool.mtx.RUnlock()
186 return len(pool.workers) == 1
191 idleBehavior IdleBehavior
194 {StateIdle, IdleBehaviorHold, false},
195 {StateIdle, IdleBehaviorDrain, false},
196 {StateIdle, IdleBehaviorRun, true},
199 for _, test := range tests {
200 for _, wkr := range pool.workers {
201 wkr.state = test.state
202 wkr.idleBehavior = test.idleBehavior
205 // Try to start a container
206 started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
207 c.Check(started, check.Equals, test.result)
211 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
212 driver := test.StubDriver{HoldCloudOps: true}
213 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
214 c.Assert(err, check.IsNil)
216 type1 := test.InstanceType(1)
218 logger: suite.logger,
219 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
220 cluster: suite.testCluster,
221 maxConcurrentInstanceCreateOps: 1,
222 instanceTypes: arvados.InstanceTypeMap{
227 c.Check(pool.Unallocated()[type1], check.Equals, 0)
228 res := pool.Create(type1)
229 c.Check(pool.Unallocated()[type1], check.Equals, 1)
230 c.Check(res, check.Equals, true)
232 res = pool.Create(type1)
233 c.Check(pool.Unallocated()[type1], check.Equals, 1)
234 c.Check(res, check.Equals, false)
236 pool.instanceSet.throttleCreate.err = nil
237 pool.maxConcurrentInstanceCreateOps = 2
239 res = pool.Create(type1)
240 c.Check(pool.Unallocated()[type1], check.Equals, 2)
241 c.Check(res, check.Equals, true)
243 pool.instanceSet.throttleCreate.err = nil
244 pool.maxConcurrentInstanceCreateOps = 0
246 res = pool.Create(type1)
247 c.Check(pool.Unallocated()[type1], check.Equals, 3)
248 c.Check(res, check.Equals, true)
251 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
252 driver := test.StubDriver{HoldCloudOps: true}
253 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
254 c.Assert(err, check.IsNil)
256 type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
257 type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
258 type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
260 logger: suite.logger,
261 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
262 cluster: suite.testCluster,
263 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
264 instanceTypes: arvados.InstanceTypeMap{
269 instanceInitCommand: "echo 'instance init command goes here'",
271 notify := pool.Subscribe()
272 defer pool.Unsubscribe(notify)
273 notify2 := pool.Subscribe()
274 defer pool.Unsubscribe(notify2)
276 c.Check(pool.Unallocated()[type1], check.Equals, 0)
277 c.Check(pool.Unallocated()[type2], check.Equals, 0)
278 c.Check(pool.Unallocated()[type3], check.Equals, 0)
283 c.Check(pool.Unallocated()[type1], check.Equals, 1)
284 c.Check(pool.Unallocated()[type2], check.Equals, 2)
285 c.Check(pool.Unallocated()[type3], check.Equals, 1)
287 // Unblock the pending Create calls.
288 go driver.ReleaseCloudOps(4)
290 // Wait for each instance to either return from its Create
291 // call, or show up in a poll.
292 suite.wait(c, pool, notify, func() bool {
294 defer pool.mtx.RUnlock()
295 return len(pool.workers) == 4
298 vms := instanceSet.(*test.StubInstanceSet).StubVMs()
299 c.Check(string(vms[0].InitCommand), check.Matches, `umask 0177 && echo -n "[0-9a-f]+" >/var/run/arvados-instance-secret\necho 'instance init command goes here'`)
301 // Place type3 node on admin-hold
302 ivs := suite.instancesByType(pool, type3)
303 c.Assert(ivs, check.HasLen, 1)
304 type3instanceID := ivs[0].Instance
305 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
306 c.Check(err, check.IsNil)
308 // Check admin-hold behavior: refuse to shutdown, and don't
309 // report as Unallocated ("available now or soon").
310 c.Check(pool.Shutdown(type3), check.Equals, false)
311 suite.wait(c, pool, notify, func() bool {
312 return pool.Unallocated()[type3] == 0
314 c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
316 // Shutdown both type2 nodes
317 c.Check(pool.Shutdown(type2), check.Equals, true)
318 suite.wait(c, pool, notify, func() bool {
319 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
321 c.Check(pool.Shutdown(type2), check.Equals, true)
322 suite.wait(c, pool, notify, func() bool {
323 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
325 c.Check(pool.Shutdown(type2), check.Equals, false)
327 // Consume any waiting notifications to ensure the
328 // next one we get is from Shutdown.
337 // Shutdown type1 node
338 c.Check(pool.Shutdown(type1), check.Equals, true)
339 suite.wait(c, pool, notify, func() bool {
340 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
344 case <-time.After(time.Second):
345 c.Error("notify did not receive")
348 // Put type3 node back in service.
349 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
350 c.Check(err, check.IsNil)
351 suite.wait(c, pool, notify, func() bool {
352 return pool.Unallocated()[type3] == 1
355 // Check admin-drain behavior: shut down right away, and don't
356 // report as Unallocated.
357 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
358 c.Check(err, check.IsNil)
359 suite.wait(c, pool, notify, func() bool {
360 return pool.Unallocated()[type3] == 0
362 suite.wait(c, pool, notify, func() bool {
363 ivs := suite.instancesByType(pool, type3)
364 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
367 // Unblock all pending Destroy calls. Pool calls Destroy again
368 // if a node still appears in the provider list after a
369 // previous attempt, so there might be more than 4 Destroy
371 go driver.ReleaseCloudOps(4444)
373 // Sync until all instances disappear from the provider list.
374 suite.wait(c, pool, notify, func() bool {
375 pool.getInstancesAndSync()
376 return len(pool.Instances()) == 0
380 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
381 var ivs []InstanceView
382 for _, iv := range pool.Instances() {
383 if iv.ArvadosInstanceType == it.Name {
384 ivs = append(ivs, iv)
390 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
391 timeout := time.NewTimer(time.Second).C
400 c.Check(ready(), check.Equals, true)