1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/sdk/go/arvados"
19 "git.arvados.org/arvados.git/sdk/go/keepclient"
20 "github.com/bmatcuk/doublestar/v4"
23 type printfer interface {
24 Printf(string, ...interface{})
27 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
29 const limitFollowSymlinks = 10
31 type filetodo struct {
37 // copier copies data from a finished container's output path to a new
38 // Arvados collection.
40 // Regular files (and symlinks to regular files) in hostOutputDir are
41 // copied from the local filesystem.
43 // Symlinks to mounted collections, and any collections mounted under
44 // ctrOutputDir, are copied by reference, without moving any data
47 // Symlinks to other parts of the container's filesystem result in
52 // manifest, err := (&copier{...}).Copy()
54 client *arvados.Client
55 keepClient IKeepClient
59 bindmounts map[string]bindmount
60 mounts map[string]arvados.Mount
61 secretMounts map[string]arvados.Mount
66 staged arvados.CollectionFileSystem
68 manifestCache map[string]string
70 // tmpfs is the filesystem representation of the source
71 // collection that was most recently handled in
72 // copyFromCollection. This improves performance slightly in
73 // the special case where many mounts reference the same
75 tmpfs arvados.CollectionFileSystem
76 tmpfsManifestText string
79 // Copy copies data as needed, and returns a new manifest.
81 // Copy should not be called more than once.
82 func (cp *copier) Copy() (string, error) {
84 cp.staged, err = (&arvados.Collection{}).FileSystem(cp.client, cp.keepClient)
86 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
88 err = cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
90 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
93 // Remove files/dirs that don't match globs (the files/dirs
94 // that were added during cp.walkMount() by copying subtree
95 // manifests into cp.staged).
96 err = cp.applyGlobsToStaged()
98 return "", fmt.Errorf("error while removing non-matching files from output collection: %w", err)
100 // Remove files/dirs that don't match globs (the files/dirs
101 // that are stored on the local filesystem and would need to
102 // be copied in copyFile() below).
103 cp.applyGlobsToFilesAndDirs()
104 for _, d := range cp.dirs {
105 err = cp.staged.Mkdir(d, 0777)
106 if err != nil && err != os.ErrExist {
107 return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
112 var lastparentdir string
113 for _, f := range cp.files {
114 // If a dir has just had its last file added, do a
115 // full Flush. Otherwise, do a partial Flush (write
116 // full-size blocks, but leave the last short block
117 // open so f's data can be packed with it).
118 dir, _ := filepath.Split(f.dst)
119 if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
120 if err := cp.staged.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
121 return "", fmt.Errorf("error flushing output collection file data: %v", err)
127 n, err := cp.copyFile(cp.staged, f)
129 return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
133 return cp.staged.MarshalManifest(".")
136 func (cp *copier) matchGlobs(path string, isDir bool) bool {
137 // An entry in the top level of the output directory looks
138 // like "/foo", but globs look like "foo", so we strip the
139 // leading "/" before matching.
140 path = strings.TrimLeft(path, "/")
141 for _, glob := range cp.globs {
142 if !isDir && strings.HasSuffix(glob, "/**") {
143 // doublestar.Match("f*/**", "ff") and
144 // doublestar.Match("f*/**", "ff/gg") both
145 // return true, but (to be compatible with
146 // bash shopt) "ff" should match only if it is
149 // To avoid errant matches, we add the file's
150 // basename to the end of the pattern:
152 // Match("f*/**/ff", "ff") => false
153 // Match("f*/**/gg", "ff/gg") => true
155 // Of course, we need to escape basename in
156 // case it contains *, ?, \, etc.
157 _, name := filepath.Split(path)
158 escapedName := strings.TrimSuffix(strings.Replace(name, "", "\\", -1), "\\")
159 if match, _ := doublestar.Match(glob+"/"+escapedName, path); match {
162 } else if match, _ := doublestar.Match(glob, path); match {
165 // Workaround doublestar bug (v4.6.1).
166 // "foo*/**" should match "foo", but does not,
167 // because isZeroLengthPattern does not accept
168 // "*/**" as a zero length pattern.
169 if trunc := strings.TrimSuffix(glob, "*/**"); trunc != glob {
170 if match, _ := doublestar.Match(trunc, path); match {
179 // Delete entries from cp.files that do not match cp.globs.
181 // Delete entries from cp.dirs that do not match cp.globs.
183 // Ensure parent/ancestor directories of remaining cp.files and
184 // cp.dirs entries are still present in cp.dirs, even if they do not
185 // match cp.globs themselves.
186 func (cp *copier) applyGlobsToFilesAndDirs() {
187 if len(cp.globs) == 0 {
190 keepdirs := make(map[string]bool)
191 for _, path := range cp.dirs {
192 if cp.matchGlobs(path, true) {
193 keepdirs[path] = true
196 for path := range keepdirs {
197 for i, c := range path {
198 if i > 0 && c == '/' {
199 keepdirs[path[:i]] = true
203 var keepfiles []filetodo
204 for _, file := range cp.files {
205 if cp.matchGlobs(file.dst, false) {
206 keepfiles = append(keepfiles, file)
209 for _, file := range keepfiles {
210 for i, c := range file.dst {
211 if i > 0 && c == '/' {
212 keepdirs[file.dst[:i]] = true
217 for path := range keepdirs {
218 cp.dirs = append(cp.dirs, path)
220 sort.Strings(cp.dirs)
224 // Delete files in cp.staged that do not match cp.globs. Also delete
225 // directories that are empty (after deleting non-matching files) and
226 // do not match cp.globs themselves.
227 func (cp *copier) applyGlobsToStaged() error {
228 if len(cp.globs) == 0 {
231 include := make(map[string]bool)
232 err := fs.WalkDir(arvados.FS(cp.staged), "", func(path string, ent fs.DirEntry, err error) error {
233 if cp.matchGlobs(path, ent.IsDir()) {
234 for i, c := range path {
235 if i > 0 && c == '/' {
236 include[path[:i]] = true
246 err = fs.WalkDir(arvados.FS(cp.staged), "", func(path string, ent fs.DirEntry, err error) error {
247 if err != nil || path == "" {
251 err := cp.staged.RemoveAll(path)
264 // Return true if it's possible for any descendant of the given path
265 // to match anything in cp.globs. Used by walkMount to avoid loading
266 // collections that are mounted underneath ctrOutputPath but excluded
268 func (cp *copier) subtreeCouldMatch(path string) bool {
269 if len(cp.globs) == 0 {
272 pathdepth := 1 + strings.Count(path, "/")
273 for _, glob := range cp.globs {
276 for i, c := range glob {
277 if c != '/' || !doublestar.ValidatePattern(glob[:i]) {
278 // Escaped "/", or "/" in a character
279 // class, is not a path separator.
282 if glob[lastsep:i] == "**" {
286 if globdepth++; globdepth == pathdepth {
287 if match, _ := doublestar.Match(glob[:i]+"/*", path+"/z"); match {
293 if globdepth < pathdepth && glob[lastsep:] == "**" {
300 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
301 cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
302 dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
306 src, err := os.Open(f.src)
312 n, err := io.Copy(dst, src)
317 return n, dst.Close()
320 // Add to cp.staged, cp.files, and cp.dirs so as to copy src (an
321 // absolute path in the container's filesystem) to dest (an absolute
322 // path in the output collection, or "" for output root).
324 // src must be (or be a descendant of) a readonly "collection" mount,
325 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
327 // If walkMountsBelow is true, include contents of any collection
328 // mounted below src as well.
329 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
330 // srcRoot, srcMount indicate the innermost mount that
333 var srcMount arvados.Mount
334 for root, mnt := range cp.mounts {
335 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
336 srcRoot, srcMount = root, mnt
339 for root := range cp.secretMounts {
340 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
341 // Silently omit secrets, and symlinks to
347 return fmt.Errorf("cannot output file %q: not in any mount", src)
350 // srcRelPath is the path to the file/dir we are trying to
351 // copy, relative to its mount point -- ".", "./foo.txt", ...
352 srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
354 // outputRelPath is the destination path relative to the
355 // output directory. Used for logging and glob matching.
356 var outputRelPath = ""
357 if strings.HasPrefix(src, cp.ctrOutputDir) {
358 outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
360 if outputRelPath == "" {
361 // blank means copy a whole directory, so replace it
362 // with a wildcard to make it a little clearer what's
363 // going on since outputRelPath is only used for logging
368 case srcMount.ExcludeFromOutput:
369 case outputRelPath != "*" && !cp.subtreeCouldMatch(outputRelPath):
370 cp.logger.Printf("not copying %q because contents cannot match output globs", outputRelPath)
372 case srcMount.Kind == "tmp":
373 // Handle by walking the host filesystem.
374 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
375 case srcMount.Kind != "collection":
376 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
377 case !srcMount.Writable:
378 cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
379 mft, err := cp.getManifest(srcMount.PortableDataHash)
383 err = cp.copyFromCollection(dest, &arvados.Collection{ManifestText: mft}, srcRelPath)
388 cp.logger.Printf("copying %q", outputRelPath)
389 hostRoot, err := cp.hostRoot(srcRoot)
393 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
398 var coll arvados.Collection
399 err = json.NewDecoder(f).Decode(&coll)
403 err = cp.copyFromCollection(dest, &coll, srcRelPath)
409 cp.tmpfsManifestText = ""
411 return cp.walkMountsBelow(dest, src)
416 func (cp *copier) copyFromCollection(dest string, coll *arvados.Collection, srcRelPath string) error {
417 if coll.ManifestText == "" || coll.ManifestText != cp.tmpfsManifestText {
418 tmpfs, err := coll.FileSystem(cp.client, cp.keepClient)
423 cp.tmpfsManifestText = coll.ManifestText
425 snap, err := arvados.Snapshot(cp.tmpfs, srcRelPath)
429 // Create ancestors of dest, if necessary.
430 for i, c := range dest {
431 if i > 0 && c == '/' {
432 err = cp.staged.Mkdir(dest[:i], 0777)
433 if err != nil && !os.IsExist(err) {
438 return arvados.Splice(cp.staged, dest, snap)
441 func (cp *copier) walkMountsBelow(dest, src string) error {
442 for mnt, mntinfo := range cp.mounts {
443 if !strings.HasPrefix(mnt, src+"/") {
446 if cp.copyRegularFiles(mntinfo) {
447 // These got copied into the nearest parent
448 // mount as regular files during setup, so
449 // they get copied as regular files when we
450 // process the parent. Output will reflect any
451 // changes and deletions done by the
455 // Example: we are processing dest=/foo src=/mnt1/dir1
456 // (perhaps we followed a symlink /outdir/foo ->
457 // /mnt1/dir1). Caller has already processed the
458 // collection mounted at /mnt1, but now we find that
459 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
460 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
462 // We handle all descendants of /mnt1/dir1 in this
463 // loop instead of using recursion:
464 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
465 // /mnt1/dir1/mnt2, but we only want to walk it
466 // once. (This simplification is safe because mounted
467 // collections cannot contain symlinks.)
468 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
476 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
477 // path in the container's filesystem which corresponds to a real file
478 // or directory in cp.hostOutputDir) to dest (an absolute path in the
479 // output collection, or "" for output root).
481 // Always follow symlinks.
483 // If includeMounts is true, include mounts at and below src.
484 // Otherwise, skip them.
485 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
487 err := cp.walkMountsBelow(dest, src)
493 hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
495 // If src is a symlink, walk its target.
496 fi, err := os.Lstat(hostsrc)
498 return fmt.Errorf("lstat %q: %s", src, err)
500 if fi.Mode()&os.ModeSymlink != 0 {
502 return errTooManySymlinks
504 target, err := os.Readlink(hostsrc)
506 return fmt.Errorf("readlink %q: %s", src, err)
508 if !strings.HasPrefix(target, "/") {
509 target = filepath.Join(filepath.Dir(src), target)
511 return cp.walkMount(dest, target, maxSymlinks-1, true)
514 // If src is a regular directory, append it to cp.dirs and
515 // walk each of its children. (If there are no children,
516 // create an empty file "dest/.keep".)
517 if fi.Mode().IsDir() {
519 cp.dirs = append(cp.dirs, dest)
521 dir, err := os.Open(hostsrc)
523 return fmt.Errorf("open %q: %s", src, err)
525 names, err := dir.Readdirnames(-1)
528 return fmt.Errorf("readdirnames %q: %s", src, err)
532 cp.files = append(cp.files, filetodo{
534 dst: dest + "/.keep",
540 for _, name := range names {
541 dest, src := dest+"/"+name, src+"/"+name
542 if _, isSecret := cp.secretMounts[src]; isSecret {
545 if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
546 // If a regular file/dir somehow
547 // exists at a path that's also a
548 // mount target, ignore the file --
549 // the mount has already been included
550 // with walkMountsBelow().
552 // (...except mount types that are
553 // handled as regular files.)
555 } else if isMount && !cp.subtreeCouldMatch(src[len(cp.ctrOutputDir)+1:]) {
558 err = cp.walkHostFS(dest, src, maxSymlinks, false)
566 // If src is a regular file, append it to cp.files.
567 if fi.Mode().IsRegular() {
568 cp.files = append(cp.files, filetodo{
575 cp.logger.Printf("Skipping unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
579 // Return the host path that was mounted at the given path in the
581 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
582 if ctrRoot == cp.ctrOutputDir {
583 return cp.hostOutputDir, nil
585 if mnt, ok := cp.bindmounts[ctrRoot]; ok {
586 return mnt.HostPath, nil
588 return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
591 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
592 return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
595 func (cp *copier) getManifest(pdh string) (string, error) {
596 if mft, ok := cp.manifestCache[pdh]; ok {
599 var coll arvados.Collection
600 err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil)
602 return "", fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
604 if cp.manifestCache == nil {
605 cp.manifestCache = make(map[string]string)
607 cp.manifestCache[pdh] = coll.ManifestText
608 return coll.ManifestText, nil