]> git.arvados.org - arvados.git/blob - services/crunch-dispatch-local/crunch-dispatch-local_test.go
21926: Fix tests and migrate BsubCUDAArguments to BsubGPUArguments
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "io"
11         "net/http"
12         "net/http/httptest"
13         "os"
14         "os/exec"
15         "regexp"
16         "testing"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
21         "git.arvados.org/arvados.git/sdk/go/arvadostest"
22         "git.arvados.org/arvados.git/sdk/go/ctxlog"
23         "git.arvados.org/arvados.git/sdk/go/dispatch"
24         "github.com/sirupsen/logrus"
25         . "gopkg.in/check.v1"
26 )
27
28 // Gocheck boilerplate
29 func Test(t *testing.T) {
30         TestingT(t)
31 }
32
33 var _ = Suite(&TestSuite{})
34 var _ = Suite(&MockArvadosServerSuite{})
35
36 type TestSuite struct{}
37 type MockArvadosServerSuite struct{}
38
39 var initialArgs []string
40
41 func (s *TestSuite) SetUpSuite(c *C) {
42         initialArgs = os.Args
43         runningCmds = make(map[string]*exec.Cmd)
44 }
45
46 func (s *TestSuite) SetUpTest(c *C) {
47         arvadostest.ResetDB(c)
48         arvadostest.ResetEnv()
49         args := []string{"crunch-dispatch-local"}
50         os.Args = args
51 }
52
53 func (s *TestSuite) TearDownTest(c *C) {
54         arvadostest.ResetEnv()
55         arvadostest.ResetDB(c)
56         os.Args = initialArgs
57 }
58
59 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
60         arvadostest.ResetEnv()
61 }
62
63 func (s *TestSuite) TestIntegration(c *C) {
64         arv, err := arvadosclient.MakeArvadosClient()
65         c.Assert(err, IsNil)
66
67         echo := "echo"
68         crunchRunCommand = echo
69
70         ctx, cancel := context.WithCancel(ctxlog.Context(context.Background(), ctxlog.TestLogger(c)))
71         dispatcher := dispatch.Dispatcher{
72                 Arv:        arv,
73                 PollPeriod: time.Second,
74         }
75
76         startCmd := func(container arvados.Container, cmd *exec.Cmd) error {
77                 dispatcher.UpdateState(container.UUID, "Running")
78                 dispatcher.UpdateState(container.UUID, "Complete")
79                 return cmd.Start()
80         }
81
82         cl := arvados.Cluster{Containers: arvados.ContainersConfig{RuntimeEngine: "docker"}}
83
84         dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
85                 defer cancel()
86                 lr := LocalRun{startCmd, make(chan ResourceRequest), make(chan ResourceAlloc), ctx, &cl}
87                 go lr.throttle(logrus.StandardLogger())
88                 return lr.run(d, c, s)
89         }
90
91         err = dispatcher.Run(ctx)
92         c.Assert(err, Equals, context.Canceled)
93
94         // Wait for all running crunch jobs to complete / terminate
95         waitGroup.Wait()
96
97         // There should be no queued containers now
98         params := arvadosclient.Dict{
99                 "filters": [][]string{{"state", "=", "Queued"}},
100         }
101         var containers arvados.ContainerList
102         err = arv.List("containers", params, &containers)
103         c.Check(err, IsNil)
104         c.Assert(len(containers.Items), Equals, 0)
105
106         // Previously "Queued" container should now be in "Complete" state
107         var container arvados.Container
108         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
109         c.Check(err, IsNil)
110         c.Check(string(container.State), Equals, "Complete")
111 }
112
113 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
114         apiStubResponses := make(map[string]arvadostest.StubResponse)
115         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
116
117         testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
118 }
119
120 func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
121         apiStubResponses := make(map[string]arvadostest.StubResponse)
122         apiStubResponses["/arvados/v1/containers"] =
123                 arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1","State":"Queued","Priority":1}]}`)}
124         apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
125                 arvadostest.StubResponse{500, string(`{}`)}
126
127         testWithServerStub(c, apiStubResponses, "echo", "error locking container zzzzz-dz642-xxxxxxxxxxxxxx1")
128 }
129
130 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
131         apiStubResponses := make(map[string]arvadostest.StubResponse)
132         apiStubResponses["/arvados/v1/containers"] =
133                 arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{
134 "uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2",
135 "state":"Queued",
136 "priority":1,
137 "runtime_constraints": {
138   "vcpus": 1,
139   "ram": 1000000
140 }}]}`)}
141         apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2/lock"] =
142                 arvadostest.StubResponse{200, string(`{
143 "uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2",
144 "state":"Locked",
145 "priority":1,
146 "locked_by_uuid": "zzzzz-gj3su-000000000000000",
147 "runtime_constraints": {
148   "vcpus": 1,
149   "ram": 1000000
150 }}`)}
151         apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
152                 arvadostest.StubResponse{200, string(`{
153 "uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2",
154 "state":"Running",
155 "priority":1,
156 "locked_by_uuid": "zzzzz-gj3su-000000000000000",
157 "runtime_constraints": {
158   "vcpus": 1,
159   "ram": 1000000
160 }}`)}
161
162         testWithServerStub(c, apiStubResponses, "echo",
163                 `after \\"echo\\" process termination, container state for zzzzz-dz642-xxxxxxxxxxxxxx2 is \\"Running\\"; updating it to \\"Cancelled\\"`)
164 }
165
166 func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
167         apiStubResponses := make(map[string]arvadostest.StubResponse)
168         apiStubResponses["/arvados/v1/containers"] =
169                 arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{
170 "uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3",
171 "state":"Queued",
172 "priority":1,
173 "runtime_constraints": {
174   "vcpus": 1,
175   "ram": 1000000
176 }}]}`)}
177
178         apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3/lock"] =
179                 arvadostest.StubResponse{200, string(`{
180 "uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3",
181 "state":"Locked",
182 "priority":1,
183 "runtime_constraints": {
184   "vcpus": 1,
185   "ram": 1000000
186 }
187 }`)}
188
189         testWithServerStub(c, apiStubResponses, "nosuchcommand", `error starting \\"nosuchcommand\\" for zzzzz-dz642-xxxxxxxxxxxxxx3`)
190 }
191
192 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
193         apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
194                 arvadostest.StubResponse{200, string(`{"uuid": "zzzzz-gj3su-000000000000000", "api_token": "xyz"}`)}
195
196         apiStub := arvadostest.ServerStub{apiStubResponses}
197
198         api := httptest.NewServer(&apiStub)
199         defer api.Close()
200
201         arv := &arvadosclient.ArvadosClient{
202                 Scheme:    "http",
203                 ApiServer: api.URL[7:],
204                 ApiToken:  "abc123",
205                 Client:    &http.Client{Transport: &http.Transport{}},
206                 Retries:   0,
207         }
208
209         buf := bytes.NewBuffer(nil)
210         logger := ctxlog.TestLogger(c)
211         logger.SetOutput(io.MultiWriter(buf, logger.Out))
212
213         crunchRunCommand = crunchCmd
214
215         ctx, cancel := context.WithCancel(ctxlog.Context(context.Background(), logger))
216         defer cancel()
217         dispatcher := dispatch.Dispatcher{
218                 Logger:     logger,
219                 Arv:        arv,
220                 PollPeriod: time.Second,
221         }
222
223         startCmd := func(container arvados.Container, cmd *exec.Cmd) error {
224                 dispatcher.UpdateState(container.UUID, "Running")
225                 dispatcher.UpdateState(container.UUID, "Complete")
226                 return cmd.Start()
227         }
228
229         cl := arvados.Cluster{Containers: arvados.ContainersConfig{RuntimeEngine: "docker"}}
230         runningCmds = make(map[string]*exec.Cmd)
231
232         dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
233                 defer cancel()
234                 lr := LocalRun{startCmd, make(chan ResourceRequest), make(chan ResourceAlloc), ctx, &cl}
235                 go lr.throttle(logrus.StandardLogger())
236                 return lr.run(d, c, s)
237         }
238
239         re := regexp.MustCompile(`(?ms).*` + expected + `.*`)
240         go func() {
241                 for i := 0; i < 80 && !re.MatchString(buf.String()); i++ {
242                         time.Sleep(100 * time.Millisecond)
243                 }
244                 cancel()
245         }()
246
247         err := dispatcher.Run(ctx)
248         c.Assert(err, Equals, context.Canceled)
249         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
250
251         c.Logf("test finished, waiting for running crunch jobs to complete / terminate")
252         waitGroup.Wait()
253 }