13100: Handle writable collections mounted below output dir.
[arvados.git] / services / crunch-run / copier.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "os"
13         "path/filepath"
14         "sort"
15         "strings"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/manifest"
19 )
20
21 type printfer interface {
22         Printf(string, ...interface{})
23 }
24
25 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
26
27 const limitFollowSymlinks = 10
28
29 type filetodo struct {
30         src  string
31         dst  string
32         size int64
33 }
34
35 // copier copies data from a finished container's output path to a new
36 // Arvados collection.
37 //
38 // Regular files (and symlinks to regular files) in hostOutputDir are
39 // copied from the local filesystem.
40 //
41 // Symlinks to mounted collections, and any collections mounted under
42 // ctrOutputDir, are copied by transforming the relevant parts of the
43 // existing manifests, without moving any data around.
44 //
45 // Symlinks to other parts of the container's filesystem result in
46 // errors.
47 //
48 // Use:
49 //
50 //      manifest, err := (&copier{...}).Copy()
51 type copier struct {
52         client        *arvados.Client
53         arvClient     IArvadosClient
54         keepClient    IKeepClient
55         hostOutputDir string
56         ctrOutputDir  string
57         mounts        map[string]arvados.Mount
58         secretMounts  map[string]arvados.Mount
59         logger        printfer
60
61         dirs     []string
62         files    []filetodo
63         manifest string
64
65         manifestCache map[string]*manifest.Manifest
66 }
67
68 // Copy copies data as needed, and returns a new manifest.
69 func (cp *copier) Copy() (string, error) {
70         err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
71         if err != nil {
72                 return "", err
73         }
74         fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
75         if err != nil {
76                 return "", err
77         }
78         for _, d := range cp.dirs {
79                 err = fs.Mkdir(d, 0777)
80                 if err != nil {
81                         return "", err
82                 }
83         }
84         for _, f := range cp.files {
85                 err = cp.copyFile(fs, f)
86                 if err != nil {
87                         return "", err
88                 }
89         }
90         return fs.MarshalManifest(".")
91 }
92
93 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
94         cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
95         dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
96         if err != nil {
97                 return err
98         }
99         src, err := os.Open(f.src)
100         if err != nil {
101                 dst.Close()
102                 return err
103         }
104         defer src.Close()
105         _, err = io.Copy(dst, src)
106         if err != nil {
107                 dst.Close()
108                 return err
109         }
110         return dst.Close()
111 }
112
113 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
114 // absolute path in the container's filesystem) to dest (an absolute
115 // path in the output collection, or "" for output root).
116 //
117 // src must be (or be a descendant of) a readonly "collection" mount,
118 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
119 //
120 // If walkMountsBelow is true, include contents of any collection
121 // mounted below src as well.
122 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
123         // srcRoot, srcMount indicate the innermost mount that
124         // contains src.
125         var srcRoot string
126         var srcMount arvados.Mount
127         for root, mnt := range cp.mounts {
128                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
129                         srcRoot, srcMount = root, mnt
130                 }
131         }
132         for root := range cp.secretMounts {
133                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
134                         // Silently omit secrets, and symlinks to
135                         // secrets.
136                         return nil
137                 }
138         }
139         if srcRoot == "" {
140                 return fmt.Errorf("cannot output file %q: not in any mount", src)
141         }
142
143         // srcRelPath is the path to the file/dir we are trying to
144         // copy, relative to its mount point -- ".", "./foo.txt", ...
145         srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
146
147         switch {
148         case srcMount.ExcludeFromOutput:
149         case srcMount.Kind == "tmp":
150                 // Handle by walking the host filesystem.
151                 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
152         case srcMount.Kind != "collection":
153                 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
154         case !srcMount.Writable:
155                 mft, err := cp.getManifest(srcMount.PortableDataHash)
156                 if err != nil {
157                         return err
158                 }
159                 cp.manifest += mft.Extract(srcRelPath, dest).Text
160         case srcRoot == cp.ctrOutputDir:
161                 f, err := os.Open(filepath.Join(cp.hostOutputDir, ".arvados#collection"))
162                 if err != nil {
163                         return err
164                 }
165                 defer f.Close()
166                 var coll arvados.Collection
167                 err = json.NewDecoder(f).Decode(&coll)
168                 if err != nil {
169                         return err
170                 }
171                 mft := manifest.Manifest{Text: coll.ManifestText}
172                 cp.manifest += mft.Extract(srcRelPath, dest).Text
173         default:
174                 return fmt.Errorf("cannot output %q as %q: writable collection mounted at %q", src, dest, srcRoot)
175         }
176         if walkMountsBelow {
177                 return cp.walkMountsBelow(dest, src)
178         } else {
179                 return nil
180         }
181 }
182
183 func (cp *copier) walkMountsBelow(dest, src string) error {
184         for mnt, mntinfo := range cp.mounts {
185                 if !strings.HasPrefix(mnt, src+"/") {
186                         continue
187                 }
188                 if cp.copyRegularFiles(mntinfo) {
189                         // These got copied into the nearest parent
190                         // mount as regular files during setup, so
191                         // they get copied as regular files when we
192                         // process the parent. Output will reflect any
193                         // changes and deletions done by the
194                         // container.
195                         continue
196                 }
197                 // Example: we are processing dest=/foo src=/mnt1/dir1
198                 // (perhaps we followed a symlink /outdir/foo ->
199                 // /mnt1/dir1). Caller has already processed the
200                 // collection mounted at /mnt1, but now we find that
201                 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
202                 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
203                 //
204                 // We handle all descendants of /mnt1/dir1 in this
205                 // loop instead of using recursion:
206                 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
207                 // /mnt1/dir1/mnt2, but we only want to walk it
208                 // once. (This simplification is safe because mounted
209                 // collections cannot contain symlinks.)
210                 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
211                 if err != nil {
212                         return err
213                 }
214         }
215         return nil
216 }
217
218 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
219 // path in the container's filesystem which corresponds to a real file
220 // or directory in cp.hostOutputDir) to dest (an absolute path in the
221 // output collection, or "" for output root).
222 //
223 // Always follow symlinks.
224 //
225 // If includeMounts is true, include mounts at and below src.
226 // Otherwise, skip them.
227 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
228         if includeMounts {
229                 err := cp.walkMountsBelow(dest, src)
230                 if err != nil {
231                         return err
232                 }
233         }
234
235         hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
236
237         // If src is a symlink, walk its target.
238         fi, err := os.Lstat(hostsrc)
239         if err != nil {
240                 return fmt.Errorf("lstat %q: %s", src, err)
241         }
242         if fi.Mode()&os.ModeSymlink != 0 {
243                 if maxSymlinks < 0 {
244                         return errTooManySymlinks
245                 }
246                 target, err := os.Readlink(hostsrc)
247                 if err != nil {
248                         return fmt.Errorf("readlink %q: %s", src, err)
249                 }
250                 if !strings.HasPrefix(target, "/") {
251                         target = filepath.Join(filepath.Dir(src), target)
252                 }
253                 return cp.walkMount(dest, target, maxSymlinks-1, true)
254         }
255
256         // If src is a regular directory, append it to cp.dirs and
257         // walk each of its children. (If there are no children,
258         // create an empty file "dest/.keep".)
259         if fi.Mode().IsDir() {
260                 if dest != "" {
261                         cp.dirs = append(cp.dirs, dest)
262                 }
263                 dir, err := os.Open(hostsrc)
264                 if err != nil {
265                         return fmt.Errorf("open %q: %s", src, err)
266                 }
267                 names, err := dir.Readdirnames(-1)
268                 dir.Close()
269                 if err != nil {
270                         return fmt.Errorf("readdirnames %q: %s", src, err)
271                 }
272                 if len(names) == 0 {
273                         if dest != "" {
274                                 cp.files = append(cp.files, filetodo{
275                                         src: os.DevNull,
276                                         dst: dest + "/.keep",
277                                 })
278                         }
279                         return nil
280                 }
281                 sort.Strings(names)
282                 for _, name := range names {
283                         dest, src := dest+"/"+name, src+"/"+name
284                         if _, isSecret := cp.secretMounts[src]; isSecret {
285                                 continue
286                         }
287                         if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
288                                 // If a regular file/dir somehow
289                                 // exists at a path that's also a
290                                 // mount target, ignore the file --
291                                 // the mount has already been included
292                                 // with walkMountsBelow().
293                                 //
294                                 // (...except mount types that are
295                                 // handled as regular files.)
296                                 continue
297                         }
298                         err = cp.walkHostFS(dest, src, maxSymlinks, false)
299                         if err != nil {
300                                 return err
301                         }
302                 }
303                 return nil
304         }
305
306         // If src is a regular file, append it to cp.files.
307         if fi.Mode().IsRegular() {
308                 cp.files = append(cp.files, filetodo{
309                         src:  hostsrc,
310                         dst:  dest,
311                         size: fi.Size(),
312                 })
313                 return nil
314         }
315
316         return fmt.Errorf("Unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
317 }
318
319 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
320         return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
321 }
322
323 func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
324         if mft, ok := cp.manifestCache[pdh]; ok {
325                 return mft, nil
326         }
327         var coll arvados.Collection
328         err := cp.arvClient.Get("collections", pdh, nil, &coll)
329         if err != nil {
330                 return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
331         }
332         mft := &manifest.Manifest{Text: coll.ManifestText}
333         if cp.manifestCache == nil {
334                 cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
335         } else {
336                 cp.manifestCache[pdh] = mft
337         }
338         return mft, nil
339 }