Merge branch '12024-zero-content-length'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "log"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "os/exec"
18         "strings"
19         "testing"
20         "time"
21
22         "git.curoverse.com/arvados.git/sdk/go/arvados"
23         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
24         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
25         "git.curoverse.com/arvados.git/sdk/go/dispatch"
26         . "gopkg.in/check.v1"
27 )
28
29 // Gocheck boilerplate
30 func Test(t *testing.T) {
31         TestingT(t)
32 }
33
34 var _ = Suite(&TestSuite{})
35 var _ = Suite(&MockArvadosServerSuite{})
36
37 type TestSuite struct{}
38 type MockArvadosServerSuite struct{}
39
40 var initialArgs []string
41
42 func (s *TestSuite) SetUpSuite(c *C) {
43         initialArgs = os.Args
44 }
45
46 func (s *TestSuite) TearDownSuite(c *C) {
47 }
48
49 func (s *TestSuite) SetUpTest(c *C) {
50         args := []string{"crunch-dispatch-slurm"}
51         os.Args = args
52
53         arvadostest.StartAPI()
54         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
55 }
56
57 func (s *TestSuite) TearDownTest(c *C) {
58         os.Args = initialArgs
59         arvadostest.ResetEnv()
60         arvadostest.StopAPI()
61 }
62
63 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
64         arvadostest.ResetEnv()
65 }
66
67 func (s *TestSuite) TestIntegrationNormal(c *C) {
68         done := false
69         container := s.integrationTest(c,
70                 func() *exec.Cmd {
71                         if done {
72                                 return exec.Command("true")
73                         } else {
74                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
75                         }
76                 },
77                 nil,
78                 nil,
79                 []string(nil),
80                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
81                         dispatcher.UpdateState(container.UUID, dispatch.Running)
82                         time.Sleep(3 * time.Second)
83                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
84                         done = true
85                 })
86         c.Check(container.State, Equals, arvados.ContainerStateComplete)
87 }
88
89 func (s *TestSuite) TestIntegrationCancel(c *C) {
90         var cmd *exec.Cmd
91         var scancelCmdLine []string
92         attempt := 0
93
94         container := s.integrationTest(c,
95                 func() *exec.Cmd {
96                         if cmd != nil && cmd.ProcessState != nil {
97                                 return exec.Command("true")
98                         } else {
99                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
100                         }
101                 },
102                 func(container arvados.Container) *exec.Cmd {
103                         if attempt++; attempt == 1 {
104                                 return exec.Command("false")
105                         } else {
106                                 scancelCmdLine = scancelFunc(container).Args
107                                 cmd = exec.Command("echo")
108                                 return cmd
109                         }
110                 },
111                 nil,
112                 []string(nil),
113                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
114                         dispatcher.UpdateState(container.UUID, dispatch.Running)
115                         time.Sleep(1 * time.Second)
116                         dispatcher.Arv.Update("containers", container.UUID,
117                                 arvadosclient.Dict{
118                                         "container": arvadosclient.Dict{"priority": 0}},
119                                 nil)
120                 })
121         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
122         c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
123 }
124
125 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
126         container := s.integrationTest(c,
127                 func() *exec.Cmd { return exec.Command("echo") },
128                 nil,
129                 nil,
130                 []string{"sbatch",
131                         fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
132                         fmt.Sprintf("--mem=%d", 11445),
133                         fmt.Sprintf("--cpus-per-task=%d", 4),
134                         fmt.Sprintf("--tmp=%d", 45777)},
135                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
136                         dispatcher.UpdateState(container.UUID, dispatch.Running)
137                         time.Sleep(3 * time.Second)
138                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
139                 })
140         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
141 }
142
143 func (s *TestSuite) TestSbatchFail(c *C) {
144         container := s.integrationTest(c,
145                 func() *exec.Cmd { return exec.Command("echo") },
146                 nil,
147                 func(container arvados.Container) *exec.Cmd {
148                         return exec.Command("false")
149                 },
150                 []string(nil),
151                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
152                         dispatcher.UpdateState(container.UUID, dispatch.Running)
153                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
154                 })
155         c.Check(container.State, Equals, arvados.ContainerStateComplete)
156
157         arv, err := arvadosclient.MakeArvadosClient()
158         c.Assert(err, IsNil)
159
160         var ll arvados.LogList
161         err = arv.List("logs", arvadosclient.Dict{"filters": [][]string{
162                 []string{"object_uuid", "=", container.UUID},
163                 []string{"event_type", "=", "dispatch"},
164         }}, &ll)
165         c.Assert(len(ll.Items), Equals, 1)
166 }
167
168 func (s *TestSuite) integrationTest(c *C,
169         newSqueueCmd func() *exec.Cmd,
170         newScancelCmd func(arvados.Container) *exec.Cmd,
171         newSbatchCmd func(arvados.Container) *exec.Cmd,
172         sbatchCmdComps []string,
173         runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
174         arvadostest.ResetEnv()
175
176         arv, err := arvadosclient.MakeArvadosClient()
177         c.Assert(err, IsNil)
178
179         var sbatchCmdLine []string
180
181         // Override sbatchCmd
182         defer func(orig func(arvados.Container) *exec.Cmd) {
183                 sbatchCmd = orig
184         }(sbatchCmd)
185
186         if newSbatchCmd != nil {
187                 sbatchCmd = newSbatchCmd
188         } else {
189                 sbatchCmd = func(container arvados.Container) *exec.Cmd {
190                         sbatchCmdLine = sbatchFunc(container).Args
191                         return exec.Command("sh")
192                 }
193         }
194
195         // Override squeueCmd
196         defer func(orig func() *exec.Cmd) {
197                 squeueCmd = orig
198         }(squeueCmd)
199         squeueCmd = newSqueueCmd
200
201         // Override scancel
202         defer func(orig func(arvados.Container) *exec.Cmd) {
203                 scancelCmd = orig
204         }(scancelCmd)
205         scancelCmd = newScancelCmd
206
207         // There should be one queued container
208         params := arvadosclient.Dict{
209                 "filters": [][]string{{"state", "=", "Queued"}},
210         }
211         var containers arvados.ContainerList
212         err = arv.List("containers", params, &containers)
213         c.Check(err, IsNil)
214         c.Check(len(containers.Items), Equals, 1)
215
216         theConfig.CrunchRunCommand = []string{"echo"}
217
218         ctx, cancel := context.WithCancel(context.Background())
219         doneRun := make(chan struct{})
220
221         dispatcher := dispatch.Dispatcher{
222                 Arv:        arv,
223                 PollPeriod: time.Duration(1) * time.Second,
224                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
225                         go func() {
226                                 runContainer(disp, ctr)
227                                 doneRun <- struct{}{}
228                         }()
229                         run(disp, ctr, status)
230                         cancel()
231                 },
232         }
233
234         sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
235
236         err = dispatcher.Run(ctx)
237         <-doneRun
238         c.Assert(err, Equals, context.Canceled)
239
240         sqCheck.Stop()
241
242         c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
243
244         // There should be no queued containers now
245         err = arv.List("containers", params, &containers)
246         c.Check(err, IsNil)
247         c.Check(len(containers.Items), Equals, 0)
248
249         // Previously "Queued" container should now be in "Complete" state
250         var container arvados.Container
251         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
252         c.Check(err, IsNil)
253         return container
254 }
255
256 func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
257         apiStubResponses := make(map[string]arvadostest.StubResponse)
258         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
259         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
260
261         testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
262 }
263
264 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
265         apiStub := arvadostest.ServerStub{apiStubResponses}
266
267         api := httptest.NewServer(&apiStub)
268         defer api.Close()
269
270         arv := &arvadosclient.ArvadosClient{
271                 Scheme:    "http",
272                 ApiServer: api.URL[7:],
273                 ApiToken:  "abc123",
274                 Client:    &http.Client{Transport: &http.Transport{}},
275                 Retries:   0,
276         }
277
278         buf := bytes.NewBuffer(nil)
279         log.SetOutput(io.MultiWriter(buf, os.Stderr))
280         defer log.SetOutput(os.Stderr)
281
282         theConfig.CrunchRunCommand = []string{crunchCmd}
283
284         ctx, cancel := context.WithCancel(context.Background())
285         dispatcher := dispatch.Dispatcher{
286                 Arv:        arv,
287                 PollPeriod: time.Duration(1) * time.Second,
288                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
289                         go func() {
290                                 time.Sleep(1 * time.Second)
291                                 disp.UpdateState(ctr.UUID, dispatch.Running)
292                                 disp.UpdateState(ctr.UUID, dispatch.Complete)
293                         }()
294                         run(disp, ctr, status)
295                         cancel()
296                 },
297         }
298
299         go func() {
300                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
301                         time.Sleep(100 * time.Millisecond)
302                 }
303                 cancel()
304         }()
305
306         err := dispatcher.Run(ctx)
307         c.Assert(err, Equals, context.Canceled)
308
309         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
310 }
311
312 func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
313         var config Config
314         err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
315         c.Assert(err, NotNil)
316 }
317
318 func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
319         var config Config
320
321         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
322         c.Check(err, IsNil)
323         defer os.Remove(tmpfile.Name())
324
325         _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
326         c.Check(err, IsNil)
327
328         err = readConfig(&config, tmpfile.Name())
329         c.Assert(err, NotNil)
330 }
331
332 func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
333         var config Config
334
335         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
336         c.Check(err, IsNil)
337         defer os.Remove(tmpfile.Name())
338
339         _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
340         c.Check(err, IsNil)
341
342         err = readConfig(&config, tmpfile.Name())
343         c.Assert(err, IsNil)
344         c.Check(0, Equals, len(config.SbatchArguments))
345 }
346
347 func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
348         var config Config
349
350         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
351         c.Check(err, IsNil)
352         defer os.Remove(tmpfile.Name())
353
354         args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
355         argsS := `{"SbatchArguments": ["--arg1=v1",  "--arg2", "--arg3=v3"]}`
356         _, err = tmpfile.Write([]byte(argsS))
357         c.Check(err, IsNil)
358
359         err = readConfig(&config, tmpfile.Name())
360         c.Assert(err, IsNil)
361         c.Check(3, Equals, len(config.SbatchArguments))
362         c.Check(args, DeepEquals, config.SbatchArguments)
363 }
364
365 func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
366         testSbatchFuncWithArgs(c, nil)
367 }
368
369 func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
370         testSbatchFuncWithArgs(c, []string{})
371 }
372
373 func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
374         testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
375 }
376
377 func testSbatchFuncWithArgs(c *C, args []string) {
378         theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
379
380         container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2}}
381         sbatchCmd := sbatchFunc(container)
382
383         var expected []string
384         expected = append(expected, "sbatch")
385         expected = append(expected, theConfig.SbatchArguments...)
386         expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0")
387
388         c.Check(sbatchCmd.Args, DeepEquals, expected)
389 }
390
391 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
392         theConfig.SbatchArguments = nil
393         container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
394         sbatchCmd := sbatchFunc(container)
395
396         var expected []string
397         expected = append(expected, "sbatch")
398         expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--partition=blurb,b2")
399
400         c.Check(sbatchCmd.Args, DeepEquals, expected)
401 }