1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.curoverse.com/arvados.git/lib/cloud"
13 "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
14 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 "github.com/prometheus/client_golang/prometheus"
16 check "gopkg.in/check.v1"
19 const GiB arvados.ByteSize = 1 << 30
21 var _ = check.Suite(&PoolSuite{})
23 type lessChecker struct {
27 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
28 return params[0].(int) < params[1].(int), ""
31 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
33 type PoolSuite struct{}
35 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
36 type1 := test.InstanceType(1)
37 type2 := test.InstanceType(2)
38 type3 := test.InstanceType(3)
39 waitForIdle := func(pool *Pool, notify <-chan struct{}) {
40 timeout := time.NewTimer(time.Second)
42 instances := pool.Instances()
43 sort.Slice(instances, func(i, j int) bool {
44 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
46 if len(instances) == 3 &&
47 instances[0].ArvadosInstanceType == type1.Name &&
48 instances[0].WorkerState == StateIdle.String() &&
49 instances[1].ArvadosInstanceType == type1.Name &&
50 instances[1].WorkerState == StateIdle.String() &&
51 instances[2].ArvadosInstanceType == type2.Name &&
52 instances[2].WorkerState == StateIdle.String() {
57 c.Logf("pool.Instances() == %#v", instances)
65 logger := test.Logger()
66 driver := &test.StubDriver{}
67 is, err := driver.InstanceSet(nil, "", logger)
68 c.Assert(err, check.IsNil)
70 newExecutor := func(cloud.Instance) Executor {
72 "crunch-run --list": stubResp{},
77 cluster := &arvados.Cluster{
78 Dispatch: arvados.Dispatch{
79 MaxProbesPerSecond: 1000,
80 ProbeInterval: arvados.Duration(time.Millisecond * 10),
82 CloudVMs: arvados.CloudVMs{
83 BootProbeCommand: "true",
84 SyncInterval: arvados.Duration(time.Millisecond * 10),
86 InstanceTypes: arvados.InstanceTypeMap{
93 pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
94 notify := pool.Subscribe()
95 defer pool.Unsubscribe(notify)
99 waitForIdle(pool, notify)
100 var heldInstanceID cloud.InstanceID
101 for _, inst := range pool.Instances() {
102 if inst.ArvadosInstanceType == type2.Name {
103 heldInstanceID = cloud.InstanceID(inst.Instance)
104 pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
109 c.Log("------- starting new pool, waiting to recover state")
111 pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
112 notify2 := pool2.Subscribe()
113 defer pool2.Unsubscribe(notify2)
114 waitForIdle(pool2, notify2)
115 for _, inst := range pool2.Instances() {
116 if inst.ArvadosInstanceType == type2.Name {
117 c.Check(inst.Instance, check.Equals, heldInstanceID)
118 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
120 c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
126 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
127 logger := test.Logger()
128 driver := test.StubDriver{HoldCloudOps: true}
129 instanceSet, err := driver.InstanceSet(nil, "", logger)
130 c.Assert(err, check.IsNil)
132 type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
133 type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
134 type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
137 newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
138 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
139 instanceTypes: arvados.InstanceTypeMap{
145 notify := pool.Subscribe()
146 defer pool.Unsubscribe(notify)
147 notify2 := pool.Subscribe()
148 defer pool.Unsubscribe(notify2)
150 c.Check(pool.Unallocated()[type1], check.Equals, 0)
151 c.Check(pool.Unallocated()[type2], check.Equals, 0)
152 c.Check(pool.Unallocated()[type3], check.Equals, 0)
157 c.Check(pool.Unallocated()[type1], check.Equals, 1)
158 c.Check(pool.Unallocated()[type2], check.Equals, 2)
159 c.Check(pool.Unallocated()[type3], check.Equals, 1)
161 // Unblock the pending Create calls.
162 go driver.ReleaseCloudOps(4)
164 // Wait for each instance to either return from its Create
165 // call, or show up in a poll.
166 suite.wait(c, pool, notify, func() bool {
168 defer pool.mtx.RUnlock()
169 return len(pool.workers) == 4
172 // Place type3 node on admin-hold
173 ivs := suite.instancesByType(pool, type3)
174 c.Assert(ivs, check.HasLen, 1)
175 type3instanceID := ivs[0].Instance
176 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
177 c.Check(err, check.IsNil)
179 // Check admin-hold behavior: refuse to shutdown, and don't
180 // report as Unallocated ("available now or soon").
181 c.Check(pool.Shutdown(type3), check.Equals, false)
182 suite.wait(c, pool, notify, func() bool {
183 return pool.Unallocated()[type3] == 0
185 c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
187 // Shutdown both type2 nodes
188 c.Check(pool.Shutdown(type2), check.Equals, true)
189 suite.wait(c, pool, notify, func() bool {
190 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
192 c.Check(pool.Shutdown(type2), check.Equals, true)
193 suite.wait(c, pool, notify, func() bool {
194 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
196 c.Check(pool.Shutdown(type2), check.Equals, false)
198 // Consume any waiting notifications to ensure the
199 // next one we get is from Shutdown.
208 // Shutdown type1 node
209 c.Check(pool.Shutdown(type1), check.Equals, true)
210 suite.wait(c, pool, notify, func() bool {
211 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
215 case <-time.After(time.Second):
216 c.Error("notify did not receive")
219 // Put type3 node back in service.
220 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
221 c.Check(err, check.IsNil)
222 suite.wait(c, pool, notify, func() bool {
223 return pool.Unallocated()[type3] == 1
226 // Check admin-drain behavior: shut down right away, and don't
227 // report as Unallocated.
228 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
229 c.Check(err, check.IsNil)
230 suite.wait(c, pool, notify, func() bool {
231 return pool.Unallocated()[type3] == 0
233 suite.wait(c, pool, notify, func() bool {
234 ivs := suite.instancesByType(pool, type3)
235 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
238 // Unblock all pending Destroy calls. Pool calls Destroy again
239 // if a node still appears in the provider list after a
240 // previous attempt, so there might be more than 4 Destroy
242 go driver.ReleaseCloudOps(4444)
244 // Sync until all instances disappear from the provider list.
245 suite.wait(c, pool, notify, func() bool {
246 pool.getInstancesAndSync()
247 return len(pool.Instances()) == 0
251 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
252 var ivs []InstanceView
253 for _, iv := range pool.Instances() {
254 if iv.ArvadosInstanceType == it.Name {
255 ivs = append(ivs, iv)
261 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
262 timeout := time.NewTimer(time.Second).C
271 c.Check(ready(), check.Equals, true)