Merge branch '20235-probe-after-upgrade'
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "sort"
9         "strings"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/cloud"
13         "git.arvados.org/arvados.git/lib/config"
14         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/ctxlog"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19         check "gopkg.in/check.v1"
20 )
21
22 const GiB arvados.ByteSize = 1 << 30
23
24 var _ = check.Suite(&PoolSuite{})
25
26 type lessChecker struct {
27         *check.CheckerInfo
28 }
29
30 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
31         return params[0].(int) < params[1].(int), ""
32 }
33
34 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
35
36 type PoolSuite struct {
37         logger      logrus.FieldLogger
38         testCluster *arvados.Cluster
39 }
40
41 func (suite *PoolSuite) SetUpTest(c *check.C) {
42         suite.logger = ctxlog.TestLogger(c)
43         cfg, err := config.NewLoader(nil, suite.logger).Load()
44         c.Assert(err, check.IsNil)
45         suite.testCluster, err = cfg.GetCluster("")
46         c.Assert(err, check.IsNil)
47 }
48
49 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
50         type1 := test.InstanceType(1)
51         type2 := test.InstanceType(2)
52         type3 := test.InstanceType(3)
53         waitForIdle := func(pool *Pool, notify <-chan struct{}) {
54                 timeout := time.NewTimer(time.Second)
55                 for {
56                         instances := pool.Instances()
57                         sort.Slice(instances, func(i, j int) bool {
58                                 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
59                         })
60                         if len(instances) == 3 &&
61                                 instances[0].ArvadosInstanceType == type1.Name &&
62                                 instances[0].WorkerState == StateIdle.String() &&
63                                 instances[1].ArvadosInstanceType == type1.Name &&
64                                 instances[1].WorkerState == StateIdle.String() &&
65                                 instances[2].ArvadosInstanceType == type2.Name &&
66                                 instances[2].WorkerState == StateIdle.String() {
67                                 return
68                         }
69                         select {
70                         case <-timeout.C:
71                                 c.Logf("pool.Instances() == %#v", instances)
72                                 c.Error("timed out")
73                                 return
74                         case <-notify:
75                         }
76                 }
77         }
78
79         driver := &test.StubDriver{}
80         instanceSetID := cloud.InstanceSetID("test-instance-set-id")
81         is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
82         c.Assert(err, check.IsNil)
83
84         newExecutor := func(cloud.Instance) Executor {
85                 return &stubExecutor{
86                         response: map[string]stubResp{
87                                 "crunch-run-custom --list": {},
88                                 "true":                     {},
89                         },
90                 }
91         }
92
93         suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
94                 BootProbeCommand:   "true",
95                 MaxProbesPerSecond: 1000,
96                 ProbeInterval:      arvados.Duration(time.Millisecond * 10),
97                 SyncInterval:       arvados.Duration(time.Millisecond * 10),
98                 TagKeyPrefix:       "testprefix:",
99         }
100         suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
101         suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
102                 type1.Name: type1,
103                 type2.Name: type2,
104                 type3.Name: type3,
105         }
106
107         pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
108         notify := pool.Subscribe()
109         defer pool.Unsubscribe(notify)
110         pool.Create(type1)
111         pool.Create(type1)
112         pool.Create(type2)
113         waitForIdle(pool, notify)
114         var heldInstanceID cloud.InstanceID
115         for _, inst := range pool.Instances() {
116                 if inst.ArvadosInstanceType == type2.Name {
117                         heldInstanceID = cloud.InstanceID(inst.Instance)
118                         pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
119                 }
120         }
121         // Wait for the tags to save to the cloud provider
122         tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
123         deadline := time.Now().Add(time.Second)
124         for !func() bool {
125                 pool.mtx.RLock()
126                 defer pool.mtx.RUnlock()
127                 for _, wkr := range pool.workers {
128                         if wkr.instType == type2 {
129                                 return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
130                         }
131                 }
132                 return false
133         }() {
134                 if time.Now().After(deadline) {
135                         c.Fatal("timeout")
136                 }
137                 time.Sleep(time.Millisecond * 10)
138         }
139         pool.Stop()
140
141         c.Log("------- starting new pool, waiting to recover state")
142
143         pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
144         notify2 := pool2.Subscribe()
145         defer pool2.Unsubscribe(notify2)
146         waitForIdle(pool2, notify2)
147         for _, inst := range pool2.Instances() {
148                 if inst.ArvadosInstanceType == type2.Name {
149                         c.Check(inst.Instance, check.Equals, heldInstanceID)
150                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
151                 } else {
152                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
153                 }
154         }
155         pool2.Stop()
156 }
157
158 func (suite *PoolSuite) TestDrain(c *check.C) {
159         driver := test.StubDriver{}
160         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
161         c.Assert(err, check.IsNil)
162
163         ac := arvados.NewClientFromEnv()
164
165         type1 := test.InstanceType(1)
166         pool := &Pool{
167                 arvClient:   ac,
168                 logger:      suite.logger,
169                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
170                 cluster:     suite.testCluster,
171                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
172                 instanceTypes: arvados.InstanceTypeMap{
173                         type1.Name: type1,
174                 },
175         }
176         notify := pool.Subscribe()
177         defer pool.Unsubscribe(notify)
178
179         pool.Create(type1)
180
181         // Wait for the instance to either return from its Create
182         // call, or show up in a poll.
183         suite.wait(c, pool, notify, func() bool {
184                 pool.mtx.RLock()
185                 defer pool.mtx.RUnlock()
186                 return len(pool.workers) == 1
187         })
188
189         tests := []struct {
190                 state        State
191                 idleBehavior IdleBehavior
192                 result       bool
193         }{
194                 {StateIdle, IdleBehaviorHold, false},
195                 {StateIdle, IdleBehaviorDrain, false},
196                 {StateIdle, IdleBehaviorRun, true},
197         }
198
199         for _, test := range tests {
200                 for _, wkr := range pool.workers {
201                         wkr.state = test.state
202                         wkr.idleBehavior = test.idleBehavior
203                 }
204
205                 // Try to start a container
206                 started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
207                 c.Check(started, check.Equals, test.result)
208         }
209 }
210
211 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
212         driver := test.StubDriver{HoldCloudOps: true}
213         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
214         c.Assert(err, check.IsNil)
215
216         type1 := test.InstanceType(1)
217         pool := &Pool{
218                 logger:                         suite.logger,
219                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
220                 cluster:                        suite.testCluster,
221                 maxConcurrentInstanceCreateOps: 1,
222                 instanceTypes: arvados.InstanceTypeMap{
223                         type1.Name: type1,
224                 },
225         }
226
227         c.Check(pool.Unallocated()[type1], check.Equals, 0)
228         res := pool.Create(type1)
229         c.Check(pool.Unallocated()[type1], check.Equals, 1)
230         c.Check(res, check.Equals, true)
231
232         res = pool.Create(type1)
233         c.Check(pool.Unallocated()[type1], check.Equals, 1)
234         c.Check(res, check.Equals, false)
235
236         pool.instanceSet.throttleCreate.err = nil
237         pool.maxConcurrentInstanceCreateOps = 2
238
239         res = pool.Create(type1)
240         c.Check(pool.Unallocated()[type1], check.Equals, 2)
241         c.Check(res, check.Equals, true)
242
243         pool.instanceSet.throttleCreate.err = nil
244         pool.maxConcurrentInstanceCreateOps = 0
245
246         res = pool.Create(type1)
247         c.Check(pool.Unallocated()[type1], check.Equals, 3)
248         c.Check(res, check.Equals, true)
249 }
250
251 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
252         driver := test.StubDriver{HoldCloudOps: true}
253         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
254         c.Assert(err, check.IsNil)
255
256         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
257         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
258         type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
259         pool := &Pool{
260                 logger:      suite.logger,
261                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
262                 cluster:     suite.testCluster,
263                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
264                 instanceTypes: arvados.InstanceTypeMap{
265                         type1.Name: type1,
266                         type2.Name: type2,
267                         type3.Name: type3,
268                 },
269         }
270         notify := pool.Subscribe()
271         defer pool.Unsubscribe(notify)
272         notify2 := pool.Subscribe()
273         defer pool.Unsubscribe(notify2)
274
275         c.Check(pool.Unallocated()[type1], check.Equals, 0)
276         c.Check(pool.Unallocated()[type2], check.Equals, 0)
277         c.Check(pool.Unallocated()[type3], check.Equals, 0)
278         pool.Create(type2)
279         pool.Create(type1)
280         pool.Create(type2)
281         pool.Create(type3)
282         c.Check(pool.Unallocated()[type1], check.Equals, 1)
283         c.Check(pool.Unallocated()[type2], check.Equals, 2)
284         c.Check(pool.Unallocated()[type3], check.Equals, 1)
285
286         // Unblock the pending Create calls.
287         go driver.ReleaseCloudOps(4)
288
289         // Wait for each instance to either return from its Create
290         // call, or show up in a poll.
291         suite.wait(c, pool, notify, func() bool {
292                 pool.mtx.RLock()
293                 defer pool.mtx.RUnlock()
294                 return len(pool.workers) == 4
295         })
296
297         // Place type3 node on admin-hold
298         ivs := suite.instancesByType(pool, type3)
299         c.Assert(ivs, check.HasLen, 1)
300         type3instanceID := ivs[0].Instance
301         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
302         c.Check(err, check.IsNil)
303
304         // Check admin-hold behavior: refuse to shutdown, and don't
305         // report as Unallocated ("available now or soon").
306         c.Check(pool.Shutdown(type3), check.Equals, false)
307         suite.wait(c, pool, notify, func() bool {
308                 return pool.Unallocated()[type3] == 0
309         })
310         c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
311
312         // Shutdown both type2 nodes
313         c.Check(pool.Shutdown(type2), check.Equals, true)
314         suite.wait(c, pool, notify, func() bool {
315                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
316         })
317         c.Check(pool.Shutdown(type2), check.Equals, true)
318         suite.wait(c, pool, notify, func() bool {
319                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
320         })
321         c.Check(pool.Shutdown(type2), check.Equals, false)
322         for {
323                 // Consume any waiting notifications to ensure the
324                 // next one we get is from Shutdown.
325                 select {
326                 case <-notify:
327                         continue
328                 default:
329                 }
330                 break
331         }
332
333         // Shutdown type1 node
334         c.Check(pool.Shutdown(type1), check.Equals, true)
335         suite.wait(c, pool, notify, func() bool {
336                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
337         })
338         select {
339         case <-notify2:
340         case <-time.After(time.Second):
341                 c.Error("notify did not receive")
342         }
343
344         // Put type3 node back in service.
345         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
346         c.Check(err, check.IsNil)
347         suite.wait(c, pool, notify, func() bool {
348                 return pool.Unallocated()[type3] == 1
349         })
350
351         // Check admin-drain behavior: shut down right away, and don't
352         // report as Unallocated.
353         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
354         c.Check(err, check.IsNil)
355         suite.wait(c, pool, notify, func() bool {
356                 return pool.Unallocated()[type3] == 0
357         })
358         suite.wait(c, pool, notify, func() bool {
359                 ivs := suite.instancesByType(pool, type3)
360                 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
361         })
362
363         // Unblock all pending Destroy calls. Pool calls Destroy again
364         // if a node still appears in the provider list after a
365         // previous attempt, so there might be more than 4 Destroy
366         // calls to unblock.
367         go driver.ReleaseCloudOps(4444)
368
369         // Sync until all instances disappear from the provider list.
370         suite.wait(c, pool, notify, func() bool {
371                 pool.getInstancesAndSync()
372                 return len(pool.Instances()) == 0
373         })
374 }
375
376 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
377         var ivs []InstanceView
378         for _, iv := range pool.Instances() {
379                 if iv.ArvadosInstanceType == it.Name {
380                         ivs = append(ivs, iv)
381                 }
382         }
383         return ivs
384 }
385
386 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
387         timeout := time.NewTimer(time.Second).C
388         for !ready() {
389                 select {
390                 case <-notify:
391                         continue
392                 case <-timeout:
393                 }
394                 break
395         }
396         c.Check(ready(), check.Equals, true)
397 }