14360: Encapsulate scheduler object.
[arvados.git] / lib / dispatchcloud / worker / pool_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "io"
9         "time"
10
11         "git.curoverse.com/arvados.git/lib/cloud"
12         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "github.com/Sirupsen/logrus"
15         check "gopkg.in/check.v1"
16 )
17
18 const GiB arvados.ByteSize = 1 << 30
19
20 var _ = check.Suite(&PoolSuite{})
21
22 type PoolSuite struct{}
23
24 func (suite *PoolSuite) SetUpSuite(c *check.C) {
25         logrus.StandardLogger().SetLevel(logrus.DebugLevel)
26 }
27
28 func (suite *PoolSuite) TestStartContainer(c *check.C) {
29         // TODO: use an instanceSet stub with an SSH server
30 }
31
32 func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
33         // TODO: use an instanceSet stub with an SSH server
34 }
35
36 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
37         lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
38         type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
39         type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
40         pool := &Pool{
41                 logger:      logrus.StandardLogger(),
42                 newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
43                 instanceSet: lameInstanceSet,
44                 instanceTypes: arvados.InstanceTypeMap{
45                         type1.Name: type1,
46                         type2.Name: type2,
47                 },
48         }
49         notify := pool.Subscribe()
50         defer pool.Unsubscribe(notify)
51         notify2 := pool.Subscribe()
52         defer pool.Unsubscribe(notify2)
53
54         c.Check(pool.Unallocated()[type1], check.Equals, 0)
55         c.Check(pool.Unallocated()[type2], check.Equals, 0)
56         pool.Create(type2)
57         pool.Create(type1)
58         pool.Create(type2)
59         c.Check(pool.Unallocated()[type1], check.Equals, 1)
60         c.Check(pool.Unallocated()[type2], check.Equals, 2)
61         // Unblock the pending Create calls and (before calling Sync!)
62         // wait for the pool to process the returned instances.
63         go lameInstanceSet.Release(3)
64         suite.wait(c, pool, notify, func() bool {
65                 list, err := lameInstanceSet.Instances(nil)
66                 return err == nil && len(list) == 3
67         })
68
69         c.Check(pool.Unallocated()[type1], check.Equals, 1)
70         c.Check(pool.Unallocated()[type2], check.Equals, 2)
71         pool.getInstancesAndSync()
72         c.Check(pool.Unallocated()[type1], check.Equals, 1)
73         c.Check(pool.Unallocated()[type2], check.Equals, 2)
74
75         c.Check(pool.Shutdown(type2), check.Equals, true)
76         suite.wait(c, pool, notify, func() bool {
77                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
78         })
79         c.Check(pool.Shutdown(type2), check.Equals, true)
80         suite.wait(c, pool, notify, func() bool {
81                 return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
82         })
83         c.Check(pool.Shutdown(type2), check.Equals, false)
84         for {
85                 // Consume any waiting notifications to ensure the
86                 // next one we get is from Shutdown.
87                 select {
88                 case <-notify:
89                         continue
90                 default:
91                 }
92                 break
93         }
94         c.Check(pool.Shutdown(type1), check.Equals, true)
95         suite.wait(c, pool, notify, func() bool {
96                 return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
97         })
98         select {
99         case <-notify2:
100         case <-time.After(time.Second):
101                 c.Error("notify did not receive")
102         }
103         go lameInstanceSet.Release(3) // unblock Destroy calls
104 }
105
106 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
107         timeout := time.NewTimer(time.Second).C
108         for !ready() {
109                 select {
110                 case <-notify:
111                         continue
112                 case <-timeout:
113                 }
114                 break
115         }
116         c.Check(ready(), check.Equals, true)
117 }
118
119 type stubExecutor struct{}
120
121 func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
122
123 func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
124         return nil, nil, nil
125 }
126
127 func (*stubExecutor) Close() {}