Add 'sdk/java-v2/' from commit '55f103e336ca9fb8bf1720d2ef4ee8dd4e221118'
[arvados.git] / services / crunch-run / copier.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "os"
13         "path/filepath"
14         "sort"
15         "strings"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/manifest"
19 )
20
21 type printfer interface {
22         Printf(string, ...interface{})
23 }
24
25 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
26
27 const limitFollowSymlinks = 10
28
29 type filetodo struct {
30         src  string
31         dst  string
32         size int64
33 }
34
35 // copier copies data from a finished container's output path to a new
36 // Arvados collection.
37 //
38 // Regular files (and symlinks to regular files) in hostOutputDir are
39 // copied from the local filesystem.
40 //
41 // Symlinks to mounted collections, and any collections mounted under
42 // ctrOutputDir, are copied by transforming the relevant parts of the
43 // existing manifests, without moving any data around.
44 //
45 // Symlinks to other parts of the container's filesystem result in
46 // errors.
47 //
48 // Use:
49 //
50 //      manifest, err := (&copier{...}).Copy()
51 type copier struct {
52         client        *arvados.Client
53         arvClient     IArvadosClient
54         keepClient    IKeepClient
55         hostOutputDir string
56         ctrOutputDir  string
57         binds         []string
58         mounts        map[string]arvados.Mount
59         secretMounts  map[string]arvados.Mount
60         logger        printfer
61
62         dirs     []string
63         files    []filetodo
64         manifest string
65
66         manifestCache map[string]*manifest.Manifest
67 }
68
69 // Copy copies data as needed, and returns a new manifest.
70 func (cp *copier) Copy() (string, error) {
71         err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
72         if err != nil {
73                 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
74         }
75         fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
76         if err != nil {
77                 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
78         }
79         for _, d := range cp.dirs {
80                 err = fs.Mkdir(d, 0777)
81                 if err != nil && err != os.ErrExist {
82                         return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
83                 }
84         }
85         for _, f := range cp.files {
86                 err = cp.copyFile(fs, f)
87                 if err != nil {
88                         return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
89                 }
90         }
91         return fs.MarshalManifest(".")
92 }
93
94 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
95         cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
96         dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
97         if err != nil {
98                 return err
99         }
100         src, err := os.Open(f.src)
101         if err != nil {
102                 dst.Close()
103                 return err
104         }
105         defer src.Close()
106         _, err = io.Copy(dst, src)
107         if err != nil {
108                 dst.Close()
109                 return err
110         }
111         return dst.Close()
112 }
113
114 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
115 // absolute path in the container's filesystem) to dest (an absolute
116 // path in the output collection, or "" for output root).
117 //
118 // src must be (or be a descendant of) a readonly "collection" mount,
119 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
120 //
121 // If walkMountsBelow is true, include contents of any collection
122 // mounted below src as well.
123 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
124         // srcRoot, srcMount indicate the innermost mount that
125         // contains src.
126         var srcRoot string
127         var srcMount arvados.Mount
128         for root, mnt := range cp.mounts {
129                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
130                         srcRoot, srcMount = root, mnt
131                 }
132         }
133         for root := range cp.secretMounts {
134                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
135                         // Silently omit secrets, and symlinks to
136                         // secrets.
137                         return nil
138                 }
139         }
140         if srcRoot == "" {
141                 return fmt.Errorf("cannot output file %q: not in any mount", src)
142         }
143
144         // srcRelPath is the path to the file/dir we are trying to
145         // copy, relative to its mount point -- ".", "./foo.txt", ...
146         srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
147
148         switch {
149         case srcMount.ExcludeFromOutput:
150         case srcMount.Kind == "tmp":
151                 // Handle by walking the host filesystem.
152                 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
153         case srcMount.Kind != "collection":
154                 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
155         case !srcMount.Writable:
156                 mft, err := cp.getManifest(srcMount.PortableDataHash)
157                 if err != nil {
158                         return err
159                 }
160                 cp.manifest += mft.Extract(srcRelPath, dest).Text
161         default:
162                 hostRoot, err := cp.hostRoot(srcRoot)
163                 if err != nil {
164                         return err
165                 }
166                 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
167                 if err != nil {
168                         return err
169                 }
170                 defer f.Close()
171                 var coll arvados.Collection
172                 err = json.NewDecoder(f).Decode(&coll)
173                 if err != nil {
174                         return err
175                 }
176                 mft := manifest.Manifest{Text: coll.ManifestText}
177                 cp.manifest += mft.Extract(srcRelPath, dest).Text
178         }
179         if walkMountsBelow {
180                 return cp.walkMountsBelow(dest, src)
181         } else {
182                 return nil
183         }
184 }
185
186 func (cp *copier) walkMountsBelow(dest, src string) error {
187         for mnt, mntinfo := range cp.mounts {
188                 if !strings.HasPrefix(mnt, src+"/") {
189                         continue
190                 }
191                 if cp.copyRegularFiles(mntinfo) {
192                         // These got copied into the nearest parent
193                         // mount as regular files during setup, so
194                         // they get copied as regular files when we
195                         // process the parent. Output will reflect any
196                         // changes and deletions done by the
197                         // container.
198                         continue
199                 }
200                 // Example: we are processing dest=/foo src=/mnt1/dir1
201                 // (perhaps we followed a symlink /outdir/foo ->
202                 // /mnt1/dir1). Caller has already processed the
203                 // collection mounted at /mnt1, but now we find that
204                 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
205                 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
206                 //
207                 // We handle all descendants of /mnt1/dir1 in this
208                 // loop instead of using recursion:
209                 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
210                 // /mnt1/dir1/mnt2, but we only want to walk it
211                 // once. (This simplification is safe because mounted
212                 // collections cannot contain symlinks.)
213                 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
214                 if err != nil {
215                         return err
216                 }
217         }
218         return nil
219 }
220
221 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
222 // path in the container's filesystem which corresponds to a real file
223 // or directory in cp.hostOutputDir) to dest (an absolute path in the
224 // output collection, or "" for output root).
225 //
226 // Always follow symlinks.
227 //
228 // If includeMounts is true, include mounts at and below src.
229 // Otherwise, skip them.
230 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
231         if includeMounts {
232                 err := cp.walkMountsBelow(dest, src)
233                 if err != nil {
234                         return err
235                 }
236         }
237
238         hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
239
240         // If src is a symlink, walk its target.
241         fi, err := os.Lstat(hostsrc)
242         if err != nil {
243                 return fmt.Errorf("lstat %q: %s", src, err)
244         }
245         if fi.Mode()&os.ModeSymlink != 0 {
246                 if maxSymlinks < 0 {
247                         return errTooManySymlinks
248                 }
249                 target, err := os.Readlink(hostsrc)
250                 if err != nil {
251                         return fmt.Errorf("readlink %q: %s", src, err)
252                 }
253                 if !strings.HasPrefix(target, "/") {
254                         target = filepath.Join(filepath.Dir(src), target)
255                 }
256                 return cp.walkMount(dest, target, maxSymlinks-1, true)
257         }
258
259         // If src is a regular directory, append it to cp.dirs and
260         // walk each of its children. (If there are no children,
261         // create an empty file "dest/.keep".)
262         if fi.Mode().IsDir() {
263                 if dest != "" {
264                         cp.dirs = append(cp.dirs, dest)
265                 }
266                 dir, err := os.Open(hostsrc)
267                 if err != nil {
268                         return fmt.Errorf("open %q: %s", src, err)
269                 }
270                 names, err := dir.Readdirnames(-1)
271                 dir.Close()
272                 if err != nil {
273                         return fmt.Errorf("readdirnames %q: %s", src, err)
274                 }
275                 if len(names) == 0 {
276                         if dest != "" {
277                                 cp.files = append(cp.files, filetodo{
278                                         src: os.DevNull,
279                                         dst: dest + "/.keep",
280                                 })
281                         }
282                         return nil
283                 }
284                 sort.Strings(names)
285                 for _, name := range names {
286                         dest, src := dest+"/"+name, src+"/"+name
287                         if _, isSecret := cp.secretMounts[src]; isSecret {
288                                 continue
289                         }
290                         if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
291                                 // If a regular file/dir somehow
292                                 // exists at a path that's also a
293                                 // mount target, ignore the file --
294                                 // the mount has already been included
295                                 // with walkMountsBelow().
296                                 //
297                                 // (...except mount types that are
298                                 // handled as regular files.)
299                                 continue
300                         }
301                         err = cp.walkHostFS(dest, src, maxSymlinks, false)
302                         if err != nil {
303                                 return err
304                         }
305                 }
306                 return nil
307         }
308
309         // If src is a regular file, append it to cp.files.
310         if fi.Mode().IsRegular() {
311                 cp.files = append(cp.files, filetodo{
312                         src:  hostsrc,
313                         dst:  dest,
314                         size: fi.Size(),
315                 })
316                 return nil
317         }
318
319         return fmt.Errorf("Unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
320 }
321
322 // Return the host path that was mounted at the given path in the
323 // container.
324 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
325         if ctrRoot == cp.ctrOutputDir {
326                 return cp.hostOutputDir, nil
327         }
328         for _, bind := range cp.binds {
329                 tokens := strings.Split(bind, ":")
330                 if len(tokens) >= 2 && tokens[1] == ctrRoot {
331                         return tokens[0], nil
332                 }
333         }
334         return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
335 }
336
337 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
338         return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
339 }
340
341 func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
342         if mft, ok := cp.manifestCache[pdh]; ok {
343                 return mft, nil
344         }
345         var coll arvados.Collection
346         err := cp.arvClient.Get("collections", pdh, nil, &coll)
347         if err != nil {
348                 return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
349         }
350         mft := &manifest.Manifest{Text: coll.ManifestText}
351         if cp.manifestCache == nil {
352                 cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
353         } else {
354                 cp.manifestCache[pdh] = mft
355         }
356         return mft, nil
357 }