Merge branch '12573-crunch2-slurm-priority' closes #12573
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "log"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "os/exec"
18         "strings"
19         "testing"
20         "time"
21
22         "git.curoverse.com/arvados.git/sdk/go/arvados"
23         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
24         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
25         "git.curoverse.com/arvados.git/sdk/go/dispatch"
26         . "gopkg.in/check.v1"
27 )
28
29 // Gocheck boilerplate
30 func Test(t *testing.T) {
31         TestingT(t)
32 }
33
34 var _ = Suite(&TestSuite{})
35 var _ = Suite(&MockArvadosServerSuite{})
36
37 type TestSuite struct{}
38 type MockArvadosServerSuite struct{}
39
40 var initialArgs []string
41
42 func (s *TestSuite) SetUpSuite(c *C) {
43         initialArgs = os.Args
44 }
45
46 func (s *TestSuite) TearDownSuite(c *C) {
47 }
48
49 func (s *TestSuite) SetUpTest(c *C) {
50         args := []string{"crunch-dispatch-slurm"}
51         os.Args = args
52
53         arvadostest.StartAPI()
54         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
55 }
56
57 func (s *TestSuite) TearDownTest(c *C) {
58         os.Args = initialArgs
59         arvadostest.ResetEnv()
60         arvadostest.StopAPI()
61 }
62
63 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
64         arvadostest.ResetEnv()
65 }
66
67 func (s *TestSuite) integrationTest(c *C,
68         newSqueueCmd func() *exec.Cmd,
69         newScancelCmd func(arvados.Container) *exec.Cmd,
70         newSbatchCmd func(arvados.Container) *exec.Cmd,
71         newScontrolCmd func(arvados.Container) *exec.Cmd,
72         sbatchCmdComps []string,
73         runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
74         arvadostest.ResetEnv()
75
76         arv, err := arvadosclient.MakeArvadosClient()
77         c.Assert(err, IsNil)
78
79         var sbatchCmdLine []string
80
81         // Override sbatchCmd
82         defer func(orig func(arvados.Container) *exec.Cmd) {
83                 sbatchCmd = orig
84         }(sbatchCmd)
85
86         if newSbatchCmd != nil {
87                 sbatchCmd = newSbatchCmd
88         } else {
89                 sbatchCmd = func(container arvados.Container) *exec.Cmd {
90                         sbatchCmdLine = sbatchFunc(container).Args
91                         return exec.Command("sh")
92                 }
93         }
94
95         // Override squeueCmd
96         defer func(orig func() *exec.Cmd) {
97                 squeueCmd = orig
98         }(squeueCmd)
99         squeueCmd = newSqueueCmd
100
101         // Override scancel
102         defer func(orig func(arvados.Container) *exec.Cmd) {
103                 scancelCmd = orig
104         }(scancelCmd)
105         scancelCmd = newScancelCmd
106
107         // Override scontrol
108         defer func(orig func(arvados.Container) *exec.Cmd) {
109                 scontrolCmd = orig
110         }(scontrolCmd)
111         scontrolCmd = newScontrolCmd
112
113         // There should be one queued container
114         params := arvadosclient.Dict{
115                 "filters": [][]string{{"state", "=", "Queued"}},
116         }
117         var containers arvados.ContainerList
118         err = arv.List("containers", params, &containers)
119         c.Check(err, IsNil)
120         c.Check(len(containers.Items), Equals, 1)
121
122         theConfig.CrunchRunCommand = []string{"echo"}
123
124         ctx, cancel := context.WithCancel(context.Background())
125         doneRun := make(chan struct{})
126
127         dispatcher := dispatch.Dispatcher{
128                 Arv:        arv,
129                 PollPeriod: time.Duration(1) * time.Second,
130                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
131                         go func() {
132                                 runContainer(disp, ctr)
133                                 doneRun <- struct{}{}
134                         }()
135                         run(disp, ctr, status)
136                         cancel()
137                 },
138         }
139
140         sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
141
142         err = dispatcher.Run(ctx)
143         <-doneRun
144         c.Assert(err, Equals, context.Canceled)
145
146         sqCheck.Stop()
147
148         c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
149
150         // There should be no queued containers now
151         err = arv.List("containers", params, &containers)
152         c.Check(err, IsNil)
153         c.Check(len(containers.Items), Equals, 0)
154
155         // Previously "Queued" container should now be in "Complete" state
156         var container arvados.Container
157         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
158         c.Check(err, IsNil)
159         return container
160 }
161
162 func (s *TestSuite) TestIntegrationNormal(c *C) {
163         done := false
164         container := s.integrationTest(c,
165                 func() *exec.Cmd {
166                         if done {
167                                 return exec.Command("true")
168                         } else {
169                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
170                         }
171                 },
172                 nil,
173                 nil,
174                 nil,
175                 []string(nil),
176                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
177                         dispatcher.UpdateState(container.UUID, dispatch.Running)
178                         time.Sleep(3 * time.Second)
179                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
180                         done = true
181                 })
182         c.Check(container.State, Equals, arvados.ContainerStateComplete)
183 }
184
185 func (s *TestSuite) TestIntegrationCancel(c *C) {
186         var cmd *exec.Cmd
187         var scancelCmdLine []string
188         attempt := 0
189
190         container := s.integrationTest(c,
191                 func() *exec.Cmd {
192                         if cmd != nil && cmd.ProcessState != nil {
193                                 return exec.Command("true")
194                         } else {
195                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
196                         }
197                 },
198                 func(container arvados.Container) *exec.Cmd {
199                         if attempt++; attempt == 1 {
200                                 return exec.Command("false")
201                         } else {
202                                 scancelCmdLine = scancelFunc(container).Args
203                                 cmd = exec.Command("echo")
204                                 return cmd
205                         }
206                 },
207                 nil,
208                 nil,
209                 []string(nil),
210                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
211                         dispatcher.UpdateState(container.UUID, dispatch.Running)
212                         time.Sleep(1 * time.Second)
213                         dispatcher.Arv.Update("containers", container.UUID,
214                                 arvadosclient.Dict{
215                                         "container": arvadosclient.Dict{"priority": 0}},
216                                 nil)
217                 })
218         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
219         c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
220 }
221
222 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
223         container := s.integrationTest(c,
224                 func() *exec.Cmd { return exec.Command("echo") },
225                 nil,
226                 nil,
227                 nil,
228                 []string{"sbatch",
229                         fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
230                         fmt.Sprintf("--mem=%d", 11445),
231                         fmt.Sprintf("--cpus-per-task=%d", 4),
232                         fmt.Sprintf("--tmp=%d", 45777),
233                         fmt.Sprintf("--nice=%d", 9990)},
234                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
235                         dispatcher.UpdateState(container.UUID, dispatch.Running)
236                         time.Sleep(3 * time.Second)
237                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
238                 })
239         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
240 }
241
242 func (s *TestSuite) TestSbatchFail(c *C) {
243         container := s.integrationTest(c,
244                 func() *exec.Cmd { return exec.Command("echo") },
245                 nil,
246                 func(container arvados.Container) *exec.Cmd {
247                         return exec.Command("false")
248                 },
249                 nil,
250                 []string(nil),
251                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
252                         dispatcher.UpdateState(container.UUID, dispatch.Running)
253                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
254                 })
255         c.Check(container.State, Equals, arvados.ContainerStateComplete)
256
257         arv, err := arvadosclient.MakeArvadosClient()
258         c.Assert(err, IsNil)
259
260         var ll arvados.LogList
261         err = arv.List("logs", arvadosclient.Dict{"filters": [][]string{
262                 {"object_uuid", "=", container.UUID},
263                 {"event_type", "=", "dispatch"},
264         }}, &ll)
265         c.Assert(len(ll.Items), Equals, 1)
266 }
267
268 func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
269         apiStubResponses := make(map[string]arvadostest.StubResponse)
270         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
271         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
272
273         testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
274 }
275
276 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
277         apiStub := arvadostest.ServerStub{apiStubResponses}
278
279         api := httptest.NewServer(&apiStub)
280         defer api.Close()
281
282         arv := &arvadosclient.ArvadosClient{
283                 Scheme:    "http",
284                 ApiServer: api.URL[7:],
285                 ApiToken:  "abc123",
286                 Client:    &http.Client{Transport: &http.Transport{}},
287                 Retries:   0,
288         }
289
290         buf := bytes.NewBuffer(nil)
291         log.SetOutput(io.MultiWriter(buf, os.Stderr))
292         defer log.SetOutput(os.Stderr)
293
294         theConfig.CrunchRunCommand = []string{crunchCmd}
295
296         ctx, cancel := context.WithCancel(context.Background())
297         dispatcher := dispatch.Dispatcher{
298                 Arv:        arv,
299                 PollPeriod: time.Duration(1) * time.Second,
300                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
301                         go func() {
302                                 time.Sleep(1 * time.Second)
303                                 disp.UpdateState(ctr.UUID, dispatch.Running)
304                                 disp.UpdateState(ctr.UUID, dispatch.Complete)
305                         }()
306                         run(disp, ctr, status)
307                         cancel()
308                 },
309         }
310
311         go func() {
312                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
313                         time.Sleep(100 * time.Millisecond)
314                 }
315                 cancel()
316         }()
317
318         err := dispatcher.Run(ctx)
319         c.Assert(err, Equals, context.Canceled)
320
321         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
322 }
323
324 func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
325         var config Config
326         err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
327         c.Assert(err, NotNil)
328 }
329
330 func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
331         var config Config
332
333         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
334         c.Check(err, IsNil)
335         defer os.Remove(tmpfile.Name())
336
337         _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
338         c.Check(err, IsNil)
339
340         err = readConfig(&config, tmpfile.Name())
341         c.Assert(err, NotNil)
342 }
343
344 func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
345         var config Config
346
347         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
348         c.Check(err, IsNil)
349         defer os.Remove(tmpfile.Name())
350
351         _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
352         c.Check(err, IsNil)
353
354         err = readConfig(&config, tmpfile.Name())
355         c.Assert(err, IsNil)
356         c.Check(0, Equals, len(config.SbatchArguments))
357 }
358
359 func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
360         var config Config
361
362         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
363         c.Check(err, IsNil)
364         defer os.Remove(tmpfile.Name())
365
366         args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
367         argsS := `{"SbatchArguments": ["--arg1=v1",  "--arg2", "--arg3=v3"]}`
368         _, err = tmpfile.Write([]byte(argsS))
369         c.Check(err, IsNil)
370
371         err = readConfig(&config, tmpfile.Name())
372         c.Assert(err, IsNil)
373         c.Check(3, Equals, len(config.SbatchArguments))
374         c.Check(args, DeepEquals, config.SbatchArguments)
375 }
376
377 func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
378         testSbatchFuncWithArgs(c, nil)
379 }
380
381 func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
382         testSbatchFuncWithArgs(c, []string{})
383 }
384
385 func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
386         testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
387 }
388
389 func testSbatchFuncWithArgs(c *C, args []string) {
390         theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
391
392         container := arvados.Container{
393                 UUID:               "123",
394                 RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
395                 Priority:           1}
396         sbatchCmd := sbatchFunc(container)
397
398         var expected []string
399         expected = append(expected, "sbatch")
400         expected = append(expected, theConfig.SbatchArguments...)
401         expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
402
403         c.Check(sbatchCmd.Args, DeepEquals, expected)
404 }
405
406 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
407         theConfig.SbatchArguments = nil
408         container := arvados.Container{
409                 UUID:                 "123",
410                 RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
411                 SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
412                 Priority:             1}
413         sbatchCmd := sbatchFunc(container)
414
415         var expected []string
416         expected = append(expected, "sbatch")
417         expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2")
418
419         c.Check(sbatchCmd.Args, DeepEquals, expected)
420 }
421
422 func (s *TestSuite) TestIntegrationChangePriority(c *C) {
423         var scontrolCmdLine []string
424         step := 0
425
426         container := s.integrationTest(c,
427                 func() *exec.Cmd {
428                         if step == 0 {
429                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
430                         } else if step == 1 {
431                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100")
432                         } else {
433                                 return exec.Command("echo")
434                         }
435                 },
436                 func(arvados.Container) *exec.Cmd { return exec.Command("true") },
437                 nil,
438                 func(container arvados.Container) *exec.Cmd {
439                         scontrolCmdLine = scontrolFunc(container).Args
440                         step = 1
441                         return exec.Command("true")
442                 },
443                 []string(nil),
444                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
445                         dispatcher.UpdateState(container.UUID, dispatch.Running)
446                         time.Sleep(1 * time.Second)
447                         dispatcher.Arv.Update("containers", container.UUID,
448                                 arvadosclient.Dict{
449                                         "container": arvadosclient.Dict{"priority": 600}},
450                                 nil)
451                         time.Sleep(1 * time.Second)
452                         step = 2
453                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
454                 })
455         c.Check(container.State, Equals, arvados.ContainerStateComplete)
456         c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"})
457 }