14645: Adds FUSE ops time to crunchstat output
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "sort"
9         "strings"
10         "time"
11
12         "git.curoverse.com/arvados.git/lib/cloud"
13         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
16         "github.com/prometheus/client_golang/prometheus"
17         check "gopkg.in/check.v1"
18 )
19
20 const GiB arvados.ByteSize = 1 << 30
21
22 var _ = check.Suite(&PoolSuite{})
23
24 type lessChecker struct {
25         *check.CheckerInfo
26 }
27
28 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
29         return params[0].(int) < params[1].(int), ""
30 }
31
32 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
33
34 type PoolSuite struct{}
35
36 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
37         type1 := test.InstanceType(1)
38         type2 := test.InstanceType(2)
39         type3 := test.InstanceType(3)
40         waitForIdle := func(pool *Pool, notify <-chan struct{}) {
41                 timeout := time.NewTimer(time.Second)
42                 for {
43                         instances := pool.Instances()
44                         sort.Slice(instances, func(i, j int) bool {
45                                 return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
46                         })
47                         if len(instances) == 3 &&
48                                 instances[0].ArvadosInstanceType == type1.Name &&
49                                 instances[0].WorkerState == StateIdle.String() &&
50                                 instances[1].ArvadosInstanceType == type1.Name &&
51                                 instances[1].WorkerState == StateIdle.String() &&
52                                 instances[2].ArvadosInstanceType == type2.Name &&
53                                 instances[2].WorkerState == StateIdle.String() {
54                                 return
55                         }
56                         select {
57                         case <-timeout.C:
58                                 c.Logf("pool.Instances() == %#v", instances)
59                                 c.Error("timed out")
60                                 return
61                         case <-notify:
62                         }
63                 }
64         }
65
66         logger := ctxlog.TestLogger(c)
67         driver := &test.StubDriver{}
68         is, err := driver.InstanceSet(nil, "", logger)
69         c.Assert(err, check.IsNil)
70
71         newExecutor := func(cloud.Instance) Executor {
72                 return stubExecutor{
73                         "crunch-run --list": stubResp{},
74                         "true":              stubResp{},
75                 }
76         }
77
78         cluster := &arvados.Cluster{
79                 Dispatch: arvados.Dispatch{
80                         MaxProbesPerSecond: 1000,
81                         ProbeInterval:      arvados.Duration(time.Millisecond * 10),
82                 },
83                 CloudVMs: arvados.CloudVMs{
84                         BootProbeCommand: "true",
85                         SyncInterval:     arvados.Duration(time.Millisecond * 10),
86                 },
87                 InstanceTypes: arvados.InstanceTypeMap{
88                         type1.Name: type1,
89                         type2.Name: type2,
90                         type3.Name: type3,
91                 },
92         }
93
94         pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
95         notify := pool.Subscribe()
96         defer pool.Unsubscribe(notify)
97         pool.Create(type1)
98         pool.Create(type1)
99         pool.Create(type2)
100         waitForIdle(pool, notify)
101         var heldInstanceID cloud.InstanceID
102         for _, inst := range pool.Instances() {
103                 if inst.ArvadosInstanceType == type2.Name {
104                         heldInstanceID = cloud.InstanceID(inst.Instance)
105                         pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
106                 }
107         }
108         pool.Stop()
109
110         c.Log("------- starting new pool, waiting to recover state")
111
112         pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
113         notify2 := pool2.Subscribe()
114         defer pool2.Unsubscribe(notify2)
115         waitForIdle(pool2, notify2)
116         for _, inst := range pool2.Instances() {
117                 if inst.ArvadosInstanceType == type2.Name {
118                         c.Check(inst.Instance, check.Equals, heldInstanceID)
119                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
120                 } else {
121                         c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
122                 }
123         }
124         pool2.Stop()
125 }
126
127 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
128         logger := ctxlog.TestLogger(c)
129         driver := test.StubDriver{HoldCloudOps: true}
130         instanceSet, err := driver.InstanceSet(nil, "", logger)
131         c.Assert(err, check.IsNil)
132
133         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
134         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
135         type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
136         pool := &Pool{
137                 logger:      logger,
138                 newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
139                 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
140                 instanceTypes: arvados.InstanceTypeMap{
141                         type1.Name: type1,
142                         type2.Name: type2,
143                         type3.Name: type3,
144                 },
145         }
146         notify := pool.Subscribe()
147         defer pool.Unsubscribe(notify)
148         notify2 := pool.Subscribe()
149         defer pool.Unsubscribe(notify2)
150
151         c.Check(pool.Unallocated()[type1], check.Equals, 0)
152         c.Check(pool.Unallocated()[type2], check.Equals, 0)
153         c.Check(pool.Unallocated()[type3], check.Equals, 0)
154         pool.Create(type2)
155         pool.Create(type1)
156         pool.Create(type2)
157         pool.Create(type3)
158         c.Check(pool.Unallocated()[type1], check.Equals, 1)
159         c.Check(pool.Unallocated()[type2], check.Equals, 2)
160         c.Check(pool.Unallocated()[type3], check.Equals, 1)
161
162         // Unblock the pending Create calls.
163         go driver.ReleaseCloudOps(4)
164
165         // Wait for each instance to either return from its Create
166         // call, or show up in a poll.
167         suite.wait(c, pool, notify, func() bool {
168                 pool.mtx.RLock()
169                 defer pool.mtx.RUnlock()
170                 return len(pool.workers) == 4
171         })
172
173         // Place type3 node on admin-hold
174         ivs := suite.instancesByType(pool, type3)
175         c.Assert(ivs, check.HasLen, 1)
176         type3instanceID := ivs[0].Instance
177         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
178         c.Check(err, check.IsNil)
179
180         // Check admin-hold behavior: refuse to shutdown, and don't
181         // report as Unallocated ("available now or soon").
182         c.Check(pool.Shutdown(type3), check.Equals, false)
183         suite.wait(c, pool, notify, func() bool {
184                 return pool.Unallocated()[type3] == 0
185         })
186         c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
187
188         // Shutdown both type2 nodes
189         c.Check(pool.Shutdown(type2), check.Equals, true)
190         suite.wait(c, pool, notify, func() bool {
191                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
192         })
193         c.Check(pool.Shutdown(type2), check.Equals, true)
194         suite.wait(c, pool, notify, func() bool {
195                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
196         })
197         c.Check(pool.Shutdown(type2), check.Equals, false)
198         for {
199                 // Consume any waiting notifications to ensure the
200                 // next one we get is from Shutdown.
201                 select {
202                 case <-notify:
203                         continue
204                 default:
205                 }
206                 break
207         }
208
209         // Shutdown type1 node
210         c.Check(pool.Shutdown(type1), check.Equals, true)
211         suite.wait(c, pool, notify, func() bool {
212                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
213         })
214         select {
215         case <-notify2:
216         case <-time.After(time.Second):
217                 c.Error("notify did not receive")
218         }
219
220         // Put type3 node back in service.
221         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
222         c.Check(err, check.IsNil)
223         suite.wait(c, pool, notify, func() bool {
224                 return pool.Unallocated()[type3] == 1
225         })
226
227         // Check admin-drain behavior: shut down right away, and don't
228         // report as Unallocated.
229         err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
230         c.Check(err, check.IsNil)
231         suite.wait(c, pool, notify, func() bool {
232                 return pool.Unallocated()[type3] == 0
233         })
234         suite.wait(c, pool, notify, func() bool {
235                 ivs := suite.instancesByType(pool, type3)
236                 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
237         })
238
239         // Unblock all pending Destroy calls. Pool calls Destroy again
240         // if a node still appears in the provider list after a
241         // previous attempt, so there might be more than 4 Destroy
242         // calls to unblock.
243         go driver.ReleaseCloudOps(4444)
244
245         // Sync until all instances disappear from the provider list.
246         suite.wait(c, pool, notify, func() bool {
247                 pool.getInstancesAndSync()
248                 return len(pool.Instances()) == 0
249         })
250 }
251
252 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
253         var ivs []InstanceView
254         for _, iv := range pool.Instances() {
255                 if iv.ArvadosInstanceType == it.Name {
256                         ivs = append(ivs, iv)
257                 }
258         }
259         return ivs
260 }
261
262 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
263         timeout := time.NewTimer(time.Second).C
264         for !ready() {
265                 select {
266                 case <-notify:
267                         continue
268                 case <-timeout:
269                 }
270                 break
271         }
272         c.Check(ready(), check.Equals, true)
273 }