Add 'sdk/java-v2/' from commit '55f103e336ca9fb8bf1720d2ef4ee8dd4e221118'
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "sort"
9         "strings"
10         "time"
11
12         "git.curoverse.com/arvados.git/lib/cloud"
13         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
16         "github.com/prometheus/client_golang/prometheus"
17         check "gopkg.in/check.v1"
18 )
19
20 const GiB arvados.ByteSize = 1 << 30
21
22 var _ = check.Suite(&PoolSuite{})
23
24 type lessChecker struct {
25         *check.CheckerInfo
26 }
27
28 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
29         return params[0].(int) < params[1].(int), ""
30 }
31
32 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
33
34 type PoolSuite struct{}
35
36 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
37         type1 := test.InstanceType(1)
38         type2 := test.InstanceType(2)
39         type3 := test.InstanceType(3)
40         waitForIdle := func(pool *Pool, notify <-chan struct{}) {
41                 timeout := time.NewTimer(time.Second)
42                 for {
43                         instances := pool.Instances()
44                         sort.Slice(instances, func(i, j int) bool {
45                                 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
46                         })
47                         if len(instances) == 3 &&
48                                 instances[0].ArvadosInstanceType == type1.Name &&
49                                 instances[0].WorkerState == StateIdle.String() &&
50                                 instances[1].ArvadosInstanceType == type1.Name &&
51                                 instances[1].WorkerState == StateIdle.String() &&
52                                 instances[2].ArvadosInstanceType == type2.Name &&
53                                 instances[2].WorkerState == StateIdle.String() {
54                                 return
55                         }
56                         select {
57                         case <-timeout.C:
58                                 c.Logf("pool.Instances() == %#v", instances)
59                                 c.Error("timed out")
60                                 return
61                         case <-notify:
62                         }
63                 }
64         }
65
66         logger := ctxlog.TestLogger(c)
67         driver := &test.StubDriver{}
68         is, err := driver.InstanceSet(nil, "", logger)
69         c.Assert(err, check.IsNil)
70
71         newExecutor := func(cloud.Instance) Executor {
72                 return stubExecutor{
73                         "crunch-run --list": stubResp{},
74                         "true":              stubResp{},
75                 }
76         }
77
78         cluster := &arvados.Cluster{
79                 Dispatch: arvados.Dispatch{
80                         MaxProbesPerSecond: 1000,
81                         ProbeInterval:      arvados.Duration(time.Millisecond * 10),
82                 },
83                 CloudVMs: arvados.CloudVMs{
84                         BootProbeCommand: "true",
85                         SyncInterval:     arvados.Duration(time.Millisecond * 10),
86                 },
87                 InstanceTypes: arvados.InstanceTypeMap{
88                         type1.Name: type1,
89                         type2.Name: type2,
90                         type3.Name: type3,
91                 },
92         }
93
94         pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
95         notify := pool.Subscribe()
96         defer pool.Unsubscribe(notify)
97         pool.Create(type1)
98         pool.Create(type1)
99         pool.Create(type2)
100         waitForIdle(pool, notify)
101         var heldInstanceID cloud.InstanceID
102         for _, inst := range pool.Instances() {
103                 if inst.ArvadosInstanceType == type2.Name {
104                         heldInstanceID = cloud.InstanceID(inst.Instance)
105                         pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
106                 }
107         }
108         // Wait for the tags to save to the cloud provider
109         deadline := time.Now().Add(time.Second)
110         for !func() bool {
111                 pool.mtx.RLock()
112                 defer pool.mtx.RUnlock()
113                 for _, wkr := range pool.workers {
114                         if wkr.instType == type2 {
115                                 return wkr.instance.Tags()[tagKeyIdleBehavior] == string(IdleBehaviorHold)
116                         }
117                 }
118                 return false
119         }() {
120                 if time.Now().After(deadline) {
121                         c.Fatal("timeout")
122                 }
123                 time.Sleep(time.Millisecond * 10)
124         }
125         pool.Stop()
126
127         c.Log("------- starting new pool, waiting to recover state")
128
129         pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
130         notify2 := pool2.Subscribe()
131         defer pool2.Unsubscribe(notify2)
132         waitForIdle(pool2, notify2)
133         for _, inst := range pool2.Instances() {
134                 if inst.ArvadosInstanceType == type2.Name {
135                         c.Check(inst.Instance, check.Equals, heldInstanceID)
136                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
137                 } else {
138                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
139                 }
140         }
141         pool2.Stop()
142 }
143
144 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
145         logger := ctxlog.TestLogger(c)
146         driver := test.StubDriver{HoldCloudOps: true}
147         instanceSet, err := driver.InstanceSet(nil, "", logger)
148         c.Assert(err, check.IsNil)
149
150         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
151         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
152         type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
153         pool := &Pool{
154                 logger:      logger,
155                 newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
156                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
157                 instanceTypes: arvados.InstanceTypeMap{
158                         type1.Name: type1,
159                         type2.Name: type2,
160                         type3.Name: type3,
161                 },
162         }
163         notify := pool.Subscribe()
164         defer pool.Unsubscribe(notify)
165         notify2 := pool.Subscribe()
166         defer pool.Unsubscribe(notify2)
167
168         c.Check(pool.Unallocated()[type1], check.Equals, 0)
169         c.Check(pool.Unallocated()[type2], check.Equals, 0)
170         c.Check(pool.Unallocated()[type3], check.Equals, 0)
171         pool.Create(type2)
172         pool.Create(type1)
173         pool.Create(type2)
174         pool.Create(type3)
175         c.Check(pool.Unallocated()[type1], check.Equals, 1)
176         c.Check(pool.Unallocated()[type2], check.Equals, 2)
177         c.Check(pool.Unallocated()[type3], check.Equals, 1)
178
179         // Unblock the pending Create calls.
180         go driver.ReleaseCloudOps(4)
181
182         // Wait for each instance to either return from its Create
183         // call, or show up in a poll.
184         suite.wait(c, pool, notify, func() bool {
185                 pool.mtx.RLock()
186                 defer pool.mtx.RUnlock()
187                 return len(pool.workers) == 4
188         })
189
190         // Place type3 node on admin-hold
191         ivs := suite.instancesByType(pool, type3)
192         c.Assert(ivs, check.HasLen, 1)
193         type3instanceID := ivs[0].Instance
194         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
195         c.Check(err, check.IsNil)
196
197         // Check admin-hold behavior: refuse to shutdown, and don't
198         // report as Unallocated ("available now or soon").
199         c.Check(pool.Shutdown(type3), check.Equals, false)
200         suite.wait(c, pool, notify, func() bool {
201                 return pool.Unallocated()[type3] == 0
202         })
203         c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
204
205         // Shutdown both type2 nodes
206         c.Check(pool.Shutdown(type2), check.Equals, true)
207         suite.wait(c, pool, notify, func() bool {
208                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
209         })
210         c.Check(pool.Shutdown(type2), check.Equals, true)
211         suite.wait(c, pool, notify, func() bool {
212                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
213         })
214         c.Check(pool.Shutdown(type2), check.Equals, false)
215         for {
216                 // Consume any waiting notifications to ensure the
217                 // next one we get is from Shutdown.
218                 select {
219                 case <-notify:
220                         continue
221                 default:
222                 }
223                 break
224         }
225
226         // Shutdown type1 node
227         c.Check(pool.Shutdown(type1), check.Equals, true)
228         suite.wait(c, pool, notify, func() bool {
229                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
230         })
231         select {
232         case <-notify2:
233         case <-time.After(time.Second):
234                 c.Error("notify did not receive")
235         }
236
237         // Put type3 node back in service.
238         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
239         c.Check(err, check.IsNil)
240         suite.wait(c, pool, notify, func() bool {
241                 return pool.Unallocated()[type3] == 1
242         })
243
244         // Check admin-drain behavior: shut down right away, and don't
245         // report as Unallocated.
246         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
247         c.Check(err, check.IsNil)
248         suite.wait(c, pool, notify, func() bool {
249                 return pool.Unallocated()[type3] == 0
250         })
251         suite.wait(c, pool, notify, func() bool {
252                 ivs := suite.instancesByType(pool, type3)
253                 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
254         })
255
256         // Unblock all pending Destroy calls. Pool calls Destroy again
257         // if a node still appears in the provider list after a
258         // previous attempt, so there might be more than 4 Destroy
259         // calls to unblock.
260         go driver.ReleaseCloudOps(4444)
261
262         // Sync until all instances disappear from the provider list.
263         suite.wait(c, pool, notify, func() bool {
264                 pool.getInstancesAndSync()
265                 return len(pool.Instances()) == 0
266         })
267 }
268
269 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
270         var ivs []InstanceView
271         for _, iv := range pool.Instances() {
272                 if iv.ArvadosInstanceType == it.Name {
273                         ivs = append(ivs, iv)
274                 }
275         }
276         return ivs
277 }
278
279 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
280         timeout := time.NewTimer(time.Second).C
281         for !ready() {
282                 select {
283                 case <-notify:
284                         continue
285                 case <-timeout:
286                 }
287                 break
288         }
289         c.Check(ready(), check.Equals, true)
290 }