1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
11 "git.curoverse.com/arvados.git/lib/cloud"
12 "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
13 "git.curoverse.com/arvados.git/sdk/go/arvados"
14 "github.com/Sirupsen/logrus"
15 check "gopkg.in/check.v1"
18 const GiB arvados.ByteSize = 1 << 30
20 var _ = check.Suite(&PoolSuite{})
22 type lessChecker struct {
26 func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
27 return params[0].(int) < params[1].(int), ""
30 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
32 type PoolSuite struct{}
34 func (suite *PoolSuite) SetUpSuite(c *check.C) {
35 logrus.StandardLogger().SetLevel(logrus.DebugLevel)
38 func (suite *PoolSuite) TestStartContainer(c *check.C) {
39 // TODO: use an instanceSet stub with an SSH server
42 func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
43 // TODO: use an instanceSet stub with an SSH server
46 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
47 lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
48 type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
49 type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
51 logger: logrus.StandardLogger(),
52 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
53 instanceSet: lameInstanceSet,
54 instanceTypes: arvados.InstanceTypeMap{
59 notify := pool.Subscribe()
60 defer pool.Unsubscribe(notify)
61 notify2 := pool.Subscribe()
62 defer pool.Unsubscribe(notify2)
64 c.Check(pool.Unallocated()[type1], check.Equals, 0)
65 c.Check(pool.Unallocated()[type2], check.Equals, 0)
69 c.Check(pool.Unallocated()[type1], check.Equals, 1)
70 c.Check(pool.Unallocated()[type2], check.Equals, 2)
72 // Unblock the pending Create calls.
73 go lameInstanceSet.Release(3)
75 // Wait for each instance to either return from its Create
76 // call, or show up in a poll.
77 suite.wait(c, pool, notify, func() bool {
79 defer pool.mtx.RUnlock()
80 return len(pool.workers) == 3
83 c.Check(pool.Shutdown(type2), check.Equals, true)
84 suite.wait(c, pool, notify, func() bool {
85 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
87 c.Check(pool.Shutdown(type2), check.Equals, true)
88 suite.wait(c, pool, notify, func() bool {
89 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
91 c.Check(pool.Shutdown(type2), check.Equals, false)
93 // Consume any waiting notifications to ensure the
94 // next one we get is from Shutdown.
102 c.Check(pool.Shutdown(type1), check.Equals, true)
103 suite.wait(c, pool, notify, func() bool {
104 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
108 case <-time.After(time.Second):
109 c.Error("notify did not receive")
111 go lameInstanceSet.Release(3) // unblock Destroy calls
114 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
115 timeout := time.NewTimer(time.Second).C
124 c.Check(ready(), check.Equals, true)
127 type stubExecutor struct{}
129 func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
131 func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
135 func (*stubExecutor) Close() {}