Merge branch '10629-fuse-listing-perf' closes #10629
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "net/http"
11         "net/http/httptest"
12         "os"
13         "os/exec"
14         "strings"
15         "testing"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
20         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
21         "git.curoverse.com/arvados.git/sdk/go/dispatch"
22         . "gopkg.in/check.v1"
23 )
24
25 // Gocheck boilerplate
26 func Test(t *testing.T) {
27         TestingT(t)
28 }
29
30 var _ = Suite(&TestSuite{})
31 var _ = Suite(&MockArvadosServerSuite{})
32
33 type TestSuite struct{}
34 type MockArvadosServerSuite struct{}
35
36 var initialArgs []string
37
38 func (s *TestSuite) SetUpSuite(c *C) {
39         initialArgs = os.Args
40 }
41
42 func (s *TestSuite) TearDownSuite(c *C) {
43 }
44
45 func (s *TestSuite) SetUpTest(c *C) {
46         args := []string{"crunch-dispatch-slurm"}
47         os.Args = args
48
49         arvadostest.StartAPI()
50         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
51 }
52
53 func (s *TestSuite) TearDownTest(c *C) {
54         os.Args = initialArgs
55         arvadostest.StopAPI()
56 }
57
58 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
59         arvadostest.ResetEnv()
60 }
61
62 func (s *TestSuite) TestIntegrationNormal(c *C) {
63         done := false
64         container := s.integrationTest(c,
65                 func() *exec.Cmd {
66                         if done {
67                                 return exec.Command("true")
68                         } else {
69                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
70                         }
71                 },
72                 []string(nil),
73                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
74                         dispatcher.UpdateState(container.UUID, dispatch.Running)
75                         time.Sleep(3 * time.Second)
76                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
77                         done = true
78                 })
79         c.Check(container.State, Equals, arvados.ContainerStateComplete)
80 }
81
82 func (s *TestSuite) TestIntegrationCancel(c *C) {
83         var cmd *exec.Cmd
84         var scancelCmdLine []string
85         defer func(orig func(arvados.Container) *exec.Cmd) {
86                 scancelCmd = orig
87         }(scancelCmd)
88         attempt := 0
89         scancelCmd = func(container arvados.Container) *exec.Cmd {
90                 if attempt++; attempt == 1 {
91                         return exec.Command("false")
92                 } else {
93                         scancelCmdLine = scancelFunc(container).Args
94                         cmd = exec.Command("echo")
95                         return cmd
96                 }
97         }
98
99         container := s.integrationTest(c,
100                 func() *exec.Cmd {
101                         if cmd != nil && cmd.ProcessState != nil {
102                                 return exec.Command("true")
103                         } else {
104                                 return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
105                         }
106                 },
107                 []string(nil),
108                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
109                         dispatcher.UpdateState(container.UUID, dispatch.Running)
110                         time.Sleep(1 * time.Second)
111                         dispatcher.Arv.Update("containers", container.UUID,
112                                 arvadosclient.Dict{
113                                         "container": arvadosclient.Dict{"priority": 0}},
114                                 nil)
115                 })
116         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
117         c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
118 }
119
120 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
121         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share",
122                 fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
123                 fmt.Sprintf("--mem-per-cpu=%d", 2862),
124                 fmt.Sprintf("--cpus-per-task=%d", 4)},
125                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
126                         dispatcher.UpdateState(container.UUID, dispatch.Running)
127                         time.Sleep(3 * time.Second)
128                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
129                 })
130         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
131 }
132
133 func (s *TestSuite) integrationTest(c *C,
134         newSqueueCmd func() *exec.Cmd,
135         sbatchCmdComps []string,
136         runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
137         arvadostest.ResetEnv()
138
139         arv, err := arvadosclient.MakeArvadosClient()
140         c.Assert(err, IsNil)
141
142         var sbatchCmdLine []string
143
144         // Override sbatchCmd
145         defer func(orig func(arvados.Container) *exec.Cmd) {
146                 sbatchCmd = orig
147         }(sbatchCmd)
148         sbatchCmd = func(container arvados.Container) *exec.Cmd {
149                 sbatchCmdLine = sbatchFunc(container).Args
150                 return exec.Command("sh")
151         }
152
153         // Override squeueCmd
154         defer func(orig func() *exec.Cmd) {
155                 squeueCmd = orig
156         }(squeueCmd)
157         squeueCmd = newSqueueCmd
158
159         // There should be one queued container
160         params := arvadosclient.Dict{
161                 "filters": [][]string{{"state", "=", "Queued"}},
162         }
163         var containers arvados.ContainerList
164         err = arv.List("containers", params, &containers)
165         c.Check(err, IsNil)
166         c.Check(len(containers.Items), Equals, 1)
167
168         theConfig.CrunchRunCommand = []string{"echo"}
169
170         ctx, cancel := context.WithCancel(context.Background())
171         dispatcher := dispatch.Dispatcher{
172                 Arv:        arv,
173                 PollPeriod: time.Duration(1) * time.Second,
174                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
175                         go runContainer(disp, ctr)
176                         run(disp, ctr, status)
177                         cancel()
178                 },
179         }
180
181         sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
182
183         err = dispatcher.Run(ctx)
184         c.Assert(err, Equals, context.Canceled)
185
186         sqCheck.Stop()
187
188         c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
189
190         // There should be no queued containers now
191         err = arv.List("containers", params, &containers)
192         c.Check(err, IsNil)
193         c.Check(len(containers.Items), Equals, 0)
194
195         // Previously "Queued" container should now be in "Complete" state
196         var container arvados.Container
197         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
198         c.Check(err, IsNil)
199         return container
200 }
201
202 func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
203         apiStubResponses := make(map[string]arvadostest.StubResponse)
204         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
205         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
206
207         testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
208 }
209
210 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
211         apiStub := arvadostest.ServerStub{apiStubResponses}
212
213         api := httptest.NewServer(&apiStub)
214         defer api.Close()
215
216         arv := &arvadosclient.ArvadosClient{
217                 Scheme:    "http",
218                 ApiServer: api.URL[7:],
219                 ApiToken:  "abc123",
220                 Client:    &http.Client{Transport: &http.Transport{}},
221                 Retries:   0,
222         }
223
224         buf := bytes.NewBuffer(nil)
225         log.SetOutput(io.MultiWriter(buf, os.Stderr))
226         defer log.SetOutput(os.Stderr)
227
228         theConfig.CrunchRunCommand = []string{crunchCmd}
229
230         ctx, cancel := context.WithCancel(context.Background())
231         dispatcher := dispatch.Dispatcher{
232                 Arv:        arv,
233                 PollPeriod: time.Duration(1) * time.Second,
234                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
235                         go func() {
236                                 time.Sleep(1 * time.Second)
237                                 disp.UpdateState(ctr.UUID, dispatch.Running)
238                                 disp.UpdateState(ctr.UUID, dispatch.Complete)
239                         }()
240                         run(disp, ctr, status)
241                         cancel()
242                 },
243         }
244
245         go func() {
246                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
247                         time.Sleep(100 * time.Millisecond)
248                 }
249                 cancel()
250         }()
251
252         err := dispatcher.Run(ctx)
253         c.Assert(err, Equals, context.Canceled)
254
255         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
256 }
257
258 func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
259         var config Config
260         err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
261         c.Assert(err, NotNil)
262 }
263
264 func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
265         var config Config
266
267         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
268         c.Check(err, IsNil)
269         defer os.Remove(tmpfile.Name())
270
271         _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
272         c.Check(err, IsNil)
273
274         err = readConfig(&config, tmpfile.Name())
275         c.Assert(err, NotNil)
276 }
277
278 func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
279         var config Config
280
281         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
282         c.Check(err, IsNil)
283         defer os.Remove(tmpfile.Name())
284
285         _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
286         c.Check(err, IsNil)
287
288         err = readConfig(&config, tmpfile.Name())
289         c.Assert(err, IsNil)
290         c.Check(0, Equals, len(config.SbatchArguments))
291 }
292
293 func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
294         var config Config
295
296         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
297         c.Check(err, IsNil)
298         defer os.Remove(tmpfile.Name())
299
300         args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
301         argsS := `{"SbatchArguments": ["--arg1=v1",  "--arg2", "--arg3=v3"]}`
302         _, err = tmpfile.Write([]byte(argsS))
303         c.Check(err, IsNil)
304
305         err = readConfig(&config, tmpfile.Name())
306         c.Assert(err, IsNil)
307         c.Check(3, Equals, len(config.SbatchArguments))
308         c.Check(args, DeepEquals, config.SbatchArguments)
309 }
310
311 func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
312         testSbatchFuncWithArgs(c, nil)
313 }
314
315 func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
316         testSbatchFuncWithArgs(c, []string{})
317 }
318
319 func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
320         testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
321 }
322
323 func testSbatchFuncWithArgs(c *C, args []string) {
324         theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
325
326         container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2}}
327         sbatchCmd := sbatchFunc(container)
328
329         var expected []string
330         expected = append(expected, "sbatch", "--share")
331         expected = append(expected, theConfig.SbatchArguments...)
332         expected = append(expected, "--job-name=123", "--mem-per-cpu=120", "--cpus-per-task=2")
333
334         c.Check(sbatchCmd.Args, DeepEquals, expected)
335 }
336
337 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
338         theConfig.SbatchArguments = nil
339         container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
340         sbatchCmd := sbatchFunc(container)
341
342         var expected []string
343         expected = append(expected, "sbatch", "--share")
344         expected = append(expected, "--job-name=123", "--mem-per-cpu=239", "--cpus-per-task=1", "--partition=blurb,b2")
345
346         c.Check(sbatchCmd.Args, DeepEquals, expected)
347 }