14325: Use snake_case keys in management API responses.
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "io"
9         "time"
10
11         "git.curoverse.com/arvados.git/lib/cloud"
12         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "github.com/sirupsen/logrus"
15         check "gopkg.in/check.v1"
16 )
17
18 const GiB arvados.ByteSize = 1 << 30
19
20 var _ = check.Suite(&PoolSuite{})
21
22 type lessChecker struct {
23         *check.CheckerInfo
24 }
25
26 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
27         return params[0].(int) < params[1].(int), ""
28 }
29
30 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
31
32 type PoolSuite struct{}
33
34 func (suite *PoolSuite) SetUpSuite(c *check.C) {
35         logrus.StandardLogger().SetLevel(logrus.DebugLevel)
36 }
37
38 func (suite *PoolSuite) TestStartContainer(c *check.C) {
39         // TODO: use an instanceSet stub with an SSH server
40 }
41
42 func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
43         // TODO: use an instanceSet stub with an SSH server
44 }
45
46 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
47         lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
48         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
49         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
50         type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
51         pool := &Pool{
52                 logger:      logrus.StandardLogger(),
53                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
54                 instanceSet: lameInstanceSet,
55                 instanceTypes: arvados.InstanceTypeMap{
56                         type1.Name: type1,
57                         type2.Name: type2,
58                         type3.Name: type3,
59                 },
60         }
61         notify := pool.Subscribe()
62         defer pool.Unsubscribe(notify)
63         notify2 := pool.Subscribe()
64         defer pool.Unsubscribe(notify2)
65
66         c.Check(pool.Unallocated()[type1], check.Equals, 0)
67         c.Check(pool.Unallocated()[type2], check.Equals, 0)
68         c.Check(pool.Unallocated()[type3], check.Equals, 0)
69         pool.Create(type2)
70         pool.Create(type1)
71         pool.Create(type2)
72         pool.Create(type3)
73         c.Check(pool.Unallocated()[type1], check.Equals, 1)
74         c.Check(pool.Unallocated()[type2], check.Equals, 2)
75         c.Check(pool.Unallocated()[type3], check.Equals, 1)
76
77         // Unblock the pending Create calls.
78         go lameInstanceSet.Release(4)
79
80         // Wait for each instance to either return from its Create
81         // call, or show up in a poll.
82         suite.wait(c, pool, notify, func() bool {
83                 pool.mtx.RLock()
84                 defer pool.mtx.RUnlock()
85                 return len(pool.workers) == 4
86         })
87
88         // Place type3 node on admin-hold
89         ivs := suite.instancesByType(pool, type3)
90         c.Assert(ivs, check.HasLen, 1)
91         type3instanceID := ivs[0].Instance
92         err := pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
93         c.Check(err, check.IsNil)
94
95         // Check admin-hold behavior: refuse to shutdown, and don't
96         // report as Unallocated ("available now or soon").
97         c.Check(pool.Shutdown(type3), check.Equals, false)
98         suite.wait(c, pool, notify, func() bool {
99                 return pool.Unallocated()[type3] == 0
100         })
101         c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
102
103         // Shutdown both type2 nodes
104         c.Check(pool.Shutdown(type2), check.Equals, true)
105         suite.wait(c, pool, notify, func() bool {
106                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
107         })
108         c.Check(pool.Shutdown(type2), check.Equals, true)
109         suite.wait(c, pool, notify, func() bool {
110                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
111         })
112         c.Check(pool.Shutdown(type2), check.Equals, false)
113         for {
114                 // Consume any waiting notifications to ensure the
115                 // next one we get is from Shutdown.
116                 select {
117                 case <-notify:
118                         continue
119                 default:
120                 }
121                 break
122         }
123
124         // Shutdown type1 node
125         c.Check(pool.Shutdown(type1), check.Equals, true)
126         suite.wait(c, pool, notify, func() bool {
127                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
128         })
129         select {
130         case <-notify2:
131         case <-time.After(time.Second):
132                 c.Error("notify did not receive")
133         }
134
135         // Put type3 node back in service.
136         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
137         c.Check(err, check.IsNil)
138         suite.wait(c, pool, notify, func() bool {
139                 return pool.Unallocated()[type3] == 1
140         })
141
142         // Check admin-drain behavior: shut down right away, and don't
143         // report as Unallocated.
144         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
145         c.Check(err, check.IsNil)
146         suite.wait(c, pool, notify, func() bool {
147                 return pool.Unallocated()[type3] == 0
148         })
149         suite.wait(c, pool, notify, func() bool {
150                 ivs := suite.instancesByType(pool, type3)
151                 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
152         })
153
154         // Unblock all pending Destroy calls. Pool calls Destroy again
155         // if a node still appears in the provider list after a
156         // previous attempt, so there might be more than 4 Destroy
157         // calls to unblock.
158         go lameInstanceSet.Release(4444)
159
160         // Sync until all instances disappear from the provider list.
161         suite.wait(c, pool, notify, func() bool {
162                 pool.getInstancesAndSync()
163                 return len(pool.Instances()) == 0
164         })
165 }
166
167 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
168         var ivs []InstanceView
169         for _, iv := range pool.Instances() {
170                 if iv.ArvadosInstanceType == it.Name {
171                         ivs = append(ivs, iv)
172                 }
173         }
174         return ivs
175 }
176
177 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
178         timeout := time.NewTimer(time.Second).C
179         for !ready() {
180                 select {
181                 case <-notify:
182                         continue
183                 case <-timeout:
184                 }
185                 break
186         }
187         c.Check(ready(), check.Equals, true)
188 }
189
190 type stubExecutor struct{}
191
192 func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
193
194 func (*stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
195         return nil, nil, nil
196 }
197
198 func (*stubExecutor) Close() {}