16347: Add test case. Flush keepstore logs in CommitLogs.
[arvados.git] / lib / crunchrun / integration_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "os"
14         "os/exec"
15         "strings"
16
17         "git.arvados.org/arvados.git/lib/config"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "git.arvados.org/arvados.git/sdk/go/keepclient"
23         . "gopkg.in/check.v1"
24 )
25
26 var _ = Suite(&integrationSuite{})
27
28 type integrationSuite struct {
29         engine string
30         image  arvados.Collection
31         input  arvados.Collection
32         stdin  bytes.Buffer
33         stdout bytes.Buffer
34         stderr bytes.Buffer
35         cr     arvados.ContainerRequest
36         client *arvados.Client
37         ac     *arvadosclient.ArvadosClient
38         kc     *keepclient.KeepClient
39
40         logCollection    arvados.Collection
41         outputCollection arvados.Collection
42 }
43
44 func (s *integrationSuite) SetUpSuite(c *C) {
45         _, err := exec.LookPath("docker")
46         if err != nil {
47                 c.Skip("looks like docker is not installed")
48         }
49
50         arvadostest.StartKeep(2, true)
51
52         out, err := exec.Command("docker", "load", "--input", busyboxDockerImage(c)).CombinedOutput()
53         c.Log(string(out))
54         c.Assert(err, IsNil)
55         out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
56         imageUUID := strings.TrimSpace(string(out))
57         c.Logf("image uuid %s", imageUUID)
58         if !c.Check(err, IsNil) {
59                 if err, ok := err.(*exec.ExitError); ok {
60                         c.Logf("%s", err.Stderr)
61                 }
62                 c.Fail()
63         }
64         err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
65         c.Assert(err, IsNil)
66         c.Logf("image pdh %s", s.image.PortableDataHash)
67
68         s.client = arvados.NewClientFromEnv()
69         s.ac, err = arvadosclient.New(s.client)
70         c.Assert(err, IsNil)
71         s.kc = keepclient.New(s.ac)
72         fs, err := s.input.FileSystem(s.client, s.kc)
73         c.Assert(err, IsNil)
74         f, err := fs.OpenFile("inputfile", os.O_CREATE|os.O_WRONLY, 0755)
75         c.Assert(err, IsNil)
76         _, err = f.Write([]byte("inputdata"))
77         c.Assert(err, IsNil)
78         err = f.Close()
79         c.Assert(err, IsNil)
80         s.input.ManifestText, err = fs.MarshalManifest(".")
81         c.Assert(err, IsNil)
82         err = s.client.RequestAndDecode(&s.input, "POST", "arvados/v1/collections", nil, map[string]interface{}{
83                 "ensure_unique_name": true,
84                 "collection": map[string]interface{}{
85                         "manifest_text": s.input.ManifestText,
86                 },
87         })
88         c.Assert(err, IsNil)
89         c.Logf("input pdh %s", s.input.PortableDataHash)
90
91         s.logCollection = arvados.Collection{}
92         s.outputCollection = arvados.Collection{}
93 }
94
95 func (s *integrationSuite) TearDownSuite(c *C) {
96         if s.client == nil {
97                 // didn't set up
98                 return
99         }
100         err := s.client.RequestAndDecode(nil, "POST", "database/reset", nil, nil)
101         c.Check(err, IsNil)
102 }
103
104 func (s *integrationSuite) SetUpTest(c *C) {
105         s.engine = "docker"
106         s.stdin = bytes.Buffer{}
107         s.stdout = bytes.Buffer{}
108         s.stderr = bytes.Buffer{}
109         s.cr = arvados.ContainerRequest{
110                 Priority:       1,
111                 State:          "Committed",
112                 OutputPath:     "/mnt/out",
113                 ContainerImage: s.image.PortableDataHash,
114                 Mounts: map[string]arvados.Mount{
115                         "/mnt/json": {
116                                 Kind: "json",
117                                 Content: []interface{}{
118                                         "foo",
119                                         map[string]string{"foo": "bar"},
120                                         nil,
121                                 },
122                         },
123                         "/mnt/in": {
124                                 Kind:             "collection",
125                                 PortableDataHash: s.input.PortableDataHash,
126                         },
127                         "/mnt/out": {
128                                 Kind:     "tmp",
129                                 Capacity: 1000,
130                         },
131                 },
132                 RuntimeConstraints: arvados.RuntimeConstraints{
133                         RAM:   128000000,
134                         VCPUs: 1,
135                         API:   true,
136                 },
137         }
138 }
139
140 func (s *integrationSuite) setup(c *C) {
141         err := s.client.RequestAndDecode(&s.cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{"container_request": map[string]interface{}{
142                 "priority":            s.cr.Priority,
143                 "state":               s.cr.State,
144                 "command":             s.cr.Command,
145                 "output_path":         s.cr.OutputPath,
146                 "container_image":     s.cr.ContainerImage,
147                 "mounts":              s.cr.Mounts,
148                 "runtime_constraints": s.cr.RuntimeConstraints,
149                 "use_existing":        false,
150         }})
151         c.Assert(err, IsNil)
152         c.Assert(s.cr.ContainerUUID, Not(Equals), "")
153         err = s.client.RequestAndDecode(nil, "POST", "arvados/v1/containers/"+s.cr.ContainerUUID+"/lock", nil, nil)
154         c.Assert(err, IsNil)
155 }
156
157 func (s *integrationSuite) TestRunTrivialContainerWithDocker(c *C) {
158         s.engine = "docker"
159         s.testRunTrivialContainer(c)
160 }
161
162 func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
163         s.engine = "singularity"
164         s.testRunTrivialContainer(c)
165 }
166
167 func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
168         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
169         c.Assert(err, IsNil)
170         cluster, err := cfg.GetCluster("")
171         c.Assert(err, IsNil)
172         for uuid, volume := range cluster.Volumes {
173                 volume.AccessViaHosts = nil
174                 volume.Replication = 2
175                 cluster.Volumes[uuid] = volume
176         }
177
178         s.stdin.Reset()
179         err = json.NewEncoder(&s.stdin).Encode(ConfigData{
180                 Env:         nil,
181                 KeepBuffers: 1,
182                 Cluster:     cluster,
183         })
184         c.Assert(err, IsNil)
185
186         s.engine = "docker"
187         s.testRunTrivialContainer(c)
188
189         fs, err := s.logCollection.FileSystem(s.client, s.kc)
190         c.Assert(err, IsNil)
191         f, err := fs.Open("keepstore.txt")
192         c.Assert(err, IsNil)
193         buf, err := ioutil.ReadAll(f)
194         c.Assert(err, IsNil)
195         c.Check(string(buf), Matches, `(?ms).*"reqMethod":"GET".*`)
196         c.Check(string(buf), Matches, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
197 }
198
199 func (s *integrationSuite) testRunTrivialContainer(c *C) {
200         if err := exec.Command("which", s.engine).Run(); err != nil {
201                 c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
202         }
203         s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
204         s.setup(c)
205
206         args := []string{
207                 "-runtime-engine=" + s.engine,
208                 "-enable-memory-limit=false",
209                 s.cr.ContainerUUID,
210         }
211         if s.stdin.Len() > 0 {
212                 args = append([]string{"-stdin-config=true"}, args...)
213         }
214         code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
215         c.Logf("\n===== stdout =====\n%s", s.stdout.String())
216         c.Logf("\n===== stderr =====\n%s", s.stderr.String())
217         c.Check(code, Equals, 0)
218         err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
219         c.Assert(err, IsNil)
220         c.Logf("Finished container request: %#v", s.cr)
221
222         var log arvados.Collection
223         err = s.client.RequestAndDecode(&log, "GET", "arvados/v1/collections/"+s.cr.LogUUID, nil, nil)
224         c.Assert(err, IsNil)
225         fs, err := log.FileSystem(s.client, s.kc)
226         c.Assert(err, IsNil)
227         if d, err := fs.Open("/"); c.Check(err, IsNil) {
228                 fis, err := d.Readdir(-1)
229                 c.Assert(err, IsNil)
230                 for _, fi := range fis {
231                         if fi.IsDir() {
232                                 continue
233                         }
234                         f, err := fs.Open(fi.Name())
235                         c.Assert(err, IsNil)
236                         buf, err := ioutil.ReadAll(f)
237                         c.Assert(err, IsNil)
238                         c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
239                 }
240         }
241         s.logCollection = log
242
243         var output arvados.Collection
244         err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
245         c.Assert(err, IsNil)
246         fs, err = output.FileSystem(s.client, s.kc)
247         c.Assert(err, IsNil)
248         if f, err := fs.Open("inputfile"); c.Check(err, IsNil) {
249                 defer f.Close()
250                 buf, err := ioutil.ReadAll(f)
251                 c.Check(err, IsNil)
252                 c.Check(string(buf), Equals, "inputdata")
253         }
254         if f, err := fs.Open("json"); c.Check(err, IsNil) {
255                 defer f.Close()
256                 buf, err := ioutil.ReadAll(f)
257                 c.Check(err, IsNil)
258                 c.Check(string(buf), Equals, `["foo",{"foo":"bar"},null]`)
259         }
260         if fi, err := fs.Stat("emptydir"); c.Check(err, IsNil) {
261                 c.Check(fi.IsDir(), Equals, true)
262         }
263         if d, err := fs.Open("emptydir"); c.Check(err, IsNil) {
264                 defer d.Close()
265                 fis, err := d.Readdir(-1)
266                 c.Assert(err, IsNil)
267                 // crunch-run still saves a ".keep" file to preserve
268                 // empty dirs even though that shouldn't be
269                 // necessary. Ideally we would do:
270                 // c.Check(fis, HasLen, 0)
271                 for _, fi := range fis {
272                         c.Check(fi.Name(), Equals, ".keep")
273                 }
274         }
275         s.outputCollection = output
276 }