14325: Clean up test suite logging.
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "sort"
9         "strings"
10         "time"
11
12         "git.curoverse.com/arvados.git/lib/cloud"
13         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "github.com/prometheus/client_golang/prometheus"
16         check "gopkg.in/check.v1"
17 )
18
19 const GiB arvados.ByteSize = 1 << 30
20
21 var _ = check.Suite(&PoolSuite{})
22
23 type lessChecker struct {
24         *check.CheckerInfo
25 }
26
27 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
28         return params[0].(int) < params[1].(int), ""
29 }
30
31 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
32
33 type PoolSuite struct{}
34
35 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
36         type1 := test.InstanceType(1)
37         type2 := test.InstanceType(2)
38         type3 := test.InstanceType(3)
39         waitForIdle := func(pool *Pool, notify <-chan struct{}) {
40                 timeout := time.NewTimer(time.Second)
41                 for {
42                         instances := pool.Instances()
43                         sort.Slice(instances, func(i, j int) bool {
44                                 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
45                         })
46                         if len(instances) == 3 &&
47                                 instances[0].ArvadosInstanceType == type1.Name &&
48                                 instances[0].WorkerState == StateIdle.String() &&
49                                 instances[1].ArvadosInstanceType == type1.Name &&
50                                 instances[1].WorkerState == StateIdle.String() &&
51                                 instances[2].ArvadosInstanceType == type2.Name &&
52                                 instances[2].WorkerState == StateIdle.String() {
53                                 return
54                         }
55                         select {
56                         case <-timeout.C:
57                                 c.Logf("pool.Instances() == %#v", instances)
58                                 c.Error("timed out")
59                                 return
60                         case <-notify:
61                         }
62                 }
63         }
64
65         logger := test.Logger()
66         driver := &test.StubDriver{}
67         is, err := driver.InstanceSet(nil, "", logger)
68         c.Assert(err, check.IsNil)
69
70         newExecutor := func(cloud.Instance) Executor {
71                 return stubExecutor{
72                         "crunch-run --list": stubResp{},
73                         "true":              stubResp{},
74                 }
75         }
76
77         cluster := &arvados.Cluster{
78                 Dispatch: arvados.Dispatch{
79                         MaxProbesPerSecond: 1000,
80                         ProbeInterval:      arvados.Duration(time.Millisecond * 10),
81                 },
82                 CloudVMs: arvados.CloudVMs{
83                         BootProbeCommand: "true",
84                         SyncInterval:     arvados.Duration(time.Millisecond * 10),
85                 },
86                 InstanceTypes: arvados.InstanceTypeMap{
87                         type1.Name: type1,
88                         type2.Name: type2,
89                         type3.Name: type3,
90                 },
91         }
92
93         pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
94         notify := pool.Subscribe()
95         defer pool.Unsubscribe(notify)
96         pool.Create(type1)
97         pool.Create(type1)
98         pool.Create(type2)
99         waitForIdle(pool, notify)
100         var heldInstanceID cloud.InstanceID
101         for _, inst := range pool.Instances() {
102                 if inst.ArvadosInstanceType == type2.Name {
103                         heldInstanceID = cloud.InstanceID(inst.Instance)
104                         pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
105                 }
106         }
107         pool.Stop()
108
109         c.Log("------- starting new pool, waiting to recover state")
110
111         pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
112         notify2 := pool2.Subscribe()
113         defer pool2.Unsubscribe(notify2)
114         waitForIdle(pool2, notify2)
115         for _, inst := range pool2.Instances() {
116                 if inst.ArvadosInstanceType == type2.Name {
117                         c.Check(inst.Instance, check.Equals, heldInstanceID)
118                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
119                 } else {
120                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
121                 }
122         }
123         pool2.Stop()
124 }
125
126 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
127         logger := test.Logger()
128         driver := test.StubDriver{HoldCloudOps: true}
129         instanceSet, err := driver.InstanceSet(nil, "", logger)
130         c.Assert(err, check.IsNil)
131
132         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
133         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
134         type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
135         pool := &Pool{
136                 logger:      logger,
137                 newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
138                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
139                 instanceTypes: arvados.InstanceTypeMap{
140                         type1.Name: type1,
141                         type2.Name: type2,
142                         type3.Name: type3,
143                 },
144         }
145         notify := pool.Subscribe()
146         defer pool.Unsubscribe(notify)
147         notify2 := pool.Subscribe()
148         defer pool.Unsubscribe(notify2)
149
150         c.Check(pool.Unallocated()[type1], check.Equals, 0)
151         c.Check(pool.Unallocated()[type2], check.Equals, 0)
152         c.Check(pool.Unallocated()[type3], check.Equals, 0)
153         pool.Create(type2)
154         pool.Create(type1)
155         pool.Create(type2)
156         pool.Create(type3)
157         c.Check(pool.Unallocated()[type1], check.Equals, 1)
158         c.Check(pool.Unallocated()[type2], check.Equals, 2)
159         c.Check(pool.Unallocated()[type3], check.Equals, 1)
160
161         // Unblock the pending Create calls.
162         go driver.ReleaseCloudOps(4)
163
164         // Wait for each instance to either return from its Create
165         // call, or show up in a poll.
166         suite.wait(c, pool, notify, func() bool {
167                 pool.mtx.RLock()
168                 defer pool.mtx.RUnlock()
169                 return len(pool.workers) == 4
170         })
171
172         // Place type3 node on admin-hold
173         ivs := suite.instancesByType(pool, type3)
174         c.Assert(ivs, check.HasLen, 1)
175         type3instanceID := ivs[0].Instance
176         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
177         c.Check(err, check.IsNil)
178
179         // Check admin-hold behavior: refuse to shutdown, and don't
180         // report as Unallocated ("available now or soon").
181         c.Check(pool.Shutdown(type3), check.Equals, false)
182         suite.wait(c, pool, notify, func() bool {
183                 return pool.Unallocated()[type3] == 0
184         })
185         c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
186
187         // Shutdown both type2 nodes
188         c.Check(pool.Shutdown(type2), check.Equals, true)
189         suite.wait(c, pool, notify, func() bool {
190                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
191         })
192         c.Check(pool.Shutdown(type2), check.Equals, true)
193         suite.wait(c, pool, notify, func() bool {
194                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
195         })
196         c.Check(pool.Shutdown(type2), check.Equals, false)
197         for {
198                 // Consume any waiting notifications to ensure the
199                 // next one we get is from Shutdown.
200                 select {
201                 case <-notify:
202                         continue
203                 default:
204                 }
205                 break
206         }
207
208         // Shutdown type1 node
209         c.Check(pool.Shutdown(type1), check.Equals, true)
210         suite.wait(c, pool, notify, func() bool {
211                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
212         })
213         select {
214         case <-notify2:
215         case <-time.After(time.Second):
216                 c.Error("notify did not receive")
217         }
218
219         // Put type3 node back in service.
220         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
221         c.Check(err, check.IsNil)
222         suite.wait(c, pool, notify, func() bool {
223                 return pool.Unallocated()[type3] == 1
224         })
225
226         // Check admin-drain behavior: shut down right away, and don't
227         // report as Unallocated.
228         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
229         c.Check(err, check.IsNil)
230         suite.wait(c, pool, notify, func() bool {
231                 return pool.Unallocated()[type3] == 0
232         })
233         suite.wait(c, pool, notify, func() bool {
234                 ivs := suite.instancesByType(pool, type3)
235                 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
236         })
237
238         // Unblock all pending Destroy calls. Pool calls Destroy again
239         // if a node still appears in the provider list after a
240         // previous attempt, so there might be more than 4 Destroy
241         // calls to unblock.
242         go driver.ReleaseCloudOps(4444)
243
244         // Sync until all instances disappear from the provider list.
245         suite.wait(c, pool, notify, func() bool {
246                 pool.getInstancesAndSync()
247                 return len(pool.Instances()) == 0
248         })
249 }
250
251 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
252         var ivs []InstanceView
253         for _, iv := range pool.Instances() {
254                 if iv.ArvadosInstanceType == it.Name {
255                         ivs = append(ivs, iv)
256                 }
257         }
258         return ivs
259 }
260
261 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
262         timeout := time.NewTimer(time.Second).C
263         for !ready() {
264                 select {
265                 case <-notify:
266                         continue
267                 case <-timeout:
268                 }
269                 break
270         }
271         c.Check(ready(), check.Equals, true)
272 }