11369: Fix tests. Tweak test fixture to include a mounts entry with capacity.
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "net/http"
11         "net/http/httptest"
12         "os"
13         "os/exec"
14         "strings"
15         "testing"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
20         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
21         "git.curoverse.com/arvados.git/sdk/go/dispatch"
22         . "gopkg.in/check.v1"
23 )
24
25 // Gocheck boilerplate
26 func Test(t *testing.T) {
27         TestingT(t)
28 }
29
30 var _ = Suite(&TestSuite{})
31 var _ = Suite(&MockArvadosServerSuite{})
32
33 type TestSuite struct{}
34 type MockArvadosServerSuite struct{}
35
36 var initialArgs []string
37
38 func (s *TestSuite) SetUpSuite(c *C) {
39         initialArgs = os.Args
40 }
41
42 func (s *TestSuite) TearDownSuite(c *C) {
43 }
44
45 func (s *TestSuite) SetUpTest(c *C) {
46         args := []string{"crunch-dispatch-slurm"}
47         os.Args = args
48
49         arvadostest.StartAPI()
50         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
51 }
52
53 func (s *TestSuite) TearDownTest(c *C) {
54         os.Args = initialArgs
55         arvadostest.StopAPI()
56 }
57
58 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
59         arvadostest.ResetEnv()
60 }
61
62 func (s *TestSuite) TestIntegrationNormal(c *C) {
63         done := false
64         container := s.integrationTest(c,
65                 func() *exec.Cmd {
66                         if done {
67                                 return exec.Command("true")
68                         } else {
69                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
70                         }
71                 },
72                 []string(nil),
73                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
74                         dispatcher.UpdateState(container.UUID, dispatch.Running)
75                         time.Sleep(3 * time.Second)
76                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
77                         done = true
78                 })
79         c.Check(container.State, Equals, arvados.ContainerStateComplete)
80 }
81
82 func (s *TestSuite) TestIntegrationCancel(c *C) {
83         var cmd *exec.Cmd
84         var scancelCmdLine []string
85         defer func(orig func(arvados.Container) *exec.Cmd) {
86                 scancelCmd = orig
87         }(scancelCmd)
88         attempt := 0
89         scancelCmd = func(container arvados.Container) *exec.Cmd {
90                 if attempt++; attempt == 1 {
91                         return exec.Command("false")
92                 } else {
93                         scancelCmdLine = scancelFunc(container).Args
94                         cmd = exec.Command("echo")
95                         return cmd
96                 }
97         }
98
99         container := s.integrationTest(c,
100                 func() *exec.Cmd {
101                         if cmd != nil && cmd.ProcessState != nil {
102                                 return exec.Command("true")
103                         } else {
104                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
105                         }
106                 },
107                 []string(nil),
108                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
109                         dispatcher.UpdateState(container.UUID, dispatch.Running)
110                         time.Sleep(1 * time.Second)
111                         dispatcher.Arv.Update("containers", container.UUID,
112                                 arvadosclient.Dict{
113                                         "container": arvadosclient.Dict{"priority": 0}},
114                                 nil)
115                 })
116         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
117         c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
118 }
119
120 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
121         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch",
122                 fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
123                 fmt.Sprintf("--mem=%d", 11445),
124                 fmt.Sprintf("--cpus-per-task=%d", 4),
125                 fmt.Sprintf("--tmp=%d", 45777)},
126                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
127                         dispatcher.UpdateState(container.UUID, dispatch.Running)
128                         time.Sleep(3 * time.Second)
129                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
130                 })
131         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
132 }
133
134 func (s *TestSuite) integrationTest(c *C,
135         newSqueueCmd func() *exec.Cmd,
136         sbatchCmdComps []string,
137         runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
138         arvadostest.ResetEnv()
139
140         arv, err := arvadosclient.MakeArvadosClient()
141         c.Assert(err, IsNil)
142
143         var sbatchCmdLine []string
144
145         // Override sbatchCmd
146         defer func(orig func(arvados.Container) *exec.Cmd) {
147                 sbatchCmd = orig
148         }(sbatchCmd)
149         sbatchCmd = func(container arvados.Container) *exec.Cmd {
150                 sbatchCmdLine = sbatchFunc(container).Args
151                 return exec.Command("sh")
152         }
153
154         // Override squeueCmd
155         defer func(orig func() *exec.Cmd) {
156                 squeueCmd = orig
157         }(squeueCmd)
158         squeueCmd = newSqueueCmd
159
160         // There should be one queued container
161         params := arvadosclient.Dict{
162                 "filters": [][]string{{"state", "=", "Queued"}},
163         }
164         var containers arvados.ContainerList
165         err = arv.List("containers", params, &containers)
166         c.Check(err, IsNil)
167         c.Check(len(containers.Items), Equals, 1)
168
169         theConfig.CrunchRunCommand = []string{"echo"}
170
171         ctx, cancel := context.WithCancel(context.Background())
172         dispatcher := dispatch.Dispatcher{
173                 Arv:        arv,
174                 PollPeriod: time.Duration(1) * time.Second,
175                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
176                         go runContainer(disp, ctr)
177                         run(disp, ctr, status)
178                         cancel()
179                 },
180         }
181
182         sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
183
184         err = dispatcher.Run(ctx)
185         c.Assert(err, Equals, context.Canceled)
186
187         sqCheck.Stop()
188
189         c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
190
191         // There should be no queued containers now
192         err = arv.List("containers", params, &containers)
193         c.Check(err, IsNil)
194         c.Check(len(containers.Items), Equals, 0)
195
196         // Previously "Queued" container should now be in "Complete" state
197         var container arvados.Container
198         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
199         c.Check(err, IsNil)
200         return container
201 }
202
203 func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
204         apiStubResponses := make(map[string]arvadostest.StubResponse)
205         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
206         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
207
208         testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
209 }
210
211 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
212         apiStub := arvadostest.ServerStub{apiStubResponses}
213
214         api := httptest.NewServer(&apiStub)
215         defer api.Close()
216
217         arv := &arvadosclient.ArvadosClient{
218                 Scheme:    "http",
219                 ApiServer: api.URL[7:],
220                 ApiToken:  "abc123",
221                 Client:    &http.Client{Transport: &http.Transport{}},
222                 Retries:   0,
223         }
224
225         buf := bytes.NewBuffer(nil)
226         log.SetOutput(io.MultiWriter(buf, os.Stderr))
227         defer log.SetOutput(os.Stderr)
228
229         theConfig.CrunchRunCommand = []string{crunchCmd}
230
231         ctx, cancel := context.WithCancel(context.Background())
232         dispatcher := dispatch.Dispatcher{
233                 Arv:        arv,
234                 PollPeriod: time.Duration(1) * time.Second,
235                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
236                         go func() {
237                                 time.Sleep(1 * time.Second)
238                                 disp.UpdateState(ctr.UUID, dispatch.Running)
239                                 disp.UpdateState(ctr.UUID, dispatch.Complete)
240                         }()
241                         run(disp, ctr, status)
242                         cancel()
243                 },
244         }
245
246         go func() {
247                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
248                         time.Sleep(100 * time.Millisecond)
249                 }
250                 cancel()
251         }()
252
253         err := dispatcher.Run(ctx)
254         c.Assert(err, Equals, context.Canceled)
255
256         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
257 }
258
259 func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
260         var config Config
261         err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
262         c.Assert(err, NotNil)
263 }
264
265 func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
266         var config Config
267
268         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
269         c.Check(err, IsNil)
270         defer os.Remove(tmpfile.Name())
271
272         _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
273         c.Check(err, IsNil)
274
275         err = readConfig(&config, tmpfile.Name())
276         c.Assert(err, NotNil)
277 }
278
279 func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
280         var config Config
281
282         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
283         c.Check(err, IsNil)
284         defer os.Remove(tmpfile.Name())
285
286         _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
287         c.Check(err, IsNil)
288
289         err = readConfig(&config, tmpfile.Name())
290         c.Assert(err, IsNil)
291         c.Check(0, Equals, len(config.SbatchArguments))
292 }
293
294 func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
295         var config Config
296
297         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
298         c.Check(err, IsNil)
299         defer os.Remove(tmpfile.Name())
300
301         args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
302         argsS := `{"SbatchArguments": ["--arg1=v1",  "--arg2", "--arg3=v3"]}`
303         _, err = tmpfile.Write([]byte(argsS))
304         c.Check(err, IsNil)
305
306         err = readConfig(&config, tmpfile.Name())
307         c.Assert(err, IsNil)
308         c.Check(3, Equals, len(config.SbatchArguments))
309         c.Check(args, DeepEquals, config.SbatchArguments)
310 }
311
312 func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
313         testSbatchFuncWithArgs(c, nil)
314 }
315
316 func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
317         testSbatchFuncWithArgs(c, []string{})
318 }
319
320 func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
321         testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
322 }
323
324 func testSbatchFuncWithArgs(c *C, args []string) {
325         theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
326
327         container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2}}
328         sbatchCmd := sbatchFunc(container)
329
330         var expected []string
331         expected = append(expected, "sbatch")
332         expected = append(expected, theConfig.SbatchArguments...)
333         expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0")
334
335         c.Check(sbatchCmd.Args, DeepEquals, expected)
336 }
337
338 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
339         theConfig.SbatchArguments = nil
340         container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
341         sbatchCmd := sbatchFunc(container)
342
343         var expected []string
344         expected = append(expected, "sbatch")
345         expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--partition=blurb,b2")
346
347         c.Check(sbatchCmd.Args, DeepEquals, expected)
348 }