1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
11 "git.curoverse.com/arvados.git/lib/cloud"
12 "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
13 "git.curoverse.com/arvados.git/sdk/go/arvados"
14 "github.com/sirupsen/logrus"
15 check "gopkg.in/check.v1"
18 const GiB arvados.ByteSize = 1 << 30
20 var _ = check.Suite(&PoolSuite{})
22 type lessChecker struct {
26 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
27 return params[0].(int) < params[1].(int), ""
30 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
32 type PoolSuite struct{}
34 func (suite *PoolSuite) SetUpSuite(c *check.C) {
35 logrus.StandardLogger().SetLevel(logrus.DebugLevel)
38 func (suite *PoolSuite) TestStartContainer(c *check.C) {
39 // TODO: use an instanceSet stub with an SSH server
42 func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
43 // TODO: use an instanceSet stub with an SSH server
46 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
47 lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
48 type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
49 type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
50 type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
52 logger: logrus.StandardLogger(),
53 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
54 instanceSet: lameInstanceSet,
55 instanceTypes: arvados.InstanceTypeMap{
61 notify := pool.Subscribe()
62 defer pool.Unsubscribe(notify)
63 notify2 := pool.Subscribe()
64 defer pool.Unsubscribe(notify2)
66 c.Check(pool.Unallocated()[type1], check.Equals, 0)
67 c.Check(pool.Unallocated()[type2], check.Equals, 0)
68 c.Check(pool.Unallocated()[type3], check.Equals, 0)
73 c.Check(pool.Unallocated()[type1], check.Equals, 1)
74 c.Check(pool.Unallocated()[type2], check.Equals, 2)
75 c.Check(pool.Unallocated()[type3], check.Equals, 1)
77 // Unblock the pending Create calls.
78 go lameInstanceSet.Release(4)
80 // Wait for each instance to either return from its Create
81 // call, or show up in a poll.
82 suite.wait(c, pool, notify, func() bool {
84 defer pool.mtx.RUnlock()
85 return len(pool.workers) == 4
88 // Place type3 node on admin-hold
89 ivs := suite.instancesByType(pool, type3)
90 c.Assert(ivs, check.HasLen, 1)
91 type3instanceID := ivs[0].Instance
92 err := pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
93 c.Check(err, check.IsNil)
95 // Check admin-hold behavior: refuse to shutdown, and don't
96 // report as Unallocated ("available now or soon").
97 c.Check(pool.Shutdown(type3), check.Equals, false)
98 suite.wait(c, pool, notify, func() bool {
99 return pool.Unallocated()[type3] == 0
101 c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
103 // Shutdown both type2 nodes
104 c.Check(pool.Shutdown(type2), check.Equals, true)
105 suite.wait(c, pool, notify, func() bool {
106 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
108 c.Check(pool.Shutdown(type2), check.Equals, true)
109 suite.wait(c, pool, notify, func() bool {
110 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
112 c.Check(pool.Shutdown(type2), check.Equals, false)
114 // Consume any waiting notifications to ensure the
115 // next one we get is from Shutdown.
124 // Shutdown type1 node
125 c.Check(pool.Shutdown(type1), check.Equals, true)
126 suite.wait(c, pool, notify, func() bool {
127 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
131 case <-time.After(time.Second):
132 c.Error("notify did not receive")
135 // Put type3 node back in service.
136 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
137 c.Check(err, check.IsNil)
138 suite.wait(c, pool, notify, func() bool {
139 return pool.Unallocated()[type3] == 1
142 // Check admin-drain behavior: shut down right away, and don't
143 // report as Unallocated.
144 err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
145 c.Check(err, check.IsNil)
146 suite.wait(c, pool, notify, func() bool {
147 return pool.Unallocated()[type3] == 0
149 suite.wait(c, pool, notify, func() bool {
150 ivs := suite.instancesByType(pool, type3)
151 return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
154 // Unblock all pending Destroy calls. Pool calls Destroy again
155 // if a node still appears in the provider list after a
156 // previous attempt, so there might be more than 4 Destroy
158 go lameInstanceSet.Release(4444)
160 // Sync until all instances disappear from the provider list.
161 suite.wait(c, pool, notify, func() bool {
162 pool.getInstancesAndSync()
163 return len(pool.Instances()) == 0
167 func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
168 var ivs []InstanceView
169 for _, iv := range pool.Instances() {
170 if iv.ArvadosInstanceType == it.Name {
171 ivs = append(ivs, iv)
177 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
178 timeout := time.NewTimer(time.Second).C
187 c.Check(ready(), check.Equals, true)
190 type stubExecutor struct{}
192 func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
194 func (*stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
198 func (*stubExecutor) Close() {}