Merge branch '20937-arv-copy-http' refs #20937
[arvados.git] / lib / crunchrun / integration_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "os"
14         "os/exec"
15         "strings"
16
17         "git.arvados.org/arvados.git/lib/config"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "git.arvados.org/arvados.git/sdk/go/keepclient"
23         "git.arvados.org/arvados.git/services/keepstore"
24         . "gopkg.in/check.v1"
25 )
26
27 var _ = Suite(&integrationSuite{})
28
29 type integrationSuite struct {
30         engine string
31         image  arvados.Collection
32         input  arvados.Collection
33         stdin  bytes.Buffer
34         stdout bytes.Buffer
35         stderr bytes.Buffer
36         args   []string
37         cr     arvados.ContainerRequest
38         client *arvados.Client
39         ac     *arvadosclient.ArvadosClient
40         kc     *keepclient.KeepClient
41
42         logCollection    arvados.Collection
43         outputCollection arvados.Collection
44         logFiles         map[string]string // filename => contents
45 }
46
47 func (s *integrationSuite) SetUpSuite(c *C) {
48         _, err := exec.LookPath("docker")
49         if err != nil {
50                 c.Skip("looks like docker is not installed")
51         }
52
53         arvadostest.StartKeep(2, true)
54
55         out, err := exec.Command("docker", "load", "--input", arvadostest.BusyboxDockerImage(c)).CombinedOutput()
56         c.Log(string(out))
57         c.Assert(err, IsNil)
58         out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
59         imageUUID := strings.TrimSpace(string(out))
60         c.Logf("image uuid %s", imageUUID)
61         if !c.Check(err, IsNil) {
62                 if err, ok := err.(*exec.ExitError); ok {
63                         c.Logf("%s", err.Stderr)
64                 }
65                 c.Fail()
66         }
67         err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
68         c.Assert(err, IsNil)
69         c.Logf("image pdh %s", s.image.PortableDataHash)
70
71         s.client = arvados.NewClientFromEnv()
72         s.ac, err = arvadosclient.New(s.client)
73         c.Assert(err, IsNil)
74         s.kc = keepclient.New(s.ac)
75         fs, err := s.input.FileSystem(s.client, s.kc)
76         c.Assert(err, IsNil)
77         f, err := fs.OpenFile("inputfile", os.O_CREATE|os.O_WRONLY, 0755)
78         c.Assert(err, IsNil)
79         _, err = f.Write([]byte("inputdata"))
80         c.Assert(err, IsNil)
81         err = f.Close()
82         c.Assert(err, IsNil)
83         s.input.ManifestText, err = fs.MarshalManifest(".")
84         c.Assert(err, IsNil)
85         err = s.client.RequestAndDecode(&s.input, "POST", "arvados/v1/collections", nil, map[string]interface{}{
86                 "ensure_unique_name": true,
87                 "collection": map[string]interface{}{
88                         "manifest_text": s.input.ManifestText,
89                 },
90         })
91         c.Assert(err, IsNil)
92         c.Logf("input pdh %s", s.input.PortableDataHash)
93 }
94
95 func (s *integrationSuite) TearDownSuite(c *C) {
96         os.Unsetenv("ARVADOS_KEEP_SERVICES")
97         if s.client == nil {
98                 // didn't set up
99                 return
100         }
101         err := s.client.RequestAndDecode(nil, "POST", "database/reset", nil, nil)
102         c.Check(err, IsNil)
103 }
104
105 func (s *integrationSuite) SetUpTest(c *C) {
106         os.Unsetenv("ARVADOS_KEEP_SERVICES")
107         s.engine = "docker"
108         s.args = nil
109         s.stdin = bytes.Buffer{}
110         s.stdout = bytes.Buffer{}
111         s.stderr = bytes.Buffer{}
112         s.logCollection = arvados.Collection{}
113         s.outputCollection = arvados.Collection{}
114         s.logFiles = map[string]string{}
115         s.cr = arvados.ContainerRequest{
116                 Priority:       1,
117                 State:          "Committed",
118                 OutputPath:     "/mnt/out",
119                 ContainerImage: s.image.PortableDataHash,
120                 Mounts: map[string]arvados.Mount{
121                         "/mnt/json": {
122                                 Kind: "json",
123                                 Content: []interface{}{
124                                         "foo",
125                                         map[string]string{"foo": "bar"},
126                                         nil,
127                                 },
128                         },
129                         "/mnt/in": {
130                                 Kind:             "collection",
131                                 PortableDataHash: s.input.PortableDataHash,
132                         },
133                         "/mnt/out": {
134                                 Kind:     "tmp",
135                                 Capacity: 1000,
136                         },
137                 },
138                 RuntimeConstraints: arvados.RuntimeConstraints{
139                         RAM:   128000000,
140                         VCPUs: 1,
141                         API:   true,
142                 },
143         }
144 }
145
146 func (s *integrationSuite) setup(c *C) {
147         err := s.client.RequestAndDecode(&s.cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{"container_request": map[string]interface{}{
148                 "priority":            s.cr.Priority,
149                 "state":               s.cr.State,
150                 "command":             s.cr.Command,
151                 "output_path":         s.cr.OutputPath,
152                 "container_image":     s.cr.ContainerImage,
153                 "mounts":              s.cr.Mounts,
154                 "runtime_constraints": s.cr.RuntimeConstraints,
155                 "use_existing":        false,
156         }})
157         c.Assert(err, IsNil)
158         c.Assert(s.cr.ContainerUUID, Not(Equals), "")
159         err = s.client.RequestAndDecode(nil, "POST", "arvados/v1/containers/"+s.cr.ContainerUUID+"/lock", nil, nil)
160         c.Assert(err, IsNil)
161 }
162
163 func (s *integrationSuite) TestRunTrivialContainerWithDocker(c *C) {
164         s.engine = "docker"
165         s.testRunTrivialContainer(c)
166         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*Using container runtime: docker Engine \d+\.\d+.*`)
167 }
168
169 func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
170         s.engine = "singularity"
171         s.testRunTrivialContainer(c)
172         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*Using container runtime: singularity.* version 3\.\d+.*`)
173 }
174
175 func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
176         for _, trial := range []struct {
177                 logConfig           string
178                 matchGetReq         Checker
179                 matchPutReq         Checker
180                 matchStartupMessage Checker
181         }{
182                 {"none", Not(Matches), Not(Matches), Not(Matches)},
183                 {"all", Matches, Matches, Matches},
184                 {"errors", Not(Matches), Not(Matches), Matches},
185         } {
186                 c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
187                 s.SetUpTest(c)
188
189                 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
190                 c.Assert(err, IsNil)
191                 cluster, err := cfg.GetCluster("")
192                 c.Assert(err, IsNil)
193                 for uuid, volume := range cluster.Volumes {
194                         volume.AccessViaHosts = nil
195                         volume.Replication = 2
196                         cluster.Volumes[uuid] = volume
197
198                         var v keepstore.UnixVolume
199                         err = json.Unmarshal(volume.DriverParameters, &v)
200                         c.Assert(err, IsNil)
201                         err = os.Mkdir(v.Root, 0777)
202                         if !os.IsExist(err) {
203                                 c.Assert(err, IsNil)
204                         }
205                 }
206                 cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
207
208                 s.stdin.Reset()
209                 err = json.NewEncoder(&s.stdin).Encode(ConfigData{
210                         Env:         nil,
211                         KeepBuffers: 1,
212                         Cluster:     cluster,
213                 })
214                 c.Assert(err, IsNil)
215
216                 s.engine = "docker"
217                 s.testRunTrivialContainer(c)
218
219                 log, logExists := s.logFiles["keepstore.txt"]
220                 if trial.logConfig == "none" {
221                         c.Check(logExists, Equals, false)
222                 } else {
223                         c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
224                         c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
225                 }
226
227                 c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*using local keepstore process .* at http://[\d\.]{7,}:\d+.*`)
228                 c.Check(s.logFiles["crunch-run.txt"], Not(Matches), `(?ms).* at http://127\..*`)
229                 c.Check(s.logFiles["crunch-run.txt"], Not(Matches), `(?ms).* at http://169\.254\..*`)
230                 c.Check(s.logFiles["stderr.txt"], Matches, `(?ms).*ARVADOS_KEEP_SERVICES=http://[\d\.]{7,}:\d+\n.*`)
231         }
232 }
233
234 func (s *integrationSuite) TestRunTrivialContainerWithNoLocalKeepstore(c *C) {
235         // Check that (1) config is loaded from $ARVADOS_CONFIG when
236         // not provided on stdin and (2) if a local keepstore is not
237         // started, crunch-run.txt explains why not.
238         s.SetUpTest(c)
239         s.stdin.Reset()
240         s.testRunTrivialContainer(c)
241         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because KeepBuffers=0 in config\n.*`)
242
243         s.SetUpTest(c)
244         s.args = []string{"-config", c.MkDir() + "/config.yaml"}
245         s.stdin.Reset()
246         buf, err := ioutil.ReadFile(os.Getenv("ARVADOS_CONFIG"))
247         c.Assert(err, IsNil)
248         err = ioutil.WriteFile(s.args[1], bytes.Replace(buf, []byte("LocalKeepBlobBuffersPerVCPU: 0"), []byte("LocalKeepBlobBuffersPerVCPU: 1"), -1), 0666)
249         c.Assert(err, IsNil)
250         s.testRunTrivialContainer(c)
251         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-00000000000000\d\) uses AccessViaHosts\n.*`)
252
253         // Check that config read errors are logged
254         s.SetUpTest(c)
255         s.args = []string{"-config", c.MkDir() + "/config-error.yaml"}
256         s.stdin.Reset()
257         s.testRunTrivialContainer(c)
258         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* no such file or directory\n.*`)
259
260         s.SetUpTest(c)
261         s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"}
262         s.stdin.Reset()
263         err = ioutil.WriteFile(s.args[1], []byte{}, 0)
264         c.Check(err, IsNil)
265         s.testRunTrivialContainer(c)
266         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`)
267
268         s.SetUpTest(c)
269         s.stdin.Reset()
270         s.testRunTrivialContainer(c)
271         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*loaded config file \Q`+os.Getenv("ARVADOS_CONFIG")+`\E\n.*`)
272 }
273
274 func (s *integrationSuite) testRunTrivialContainer(c *C) {
275         if err := exec.Command("which", s.engine).Run(); err != nil {
276                 c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
277         }
278         s.cr.Command = []string{"sh", "-c", "env >&2 && cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
279         s.setup(c)
280
281         args := []string{
282                 "-runtime-engine=" + s.engine,
283                 "-enable-memory-limit=false",
284         }
285         if s.stdin.Len() > 0 {
286                 args = append(args, "-stdin-config=true")
287         }
288         args = append(args, s.args...)
289         args = append(args, s.cr.ContainerUUID)
290         code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
291         c.Logf("\n===== stdout =====\n%s", s.stdout.String())
292         c.Logf("\n===== stderr =====\n%s", s.stderr.String())
293         c.Check(code, Equals, 0)
294         err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
295         c.Assert(err, IsNil)
296         c.Logf("Finished container request: %#v", s.cr)
297
298         var log arvados.Collection
299         err = s.client.RequestAndDecode(&log, "GET", "arvados/v1/collections/"+s.cr.LogUUID, nil, nil)
300         c.Assert(err, IsNil)
301         fs, err := log.FileSystem(s.client, s.kc)
302         c.Assert(err, IsNil)
303         if d, err := fs.Open("/"); c.Check(err, IsNil) {
304                 fis, err := d.Readdir(-1)
305                 c.Assert(err, IsNil)
306                 for _, fi := range fis {
307                         if fi.IsDir() {
308                                 continue
309                         }
310                         f, err := fs.Open(fi.Name())
311                         c.Assert(err, IsNil)
312                         buf, err := ioutil.ReadAll(f)
313                         c.Assert(err, IsNil)
314                         c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
315                         s.logFiles[fi.Name()] = string(buf)
316                 }
317         }
318         s.logCollection = log
319
320         var output arvados.Collection
321         err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
322         c.Assert(err, IsNil)
323         fs, err = output.FileSystem(s.client, s.kc)
324         c.Assert(err, IsNil)
325         if f, err := fs.Open("inputfile"); c.Check(err, IsNil) {
326                 defer f.Close()
327                 buf, err := ioutil.ReadAll(f)
328                 c.Check(err, IsNil)
329                 c.Check(string(buf), Equals, "inputdata")
330         }
331         if f, err := fs.Open("json"); c.Check(err, IsNil) {
332                 defer f.Close()
333                 buf, err := ioutil.ReadAll(f)
334                 c.Check(err, IsNil)
335                 c.Check(string(buf), Equals, `["foo",{"foo":"bar"},null]`)
336         }
337         if fi, err := fs.Stat("emptydir"); c.Check(err, IsNil) {
338                 c.Check(fi.IsDir(), Equals, true)
339         }
340         if d, err := fs.Open("emptydir"); c.Check(err, IsNil) {
341                 defer d.Close()
342                 fis, err := d.Readdir(-1)
343                 c.Assert(err, IsNil)
344                 // crunch-run still saves a ".keep" file to preserve
345                 // empty dirs even though that shouldn't be
346                 // necessary. Ideally we would do:
347                 // c.Check(fis, HasLen, 0)
348                 for _, fi := range fis {
349                         c.Check(fi.Name(), Equals, ".keep")
350                 }
351         }
352         s.outputCollection = output
353 }