Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvados"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/dispatch"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "net/http/httptest"
15         "os"
16         "os/exec"
17         "strings"
18         "testing"
19         "time"
20
21         . "gopkg.in/check.v1"
22 )
23
24 // Gocheck boilerplate
25 func Test(t *testing.T) {
26         TestingT(t)
27 }
28
29 var _ = Suite(&TestSuite{})
30 var _ = Suite(&MockArvadosServerSuite{})
31
32 type TestSuite struct{}
33 type MockArvadosServerSuite struct{}
34
35 var initialArgs []string
36
37 func (s *TestSuite) SetUpSuite(c *C) {
38         initialArgs = os.Args
39 }
40
41 func (s *TestSuite) TearDownSuite(c *C) {
42 }
43
44 func (s *TestSuite) SetUpTest(c *C) {
45         args := []string{"crunch-dispatch-slurm"}
46         os.Args = args
47
48         arvadostest.StartAPI()
49         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
50 }
51
52 func (s *TestSuite) TearDownTest(c *C) {
53         os.Args = initialArgs
54         arvadostest.StopAPI()
55 }
56
57 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
58         arvadostest.ResetEnv()
59 }
60
61 func (s *TestSuite) TestIntegrationNormal(c *C) {
62         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
63                 []string(nil),
64                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
65                         dispatcher.UpdateState(container.UUID, dispatch.Running)
66                         time.Sleep(3 * time.Second)
67                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
68                 })
69         c.Check(container.State, Equals, arvados.ContainerStateComplete)
70 }
71
72 func (s *TestSuite) TestIntegrationCancel(c *C) {
73
74         // Override sbatchCmd
75         var scancelCmdLine []string
76         defer func(orig func(arvados.Container) *exec.Cmd) {
77                 scancelCmd = orig
78         }(scancelCmd)
79         scancelCmd = func(container arvados.Container) *exec.Cmd {
80                 scancelCmdLine = scancelFunc(container).Args
81                 return exec.Command("echo")
82         }
83
84         container := s.integrationTest(c,
85                 func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
86                 []string(nil),
87                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
88                         dispatcher.UpdateState(container.UUID, dispatch.Running)
89                         time.Sleep(1 * time.Second)
90                         dispatcher.Arv.Update("containers", container.UUID,
91                                 arvadosclient.Dict{
92                                         "container": arvadosclient.Dict{"priority": 0}},
93                                 nil)
94                 })
95         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
96         c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
97 }
98
99 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
100         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share",
101                 fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
102                 fmt.Sprintf("--mem-per-cpu=%d", 2862),
103                 fmt.Sprintf("--cpus-per-task=%d", 4)},
104                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
105                         dispatcher.UpdateState(container.UUID, dispatch.Running)
106                         time.Sleep(3 * time.Second)
107                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
108                 })
109         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
110 }
111
112 func (s *TestSuite) integrationTest(c *C,
113         newSqueueCmd func() *exec.Cmd,
114         sbatchCmdComps []string,
115         runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
116         arvadostest.ResetEnv()
117
118         arv, err := arvadosclient.MakeArvadosClient()
119         c.Assert(err, IsNil)
120
121         var sbatchCmdLine []string
122
123         // Override sbatchCmd
124         defer func(orig func(arvados.Container) *exec.Cmd) {
125                 sbatchCmd = orig
126         }(sbatchCmd)
127         sbatchCmd = func(container arvados.Container) *exec.Cmd {
128                 sbatchCmdLine = sbatchFunc(container).Args
129                 return exec.Command("sh")
130         }
131
132         // Override squeueCmd
133         defer func(orig func() *exec.Cmd) {
134                 squeueCmd = orig
135         }(squeueCmd)
136         squeueCmd = newSqueueCmd
137
138         // There should be one queued container
139         params := arvadosclient.Dict{
140                 "filters": [][]string{{"state", "=", "Queued"}},
141         }
142         var containers arvados.ContainerList
143         err = arv.List("containers", params, &containers)
144         c.Check(err, IsNil)
145         c.Check(len(containers.Items), Equals, 1)
146
147         theConfig.CrunchRunCommand = []string{"echo"}
148
149         doneProcessing := make(chan struct{})
150         dispatcher := dispatch.Dispatcher{
151                 Arv:          arv,
152                 PollInterval: time.Duration(1) * time.Second,
153                 RunContainer: func(dispatcher *dispatch.Dispatcher,
154                         container arvados.Container,
155                         status chan arvados.Container) {
156                         go runContainer(dispatcher, container)
157                         run(dispatcher, container, status)
158                         doneProcessing <- struct{}{}
159                 },
160                 DoneProcessing: doneProcessing}
161
162         squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
163
164         err = dispatcher.RunDispatcher()
165         c.Assert(err, IsNil)
166
167         squeueUpdater.Done()
168
169         c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
170
171         // There should be no queued containers now
172         err = arv.List("containers", params, &containers)
173         c.Check(err, IsNil)
174         c.Check(len(containers.Items), Equals, 0)
175
176         // Previously "Queued" container should now be in "Complete" state
177         var container arvados.Container
178         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
179         c.Check(err, IsNil)
180         return container
181 }
182
183 func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
184         apiStubResponses := make(map[string]arvadostest.StubResponse)
185         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
186         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
187
188         testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
189 }
190
191 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
192         apiStub := arvadostest.ServerStub{apiStubResponses}
193
194         api := httptest.NewServer(&apiStub)
195         defer api.Close()
196
197         arv := &arvadosclient.ArvadosClient{
198                 Scheme:    "http",
199                 ApiServer: api.URL[7:],
200                 ApiToken:  "abc123",
201                 Client:    &http.Client{Transport: &http.Transport{}},
202                 Retries:   0,
203         }
204
205         buf := bytes.NewBuffer(nil)
206         log.SetOutput(io.MultiWriter(buf, os.Stderr))
207         defer log.SetOutput(os.Stderr)
208
209         theConfig.CrunchRunCommand = []string{crunchCmd}
210
211         doneProcessing := make(chan struct{})
212         dispatcher := dispatch.Dispatcher{
213                 Arv:          arv,
214                 PollInterval: time.Duration(1) * time.Second,
215                 RunContainer: func(dispatcher *dispatch.Dispatcher,
216                         container arvados.Container,
217                         status chan arvados.Container) {
218                         go func() {
219                                 time.Sleep(1 * time.Second)
220                                 dispatcher.UpdateState(container.UUID, dispatch.Running)
221                                 dispatcher.UpdateState(container.UUID, dispatch.Complete)
222                         }()
223                         run(dispatcher, container, status)
224                         doneProcessing <- struct{}{}
225                 },
226                 DoneProcessing: doneProcessing}
227
228         go func() {
229                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
230                         time.Sleep(100 * time.Millisecond)
231                 }
232                 dispatcher.DoneProcessing <- struct{}{}
233         }()
234
235         err := dispatcher.RunDispatcher()
236         c.Assert(err, IsNil)
237
238         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
239 }
240
241 func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
242         var config Config
243         err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
244         c.Assert(err, NotNil)
245 }
246
247 func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
248         var config Config
249
250         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
251         c.Check(err, IsNil)
252         defer os.Remove(tmpfile.Name())
253
254         _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
255         c.Check(err, IsNil)
256
257         err = readConfig(&config, tmpfile.Name())
258         c.Assert(err, NotNil)
259 }
260
261 func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
262         var config Config
263
264         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
265         c.Check(err, IsNil)
266         defer os.Remove(tmpfile.Name())
267
268         _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
269         c.Check(err, IsNil)
270
271         err = readConfig(&config, tmpfile.Name())
272         c.Assert(err, IsNil)
273         c.Check(0, Equals, len(config.SbatchArguments))
274 }
275
276 func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
277         var config Config
278
279         tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
280         c.Check(err, IsNil)
281         defer os.Remove(tmpfile.Name())
282
283         args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
284         argsS := `{"SbatchArguments": ["--arg1=v1",  "--arg2", "--arg3=v3"]}`
285         _, err = tmpfile.Write([]byte(argsS))
286         c.Check(err, IsNil)
287
288         err = readConfig(&config, tmpfile.Name())
289         c.Assert(err, IsNil)
290         c.Check(3, Equals, len(config.SbatchArguments))
291         c.Check(args, DeepEquals, config.SbatchArguments)
292 }
293
294 func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
295         testSbatchFuncWithArgs(c, nil)
296 }
297
298 func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
299         testSbatchFuncWithArgs(c, []string{})
300 }
301
302 func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
303         testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
304 }
305
306 func testSbatchFuncWithArgs(c *C, args []string) {
307         theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
308
309         container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2}}
310         sbatchCmd := sbatchFunc(container)
311
312         var expected []string
313         expected = append(expected, "sbatch", "--share")
314         expected = append(expected, theConfig.SbatchArguments...)
315         expected = append(expected, "--job-name=123", "--mem-per-cpu=120", "--cpus-per-task=2")
316
317         c.Check(sbatchCmd.Args, DeepEquals, expected)
318 }
319
320 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
321         theConfig.SbatchArguments = nil
322         container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
323         sbatchCmd := sbatchFunc(container)
324
325         var expected []string
326         expected = append(expected, "sbatch", "--share")
327         expected = append(expected, "--job-name=123", "--mem-per-cpu=239", "--cpus-per-task=1", "--partition=blurb,b2")
328
329         c.Check(sbatchCmd.Args, DeepEquals, expected)
330 }