15370: Merge branch 'main' into 15370-loopback-dispatchcloud
[arvados.git] / lib / crunchrun / integration_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "os"
14         "os/exec"
15         "strings"
16
17         "git.arvados.org/arvados.git/lib/config"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "git.arvados.org/arvados.git/sdk/go/keepclient"
23         . "gopkg.in/check.v1"
24 )
25
26 var _ = Suite(&integrationSuite{})
27
28 type integrationSuite struct {
29         engine string
30         image  arvados.Collection
31         input  arvados.Collection
32         stdin  bytes.Buffer
33         stdout bytes.Buffer
34         stderr bytes.Buffer
35         args   []string
36         cr     arvados.ContainerRequest
37         client *arvados.Client
38         ac     *arvadosclient.ArvadosClient
39         kc     *keepclient.KeepClient
40
41         logCollection    arvados.Collection
42         outputCollection arvados.Collection
43         logFiles         map[string]string // filename => contents
44 }
45
46 func (s *integrationSuite) SetUpSuite(c *C) {
47         _, err := exec.LookPath("docker")
48         if err != nil {
49                 c.Skip("looks like docker is not installed")
50         }
51
52         arvadostest.StartKeep(2, true)
53
54         out, err := exec.Command("docker", "load", "--input", arvadostest.BusyboxDockerImage(c)).CombinedOutput()
55         c.Log(string(out))
56         c.Assert(err, IsNil)
57         out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
58         imageUUID := strings.TrimSpace(string(out))
59         c.Logf("image uuid %s", imageUUID)
60         if !c.Check(err, IsNil) {
61                 if err, ok := err.(*exec.ExitError); ok {
62                         c.Logf("%s", err.Stderr)
63                 }
64                 c.Fail()
65         }
66         err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
67         c.Assert(err, IsNil)
68         c.Logf("image pdh %s", s.image.PortableDataHash)
69
70         s.client = arvados.NewClientFromEnv()
71         s.ac, err = arvadosclient.New(s.client)
72         c.Assert(err, IsNil)
73         s.kc = keepclient.New(s.ac)
74         fs, err := s.input.FileSystem(s.client, s.kc)
75         c.Assert(err, IsNil)
76         f, err := fs.OpenFile("inputfile", os.O_CREATE|os.O_WRONLY, 0755)
77         c.Assert(err, IsNil)
78         _, err = f.Write([]byte("inputdata"))
79         c.Assert(err, IsNil)
80         err = f.Close()
81         c.Assert(err, IsNil)
82         s.input.ManifestText, err = fs.MarshalManifest(".")
83         c.Assert(err, IsNil)
84         err = s.client.RequestAndDecode(&s.input, "POST", "arvados/v1/collections", nil, map[string]interface{}{
85                 "ensure_unique_name": true,
86                 "collection": map[string]interface{}{
87                         "manifest_text": s.input.ManifestText,
88                 },
89         })
90         c.Assert(err, IsNil)
91         c.Logf("input pdh %s", s.input.PortableDataHash)
92 }
93
94 func (s *integrationSuite) TearDownSuite(c *C) {
95         os.Unsetenv("ARVADOS_KEEP_SERVICES")
96         if s.client == nil {
97                 // didn't set up
98                 return
99         }
100         err := s.client.RequestAndDecode(nil, "POST", "database/reset", nil, nil)
101         c.Check(err, IsNil)
102 }
103
104 func (s *integrationSuite) SetUpTest(c *C) {
105         os.Unsetenv("ARVADOS_KEEP_SERVICES")
106         s.engine = "docker"
107         s.args = nil
108         s.stdin = bytes.Buffer{}
109         s.stdout = bytes.Buffer{}
110         s.stderr = bytes.Buffer{}
111         s.logCollection = arvados.Collection{}
112         s.outputCollection = arvados.Collection{}
113         s.logFiles = map[string]string{}
114         s.cr = arvados.ContainerRequest{
115                 Priority:       1,
116                 State:          "Committed",
117                 OutputPath:     "/mnt/out",
118                 ContainerImage: s.image.PortableDataHash,
119                 Mounts: map[string]arvados.Mount{
120                         "/mnt/json": {
121                                 Kind: "json",
122                                 Content: []interface{}{
123                                         "foo",
124                                         map[string]string{"foo": "bar"},
125                                         nil,
126                                 },
127                         },
128                         "/mnt/in": {
129                                 Kind:             "collection",
130                                 PortableDataHash: s.input.PortableDataHash,
131                         },
132                         "/mnt/out": {
133                                 Kind:     "tmp",
134                                 Capacity: 1000,
135                         },
136                 },
137                 RuntimeConstraints: arvados.RuntimeConstraints{
138                         RAM:   128000000,
139                         VCPUs: 1,
140                         API:   true,
141                 },
142         }
143 }
144
145 func (s *integrationSuite) setup(c *C) {
146         err := s.client.RequestAndDecode(&s.cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{"container_request": map[string]interface{}{
147                 "priority":            s.cr.Priority,
148                 "state":               s.cr.State,
149                 "command":             s.cr.Command,
150                 "output_path":         s.cr.OutputPath,
151                 "container_image":     s.cr.ContainerImage,
152                 "mounts":              s.cr.Mounts,
153                 "runtime_constraints": s.cr.RuntimeConstraints,
154                 "use_existing":        false,
155         }})
156         c.Assert(err, IsNil)
157         c.Assert(s.cr.ContainerUUID, Not(Equals), "")
158         err = s.client.RequestAndDecode(nil, "POST", "arvados/v1/containers/"+s.cr.ContainerUUID+"/lock", nil, nil)
159         c.Assert(err, IsNil)
160 }
161
162 func (s *integrationSuite) TestRunTrivialContainerWithDocker(c *C) {
163         s.engine = "docker"
164         s.testRunTrivialContainer(c)
165         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*Using container runtime: docker Engine \d+\.\d+.*`)
166 }
167
168 func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
169         s.engine = "singularity"
170         s.testRunTrivialContainer(c)
171         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*Using container runtime: singularity.* version 3\.\d+.*`)
172 }
173
174 func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
175         for _, trial := range []struct {
176                 logConfig           string
177                 matchGetReq         Checker
178                 matchPutReq         Checker
179                 matchStartupMessage Checker
180         }{
181                 {"none", Not(Matches), Not(Matches), Not(Matches)},
182                 {"all", Matches, Matches, Matches},
183                 {"errors", Not(Matches), Not(Matches), Matches},
184         } {
185                 c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
186                 s.SetUpTest(c)
187
188                 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
189                 c.Assert(err, IsNil)
190                 cluster, err := cfg.GetCluster("")
191                 c.Assert(err, IsNil)
192                 for uuid, volume := range cluster.Volumes {
193                         volume.AccessViaHosts = nil
194                         volume.Replication = 2
195                         cluster.Volumes[uuid] = volume
196                 }
197                 cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
198
199                 s.stdin.Reset()
200                 err = json.NewEncoder(&s.stdin).Encode(ConfigData{
201                         Env:         nil,
202                         KeepBuffers: 1,
203                         Cluster:     cluster,
204                 })
205                 c.Assert(err, IsNil)
206
207                 s.engine = "docker"
208                 s.testRunTrivialContainer(c)
209
210                 log, logExists := s.logFiles["keepstore.txt"]
211                 if trial.logConfig == "none" {
212                         c.Check(logExists, Equals, false)
213                 } else {
214                         c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
215                         c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
216                 }
217         }
218
219         // Check that (1) config is loaded from $ARVADOS_CONFIG when
220         // not provided on stdin and (2) if a local keepstore is not
221         // started, crunch-run.txt explains why not.
222         s.SetUpTest(c)
223         s.stdin.Reset()
224         s.testRunTrivialContainer(c)
225         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-000000000000000\) uses AccessViaHosts\n.*`)
226
227         // Check that config read errors are logged
228         s.SetUpTest(c)
229         s.args = []string{"-config", c.MkDir() + "/config-error.yaml"}
230         s.stdin.Reset()
231         s.testRunTrivialContainer(c)
232         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* no such file or directory\n.*`)
233
234         s.SetUpTest(c)
235         s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"}
236         s.stdin.Reset()
237         err := ioutil.WriteFile(s.args[1], []byte{}, 0)
238         c.Check(err, IsNil)
239         s.testRunTrivialContainer(c)
240         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`)
241
242         s.SetUpTest(c)
243         s.stdin.Reset()
244         s.testRunTrivialContainer(c)
245         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*loaded config file \Q`+os.Getenv("ARVADOS_CONFIG")+`\E\n.*`)
246 }
247
248 func (s *integrationSuite) testRunTrivialContainer(c *C) {
249         if err := exec.Command("which", s.engine).Run(); err != nil {
250                 c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
251         }
252         s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
253         s.setup(c)
254
255         args := []string{
256                 "-runtime-engine=" + s.engine,
257                 "-enable-memory-limit=false",
258         }
259         if s.stdin.Len() > 0 {
260                 args = append(args, "-stdin-config=true")
261         }
262         args = append(args, s.args...)
263         args = append(args, s.cr.ContainerUUID)
264         code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
265         c.Logf("\n===== stdout =====\n%s", s.stdout.String())
266         c.Logf("\n===== stderr =====\n%s", s.stderr.String())
267         c.Check(code, Equals, 0)
268         err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
269         c.Assert(err, IsNil)
270         c.Logf("Finished container request: %#v", s.cr)
271
272         var log arvados.Collection
273         err = s.client.RequestAndDecode(&log, "GET", "arvados/v1/collections/"+s.cr.LogUUID, nil, nil)
274         c.Assert(err, IsNil)
275         fs, err := log.FileSystem(s.client, s.kc)
276         c.Assert(err, IsNil)
277         if d, err := fs.Open("/"); c.Check(err, IsNil) {
278                 fis, err := d.Readdir(-1)
279                 c.Assert(err, IsNil)
280                 for _, fi := range fis {
281                         if fi.IsDir() {
282                                 continue
283                         }
284                         f, err := fs.Open(fi.Name())
285                         c.Assert(err, IsNil)
286                         buf, err := ioutil.ReadAll(f)
287                         c.Assert(err, IsNil)
288                         c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
289                         s.logFiles[fi.Name()] = string(buf)
290                 }
291         }
292         s.logCollection = log
293
294         var output arvados.Collection
295         err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
296         c.Assert(err, IsNil)
297         fs, err = output.FileSystem(s.client, s.kc)
298         c.Assert(err, IsNil)
299         if f, err := fs.Open("inputfile"); c.Check(err, IsNil) {
300                 defer f.Close()
301                 buf, err := ioutil.ReadAll(f)
302                 c.Check(err, IsNil)
303                 c.Check(string(buf), Equals, "inputdata")
304         }
305         if f, err := fs.Open("json"); c.Check(err, IsNil) {
306                 defer f.Close()
307                 buf, err := ioutil.ReadAll(f)
308                 c.Check(err, IsNil)
309                 c.Check(string(buf), Equals, `["foo",{"foo":"bar"},null]`)
310         }
311         if fi, err := fs.Stat("emptydir"); c.Check(err, IsNil) {
312                 c.Check(fi.IsDir(), Equals, true)
313         }
314         if d, err := fs.Open("emptydir"); c.Check(err, IsNil) {
315                 defer d.Close()
316                 fis, err := d.Readdir(-1)
317                 c.Assert(err, IsNil)
318                 // crunch-run still saves a ".keep" file to preserve
319                 // empty dirs even though that shouldn't be
320                 // necessary. Ideally we would do:
321                 // c.Check(fis, HasLen, 0)
322                 for _, fi := range fis {
323                         c.Check(fi.Name(), Equals, ".keep")
324                 }
325         }
326         s.outputCollection = output
327 }