22207: fixed subpanel toolbar spacing
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "sort"
9         "strings"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/cloud"
13         "git.arvados.org/arvados.git/lib/config"
14         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/ctxlog"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19         check "gopkg.in/check.v1"
20 )
21
22 const GiB arvados.ByteSize = 1 << 30
23
24 var _ = check.Suite(&PoolSuite{})
25
26 type lessChecker struct {
27         *check.CheckerInfo
28 }
29
30 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
31         return params[0].(int) < params[1].(int), ""
32 }
33
34 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
35
36 type PoolSuite struct {
37         logger      logrus.FieldLogger
38         testCluster *arvados.Cluster
39 }
40
41 func (suite *PoolSuite) SetUpTest(c *check.C) {
42         suite.logger = ctxlog.TestLogger(c)
43         cfg, err := config.NewLoader(nil, suite.logger).Load()
44         c.Assert(err, check.IsNil)
45         suite.testCluster, err = cfg.GetCluster("")
46         c.Assert(err, check.IsNil)
47 }
48
49 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
50         type1 := test.InstanceType(1)
51         type2 := test.InstanceType(2)
52         type3 := test.InstanceType(3)
53         waitForIdle := func(pool *Pool, notify <-chan struct{}) {
54                 timeout := time.NewTimer(time.Second)
55                 for {
56                         instances := pool.Instances()
57                         sort.Slice(instances, func(i, j int) bool {
58                                 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
59                         })
60                         if len(instances) == 3 &&
61                                 instances[0].ArvadosInstanceType == type1.Name &&
62                                 instances[0].WorkerState == StateIdle.String() &&
63                                 instances[1].ArvadosInstanceType == type1.Name &&
64                                 instances[1].WorkerState == StateIdle.String() &&
65                                 instances[2].ArvadosInstanceType == type2.Name &&
66                                 instances[2].WorkerState == StateIdle.String() {
67                                 return
68                         }
69                         select {
70                         case <-timeout.C:
71                                 c.Logf("pool.Instances() == %#v", instances)
72                                 c.Error("timed out")
73                                 return
74                         case <-notify:
75                         }
76                 }
77         }
78
79         driver := &test.StubDriver{}
80         instanceSetID := cloud.InstanceSetID("test-instance-set-id")
81         is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger, nil)
82         c.Assert(err, check.IsNil)
83         defer is.Stop()
84
85         newExecutor := func(cloud.Instance) Executor {
86                 return &stubExecutor{
87                         response: map[string]stubResp{
88                                 "crunch-run-custom --list": {},
89                                 "true":                     {},
90                         },
91                 }
92         }
93
94         suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
95                 BootProbeCommand:   "true",
96                 MaxProbesPerSecond: 1000,
97                 ProbeInterval:      arvados.Duration(time.Millisecond * 10),
98                 SyncInterval:       arvados.Duration(time.Millisecond * 10),
99                 TagKeyPrefix:       "testprefix:",
100         }
101         suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
102         suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
103                 type1.Name: type1,
104                 type2.Name: type2,
105                 type3.Name: type3,
106         }
107
108         pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
109         notify := pool.Subscribe()
110         defer pool.Unsubscribe(notify)
111         pool.Create(type1)
112         pool.Create(type1)
113         pool.Create(type2)
114         waitForIdle(pool, notify)
115         var heldInstanceID cloud.InstanceID
116         for _, inst := range pool.Instances() {
117                 if inst.ArvadosInstanceType == type2.Name {
118                         heldInstanceID = cloud.InstanceID(inst.Instance)
119                         pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
120                 }
121         }
122         // Wait for the tags to save to the cloud provider
123         tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
124         deadline := time.Now().Add(time.Second)
125         for !func() bool {
126                 pool.mtx.RLock()
127                 defer pool.mtx.RUnlock()
128                 for _, wkr := range pool.workers {
129                         if wkr.instType == type2 {
130                                 return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
131                         }
132                 }
133                 return false
134         }() {
135                 if time.Now().After(deadline) {
136                         c.Fatal("timeout")
137                 }
138                 time.Sleep(time.Millisecond * 10)
139         }
140         pool.Stop()
141
142         c.Log("------- starting new pool, waiting to recover state")
143
144         pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
145         notify2 := pool2.Subscribe()
146         defer pool2.Unsubscribe(notify2)
147         waitForIdle(pool2, notify2)
148         for _, inst := range pool2.Instances() {
149                 if inst.ArvadosInstanceType == type2.Name {
150                         c.Check(inst.Instance, check.Equals, heldInstanceID)
151                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
152                 } else {
153                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
154                 }
155         }
156         pool2.Stop()
157 }
158
159 func (suite *PoolSuite) TestDrain(c *check.C) {
160         driver := test.StubDriver{}
161         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
162         c.Assert(err, check.IsNil)
163         defer instanceSet.Stop()
164
165         ac := arvados.NewClientFromEnv()
166
167         type1 := test.InstanceType(1)
168         pool := &Pool{
169                 arvClient:   ac,
170                 logger:      suite.logger,
171                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
172                 cluster:     suite.testCluster,
173                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
174                 instanceTypes: arvados.InstanceTypeMap{
175                         type1.Name: type1,
176                 },
177         }
178         notify := pool.Subscribe()
179         defer pool.Unsubscribe(notify)
180
181         pool.Create(type1)
182
183         // Wait for the instance to either return from its Create
184         // call, or show up in a poll.
185         suite.wait(c, pool, notify, func() bool {
186                 pool.mtx.RLock()
187                 defer pool.mtx.RUnlock()
188                 return len(pool.workers) == 1
189         })
190
191         tests := []struct {
192                 state        State
193                 idleBehavior IdleBehavior
194                 result       bool
195         }{
196                 {StateIdle, IdleBehaviorHold, false},
197                 {StateIdle, IdleBehaviorDrain, false},
198                 {StateIdle, IdleBehaviorRun, true},
199         }
200
201         for _, test := range tests {
202                 for _, wkr := range pool.workers {
203                         wkr.state = test.state
204                         wkr.idleBehavior = test.idleBehavior
205                 }
206
207                 // Try to start a container
208                 started := pool.StartContainer(type1, arvados.Container{UUID: "testcontainer"})
209                 c.Check(started, check.Equals, test.result)
210         }
211 }
212
213 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
214         driver := test.StubDriver{HoldCloudOps: true}
215         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
216         c.Assert(err, check.IsNil)
217         defer instanceSet.Stop()
218
219         type1 := test.InstanceType(1)
220         pool := &Pool{
221                 logger:                         suite.logger,
222                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
223                 cluster:                        suite.testCluster,
224                 maxConcurrentInstanceCreateOps: 1,
225                 instanceTypes: arvados.InstanceTypeMap{
226                         type1.Name: type1,
227                 },
228         }
229
230         c.Check(pool.Unallocated()[type1], check.Equals, 0)
231         res := pool.Create(type1)
232         c.Check(pool.Unallocated()[type1], check.Equals, 1)
233         c.Check(res, check.Equals, true)
234
235         res = pool.Create(type1)
236         c.Check(pool.Unallocated()[type1], check.Equals, 1)
237         c.Check(res, check.Equals, false)
238
239         pool.instanceSet.throttleCreate.err = nil
240         pool.maxConcurrentInstanceCreateOps = 2
241
242         res = pool.Create(type1)
243         c.Check(pool.Unallocated()[type1], check.Equals, 2)
244         c.Check(res, check.Equals, true)
245
246         pool.instanceSet.throttleCreate.err = nil
247         pool.maxConcurrentInstanceCreateOps = 0
248
249         res = pool.Create(type1)
250         c.Check(pool.Unallocated()[type1], check.Equals, 3)
251         c.Check(res, check.Equals, true)
252 }
253
254 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
255         driver := test.StubDriver{HoldCloudOps: true}
256         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
257         c.Assert(err, check.IsNil)
258         defer instanceSet.Stop()
259
260         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
261         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
262         type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
263         pool := &Pool{
264                 logger:      suite.logger,
265                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
266                 cluster:     suite.testCluster,
267                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
268                 instanceTypes: arvados.InstanceTypeMap{
269                         type1.Name: type1,
270                         type2.Name: type2,
271                         type3.Name: type3,
272                 },
273                 instanceInitCommand: "echo 'instance init command goes here'",
274         }
275         notify := pool.Subscribe()
276         defer pool.Unsubscribe(notify)
277         notify2 := pool.Subscribe()
278         defer pool.Unsubscribe(notify2)
279
280         c.Check(pool.Unallocated()[type1], check.Equals, 0)
281         c.Check(pool.Unallocated()[type2], check.Equals, 0)
282         c.Check(pool.Unallocated()[type3], check.Equals, 0)
283         pool.Create(type2)
284         pool.Create(type1)
285         pool.Create(type2)
286         pool.Create(type3)
287         c.Check(pool.Unallocated()[type1], check.Equals, 1)
288         c.Check(pool.Unallocated()[type2], check.Equals, 2)
289         c.Check(pool.Unallocated()[type3], check.Equals, 1)
290
291         // Unblock the pending Create calls.
292         go driver.ReleaseCloudOps(4)
293
294         // Wait for each instance to either return from its Create
295         // call, or show up in a poll.
296         suite.wait(c, pool, notify, func() bool {
297                 pool.mtx.RLock()
298                 defer pool.mtx.RUnlock()
299                 return len(pool.workers) == 4
300         })
301
302         vms := instanceSet.(*test.StubInstanceSet).StubVMs()
303         c.Check(string(vms[0].InitCommand), check.Matches, `umask 0177 && echo -n "[0-9a-f]+" >/var/run/arvados-instance-secret\necho 'instance init command goes here'`)
304
305         // Place type3 node on admin-hold
306         ivs := suite.instancesByType(pool, type3)
307         c.Assert(ivs, check.HasLen, 1)
308         type3instanceID := ivs[0].Instance
309         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
310         c.Check(err, check.IsNil)
311
312         // Check admin-hold behavior: refuse to shutdown, and don't
313         // report as Unallocated ("available now or soon").
314         c.Check(pool.Shutdown(type3), check.Equals, false)
315         suite.wait(c, pool, notify, func() bool {
316                 return pool.Unallocated()[type3] == 0
317         })
318         c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
319
320         // Shutdown both type2 nodes
321         c.Check(pool.Shutdown(type2), check.Equals, true)
322         suite.wait(c, pool, notify, func() bool {
323                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
324         })
325         c.Check(pool.Shutdown(type2), check.Equals, true)
326         suite.wait(c, pool, notify, func() bool {
327                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
328         })
329         c.Check(pool.Shutdown(type2), check.Equals, false)
330         for {
331                 // Consume any waiting notifications to ensure the
332                 // next one we get is from Shutdown.
333                 select {
334                 case <-notify:
335                         continue
336                 default:
337                 }
338                 break
339         }
340
341         // Shutdown type1 node
342         c.Check(pool.Shutdown(type1), check.Equals, true)
343         suite.wait(c, pool, notify, func() bool {
344                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
345         })
346         select {
347         case <-notify2:
348         case <-time.After(time.Second):
349                 c.Error("notify did not receive")
350         }
351
352         // Put type3 node back in service.
353         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
354         c.Check(err, check.IsNil)
355         suite.wait(c, pool, notify, func() bool {
356                 return pool.Unallocated()[type3] == 1
357         })
358
359         // Check admin-drain behavior: shut down right away, and don't
360         // report as Unallocated.
361         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
362         c.Check(err, check.IsNil)
363         suite.wait(c, pool, notify, func() bool {
364                 return pool.Unallocated()[type3] == 0
365         })
366         suite.wait(c, pool, notify, func() bool {
367                 ivs := suite.instancesByType(pool, type3)
368                 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
369         })
370
371         // Unblock all pending Destroy calls. Pool calls Destroy again
372         // if a node still appears in the provider list after a
373         // previous attempt, so there might be more than 4 Destroy
374         // calls to unblock.
375         go driver.ReleaseCloudOps(4444)
376
377         // Sync until all instances disappear from the provider list.
378         suite.wait(c, pool, notify, func() bool {
379                 pool.getInstancesAndSync()
380                 return len(pool.Instances()) == 0
381         })
382 }
383
384 func (suite *PoolSuite) TestInstanceQuotaGroup(c *check.C) {
385         driver := test.StubDriver{}
386         instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
387         c.Assert(err, check.IsNil)
388         defer instanceSet.Stop()
389
390         // Note the stub driver uses the first character of
391         // ProviderType as the instance family, so we have two
392         // instance families here, "a" and "b".
393         typeA1 := test.InstanceType(1)
394         typeA1.ProviderType = "a1"
395         typeA2 := test.InstanceType(2)
396         typeA2.ProviderType = "a2"
397         typeB3 := test.InstanceType(3)
398         typeB3.ProviderType = "b3"
399         typeB4 := test.InstanceType(4)
400         typeB4.ProviderType = "b4"
401
402         pool := &Pool{
403                 logger:      suite.logger,
404                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
405                 cluster:     suite.testCluster,
406                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
407                 instanceTypes: arvados.InstanceTypeMap{
408                         typeA1.Name: typeA1,
409                         typeA2.Name: typeA2,
410                         typeB3.Name: typeB3,
411                         typeB4.Name: typeB4,
412                 },
413         }
414
415         // Arrange for the driver to fail when the pool calls
416         // instanceSet.Create().
417         driver.SetupVM = func(*test.StubVM) error { return test.CapacityError{InstanceQuotaGroupSpecific: true} }
418         // pool.Create() returns true when it starts a goroutine to
419         // call instanceSet.Create() in the background.
420         c.Check(pool.Create(typeA1), check.Equals, true)
421         // Wait for the pool to start reporting that the provider is
422         // at capacity for instance type A1.
423         for deadline := time.Now().Add(time.Second); !pool.AtCapacity(typeA1); time.Sleep(time.Millisecond) {
424                 if time.Now().After(deadline) {
425                         c.Fatal("timed out waiting for pool to report quota")
426                 }
427         }
428
429         // The pool should now report AtCapacity for the affected
430         // instance family (A1, A2) and refuse to call
431         // instanceSet.Create() for those types -- but other types
432         // (B3, B4) are still usable.
433         driver.SetupVM = func(*test.StubVM) error { return nil }
434         c.Check(pool.AtCapacity(typeA1), check.Equals, true)
435         c.Check(pool.AtCapacity(typeA2), check.Equals, true)
436         c.Check(pool.AtCapacity(typeB3), check.Equals, false)
437         c.Check(pool.AtCapacity(typeB4), check.Equals, false)
438         c.Check(pool.Create(typeA2), check.Equals, false)
439         c.Check(pool.Create(typeB3), check.Equals, true)
440         c.Check(pool.Create(typeB4), check.Equals, true)
441         c.Check(pool.Create(typeA2), check.Equals, false)
442         c.Check(pool.Create(typeA1), check.Equals, false)
443 }
444
445 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
446         var ivs []InstanceView
447         for _, iv := range pool.Instances() {
448                 if iv.ArvadosInstanceType == it.Name {
449                         ivs = append(ivs, iv)
450                 }
451         }
452         return ivs
453 }
454
455 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
456         timeout := time.NewTimer(time.Second).C
457         for !ready() {
458                 select {
459                 case <-notify:
460                         continue
461                 case <-timeout:
462                 }
463                 break
464         }
465         c.Check(ready(), check.Equals, true)
466 }