20318: Implement BlockRead via ReadAt so it supports streaming.
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchslurm
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "net/http/httptest"
17         "os"
18         "os/exec"
19         "strings"
20         "testing"
21         "time"
22
23         "git.arvados.org/arvados.git/lib/cmd"
24         "git.arvados.org/arvados.git/lib/config"
25         "git.arvados.org/arvados.git/lib/dispatchcloud"
26         "git.arvados.org/arvados.git/sdk/go/arvados"
27         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
28         "git.arvados.org/arvados.git/sdk/go/arvadostest"
29         "git.arvados.org/arvados.git/sdk/go/ctxlog"
30         "git.arvados.org/arvados.git/sdk/go/dispatch"
31         "github.com/sirupsen/logrus"
32         . "gopkg.in/check.v1"
33 )
34
35 // Gocheck boilerplate
36 func Test(t *testing.T) {
37         TestingT(t)
38 }
39
40 var _ = Suite(&IntegrationSuite{})
41 var _ = Suite(&StubbedSuite{})
42
43 type IntegrationSuite struct {
44         disp  Dispatcher
45         slurm slurmFake
46 }
47
48 func (s *IntegrationSuite) SetUpTest(c *C) {
49         arvadostest.ResetEnv()
50         arvadostest.ResetDB(c)
51         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
52         s.disp = Dispatcher{}
53         s.disp.cluster = &arvados.Cluster{}
54         s.disp.setup()
55         s.slurm = slurmFake{}
56 }
57
58 func (s *IntegrationSuite) TearDownTest(c *C) {
59         arvadostest.ResetEnv()
60         arvadostest.ResetDB(c)
61 }
62
63 type slurmFake struct {
64         didBatch      [][]string
65         didCancel     []string
66         didRelease    []string
67         didRenice     [][]string
68         queue         string
69         rejectNice10K bool
70         // If non-nil, run this func during the 2nd+ call to Cancel()
71         onCancel func()
72         // Error returned by Batch()
73         errBatch error
74 }
75
76 func (sf *slurmFake) Batch(script io.Reader, args []string) error {
77         sf.didBatch = append(sf.didBatch, args)
78         return sf.errBatch
79 }
80
81 func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
82         return exec.Command("echo", sf.queue)
83 }
84
85 func (sf *slurmFake) Release(name string) error {
86         sf.didRelease = append(sf.didRelease, name)
87         return nil
88 }
89
90 func (sf *slurmFake) Renice(name string, nice int64) error {
91         sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
92         if sf.rejectNice10K && nice > 10000 {
93                 return errors.New("scontrol: error: Invalid nice value, must be between -10000 and 10000")
94         }
95         return nil
96 }
97
98 func (sf *slurmFake) Cancel(name string) error {
99         sf.didCancel = append(sf.didCancel, name)
100         if len(sf.didCancel) == 1 {
101                 // simulate error on first attempt
102                 return errors.New("something terrible happened")
103         }
104         if sf.onCancel != nil {
105                 sf.onCancel()
106         }
107         return nil
108 }
109
110 func (s *IntegrationSuite) integrationTest(c *C,
111         expectBatch [][]string,
112         runContainer func(*dispatch.Dispatcher, arvados.Container)) (arvados.Container, error) {
113         arvadostest.ResetEnv()
114
115         arv, err := arvadosclient.MakeArvadosClient()
116         c.Assert(err, IsNil)
117
118         // There should be one queued container
119         params := arvadosclient.Dict{
120                 "filters": [][]string{{"state", "=", "Queued"}},
121         }
122         var containers arvados.ContainerList
123         err = arv.List("containers", params, &containers)
124         c.Check(err, IsNil)
125         c.Assert(len(containers.Items), Equals, 1)
126
127         s.disp.cluster.Containers.CrunchRunCommand = "echo"
128
129         ctx, cancel := context.WithCancel(context.Background())
130         doneRun := make(chan struct{})
131         doneDispatch := make(chan error)
132
133         s.disp.Dispatcher = &dispatch.Dispatcher{
134                 Arv:        arv,
135                 PollPeriod: time.Second,
136                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
137                         go func() {
138                                 runContainer(disp, ctr)
139                                 s.slurm.queue = ""
140                                 doneRun <- struct{}{}
141                         }()
142                         err := s.disp.runContainer(disp, ctr, status)
143                         cancel()
144                         doneDispatch <- err
145                         return nil
146                 },
147         }
148
149         s.disp.slurm = &s.slurm
150         s.disp.sqCheck = &SqueueChecker{
151                 Logger: logrus.StandardLogger(),
152                 Period: 500 * time.Millisecond,
153                 Slurm:  s.disp.slurm,
154         }
155
156         err = s.disp.Dispatcher.Run(ctx)
157         <-doneRun
158         c.Assert(err, Equals, context.Canceled)
159         errDispatch := <-doneDispatch
160
161         s.disp.sqCheck.Stop()
162
163         c.Check(s.slurm.didBatch, DeepEquals, expectBatch)
164
165         // There should be no queued containers now
166         err = arv.List("containers", params, &containers)
167         c.Check(err, IsNil)
168         c.Check(len(containers.Items), Equals, 0)
169
170         // Previously "Queued" container should now be in "Complete" state
171         var container arvados.Container
172         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
173         c.Check(err, IsNil)
174         return container, errDispatch
175 }
176
177 func (s *IntegrationSuite) TestNormal(c *C) {
178         s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
179         container, _ := s.integrationTest(c,
180                 nil,
181                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
182                         dispatcher.UpdateState(container.UUID, dispatch.Running)
183                         time.Sleep(3 * time.Second)
184                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
185                 })
186         c.Check(container.State, Equals, arvados.ContainerStateComplete)
187 }
188
189 func (s *IntegrationSuite) TestCancel(c *C) {
190         s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
191         readyToCancel := make(chan bool)
192         s.slurm.onCancel = func() { <-readyToCancel }
193         container, _ := s.integrationTest(c,
194                 nil,
195                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
196                         dispatcher.UpdateState(container.UUID, dispatch.Running)
197                         time.Sleep(time.Second)
198                         dispatcher.Arv.Update("containers", container.UUID,
199                                 arvadosclient.Dict{
200                                         "container": arvadosclient.Dict{"priority": 0}},
201                                 nil)
202                         readyToCancel <- true
203                         close(readyToCancel)
204                 })
205         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
206         c.Check(len(s.slurm.didCancel) > 1, Equals, true)
207         c.Check(s.slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
208 }
209
210 func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
211         container, _ := s.integrationTest(c,
212                 [][]string{{
213                         fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
214                         fmt.Sprintf("--nice=%d", 10000),
215                         "--no-requeue",
216                         fmt.Sprintf("--mem=%d", 11445),
217                         fmt.Sprintf("--cpus-per-task=%d", 4),
218                         fmt.Sprintf("--tmp=%d", 45777),
219                 }},
220                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
221                         dispatcher.UpdateState(container.UUID, dispatch.Running)
222                         time.Sleep(3 * time.Second)
223                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
224                 })
225         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
226 }
227
228 func (s *IntegrationSuite) TestSbatchFail(c *C) {
229         s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
230         container, err := s.integrationTest(c,
231                 [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
232                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
233                         dispatcher.UpdateState(container.UUID, dispatch.Running)
234                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
235                 })
236         c.Check(container.State, Equals, arvados.ContainerStateComplete)
237         c.Check(err, ErrorMatches, `something terrible happened`)
238 }
239
240 type StubbedSuite struct {
241         disp Dispatcher
242 }
243
244 func (s *StubbedSuite) SetUpTest(c *C) {
245         s.disp = Dispatcher{}
246         s.disp.cluster = &arvados.Cluster{}
247         s.disp.setup()
248 }
249
250 func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
251         apiStubResponses := make(map[string]arvadostest.StubResponse)
252         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
253         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
254
255         s.testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
256 }
257
258 func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
259         apiStub := arvadostest.ServerStub{apiStubResponses}
260
261         api := httptest.NewServer(&apiStub)
262         defer api.Close()
263
264         arv := &arvadosclient.ArvadosClient{
265                 Scheme:    "http",
266                 ApiServer: api.URL[7:],
267                 ApiToken:  "abc123",
268                 Client:    &http.Client{Transport: &http.Transport{}},
269                 Retries:   0,
270         }
271
272         buf := bytes.NewBuffer(nil)
273         logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
274         defer logrus.SetOutput(os.Stderr)
275
276         s.disp.cluster.Containers.CrunchRunCommand = "crunchCmd"
277
278         ctx, cancel := context.WithCancel(context.Background())
279         dispatcher := dispatch.Dispatcher{
280                 Arv:        arv,
281                 PollPeriod: time.Second,
282                 RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
283                         go func() {
284                                 time.Sleep(time.Second)
285                                 disp.UpdateState(ctr.UUID, dispatch.Running)
286                                 disp.UpdateState(ctr.UUID, dispatch.Complete)
287                         }()
288                         s.disp.runContainer(disp, ctr, status)
289                         cancel()
290                         return nil
291                 },
292         }
293
294         go func() {
295                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
296                         time.Sleep(100 * time.Millisecond)
297                 }
298                 cancel()
299         }()
300
301         err := dispatcher.Run(ctx)
302         c.Assert(err, Equals, context.Canceled)
303
304         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
305 }
306
307 func (s *StubbedSuite) TestSbatchArgs(c *C) {
308         container := arvados.Container{
309                 UUID:               "123",
310                 RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
311                 Priority:           1,
312         }
313
314         for _, defaults := range [][]string{
315                 nil,
316                 {},
317                 {"--arg1=v1", "--arg2"},
318         } {
319                 c.Logf("%#v", defaults)
320                 s.disp.cluster.Containers.SLURM.SbatchArgumentsList = defaults
321
322                 args, err := s.disp.sbatchArgs(container)
323                 c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--no-requeue", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
324                 c.Check(err, IsNil)
325         }
326 }
327
328 func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
329         container := arvados.Container{
330                 UUID:               "123",
331                 RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
332                 Priority:           1,
333         }
334
335         for _, trial := range []struct {
336                 types      map[string]arvados.InstanceType
337                 sbatchArgs []string
338                 err        error
339         }{
340                 // Choose node type => use --constraint arg
341                 {
342                         types: map[string]arvados.InstanceType{
343                                 "a1.tiny":   {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
344                                 "a1.small":  {Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
345                                 "a1.medium": {Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
346                                 "a1.large":  {Name: "a1.large", Price: 0.16, RAM: 1024000000, VCPUs: 8},
347                         },
348                         sbatchArgs: []string{"--constraint=instancetype=a1.medium"},
349                 },
350                 // No node types configured => no slurm constraint
351                 {
352                         types:      nil,
353                         sbatchArgs: []string{"--mem=239", "--cpus-per-task=2", "--tmp=0"},
354                 },
355                 // No node type is big enough => error
356                 {
357                         types: map[string]arvados.InstanceType{
358                                 "a1.tiny": {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
359                         },
360                         err: dispatchcloud.ConstraintsNotSatisfiableError{},
361                 },
362         } {
363                 c.Logf("%#v", trial)
364                 s.disp.cluster = &arvados.Cluster{InstanceTypes: trial.types}
365
366                 args, err := s.disp.sbatchArgs(container)
367                 c.Check(err == nil, Equals, trial.err == nil)
368                 if trial.err == nil {
369                         c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000", "--no-requeue"}, trial.sbatchArgs...))
370                 } else {
371                         c.Check(len(err.(dispatchcloud.ConstraintsNotSatisfiableError).AvailableTypes), Equals, len(trial.types))
372                 }
373         }
374 }
375
376 func (s *StubbedSuite) TestSbatchPartition(c *C) {
377         container := arvados.Container{
378                 UUID:                 "123",
379                 RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
380                 SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
381                 Priority:             1,
382         }
383
384         args, err := s.disp.sbatchArgs(container)
385         c.Check(args, DeepEquals, []string{
386                 "--job-name=123", "--nice=10000", "--no-requeue",
387                 "--mem=239", "--cpus-per-task=1", "--tmp=0",
388                 "--partition=blurb,b2",
389         })
390         c.Check(err, IsNil)
391 }
392
393 func (s *StubbedSuite) TestLoadLegacyConfig(c *C) {
394         log := ctxlog.TestLogger(c)
395         content := []byte(`
396 Client:
397   APIHost: example.com
398   AuthToken: abcdefg
399   KeepServiceURIs:
400     - https://example.com/keep1
401     - https://example.com/keep2
402 SbatchArguments: ["--foo", "bar"]
403 PollPeriod: 12s
404 PrioritySpread: 42
405 CrunchRunCommand: ["x-crunch-run", "--cgroup-parent-subsystem=memory"]
406 ReserveExtraRAM: 12345
407 MinRetryPeriod: 13s
408 BatchSize: 99
409 `)
410         tmpfile := c.MkDir() + "/config.yml"
411         err := ioutil.WriteFile(tmpfile, content, 0777)
412         c.Assert(err, IsNil)
413
414         os.Setenv("ARVADOS_KEEP_SERVICES", "")
415
416         flags := flag.NewFlagSet("", flag.ContinueOnError)
417         flags.SetOutput(os.Stderr)
418         loader := config.NewLoader(&bytes.Buffer{}, log)
419         loader.SetupFlags(flags)
420         args := loader.MungeLegacyConfigArgs(log, []string{"-config", tmpfile}, "-legacy-"+string(arvados.ServiceNameDispatchSLURM)+"-config")
421         ok, _ := cmd.ParseFlags(flags, "crunch-dispatch-slurm", args, "", os.Stderr)
422         c.Check(ok, Equals, true)
423         cfg, err := loader.Load()
424         c.Assert(err, IsNil)
425         cluster, err := cfg.GetCluster("")
426         c.Assert(err, IsNil)
427
428         c.Check(cluster.Services.Controller.ExternalURL, Equals, arvados.URL{Scheme: "https", Host: "example.com", Path: "/"})
429         c.Check(cluster.SystemRootToken, Equals, "abcdefg")
430         c.Check(cluster.Containers.SLURM.SbatchArgumentsList, DeepEquals, []string{"--foo", "bar"})
431         c.Check(cluster.Containers.CloudVMs.PollInterval, Equals, arvados.Duration(12*time.Second))
432         c.Check(cluster.Containers.SLURM.PrioritySpread, Equals, int64(42))
433         c.Check(cluster.Containers.CrunchRunCommand, Equals, "x-crunch-run")
434         c.Check(cluster.Containers.CrunchRunArgumentsList, DeepEquals, []string{"--cgroup-parent-subsystem=memory"})
435         c.Check(cluster.Containers.ReserveExtraRAM, Equals, arvados.ByteSize(12345))
436         c.Check(cluster.Containers.MinRetryPeriod, Equals, arvados.Duration(13*time.Second))
437         c.Check(cluster.API.MaxItemsPerResponse, Equals, 99)
438         c.Check(cluster.Containers.SLURM.SbatchEnvironmentVariables, DeepEquals, map[string]string{
439                 "ARVADOS_KEEP_SERVICES": "https://example.com/keep1 https://example.com/keep2",
440         })
441
442         // Ensure configure() copies SbatchEnvironmentVariables into
443         // the current process's environment (that's how they end up
444         // getting passed to sbatch).
445         s.disp.cluster = cluster
446         s.disp.configure()
447         c.Check(os.Getenv("ARVADOS_KEEP_SERVICES"), Equals, "https://example.com/keep1 https://example.com/keep2")
448 }