18992: Load cluster config file if present in HPC environment.
[arvados.git] / lib / crunchrun / integration_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "os"
14         "os/exec"
15         "strings"
16
17         "git.arvados.org/arvados.git/lib/config"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "git.arvados.org/arvados.git/sdk/go/keepclient"
23         . "gopkg.in/check.v1"
24 )
25
26 var _ = Suite(&integrationSuite{})
27
28 type integrationSuite struct {
29         engine string
30         image  arvados.Collection
31         input  arvados.Collection
32         stdin  bytes.Buffer
33         stdout bytes.Buffer
34         stderr bytes.Buffer
35         cr     arvados.ContainerRequest
36         client *arvados.Client
37         ac     *arvadosclient.ArvadosClient
38         kc     *keepclient.KeepClient
39
40         logCollection    arvados.Collection
41         outputCollection arvados.Collection
42         logFiles         map[string]string // filename => contents
43 }
44
45 func (s *integrationSuite) SetUpSuite(c *C) {
46         _, err := exec.LookPath("docker")
47         if err != nil {
48                 c.Skip("looks like docker is not installed")
49         }
50
51         arvadostest.StartKeep(2, true)
52
53         out, err := exec.Command("docker", "load", "--input", busyboxDockerImage(c)).CombinedOutput()
54         c.Log(string(out))
55         c.Assert(err, IsNil)
56         out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
57         imageUUID := strings.TrimSpace(string(out))
58         c.Logf("image uuid %s", imageUUID)
59         if !c.Check(err, IsNil) {
60                 if err, ok := err.(*exec.ExitError); ok {
61                         c.Logf("%s", err.Stderr)
62                 }
63                 c.Fail()
64         }
65         err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
66         c.Assert(err, IsNil)
67         c.Logf("image pdh %s", s.image.PortableDataHash)
68
69         s.client = arvados.NewClientFromEnv()
70         s.ac, err = arvadosclient.New(s.client)
71         c.Assert(err, IsNil)
72         s.kc = keepclient.New(s.ac)
73         fs, err := s.input.FileSystem(s.client, s.kc)
74         c.Assert(err, IsNil)
75         f, err := fs.OpenFile("inputfile", os.O_CREATE|os.O_WRONLY, 0755)
76         c.Assert(err, IsNil)
77         _, err = f.Write([]byte("inputdata"))
78         c.Assert(err, IsNil)
79         err = f.Close()
80         c.Assert(err, IsNil)
81         s.input.ManifestText, err = fs.MarshalManifest(".")
82         c.Assert(err, IsNil)
83         err = s.client.RequestAndDecode(&s.input, "POST", "arvados/v1/collections", nil, map[string]interface{}{
84                 "ensure_unique_name": true,
85                 "collection": map[string]interface{}{
86                         "manifest_text": s.input.ManifestText,
87                 },
88         })
89         c.Assert(err, IsNil)
90         c.Logf("input pdh %s", s.input.PortableDataHash)
91 }
92
93 func (s *integrationSuite) TearDownSuite(c *C) {
94         os.Unsetenv("ARVADOS_KEEP_SERVICES")
95         if s.client == nil {
96                 // didn't set up
97                 return
98         }
99         err := s.client.RequestAndDecode(nil, "POST", "database/reset", nil, nil)
100         c.Check(err, IsNil)
101 }
102
103 func (s *integrationSuite) SetUpTest(c *C) {
104         os.Unsetenv("ARVADOS_KEEP_SERVICES")
105         s.engine = "docker"
106         s.stdin = bytes.Buffer{}
107         s.stdout = bytes.Buffer{}
108         s.stderr = bytes.Buffer{}
109         s.logCollection = arvados.Collection{}
110         s.outputCollection = arvados.Collection{}
111         s.logFiles = map[string]string{}
112         s.cr = arvados.ContainerRequest{
113                 Priority:       1,
114                 State:          "Committed",
115                 OutputPath:     "/mnt/out",
116                 ContainerImage: s.image.PortableDataHash,
117                 Mounts: map[string]arvados.Mount{
118                         "/mnt/json": {
119                                 Kind: "json",
120                                 Content: []interface{}{
121                                         "foo",
122                                         map[string]string{"foo": "bar"},
123                                         nil,
124                                 },
125                         },
126                         "/mnt/in": {
127                                 Kind:             "collection",
128                                 PortableDataHash: s.input.PortableDataHash,
129                         },
130                         "/mnt/out": {
131                                 Kind:     "tmp",
132                                 Capacity: 1000,
133                         },
134                 },
135                 RuntimeConstraints: arvados.RuntimeConstraints{
136                         RAM:   128000000,
137                         VCPUs: 1,
138                         API:   true,
139                 },
140         }
141 }
142
143 func (s *integrationSuite) setup(c *C) {
144         err := s.client.RequestAndDecode(&s.cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{"container_request": map[string]interface{}{
145                 "priority":            s.cr.Priority,
146                 "state":               s.cr.State,
147                 "command":             s.cr.Command,
148                 "output_path":         s.cr.OutputPath,
149                 "container_image":     s.cr.ContainerImage,
150                 "mounts":              s.cr.Mounts,
151                 "runtime_constraints": s.cr.RuntimeConstraints,
152                 "use_existing":        false,
153         }})
154         c.Assert(err, IsNil)
155         c.Assert(s.cr.ContainerUUID, Not(Equals), "")
156         err = s.client.RequestAndDecode(nil, "POST", "arvados/v1/containers/"+s.cr.ContainerUUID+"/lock", nil, nil)
157         c.Assert(err, IsNil)
158 }
159
160 func (s *integrationSuite) TestRunTrivialContainerWithDocker(c *C) {
161         s.engine = "docker"
162         s.testRunTrivialContainer(c)
163 }
164
165 func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
166         s.engine = "singularity"
167         s.testRunTrivialContainer(c)
168 }
169
170 func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
171         for _, trial := range []struct {
172                 logConfig           string
173                 matchGetReq         Checker
174                 matchPutReq         Checker
175                 matchStartupMessage Checker
176         }{
177                 {"none", Not(Matches), Not(Matches), Not(Matches)},
178                 {"all", Matches, Matches, Matches},
179                 {"errors", Not(Matches), Not(Matches), Matches},
180         } {
181                 c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
182                 s.SetUpTest(c)
183
184                 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
185                 c.Assert(err, IsNil)
186                 cluster, err := cfg.GetCluster("")
187                 c.Assert(err, IsNil)
188                 for uuid, volume := range cluster.Volumes {
189                         volume.AccessViaHosts = nil
190                         volume.Replication = 2
191                         cluster.Volumes[uuid] = volume
192                 }
193                 cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
194
195                 s.stdin.Reset()
196                 err = json.NewEncoder(&s.stdin).Encode(ConfigData{
197                         Env:         nil,
198                         KeepBuffers: 1,
199                         Cluster:     cluster,
200                 })
201                 c.Assert(err, IsNil)
202
203                 s.engine = "docker"
204                 s.testRunTrivialContainer(c)
205
206                 log, logExists := s.logFiles["keepstore.txt"]
207                 if trial.logConfig == "none" {
208                         c.Check(logExists, Equals, false)
209                 } else {
210                         c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
211                         c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
212                 }
213         }
214
215         // Check that (1) config is loaded from $ARVADOS_CONFIG when
216         // not provided on stdin and (2) if a local keepstore is not
217         // started, crunch-run.txt explains why not.
218         s.SetUpTest(c)
219         s.stdin.Reset()
220         s.testRunTrivialContainer(c)
221         c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-000000000000000\) uses AccessViaHosts\n.*`)
222 }
223
224 func (s *integrationSuite) testRunTrivialContainer(c *C) {
225         if err := exec.Command("which", s.engine).Run(); err != nil {
226                 c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
227         }
228         s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
229         s.setup(c)
230
231         args := []string{
232                 "-runtime-engine=" + s.engine,
233                 "-enable-memory-limit=false",
234                 s.cr.ContainerUUID,
235         }
236         if s.stdin.Len() > 0 {
237                 args = append([]string{"-stdin-config=true"}, args...)
238         }
239         code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
240         c.Logf("\n===== stdout =====\n%s", s.stdout.String())
241         c.Logf("\n===== stderr =====\n%s", s.stderr.String())
242         c.Check(code, Equals, 0)
243         err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
244         c.Assert(err, IsNil)
245         c.Logf("Finished container request: %#v", s.cr)
246
247         var log arvados.Collection
248         err = s.client.RequestAndDecode(&log, "GET", "arvados/v1/collections/"+s.cr.LogUUID, nil, nil)
249         c.Assert(err, IsNil)
250         fs, err := log.FileSystem(s.client, s.kc)
251         c.Assert(err, IsNil)
252         if d, err := fs.Open("/"); c.Check(err, IsNil) {
253                 fis, err := d.Readdir(-1)
254                 c.Assert(err, IsNil)
255                 for _, fi := range fis {
256                         if fi.IsDir() {
257                                 continue
258                         }
259                         f, err := fs.Open(fi.Name())
260                         c.Assert(err, IsNil)
261                         buf, err := ioutil.ReadAll(f)
262                         c.Assert(err, IsNil)
263                         c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
264                         s.logFiles[fi.Name()] = string(buf)
265                 }
266         }
267         s.logCollection = log
268
269         var output arvados.Collection
270         err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
271         c.Assert(err, IsNil)
272         fs, err = output.FileSystem(s.client, s.kc)
273         c.Assert(err, IsNil)
274         if f, err := fs.Open("inputfile"); c.Check(err, IsNil) {
275                 defer f.Close()
276                 buf, err := ioutil.ReadAll(f)
277                 c.Check(err, IsNil)
278                 c.Check(string(buf), Equals, "inputdata")
279         }
280         if f, err := fs.Open("json"); c.Check(err, IsNil) {
281                 defer f.Close()
282                 buf, err := ioutil.ReadAll(f)
283                 c.Check(err, IsNil)
284                 c.Check(string(buf), Equals, `["foo",{"foo":"bar"},null]`)
285         }
286         if fi, err := fs.Stat("emptydir"); c.Check(err, IsNil) {
287                 c.Check(fi.IsDir(), Equals, true)
288         }
289         if d, err := fs.Open("emptydir"); c.Check(err, IsNil) {
290                 defer d.Close()
291                 fis, err := d.Readdir(-1)
292                 c.Assert(err, IsNil)
293                 // crunch-run still saves a ".keep" file to preserve
294                 // empty dirs even though that shouldn't be
295                 // necessary. Ideally we would do:
296                 // c.Check(fis, HasLen, 0)
297                 for _, fi := range fis {
298                         c.Check(fi.Name(), Equals, ".keep")
299                 }
300         }
301         s.outputCollection = output
302 }