507029b36edfcf49b7037807e365cbf79be54e2f
[arvados.git] / lib / crunchrun / copier.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "io/fs"
13         "os"
14         "path/filepath"
15         "sort"
16         "strings"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/keepclient"
20         "git.arvados.org/arvados.git/sdk/go/manifest"
21         "github.com/bmatcuk/doublestar/v4"
22 )
23
24 type printfer interface {
25         Printf(string, ...interface{})
26 }
27
28 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
29
30 const limitFollowSymlinks = 10
31
32 type filetodo struct {
33         src  string
34         dst  string
35         size int64
36 }
37
38 // copier copies data from a finished container's output path to a new
39 // Arvados collection.
40 //
41 // Regular files (and symlinks to regular files) in hostOutputDir are
42 // copied from the local filesystem.
43 //
44 // Symlinks to mounted collections, and any collections mounted under
45 // ctrOutputDir, are copied by transforming the relevant parts of the
46 // existing manifests, without moving any data around.
47 //
48 // Symlinks to other parts of the container's filesystem result in
49 // errors.
50 //
51 // Use:
52 //
53 //      manifest, err := (&copier{...}).Copy()
54 type copier struct {
55         client        *arvados.Client
56         keepClient    IKeepClient
57         hostOutputDir string
58         ctrOutputDir  string
59         globs         []string
60         bindmounts    map[string]bindmount
61         mounts        map[string]arvados.Mount
62         secretMounts  map[string]arvados.Mount
63         logger        printfer
64
65         dirs     []string
66         files    []filetodo
67         manifest string
68
69         manifestCache map[string]*manifest.Manifest
70 }
71
72 // Copy copies data as needed, and returns a new manifest.
73 func (cp *copier) Copy() (string, error) {
74         err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
75         if err != nil {
76                 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
77         }
78         collfs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
79         if err != nil {
80                 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
81         }
82         err = cp.applyGlobsToCollectionFS(collfs)
83         if err != nil {
84                 return "", fmt.Errorf("error while removing non-matching files from output collection: %w", err)
85         }
86         cp.applyGlobsToFilesAndDirs()
87         for _, d := range cp.dirs {
88                 err = collfs.Mkdir(d, 0777)
89                 if err != nil && err != os.ErrExist {
90                         return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
91                 }
92         }
93         var unflushed int64
94         var lastparentdir string
95         for _, f := range cp.files {
96                 // If a dir has just had its last file added, do a
97                 // full Flush. Otherwise, do a partial Flush (write
98                 // full-size blocks, but leave the last short block
99                 // open so f's data can be packed with it).
100                 dir, _ := filepath.Split(f.dst)
101                 if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
102                         if err := collfs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
103                                 return "", fmt.Errorf("error flushing output collection file data: %v", err)
104                         }
105                         unflushed = 0
106                 }
107                 lastparentdir = dir
108
109                 n, err := cp.copyFile(collfs, f)
110                 if err != nil {
111                         return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
112                 }
113                 unflushed += n
114         }
115         return collfs.MarshalManifest(".")
116 }
117
118 func (cp *copier) matchGlobs(path string, isDir bool) bool {
119         // An entry in the top level of the output directory looks
120         // like "/foo", but globs look like "foo", so we strip the
121         // leading "/" before matching.
122         path = strings.TrimLeft(path, "/")
123         for _, glob := range cp.globs {
124                 if !isDir && strings.HasSuffix(glob, "/**") {
125                         // doublestar.Match("f*/**", "ff") and
126                         // doublestar.Match("f*/**", "ff/gg") both
127                         // return true, but (to be compatible with
128                         // bash shopt) "ff" should match only if it is
129                         // a directory.
130                         //
131                         // To avoid errant matches, we add the file's
132                         // basename to the end of the pattern:
133                         //
134                         // Match("f*/**/ff", "ff") => false
135                         // Match("f*/**/gg", "ff/gg") => true
136                         //
137                         // Of course, we need to escape basename in
138                         // case it contains *, ?, \, etc.
139                         _, name := filepath.Split(path)
140                         escapedName := strings.TrimSuffix(strings.Replace(name, "", "\\", -1), "\\")
141                         if match, _ := doublestar.Match(glob+"/"+escapedName, path); match {
142                                 return true
143                         }
144                 } else if match, _ := doublestar.Match(glob, path); match {
145                         return true
146                 } else if isDir {
147                         // Workaround doublestar bug (v4.6.1).
148                         // "foo*/**" should match "foo", but does not,
149                         // because isZeroLengthPattern does not accept
150                         // "*/**" as a zero length pattern.
151                         if trunc := strings.TrimSuffix(glob, "*/**"); trunc != glob {
152                                 if match, _ := doublestar.Match(trunc, path); match {
153                                         return true
154                                 }
155                         }
156                 }
157         }
158         return false
159 }
160
161 // Delete entries from cp.files that do not match cp.globs.
162 //
163 // Delete entries from cp.dirs that do not match cp.globs.
164 //
165 // Ensure parent/ancestor directories of remaining cp.files and
166 // cp.dirs entries are still present in cp.dirs, even if they do not
167 // match cp.globs themselves.
168 func (cp *copier) applyGlobsToFilesAndDirs() {
169         if len(cp.globs) == 0 {
170                 return
171         }
172         keepdirs := make(map[string]bool)
173         for _, path := range cp.dirs {
174                 if cp.matchGlobs(path, true) {
175                         keepdirs[path] = true
176                 }
177         }
178         for path := range keepdirs {
179                 for i, c := range path {
180                         if i > 0 && c == '/' {
181                                 keepdirs[path[:i]] = true
182                         }
183                 }
184         }
185         var keepfiles []filetodo
186         for _, file := range cp.files {
187                 if cp.matchGlobs(file.dst, false) {
188                         keepfiles = append(keepfiles, file)
189                 }
190         }
191         for _, file := range keepfiles {
192                 for i, c := range file.dst {
193                         if i > 0 && c == '/' {
194                                 keepdirs[file.dst[:i]] = true
195                         }
196                 }
197         }
198         cp.dirs = nil
199         for path := range keepdirs {
200                 cp.dirs = append(cp.dirs, path)
201         }
202         sort.Strings(cp.dirs)
203         cp.files = keepfiles
204 }
205
206 // Delete files in collfs that do not match cp.globs.  Also delete
207 // directories that are empty (after deleting non-matching files) and
208 // do not match cp.globs themselves.
209 func (cp *copier) applyGlobsToCollectionFS(collfs arvados.CollectionFileSystem) error {
210         if len(cp.globs) == 0 {
211                 return nil
212         }
213         include := make(map[string]bool)
214         err := fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error {
215                 if cp.matchGlobs(path, ent.IsDir()) {
216                         for i, c := range path {
217                                 if i > 0 && c == '/' {
218                                         include[path[:i]] = true
219                                 }
220                         }
221                         include[path] = true
222                 }
223                 return nil
224         })
225         if err != nil {
226                 return err
227         }
228         err = fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error {
229                 if err != nil || path == "" {
230                         return err
231                 }
232                 if !include[path] {
233                         err := collfs.RemoveAll(path)
234                         if err != nil {
235                                 return err
236                         }
237                         if ent.IsDir() {
238                                 return fs.SkipDir
239                         }
240                 }
241                 return nil
242         })
243         return err
244 }
245
246 // Return true if it's possible for any descendant of the given path
247 // to match anything in cp.globs.  Used by walkMount to avoid loading
248 // collections that are mounted underneath ctrOutputPath but excluded
249 // by globs.
250 func (cp *copier) subtreeCouldMatch(path string) bool {
251         if len(cp.globs) == 0 {
252                 return true
253         }
254         pathdepth := 1 + strings.Count(path, "/")
255         for _, glob := range cp.globs {
256                 globdepth := 0
257                 lastsep := 0
258                 for i, c := range glob {
259                         if c != '/' || !doublestar.ValidatePattern(glob[:i]) {
260                                 // Escaped "/", or "/" in a character
261                                 // class, is not a path separator.
262                                 continue
263                         }
264                         if glob[lastsep:i] == "**" {
265                                 return true
266                         }
267                         lastsep = i + 1
268                         if globdepth++; globdepth == pathdepth {
269                                 if match, _ := doublestar.Match(glob[:i]+"/*", path+"/z"); match {
270                                         return true
271                                 }
272                                 break
273                         }
274                 }
275                 if globdepth < pathdepth && glob[lastsep:] == "**" {
276                         return true
277                 }
278         }
279         return false
280 }
281
282 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
283         cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
284         dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
285         if err != nil {
286                 return 0, err
287         }
288         src, err := os.Open(f.src)
289         if err != nil {
290                 dst.Close()
291                 return 0, err
292         }
293         defer src.Close()
294         n, err := io.Copy(dst, src)
295         if err != nil {
296                 dst.Close()
297                 return n, err
298         }
299         return n, dst.Close()
300 }
301
302 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
303 // absolute path in the container's filesystem) to dest (an absolute
304 // path in the output collection, or "" for output root).
305 //
306 // src must be (or be a descendant of) a readonly "collection" mount,
307 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
308 //
309 // If walkMountsBelow is true, include contents of any collection
310 // mounted below src as well.
311 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
312         // srcRoot, srcMount indicate the innermost mount that
313         // contains src.
314         var srcRoot string
315         var srcMount arvados.Mount
316         for root, mnt := range cp.mounts {
317                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
318                         srcRoot, srcMount = root, mnt
319                 }
320         }
321         for root := range cp.secretMounts {
322                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
323                         // Silently omit secrets, and symlinks to
324                         // secrets.
325                         return nil
326                 }
327         }
328         if srcRoot == "" {
329                 return fmt.Errorf("cannot output file %q: not in any mount", src)
330         }
331
332         // srcRelPath is the path to the file/dir we are trying to
333         // copy, relative to its mount point -- ".", "./foo.txt", ...
334         srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
335
336         // outputRelPath is the destination path relative to the
337         // output directory. Used for logging and glob matching.
338         var outputRelPath = ""
339         if strings.HasPrefix(src, cp.ctrOutputDir) {
340                 outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
341         }
342         if outputRelPath == "" {
343                 // blank means copy a whole directory, so replace it
344                 // with a wildcard to make it a little clearer what's
345                 // going on since outputRelPath is only used for logging
346                 outputRelPath = "*"
347         }
348
349         switch {
350         case srcMount.ExcludeFromOutput:
351         case outputRelPath != "*" && !cp.subtreeCouldMatch(outputRelPath):
352                 cp.logger.Printf("not copying %q because contents cannot match output globs", outputRelPath)
353                 return nil
354         case srcMount.Kind == "tmp":
355                 // Handle by walking the host filesystem.
356                 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
357         case srcMount.Kind != "collection":
358                 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
359         case !srcMount.Writable:
360                 cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
361                 mft, err := cp.getManifest(srcMount.PortableDataHash)
362                 if err != nil {
363                         return err
364                 }
365                 cp.manifest += mft.Extract(srcRelPath, dest).Text
366         default:
367                 cp.logger.Printf("copying %q", outputRelPath)
368                 hostRoot, err := cp.hostRoot(srcRoot)
369                 if err != nil {
370                         return err
371                 }
372                 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
373                 if err != nil {
374                         return err
375                 }
376                 defer f.Close()
377                 var coll arvados.Collection
378                 err = json.NewDecoder(f).Decode(&coll)
379                 if err != nil {
380                         return err
381                 }
382                 mft := manifest.Manifest{Text: coll.ManifestText}
383                 cp.manifest += mft.Extract(srcRelPath, dest).Text
384         }
385         if walkMountsBelow {
386                 return cp.walkMountsBelow(dest, src)
387         }
388         return nil
389 }
390
391 func (cp *copier) walkMountsBelow(dest, src string) error {
392         for mnt, mntinfo := range cp.mounts {
393                 if !strings.HasPrefix(mnt, src+"/") {
394                         continue
395                 }
396                 if cp.copyRegularFiles(mntinfo) {
397                         // These got copied into the nearest parent
398                         // mount as regular files during setup, so
399                         // they get copied as regular files when we
400                         // process the parent. Output will reflect any
401                         // changes and deletions done by the
402                         // container.
403                         continue
404                 }
405                 // Example: we are processing dest=/foo src=/mnt1/dir1
406                 // (perhaps we followed a symlink /outdir/foo ->
407                 // /mnt1/dir1). Caller has already processed the
408                 // collection mounted at /mnt1, but now we find that
409                 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
410                 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
411                 //
412                 // We handle all descendants of /mnt1/dir1 in this
413                 // loop instead of using recursion:
414                 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
415                 // /mnt1/dir1/mnt2, but we only want to walk it
416                 // once. (This simplification is safe because mounted
417                 // collections cannot contain symlinks.)
418                 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
419                 if err != nil {
420                         return err
421                 }
422         }
423         return nil
424 }
425
426 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
427 // path in the container's filesystem which corresponds to a real file
428 // or directory in cp.hostOutputDir) to dest (an absolute path in the
429 // output collection, or "" for output root).
430 //
431 // Always follow symlinks.
432 //
433 // If includeMounts is true, include mounts at and below src.
434 // Otherwise, skip them.
435 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
436         if includeMounts {
437                 err := cp.walkMountsBelow(dest, src)
438                 if err != nil {
439                         return err
440                 }
441         }
442
443         hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
444
445         // If src is a symlink, walk its target.
446         fi, err := os.Lstat(hostsrc)
447         if err != nil {
448                 return fmt.Errorf("lstat %q: %s", src, err)
449         }
450         if fi.Mode()&os.ModeSymlink != 0 {
451                 if maxSymlinks < 0 {
452                         return errTooManySymlinks
453                 }
454                 target, err := os.Readlink(hostsrc)
455                 if err != nil {
456                         return fmt.Errorf("readlink %q: %s", src, err)
457                 }
458                 if !strings.HasPrefix(target, "/") {
459                         target = filepath.Join(filepath.Dir(src), target)
460                 }
461                 return cp.walkMount(dest, target, maxSymlinks-1, true)
462         }
463
464         // If src is a regular directory, append it to cp.dirs and
465         // walk each of its children. (If there are no children,
466         // create an empty file "dest/.keep".)
467         if fi.Mode().IsDir() {
468                 if dest != "" {
469                         cp.dirs = append(cp.dirs, dest)
470                 }
471                 dir, err := os.Open(hostsrc)
472                 if err != nil {
473                         return fmt.Errorf("open %q: %s", src, err)
474                 }
475                 names, err := dir.Readdirnames(-1)
476                 dir.Close()
477                 if err != nil {
478                         return fmt.Errorf("readdirnames %q: %s", src, err)
479                 }
480                 if len(names) == 0 {
481                         if dest != "" {
482                                 cp.files = append(cp.files, filetodo{
483                                         src: os.DevNull,
484                                         dst: dest + "/.keep",
485                                 })
486                         }
487                         return nil
488                 }
489                 sort.Strings(names)
490                 for _, name := range names {
491                         dest, src := dest+"/"+name, src+"/"+name
492                         if _, isSecret := cp.secretMounts[src]; isSecret {
493                                 continue
494                         }
495                         if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
496                                 // If a regular file/dir somehow
497                                 // exists at a path that's also a
498                                 // mount target, ignore the file --
499                                 // the mount has already been included
500                                 // with walkMountsBelow().
501                                 //
502                                 // (...except mount types that are
503                                 // handled as regular files.)
504                                 continue
505                         } else if isMount && !cp.subtreeCouldMatch(src[len(cp.ctrOutputDir)+1:]) {
506                                 continue
507                         }
508                         err = cp.walkHostFS(dest, src, maxSymlinks, false)
509                         if err != nil {
510                                 return err
511                         }
512                 }
513                 return nil
514         }
515
516         // If src is a regular file, append it to cp.files.
517         if fi.Mode().IsRegular() {
518                 cp.files = append(cp.files, filetodo{
519                         src:  hostsrc,
520                         dst:  dest,
521                         size: fi.Size(),
522                 })
523                 return nil
524         }
525         cp.logger.Printf("Skipping unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
526         return nil
527 }
528
529 // Return the host path that was mounted at the given path in the
530 // container.
531 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
532         if ctrRoot == cp.ctrOutputDir {
533                 return cp.hostOutputDir, nil
534         }
535         if mnt, ok := cp.bindmounts[ctrRoot]; ok {
536                 return mnt.HostPath, nil
537         }
538         return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
539 }
540
541 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
542         return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
543 }
544
545 func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
546         if mft, ok := cp.manifestCache[pdh]; ok {
547                 return mft, nil
548         }
549         var coll arvados.Collection
550         err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil)
551         if err != nil {
552                 return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
553         }
554         mft := &manifest.Manifest{Text: coll.ManifestText}
555         if cp.manifestCache == nil {
556                 cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
557         } else {
558                 cp.manifestCache[pdh] = mft
559         }
560         return mft, nil
561 }