21700: Install Bundler system-wide in Rails postinst
[arvados.git] / lib / dispatchcloud / container / queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package container
6
7 import (
8         "errors"
9         "os"
10         "sync"
11         "testing"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/arvadostest"
16         "github.com/sirupsen/logrus"
17         check "gopkg.in/check.v1"
18 )
19
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
22         check.TestingT(t)
23 }
24
25 var _ = check.Suite(&IntegrationSuite{})
26
27 func logger() logrus.FieldLogger {
28         logger := logrus.StandardLogger()
29         if os.Getenv("ARVADOS_DEBUG") != "" {
30                 logger.SetLevel(logrus.DebugLevel)
31         }
32         return logger
33 }
34
35 type IntegrationSuite struct{}
36
37 func (suite *IntegrationSuite) TearDownTest(c *check.C) {
38         err := arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
39         c.Check(err, check.IsNil)
40 }
41
42 func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
43         typeChooser := func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
44                 c.Check(ctr.Mounts["/tmp"].Capacity, check.Equals, int64(24000000000))
45                 return []arvados.InstanceType{{Name: "testType"}}, nil
46         }
47
48         client := arvados.NewClientFromEnv()
49         cq := NewQueue(logger(), nil, typeChooser, client)
50
51         err := cq.Update()
52         c.Check(err, check.IsNil)
53
54         ents, threshold := cq.Entries()
55         c.Check(len(ents), check.Not(check.Equals), 0)
56         c.Check(time.Since(threshold) < time.Minute, check.Equals, true)
57         c.Check(time.Since(threshold) > 0, check.Equals, true)
58
59         _, ok := ents[arvadostest.QueuedContainerUUID]
60         c.Check(ok, check.Equals, true)
61
62         var wg sync.WaitGroup
63         for uuid, ent := range ents {
64                 c.Check(ent.Container.UUID, check.Equals, uuid)
65                 c.Check(ent.InstanceTypes, check.HasLen, 1)
66                 c.Check(ent.InstanceTypes[0].Name, check.Equals, "testType")
67                 c.Check(ent.Container.State, check.Equals, arvados.ContainerStateQueued)
68                 c.Check(ent.Container.Priority > 0, check.Equals, true)
69                 // Mounts should be deleted to avoid wasting memory
70                 c.Check(ent.Container.Mounts, check.IsNil)
71
72                 ctr, ok := cq.Get(uuid)
73                 c.Check(ok, check.Equals, true)
74                 c.Check(ctr.UUID, check.Equals, uuid)
75
76                 wg.Add(1)
77                 go func(uuid string) {
78                         defer wg.Done()
79                         err := cq.Unlock(uuid)
80                         c.Check(err, check.NotNil)
81                         c.Check(err, check.ErrorMatches, ".*cannot unlock when Queued.*")
82
83                         err = cq.Lock(uuid)
84                         c.Check(err, check.IsNil)
85                         ctr, ok := cq.Get(uuid)
86                         c.Check(ok, check.Equals, true)
87                         c.Check(ctr.State, check.Equals, arvados.ContainerStateLocked)
88                         err = cq.Lock(uuid)
89                         c.Check(err, check.NotNil)
90
91                         err = cq.Unlock(uuid)
92                         c.Check(err, check.IsNil)
93                         ctr, ok = cq.Get(uuid)
94                         c.Check(ok, check.Equals, true)
95                         c.Check(ctr.State, check.Equals, arvados.ContainerStateQueued)
96                         err = cq.Unlock(uuid)
97                         c.Check(err, check.NotNil)
98
99                         err = cq.Cancel(uuid)
100                         c.Check(err, check.IsNil)
101                         ctr, ok = cq.Get(uuid)
102                         c.Check(ok, check.Equals, true)
103                         c.Check(ctr.State, check.Equals, arvados.ContainerStateCancelled)
104                         err = cq.Lock(uuid)
105                         c.Check(err, check.NotNil)
106                 }(uuid)
107         }
108         wg.Wait()
109 }
110
111 func (suite *IntegrationSuite) TestCancelIfNoInstanceType(c *check.C) {
112         errorTypeChooser := func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
113                 // Make sure the relevant container fields are
114                 // actually populated.
115                 c.Check(ctr.ContainerImage, check.Equals, "test")
116                 c.Check(ctr.RuntimeConstraints.VCPUs, check.Equals, 4)
117                 c.Check(ctr.RuntimeConstraints.RAM, check.Equals, int64(12000000000))
118                 c.Check(ctr.Mounts["/tmp"].Capacity, check.Equals, int64(24000000000))
119                 c.Check(ctr.Mounts["/var/spool/cwl"].Capacity, check.Equals, int64(24000000000))
120                 return nil, errors.New("no suitable instance type")
121         }
122
123         client := arvados.NewClientFromEnv()
124         cq := NewQueue(logger(), nil, errorTypeChooser, client)
125
126         ch := cq.Subscribe()
127         go func() {
128                 defer cq.Unsubscribe(ch)
129                 for range ch {
130                         // Container should never be added to
131                         // queue. Note that polling the queue this way
132                         // doesn't guarantee a bug (container being
133                         // incorrectly added to the queue) will cause
134                         // a test failure.
135                         _, ok := cq.Get(arvadostest.QueuedContainerUUID)
136                         if !c.Check(ok, check.Equals, false) {
137                                 // Don't spam the log with more failures
138                                 break
139                         }
140                 }
141         }()
142
143         var ctr arvados.Container
144         err := client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+arvadostest.QueuedContainerUUID, nil, nil)
145         c.Check(err, check.IsNil)
146         c.Check(ctr.State, check.Equals, arvados.ContainerStateQueued)
147
148         go cq.Update()
149
150         // Wait for the cancel operation to take effect. Container
151         // will have state=Cancelled or just disappear from the queue.
152         suite.waitfor(c, time.Second, func() bool {
153                 err := client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+arvadostest.QueuedContainerUUID, nil, nil)
154                 return err == nil && ctr.State == arvados.ContainerStateCancelled
155         })
156         c.Check(ctr.RuntimeStatus["error"], check.Equals, `no suitable instance type`)
157 }
158
159 func (suite *IntegrationSuite) waitfor(c *check.C, timeout time.Duration, fn func() bool) {
160         defer func() {
161                 c.Check(fn(), check.Equals, true)
162         }()
163         deadline := time.Now().Add(timeout)
164         for !fn() && time.Now().Before(deadline) {
165                 time.Sleep(timeout / 1000)
166         }
167 }