14360: Shutdown pool between tests to eliminate leaking logs.
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "io"
9         "time"
10
11         "git.curoverse.com/arvados.git/lib/cloud"
12         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "github.com/Sirupsen/logrus"
15         check "gopkg.in/check.v1"
16 )
17
18 const GiB arvados.ByteSize = 1 << 30
19
20 var _ = check.Suite(&PoolSuite{})
21
22 type lessChecker struct {
23         *check.CheckerInfo
24 }
25
26 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
27         return params[0].(int) < params[1].(int), ""
28 }
29
30 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
31
32 type PoolSuite struct{}
33
34 func (suite *PoolSuite) SetUpSuite(c *check.C) {
35         logrus.StandardLogger().SetLevel(logrus.DebugLevel)
36 }
37
38 func (suite *PoolSuite) TestStartContainer(c *check.C) {
39         // TODO: use an instanceSet stub with an SSH server
40 }
41
42 func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
43         // TODO: use an instanceSet stub with an SSH server
44 }
45
46 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
47         lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
48         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
49         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
50         pool := &Pool{
51                 logger:      logrus.StandardLogger(),
52                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
53                 instanceSet: lameInstanceSet,
54                 instanceTypes: arvados.InstanceTypeMap{
55                         type1.Name: type1,
56                         type2.Name: type2,
57                 },
58         }
59         notify := pool.Subscribe()
60         defer pool.Unsubscribe(notify)
61         notify2 := pool.Subscribe()
62         defer pool.Unsubscribe(notify2)
63
64         c.Check(pool.Unallocated()[type1], check.Equals, 0)
65         c.Check(pool.Unallocated()[type2], check.Equals, 0)
66         pool.Create(type2)
67         pool.Create(type1)
68         pool.Create(type2)
69         c.Check(pool.Unallocated()[type1], check.Equals, 1)
70         c.Check(pool.Unallocated()[type2], check.Equals, 2)
71
72         // Unblock the pending Create calls.
73         go lameInstanceSet.Release(3)
74
75         // Wait for each instance to either return from its Create
76         // call, or show up in a poll.
77         suite.wait(c, pool, notify, func() bool {
78                 pool.mtx.RLock()
79                 defer pool.mtx.RUnlock()
80                 return len(pool.workers) == 3
81         })
82
83         c.Check(pool.Shutdown(type2), check.Equals, true)
84         suite.wait(c, pool, notify, func() bool {
85                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
86         })
87         c.Check(pool.Shutdown(type2), check.Equals, true)
88         suite.wait(c, pool, notify, func() bool {
89                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
90         })
91         c.Check(pool.Shutdown(type2), check.Equals, false)
92         for {
93                 // Consume any waiting notifications to ensure the
94                 // next one we get is from Shutdown.
95                 select {
96                 case <-notify:
97                         continue
98                 default:
99                 }
100                 break
101         }
102         c.Check(pool.Shutdown(type1), check.Equals, true)
103         suite.wait(c, pool, notify, func() bool {
104                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
105         })
106         select {
107         case <-notify2:
108         case <-time.After(time.Second):
109                 c.Error("notify did not receive")
110         }
111         go lameInstanceSet.Release(3) // unblock Destroy calls
112 }
113
114 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
115         timeout := time.NewTimer(time.Second).C
116         for !ready() {
117                 select {
118                 case <-notify:
119                         continue
120                 case <-timeout:
121                 }
122                 break
123         }
124         c.Check(ready(), check.Equals, true)
125 }
126
127 type stubExecutor struct{}
128
129 func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
130
131 func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
132         return nil, nil, nil
133 }
134
135 func (*stubExecutor) Close() {}