0f5c5ee196d2866269f2bb999292e8a9672c3e47
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "sort"
9         "strings"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/cloud"
13         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "github.com/prometheus/client_golang/prometheus"
17         check "gopkg.in/check.v1"
18 )
19
20 const GiB arvados.ByteSize = 1 << 30
21
22 var _ = check.Suite(&PoolSuite{})
23
24 type lessChecker struct {
25         *check.CheckerInfo
26 }
27
28 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
29         return params[0].(int) < params[1].(int), ""
30 }
31
32 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
33
34 type PoolSuite struct{}
35
36 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
37         type1 := test.InstanceType(1)
38         type2 := test.InstanceType(2)
39         type3 := test.InstanceType(3)
40         waitForIdle := func(pool *Pool, notify <-chan struct{}) {
41                 timeout := time.NewTimer(time.Second)
42                 for {
43                         instances := pool.Instances()
44                         sort.Slice(instances, func(i, j int) bool {
45                                 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
46                         })
47                         if len(instances) == 3 &&
48                                 instances[0].ArvadosInstanceType == type1.Name &&
49                                 instances[0].WorkerState == StateIdle.String() &&
50                                 instances[1].ArvadosInstanceType == type1.Name &&
51                                 instances[1].WorkerState == StateIdle.String() &&
52                                 instances[2].ArvadosInstanceType == type2.Name &&
53                                 instances[2].WorkerState == StateIdle.String() {
54                                 return
55                         }
56                         select {
57                         case <-timeout.C:
58                                 c.Logf("pool.Instances() == %#v", instances)
59                                 c.Error("timed out")
60                                 return
61                         case <-notify:
62                         }
63                 }
64         }
65
66         logger := ctxlog.TestLogger(c)
67         driver := &test.StubDriver{}
68         instanceSetID := cloud.InstanceSetID("test-instance-set-id")
69         is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
70         c.Assert(err, check.IsNil)
71
72         newExecutor := func(cloud.Instance) Executor {
73                 return &stubExecutor{
74                         response: map[string]stubResp{
75                                 "crunch-run-custom --list": {},
76                                 "true":                     {},
77                         },
78                 }
79         }
80
81         cluster := &arvados.Cluster{
82                 Containers: arvados.ContainersConfig{
83                         CloudVMs: arvados.CloudVMsConfig{
84                                 BootProbeCommand:   "true",
85                                 MaxProbesPerSecond: 1000,
86                                 ProbeInterval:      arvados.Duration(time.Millisecond * 10),
87                                 SyncInterval:       arvados.Duration(time.Millisecond * 10),
88                                 TagKeyPrefix:       "testprefix:",
89                         },
90                         CrunchRunCommand: "crunch-run-custom",
91                 },
92                 InstanceTypes: arvados.InstanceTypeMap{
93                         type1.Name: type1,
94                         type2.Name: type2,
95                         type3.Name: type3,
96                 },
97         }
98
99         pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
100         notify := pool.Subscribe()
101         defer pool.Unsubscribe(notify)
102         pool.Create(type1)
103         pool.Create(type1)
104         pool.Create(type2)
105         waitForIdle(pool, notify)
106         var heldInstanceID cloud.InstanceID
107         for _, inst := range pool.Instances() {
108                 if inst.ArvadosInstanceType == type2.Name {
109                         heldInstanceID = cloud.InstanceID(inst.Instance)
110                         pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
111                 }
112         }
113         // Wait for the tags to save to the cloud provider
114         tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
115         deadline := time.Now().Add(time.Second)
116         for !func() bool {
117                 pool.mtx.RLock()
118                 defer pool.mtx.RUnlock()
119                 for _, wkr := range pool.workers {
120                         if wkr.instType == type2 {
121                                 return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
122                         }
123                 }
124                 return false
125         }() {
126                 if time.Now().After(deadline) {
127                         c.Fatal("timeout")
128                 }
129                 time.Sleep(time.Millisecond * 10)
130         }
131         pool.Stop()
132
133         c.Log("------- starting new pool, waiting to recover state")
134
135         pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
136         notify2 := pool2.Subscribe()
137         defer pool2.Unsubscribe(notify2)
138         waitForIdle(pool2, notify2)
139         for _, inst := range pool2.Instances() {
140                 if inst.ArvadosInstanceType == type2.Name {
141                         c.Check(inst.Instance, check.Equals, heldInstanceID)
142                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
143                 } else {
144                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
145                 }
146         }
147         pool2.Stop()
148 }
149
150 func (suite *PoolSuite) TestDrain(c *check.C) {
151         logger := ctxlog.TestLogger(c)
152         driver := test.StubDriver{}
153         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
154         c.Assert(err, check.IsNil)
155
156         ac := arvados.NewClientFromEnv()
157
158         type1 := test.InstanceType(1)
159         pool := &Pool{
160                 arvClient:   ac,
161                 logger:      logger,
162                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
163                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
164                 instanceTypes: arvados.InstanceTypeMap{
165                         type1.Name: type1,
166                 },
167         }
168         notify := pool.Subscribe()
169         defer pool.Unsubscribe(notify)
170
171         pool.Create(type1)
172
173         // Wait for the instance to either return from its Create
174         // call, or show up in a poll.
175         suite.wait(c, pool, notify, func() bool {
176                 pool.mtx.RLock()
177                 defer pool.mtx.RUnlock()
178                 return len(pool.workers) == 1
179         })
180
181         tests := []struct {
182                 state        State
183                 idleBehavior IdleBehavior
184                 result       bool
185         }{
186                 {StateIdle, IdleBehaviorHold, false},
187                 {StateIdle, IdleBehaviorDrain, false},
188                 {StateIdle, IdleBehaviorRun, true},
189         }
190
191         for _, test := range tests {
192                 for _, wkr := range pool.workers {
193                         wkr.state = test.state
194                         wkr.idleBehavior = test.idleBehavior
195                 }
196
197                 // Try to start a container
198                 started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
199                 c.Check(started, check.Equals, test.result)
200         }
201 }
202
203 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
204         logger := ctxlog.TestLogger(c)
205         driver := test.StubDriver{HoldCloudOps: true}
206         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
207         c.Assert(err, check.IsNil)
208
209         type1 := test.InstanceType(1)
210         pool := &Pool{
211                 logger:                         logger,
212                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
213                 maxConcurrentInstanceCreateOps: 1,
214                 instanceTypes: arvados.InstanceTypeMap{
215                         type1.Name: type1,
216                 },
217         }
218
219         c.Check(pool.Unallocated()[type1], check.Equals, 0)
220         res := pool.Create(type1)
221         c.Check(pool.Unallocated()[type1], check.Equals, 1)
222         c.Check(res, check.Equals, true)
223
224         res = pool.Create(type1)
225         c.Check(pool.Unallocated()[type1], check.Equals, 1)
226         c.Check(res, check.Equals, false)
227
228         pool.instanceSet.throttleCreate.err = nil
229         pool.maxConcurrentInstanceCreateOps = 2
230
231         res = pool.Create(type1)
232         c.Check(pool.Unallocated()[type1], check.Equals, 2)
233         c.Check(res, check.Equals, true)
234
235         pool.instanceSet.throttleCreate.err = nil
236         pool.maxConcurrentInstanceCreateOps = 0
237
238         res = pool.Create(type1)
239         c.Check(pool.Unallocated()[type1], check.Equals, 3)
240         c.Check(res, check.Equals, true)
241 }
242
243 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
244         logger := ctxlog.TestLogger(c)
245         driver := test.StubDriver{HoldCloudOps: true}
246         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
247         c.Assert(err, check.IsNil)
248
249         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
250         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
251         type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
252         pool := &Pool{
253                 logger:      logger,
254                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
255                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
256                 instanceTypes: arvados.InstanceTypeMap{
257                         type1.Name: type1,
258                         type2.Name: type2,
259                         type3.Name: type3,
260                 },
261         }
262         notify := pool.Subscribe()
263         defer pool.Unsubscribe(notify)
264         notify2 := pool.Subscribe()
265         defer pool.Unsubscribe(notify2)
266
267         c.Check(pool.Unallocated()[type1], check.Equals, 0)
268         c.Check(pool.Unallocated()[type2], check.Equals, 0)
269         c.Check(pool.Unallocated()[type3], check.Equals, 0)
270         pool.Create(type2)
271         pool.Create(type1)
272         pool.Create(type2)
273         pool.Create(type3)
274         c.Check(pool.Unallocated()[type1], check.Equals, 1)
275         c.Check(pool.Unallocated()[type2], check.Equals, 2)
276         c.Check(pool.Unallocated()[type3], check.Equals, 1)
277
278         // Unblock the pending Create calls.
279         go driver.ReleaseCloudOps(4)
280
281         // Wait for each instance to either return from its Create
282         // call, or show up in a poll.
283         suite.wait(c, pool, notify, func() bool {
284                 pool.mtx.RLock()
285                 defer pool.mtx.RUnlock()
286                 return len(pool.workers) == 4
287         })
288
289         // Place type3 node on admin-hold
290         ivs := suite.instancesByType(pool, type3)
291         c.Assert(ivs, check.HasLen, 1)
292         type3instanceID := ivs[0].Instance
293         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
294         c.Check(err, check.IsNil)
295
296         // Check admin-hold behavior: refuse to shutdown, and don't
297         // report as Unallocated ("available now or soon").
298         c.Check(pool.Shutdown(type3), check.Equals, false)
299         suite.wait(c, pool, notify, func() bool {
300                 return pool.Unallocated()[type3] == 0
301         })
302         c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
303
304         // Shutdown both type2 nodes
305         c.Check(pool.Shutdown(type2), check.Equals, true)
306         suite.wait(c, pool, notify, func() bool {
307                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
308         })
309         c.Check(pool.Shutdown(type2), check.Equals, true)
310         suite.wait(c, pool, notify, func() bool {
311                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
312         })
313         c.Check(pool.Shutdown(type2), check.Equals, false)
314         for {
315                 // Consume any waiting notifications to ensure the
316                 // next one we get is from Shutdown.
317                 select {
318                 case <-notify:
319                         continue
320                 default:
321                 }
322                 break
323         }
324
325         // Shutdown type1 node
326         c.Check(pool.Shutdown(type1), check.Equals, true)
327         suite.wait(c, pool, notify, func() bool {
328                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
329         })
330         select {
331         case <-notify2:
332         case <-time.After(time.Second):
333                 c.Error("notify did not receive")
334         }
335
336         // Put type3 node back in service.
337         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
338         c.Check(err, check.IsNil)
339         suite.wait(c, pool, notify, func() bool {
340                 return pool.Unallocated()[type3] == 1
341         })
342
343         // Check admin-drain behavior: shut down right away, and don't
344         // report as Unallocated.
345         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
346         c.Check(err, check.IsNil)
347         suite.wait(c, pool, notify, func() bool {
348                 return pool.Unallocated()[type3] == 0
349         })
350         suite.wait(c, pool, notify, func() bool {
351                 ivs := suite.instancesByType(pool, type3)
352                 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
353         })
354
355         // Unblock all pending Destroy calls. Pool calls Destroy again
356         // if a node still appears in the provider list after a
357         // previous attempt, so there might be more than 4 Destroy
358         // calls to unblock.
359         go driver.ReleaseCloudOps(4444)
360
361         // Sync until all instances disappear from the provider list.
362         suite.wait(c, pool, notify, func() bool {
363                 pool.getInstancesAndSync()
364                 return len(pool.Instances()) == 0
365         })
366 }
367
368 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
369         var ivs []InstanceView
370         for _, iv := range pool.Instances() {
371                 if iv.ArvadosInstanceType == it.Name {
372                         ivs = append(ivs, iv)
373                 }
374         }
375         return ivs
376 }
377
378 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
379         timeout := time.NewTimer(time.Second).C
380         for !ready() {
381                 select {
382                 case <-notify:
383                         continue
384                 case <-timeout:
385                 }
386                 break
387         }
388         c.Check(ready(), check.Equals, true)
389 }