Merge branch '18995-code-cleanup-1'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "os/exec"
18         "strings"
19         "testing"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/dispatchcloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/arvadostest"
26         "git.arvados.org/arvados.git/sdk/go/dispatch"
27         "github.com/sirupsen/logrus"
28         . "gopkg.in/check.v1"
29 )
30
31 // Gocheck boilerplate
32 func Test(t *testing.T) {
33         TestingT(t)
34 }
35
36 var _ = Suite(&IntegrationSuite{})
37 var _ = Suite(&StubbedSuite{})
38
39 type IntegrationSuite struct {
40         disp  Dispatcher
41         slurm slurmFake
42 }
43
44 func (s *IntegrationSuite) SetUpTest(c *C) {
45         arvadostest.ResetEnv()
46         arvadostest.ResetDB(c)
47         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
48         s.disp = Dispatcher{}
49         s.disp.cluster = &arvados.Cluster{}
50         s.disp.setup()
51         s.slurm = slurmFake{}
52 }
53
54 func (s *IntegrationSuite) TearDownTest(c *C) {
55         arvadostest.ResetEnv()
56         arvadostest.ResetDB(c)
57 }
58
59 type slurmFake struct {
60         didBatch      [][]string
61         didCancel     []string
62         didRelease    []string
63         didRenice     [][]string
64         queue         string
65         rejectNice10K bool
66         // If non-nil, run this func during the 2nd+ call to Cancel()
67         onCancel func()
68         // Error returned by Batch()
69         errBatch error
70 }
71
72 func (sf *slurmFake) Batch(script io.Reader, args []string) error {
73         sf.didBatch = append(sf.didBatch, args)
74         return sf.errBatch
75 }
76
77 func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
78         return exec.Command("echo", sf.queue)
79 }
80
81 func (sf *slurmFake) Release(name string) error {
82         sf.didRelease = append(sf.didRelease, name)
83         return nil
84 }
85
86 func (sf *slurmFake) Renice(name string, nice int64) error {
87         sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
88         if sf.rejectNice10K && nice > 10000 {
89                 return errors.New("scontrol: error: Invalid nice value, must be between -10000 and 10000")
90         }
91         return nil
92 }
93
94 func (sf *slurmFake) Cancel(name string) error {
95         sf.didCancel = append(sf.didCancel, name)
96         if len(sf.didCancel) == 1 {
97                 // simulate error on first attempt
98                 return errors.New("something terrible happened")
99         }
100         if sf.onCancel != nil {
101                 sf.onCancel()
102         }
103         return nil
104 }
105
106 func (s *IntegrationSuite) integrationTest(c *C,
107         expectBatch [][]string,
108         runContainer func(*dispatch.Dispatcher, arvados.Container)) (arvados.Container, error) {
109         arvadostest.ResetEnv()
110
111         arv, err := arvadosclient.MakeArvadosClient()
112         c.Assert(err, IsNil)
113
114         // There should be one queued container
115         params := arvadosclient.Dict{
116                 "filters": [][]string{{"state", "=", "Queued"}},
117         }
118         var containers arvados.ContainerList
119         err = arv.List("containers", params, &containers)
120         c.Check(err, IsNil)
121         c.Assert(len(containers.Items), Equals, 1)
122
123         s.disp.cluster.Containers.CrunchRunCommand = "echo"
124
125         ctx, cancel := context.WithCancel(context.Background())
126         doneRun := make(chan struct{})
127         doneDispatch := make(chan error)
128
129         s.disp.Dispatcher = &dispatch.Dispatcher{
130                 Arv:        arv,
131                 PollPeriod: time.Second,
132                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
133                         go func() {
134                                 runContainer(disp, ctr)
135                                 s.slurm.queue = ""
136                                 doneRun <- struct{}{}
137                         }()
138                         err := s.disp.runContainer(disp, ctr, status)
139                         cancel()
140                         doneDispatch <- err
141                         return nil
142                 },
143         }
144
145         s.disp.slurm = &s.slurm
146         s.disp.sqCheck = &SqueueChecker{
147                 Logger: logrus.StandardLogger(),
148                 Period: 500 * time.Millisecond,
149                 Slurm:  s.disp.slurm,
150         }
151
152         err = s.disp.Dispatcher.Run(ctx)
153         <-doneRun
154         c.Assert(err, Equals, context.Canceled)
155         errDispatch := <-doneDispatch
156
157         s.disp.sqCheck.Stop()
158
159         c.Check(s.slurm.didBatch, DeepEquals, expectBatch)
160
161         // There should be no queued containers now
162         err = arv.List("containers", params, &containers)
163         c.Check(err, IsNil)
164         c.Check(len(containers.Items), Equals, 0)
165
166         // Previously "Queued" container should now be in "Complete" state
167         var container arvados.Container
168         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
169         c.Check(err, IsNil)
170         return container, errDispatch
171 }
172
173 func (s *IntegrationSuite) TestNormal(c *C) {
174         s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
175         container, _ := s.integrationTest(c,
176                 nil,
177                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
178                         dispatcher.UpdateState(container.UUID, dispatch.Running)
179                         time.Sleep(3 * time.Second)
180                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
181                 })
182         c.Check(container.State, Equals, arvados.ContainerStateComplete)
183 }
184
185 func (s *IntegrationSuite) TestCancel(c *C) {
186         s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
187         readyToCancel := make(chan bool)
188         s.slurm.onCancel = func() { <-readyToCancel }
189         container, _ := s.integrationTest(c,
190                 nil,
191                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
192                         dispatcher.UpdateState(container.UUID, dispatch.Running)
193                         time.Sleep(time.Second)
194                         dispatcher.Arv.Update("containers", container.UUID,
195                                 arvadosclient.Dict{
196                                         "container": arvadosclient.Dict{"priority": 0}},
197                                 nil)
198                         readyToCancel <- true
199                         close(readyToCancel)
200                 })
201         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
202         c.Check(len(s.slurm.didCancel) > 1, Equals, true)
203         c.Check(s.slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
204 }
205
206 func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
207         container, _ := s.integrationTest(c,
208                 [][]string{{
209                         fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
210                         fmt.Sprintf("--nice=%d", 10000),
211                         "--no-requeue",
212                         fmt.Sprintf("--mem=%d", 11445),
213                         fmt.Sprintf("--cpus-per-task=%d", 4),
214                         fmt.Sprintf("--tmp=%d", 45777),
215                 }},
216                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
217                         dispatcher.UpdateState(container.UUID, dispatch.Running)
218                         time.Sleep(3 * time.Second)
219                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
220                 })
221         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
222 }
223
224 func (s *IntegrationSuite) TestSbatchFail(c *C) {
225         s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
226         container, err := s.integrationTest(c,
227                 [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
228                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
229                         dispatcher.UpdateState(container.UUID, dispatch.Running)
230                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
231                 })
232         c.Check(container.State, Equals, arvados.ContainerStateComplete)
233         c.Check(err, ErrorMatches, `something terrible happened`)
234 }
235
236 type StubbedSuite struct {
237         disp Dispatcher
238 }
239
240 func (s *StubbedSuite) SetUpTest(c *C) {
241         s.disp = Dispatcher{}
242         s.disp.cluster = &arvados.Cluster{}
243         s.disp.setup()
244 }
245
246 func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
247         apiStubResponses := make(map[string]arvadostest.StubResponse)
248         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
249         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
250
251         s.testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
252 }
253
254 func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
255         apiStub := arvadostest.ServerStub{apiStubResponses}
256
257         api := httptest.NewServer(&apiStub)
258         defer api.Close()
259
260         arv := &arvadosclient.ArvadosClient{
261                 Scheme:    "http",
262                 ApiServer: api.URL[7:],
263                 ApiToken:  "abc123",
264                 Client:    &http.Client{Transport: &http.Transport{}},
265                 Retries:   0,
266         }
267
268         buf := bytes.NewBuffer(nil)
269         logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
270         defer logrus.SetOutput(os.Stderr)
271
272         s.disp.cluster.Containers.CrunchRunCommand = "crunchCmd"
273
274         ctx, cancel := context.WithCancel(context.Background())
275         dispatcher := dispatch.Dispatcher{
276                 Arv:        arv,
277                 PollPeriod: time.Second,
278                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
279                         go func() {
280                                 time.Sleep(time.Second)
281                                 disp.UpdateState(ctr.UUID, dispatch.Running)
282                                 disp.UpdateState(ctr.UUID, dispatch.Complete)
283                         }()
284                         s.disp.runContainer(disp, ctr, status)
285                         cancel()
286                         return nil
287                 },
288         }
289
290         go func() {
291                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
292                         time.Sleep(100 * time.Millisecond)
293                 }
294                 cancel()
295         }()
296
297         err := dispatcher.Run(ctx)
298         c.Assert(err, Equals, context.Canceled)
299
300         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
301 }
302
303 func (s *StubbedSuite) TestSbatchArgs(c *C) {
304         container := arvados.Container{
305                 UUID:               "123",
306                 RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
307                 Priority:           1,
308         }
309
310         for _, defaults := range [][]string{
311                 nil,
312                 {},
313                 {"--arg1=v1", "--arg2"},
314         } {
315                 c.Logf("%#v", defaults)
316                 s.disp.cluster.Containers.SLURM.SbatchArgumentsList = defaults
317
318                 args, err := s.disp.sbatchArgs(container)
319                 c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--no-requeue", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
320                 c.Check(err, IsNil)
321         }
322 }
323
324 func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
325         container := arvados.Container{
326                 UUID:               "123",
327                 RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
328                 Priority:           1,
329         }
330
331         for _, trial := range []struct {
332                 types      map[string]arvados.InstanceType
333                 sbatchArgs []string
334                 err        error
335         }{
336                 // Choose node type => use --constraint arg
337                 {
338                         types: map[string]arvados.InstanceType{
339                                 "a1.tiny":   {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
340                                 "a1.small":  {Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
341                                 "a1.medium": {Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
342                                 "a1.large":  {Name: "a1.large", Price: 0.16, RAM: 1024000000, VCPUs: 8},
343                         },
344                         sbatchArgs: []string{"--constraint=instancetype=a1.medium"},
345                 },
346                 // No node types configured => no slurm constraint
347                 {
348                         types:      nil,
349                         sbatchArgs: []string{"--mem=239", "--cpus-per-task=2", "--tmp=0"},
350                 },
351                 // No node type is big enough => error
352                 {
353                         types: map[string]arvados.InstanceType{
354                                 "a1.tiny": {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
355                         },
356                         err: dispatchcloud.ConstraintsNotSatisfiableError{},
357                 },
358         } {
359                 c.Logf("%#v", trial)
360                 s.disp.cluster = &arvados.Cluster{InstanceTypes: trial.types}
361
362                 args, err := s.disp.sbatchArgs(container)
363                 c.Check(err == nil, Equals, trial.err == nil)
364                 if trial.err == nil {
365                         c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000", "--no-requeue"}, trial.sbatchArgs...))
366                 } else {
367                         c.Check(len(err.(dispatchcloud.ConstraintsNotSatisfiableError).AvailableTypes), Equals, len(trial.types))
368                 }
369         }
370 }
371
372 func (s *StubbedSuite) TestSbatchPartition(c *C) {
373         container := arvados.Container{
374                 UUID:                 "123",
375                 RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
376                 SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
377                 Priority:             1,
378         }
379
380         args, err := s.disp.sbatchArgs(container)
381         c.Check(args, DeepEquals, []string{
382                 "--job-name=123", "--nice=10000", "--no-requeue",
383                 "--mem=239", "--cpus-per-task=1", "--tmp=0",
384                 "--partition=blurb,b2",
385         })
386         c.Check(err, IsNil)
387 }
388
389 func (s *StubbedSuite) TestLoadLegacyConfig(c *C) {
390         content := []byte(`
391 Client:
392   APIHost: example.com
393   AuthToken: abcdefg
394   KeepServiceURIs:
395     - https://example.com/keep1
396     - https://example.com/keep2
397 SbatchArguments: ["--foo", "bar"]
398 PollPeriod: 12s
399 PrioritySpread: 42
400 CrunchRunCommand: ["x-crunch-run", "--cgroup-parent-subsystem=memory"]
401 ReserveExtraRAM: 12345
402 MinRetryPeriod: 13s
403 BatchSize: 99
404 `)
405         tmpfile, err := ioutil.TempFile("", "example")
406         if err != nil {
407                 c.Error(err)
408         }
409
410         defer os.Remove(tmpfile.Name()) // clean up
411
412         if _, err := tmpfile.Write(content); err != nil {
413                 c.Error(err)
414         }
415         if err := tmpfile.Close(); err != nil {
416                 c.Error(err)
417
418         }
419         os.Setenv("ARVADOS_KEEP_SERVICES", "")
420         err = s.disp.configure("crunch-dispatch-slurm", []string{"-config", tmpfile.Name()})
421         c.Check(err, IsNil)
422
423         c.Check(s.disp.cluster.Services.Controller.ExternalURL, Equals, arvados.URL{Scheme: "https", Host: "example.com", Path: "/"})
424         c.Check(s.disp.cluster.SystemRootToken, Equals, "abcdefg")
425         c.Check(s.disp.cluster.Containers.SLURM.SbatchArgumentsList, DeepEquals, []string{"--foo", "bar"})
426         c.Check(s.disp.cluster.Containers.CloudVMs.PollInterval, Equals, arvados.Duration(12*time.Second))
427         c.Check(s.disp.cluster.Containers.SLURM.PrioritySpread, Equals, int64(42))
428         c.Check(s.disp.cluster.Containers.CrunchRunCommand, Equals, "x-crunch-run")
429         c.Check(s.disp.cluster.Containers.CrunchRunArgumentsList, DeepEquals, []string{"--cgroup-parent-subsystem=memory"})
430         c.Check(s.disp.cluster.Containers.ReserveExtraRAM, Equals, arvados.ByteSize(12345))
431         c.Check(s.disp.cluster.Containers.MinRetryPeriod, Equals, arvados.Duration(13*time.Second))
432         c.Check(s.disp.cluster.API.MaxItemsPerResponse, Equals, 99)
433         c.Check(s.disp.cluster.Containers.SLURM.SbatchEnvironmentVariables, DeepEquals, map[string]string{
434                 "ARVADOS_KEEP_SERVICES": "https://example.com/keep1 https://example.com/keep2",
435         })
436         c.Check(os.Getenv("ARVADOS_KEEP_SERVICES"), Equals, "https://example.com/keep1 https://example.com/keep2")
437 }