14360: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "io"
9         "time"
10
11         "git.curoverse.com/arvados.git/lib/cloud"
12         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "github.com/Sirupsen/logrus"
15         check "gopkg.in/check.v1"
16 )
17
18 const GiB arvados.ByteSize = 1 << 30
19
20 var _ = check.Suite(&PoolSuite{})
21
22 type lessChecker struct {
23         *check.CheckerInfo
24 }
25
26 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
27         return params[0].(int) < params[1].(int), ""
28 }
29
30 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
31
32 type PoolSuite struct{}
33
34 func (suite *PoolSuite) SetUpSuite(c *check.C) {
35         logrus.StandardLogger().SetLevel(logrus.DebugLevel)
36 }
37
38 func (suite *PoolSuite) TestStartContainer(c *check.C) {
39         // TODO: use an instanceSet stub with an SSH server
40 }
41
42 func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
43         // TODO: use an instanceSet stub with an SSH server
44 }
45
46 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
47         lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
48         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
49         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
50         pool := &Pool{
51                 logger:      logrus.StandardLogger(),
52                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
53                 instanceSet: lameInstanceSet,
54                 instanceTypes: arvados.InstanceTypeMap{
55                         type1.Name: type1,
56                         type2.Name: type2,
57                 },
58         }
59         notify := pool.Subscribe()
60         defer pool.Unsubscribe(notify)
61         notify2 := pool.Subscribe()
62         defer pool.Unsubscribe(notify2)
63
64         c.Check(pool.Unallocated()[type1], check.Equals, 0)
65         c.Check(pool.Unallocated()[type2], check.Equals, 0)
66         pool.Create(type2)
67         pool.Create(type1)
68         pool.Create(type2)
69         c.Check(pool.Unallocated()[type1], check.Equals, 1)
70         c.Check(pool.Unallocated()[type2], check.Equals, 2)
71         // Unblock the pending Create calls and (before calling Sync!)
72         // wait for the pool to process the returned instances.
73         go lameInstanceSet.Release(3)
74         suite.wait(c, pool, notify, func() bool {
75                 list, err := lameInstanceSet.Instances(nil)
76                 return err == nil && len(list) == 3
77         })
78
79         c.Check(pool.Unallocated()[type1], check.Equals, 1)
80         c.Check(pool.Unallocated()[type2], check.Equals, 2)
81         pool.getInstancesAndSync()
82         // Returned counts can be temporarily higher than 1 and 2 if
83         // poll ran before Create() returned.
84         c.Check(pool.Unallocated()[type1], check.Not(less), 1)
85         c.Check(pool.Unallocated()[type2], check.Not(less), 2)
86         suite.wait(c, pool, notify, func() bool {
87                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 2
88         })
89
90         c.Check(pool.Shutdown(type2), check.Equals, true)
91         suite.wait(c, pool, notify, func() bool {
92                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
93         })
94         c.Check(pool.Shutdown(type2), check.Equals, true)
95         suite.wait(c, pool, notify, func() bool {
96                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
97         })
98         c.Check(pool.Shutdown(type2), check.Equals, false)
99         for {
100                 // Consume any waiting notifications to ensure the
101                 // next one we get is from Shutdown.
102                 select {
103                 case <-notify:
104                         continue
105                 default:
106                 }
107                 break
108         }
109         c.Check(pool.Shutdown(type1), check.Equals, true)
110         suite.wait(c, pool, notify, func() bool {
111                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
112         })
113         select {
114         case <-notify2:
115         case <-time.After(time.Second):
116                 c.Error("notify did not receive")
117         }
118         go lameInstanceSet.Release(3) // unblock Destroy calls
119 }
120
121 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
122         timeout := time.NewTimer(time.Second).C
123         for !ready() {
124                 select {
125                 case <-notify:
126                         continue
127                 case <-timeout:
128                 }
129                 break
130         }
131         c.Check(ready(), check.Equals, true)
132 }
133
134 type stubExecutor struct{}
135
136 func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
137
138 func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
139         return nil, nil, nil
140 }
141
142 func (*stubExecutor) Close() {}