1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/cloud"
13 "git.arvados.org/arvados.git/lib/dispatchcloud/test"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 "github.com/prometheus/client_golang/prometheus"
17 check "gopkg.in/check.v1"
20 const GiB arvados.ByteSize = 1 << 30
22 var _ = check.Suite(&PoolSuite{})
24 type lessChecker struct {
28 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
29 return params[0].(int) < params[1].(int), ""
32 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
34 type PoolSuite struct{}
36 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
37 type1 := test.InstanceType(1)
38 type2 := test.InstanceType(2)
39 type3 := test.InstanceType(3)
40 waitForIdle := func(pool *Pool, notify <-chan struct{}) {
41 timeout := time.NewTimer(time.Second)
43 instances := pool.Instances()
44 sort.Slice(instances, func(i, j int) bool {
45 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
47 if len(instances) == 3 &&
48 instances[0].ArvadosInstanceType == type1.Name &&
49 instances[0].WorkerState == StateIdle.String() &&
50 instances[1].ArvadosInstanceType == type1.Name &&
51 instances[1].WorkerState == StateIdle.String() &&
52 instances[2].ArvadosInstanceType == type2.Name &&
53 instances[2].WorkerState == StateIdle.String() {
58 c.Logf("pool.Instances() == %#v", instances)
66 logger := ctxlog.TestLogger(c)
67 driver := &test.StubDriver{}
68 instanceSetID := cloud.InstanceSetID("test-instance-set-id")
69 is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
70 c.Assert(err, check.IsNil)
72 newExecutor := func(cloud.Instance) Executor {
74 response: map[string]stubResp{
75 "crunch-run-custom --list": {},
81 cluster := &arvados.Cluster{
82 Containers: arvados.ContainersConfig{
83 CloudVMs: arvados.CloudVMsConfig{
84 BootProbeCommand: "true",
85 MaxProbesPerSecond: 1000,
86 ProbeInterval: arvados.Duration(time.Millisecond * 10),
87 SyncInterval: arvados.Duration(time.Millisecond * 10),
88 TagKeyPrefix: "testprefix:",
90 CrunchRunCommand: "crunch-run-custom",
92 InstanceTypes: arvados.InstanceTypeMap{
99 pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
100 notify := pool.Subscribe()
101 defer pool.Unsubscribe(notify)
105 waitForIdle(pool, notify)
106 var heldInstanceID cloud.InstanceID
107 for _, inst := range pool.Instances() {
108 if inst.ArvadosInstanceType == type2.Name {
109 heldInstanceID = cloud.InstanceID(inst.Instance)
110 pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
113 // Wait for the tags to save to the cloud provider
114 tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
115 deadline := time.Now().Add(time.Second)
118 defer pool.mtx.RUnlock()
119 for _, wkr := range pool.workers {
120 if wkr.instType == type2 {
121 return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
126 if time.Now().After(deadline) {
129 time.Sleep(time.Millisecond * 10)
133 c.Log("------- starting new pool, waiting to recover state")
135 pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
136 notify2 := pool2.Subscribe()
137 defer pool2.Unsubscribe(notify2)
138 waitForIdle(pool2, notify2)
139 for _, inst := range pool2.Instances() {
140 if inst.ArvadosInstanceType == type2.Name {
141 c.Check(inst.Instance, check.Equals, heldInstanceID)
142 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
144 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
150 func (suite *PoolSuite) TestDrain(c *check.C) {
151 logger := ctxlog.TestLogger(c)
152 driver := test.StubDriver{}
153 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
154 c.Assert(err, check.IsNil)
156 ac := arvados.NewClientFromEnv()
158 type1 := test.InstanceType(1)
162 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
163 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
164 instanceTypes: arvados.InstanceTypeMap{
168 notify := pool.Subscribe()
169 defer pool.Unsubscribe(notify)
173 // Wait for the instance to either return from its Create
174 // call, or show up in a poll.
175 suite.wait(c, pool, notify, func() bool {
177 defer pool.mtx.RUnlock()
178 return len(pool.workers) == 1
183 idleBehavior IdleBehavior
186 {StateIdle, IdleBehaviorHold, false},
187 {StateIdle, IdleBehaviorDrain, false},
188 {StateIdle, IdleBehaviorRun, true},
191 for _, test := range tests {
192 for _, wkr := range pool.workers {
193 wkr.state = test.state
194 wkr.idleBehavior = test.idleBehavior
197 // Try to start a container
198 started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
199 c.Check(started, check.Equals, test.result)
203 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
204 logger := ctxlog.TestLogger(c)
205 driver := test.StubDriver{HoldCloudOps: true}
206 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
207 c.Assert(err, check.IsNil)
209 type1 := test.InstanceType(1)
212 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
213 maxConcurrentInstanceCreateOps: 1,
214 instanceTypes: arvados.InstanceTypeMap{
219 c.Check(pool.Unallocated()[type1], check.Equals, 0)
220 res := pool.Create(type1)
221 c.Check(pool.Unallocated()[type1], check.Equals, 1)
222 c.Check(res, check.Equals, true)
224 res = pool.Create(type1)
225 c.Check(pool.Unallocated()[type1], check.Equals, 1)
226 c.Check(res, check.Equals, false)
228 pool.instanceSet.throttleCreate.err = nil
229 pool.maxConcurrentInstanceCreateOps = 2
231 res = pool.Create(type1)
232 c.Check(pool.Unallocated()[type1], check.Equals, 2)
233 c.Check(res, check.Equals, true)
235 pool.instanceSet.throttleCreate.err = nil
236 pool.maxConcurrentInstanceCreateOps = 0
238 res = pool.Create(type1)
239 c.Check(pool.Unallocated()[type1], check.Equals, 3)
240 c.Check(res, check.Equals, true)
243 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
244 logger := ctxlog.TestLogger(c)
245 driver := test.StubDriver{HoldCloudOps: true}
246 instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
247 c.Assert(err, check.IsNil)
249 type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
250 type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
251 type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
254 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
255 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
256 instanceTypes: arvados.InstanceTypeMap{
262 notify := pool.Subscribe()
263 defer pool.Unsubscribe(notify)
264 notify2 := pool.Subscribe()
265 defer pool.Unsubscribe(notify2)
267 c.Check(pool.Unallocated()[type1], check.Equals, 0)
268 c.Check(pool.Unallocated()[type2], check.Equals, 0)
269 c.Check(pool.Unallocated()[type3], check.Equals, 0)
274 c.Check(pool.Unallocated()[type1], check.Equals, 1)
275 c.Check(pool.Unallocated()[type2], check.Equals, 2)
276 c.Check(pool.Unallocated()[type3], check.Equals, 1)
278 // Unblock the pending Create calls.
279 go driver.ReleaseCloudOps(4)
281 // Wait for each instance to either return from its Create
282 // call, or show up in a poll.
283 suite.wait(c, pool, notify, func() bool {
285 defer pool.mtx.RUnlock()
286 return len(pool.workers) == 4
289 // Place type3 node on admin-hold
290 ivs := suite.instancesByType(pool, type3)
291 c.Assert(ivs, check.HasLen, 1)
292 type3instanceID := ivs[0].Instance
293 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
294 c.Check(err, check.IsNil)
296 // Check admin-hold behavior: refuse to shutdown, and don't
297 // report as Unallocated ("available now or soon").
298 c.Check(pool.Shutdown(type3), check.Equals, false)
299 suite.wait(c, pool, notify, func() bool {
300 return pool.Unallocated()[type3] == 0
302 c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
304 // Shutdown both type2 nodes
305 c.Check(pool.Shutdown(type2), check.Equals, true)
306 suite.wait(c, pool, notify, func() bool {
307 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
309 c.Check(pool.Shutdown(type2), check.Equals, true)
310 suite.wait(c, pool, notify, func() bool {
311 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
313 c.Check(pool.Shutdown(type2), check.Equals, false)
315 // Consume any waiting notifications to ensure the
316 // next one we get is from Shutdown.
325 // Shutdown type1 node
326 c.Check(pool.Shutdown(type1), check.Equals, true)
327 suite.wait(c, pool, notify, func() bool {
328 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
332 case <-time.After(time.Second):
333 c.Error("notify did not receive")
336 // Put type3 node back in service.
337 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
338 c.Check(err, check.IsNil)
339 suite.wait(c, pool, notify, func() bool {
340 return pool.Unallocated()[type3] == 1
343 // Check admin-drain behavior: shut down right away, and don't
344 // report as Unallocated.
345 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
346 c.Check(err, check.IsNil)
347 suite.wait(c, pool, notify, func() bool {
348 return pool.Unallocated()[type3] == 0
350 suite.wait(c, pool, notify, func() bool {
351 ivs := suite.instancesByType(pool, type3)
352 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
355 // Unblock all pending Destroy calls. Pool calls Destroy again
356 // if a node still appears in the provider list after a
357 // previous attempt, so there might be more than 4 Destroy
359 go driver.ReleaseCloudOps(4444)
361 // Sync until all instances disappear from the provider list.
362 suite.wait(c, pool, notify, func() bool {
363 pool.getInstancesAndSync()
364 return len(pool.Instances()) == 0
368 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
369 var ivs []InstanceView
370 for _, iv := range pool.Instances() {
371 if iv.ArvadosInstanceType == it.Name {
372 ivs = append(ivs, iv)
378 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
379 timeout := time.NewTimer(time.Second).C
388 c.Check(ready(), check.Equals, true)