// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package main import ( "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "sort" "strings" "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/manifest" ) type printfer interface { Printf(string, ...interface{}) } var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle") const limitFollowSymlinks = 10 type filetodo struct { src string dst string size int64 } // copier copies data from a finished container's output path to a new // Arvados collection. // // Regular files (and symlinks to regular files) in hostOutputDir are // copied from the local filesystem. // // Symlinks to mounted collections, and any collections mounted under // ctrOutputDir, are copied by transforming the relevant parts of the // existing manifests, without moving any data around. // // Symlinks to other parts of the container's filesystem result in // errors. // // To use a copier: first call walkMount() to inspect the output // directory, grab the necessary parts of already-stored collections, // and prepare a list of files that need to be copied from the local // filesystem; then call commit() to copy the file data and return a // complete output manifest. type copier struct { client *arvados.Client arvClient IArvadosClient keepClient IKeepClient hostOutputDir string ctrOutputDir string mounts map[string]arvados.Mount secretMounts map[string]arvados.Mount logger printfer dirs []string files []filetodo manifest string manifestCache map[string]*manifest.Manifest } // Copy copies data as needed, and returns a new manifest. func (cp *copier) Copy() (string, error) { err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true) if err != nil { return "", err } fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient) if err != nil { return "", err } for _, d := range cp.dirs { err = fs.Mkdir(d, 0777) if err != nil { return "", err } } for _, f := range cp.files { err = cp.copyFile(fs, f) if err != nil { return "", err } } return fs.MarshalManifest(".") } func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error { cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size) dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return err } src, err := os.Open(f.src) if err != nil { dst.Close() return err } defer src.Close() _, err = io.Copy(dst, src) if err != nil { dst.Close() return err } return dst.Close() } // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an // absolute path in the container's filesystem) to dest (an absolute // path in the output collection, or "" for output root). // // src must be (or be a descendant of) a readonly "collection" mount, // a writable collection mounted at ctrOutputPath, or a "tmp" mount. // // If walkMountsBelow is true, include contents of any collection // mounted below src as well. func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error { // srcRoot, srcMount indicate the innermost mount that // contains src. var srcRoot string var srcMount arvados.Mount for root, mnt := range cp.mounts { if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") { srcRoot, srcMount = root, mnt } } for root := range cp.secretMounts { if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") { // Silently omit secrets, and symlinks to // secrets. return nil } } if srcRoot == "" { return fmt.Errorf("cannot output file %q: not in any mount", src) } // srcRelPath is the path to the file/dir we are trying to // copy, relative to its mount point -- ".", "./foo.txt", ... srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):]) switch { case srcMount.ExcludeFromOutput: case srcMount.Kind == "tmp": // Handle by walking the host filesystem. return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow) case srcMount.Kind != "collection": return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind) case !srcMount.Writable: mft, err := cp.getManifest(srcMount.PortableDataHash) if err != nil { return err } cp.manifest += mft.Extract(srcRelPath, dest).Text case srcRoot == cp.ctrOutputDir: f, err := os.Open(filepath.Join(cp.hostOutputDir, ".arvados#collection")) if err != nil { return err } defer f.Close() var coll arvados.Collection err = json.NewDecoder(f).Decode(&coll) if err != nil { return err } mft := manifest.Manifest{Text: coll.ManifestText} cp.manifest += mft.Extract(srcRelPath, dest).Text default: return fmt.Errorf("cannot output %q as %q: writable collection mounted at %q", src, dest, srcRoot) } if walkMountsBelow { return cp.walkMountsBelow(dest, src) } else { return nil } } func (cp *copier) walkMountsBelow(dest, src string) error { for mnt, mntinfo := range cp.mounts { if !strings.HasPrefix(mnt, src+"/") { continue } if mntinfo.Kind == "text" || mntinfo.Kind == "json" { // These got copied into the nearest parent // mount as regular files during setup, so // they get copied as regular files when we // process the parent. Output will reflect any // changes and deletions done by the // container. continue } // Example: we are processing dest=/foo src=/mnt1/dir1 // (perhaps we followed a symlink /outdir/foo -> // /mnt1/dir1). Caller has already processed the // collection mounted at /mnt1, but now we find that // /mnt1/dir1/mnt2 is also a mount, so we need to copy // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2. // // We handle all descendants of /mnt1/dir1 in this // loop instead of using recursion: // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and // /mnt1/dir1/mnt2, but we only want to walk it // once. (This simplification is safe because mounted // collections cannot contain symlinks.) err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false) if err != nil { return err } } return nil } // Add entries to cp.dirs and cp.files so as to copy src (an absolute // path in the container's filesystem which corresponds to a real file // or directory in cp.hostOutputDir) to dest (an absolute path in the // output collection, or "" for output root). // // Always follow symlinks. // // If includeMounts is true, include mounts at and below src. // Otherwise, skip them. func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error { if includeMounts { err := cp.walkMountsBelow(dest, src) if err != nil { return err } } hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):] // If src is a symlink, walk its target. fi, err := os.Lstat(hostsrc) if err != nil { return fmt.Errorf("lstat %q: %s", src, err) } if fi.Mode()&os.ModeSymlink != 0 { if maxSymlinks < 0 { return errTooManySymlinks } target, err := os.Readlink(hostsrc) if err != nil { return fmt.Errorf("readlink %q: %s", src, err) } if !strings.HasPrefix(target, "/") { target = filepath.Join(filepath.Dir(src), target) } return cp.walkMount(dest, target, maxSymlinks-1, true) } // If src is a regular directory, append it to cp.dirs and // walk each of its children. (If there are no children, // create an empty file "dest/.keep".) if fi.Mode().IsDir() { if dest != "" { cp.dirs = append(cp.dirs, dest) } dir, err := os.Open(hostsrc) if err != nil { return fmt.Errorf("open %q: %s", src, err) } names, err := dir.Readdirnames(-1) dir.Close() if err != nil { return fmt.Errorf("readdirnames %q: %s", src, err) } if len(names) == 0 { if dest != "" { cp.files = append(cp.files, filetodo{ src: os.DevNull, dst: dest + "/.keep", }) } return nil } sort.Strings(names) for _, name := range names { dest, src := dest+"/"+name, src+"/"+name if _, isSecret := cp.secretMounts[src]; isSecret { continue } if mntinfo, isMount := cp.mounts[src]; isMount && mntinfo.Kind != "text" && mntinfo.Kind != "json" { // If a regular file/dir somehow // exists at a path that's also a // mount target, ignore the file -- // the mount has already been included // with walkMountsBelow(). // // (...except json/text mounts, which // are handled as regular files.) continue } err = cp.walkHostFS(dest, src, maxSymlinks, false) if err != nil { return err } } return nil } // If src is a regular file, append it to cp.files. if fi.Mode().IsRegular() { cp.files = append(cp.files, filetodo{ src: hostsrc, dst: dest, size: fi.Size(), }) return nil } return fmt.Errorf("Unsupported file type (mode %o) in output dir: %q", fi.Mode(), src) } func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) { if mft, ok := cp.manifestCache[pdh]; ok { return mft, nil } var coll arvados.Collection err := cp.arvClient.Get("collections", pdh, nil, &coll) if err != nil { return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err) } mft := &manifest.Manifest{Text: coll.ManifestText} if cp.manifestCache == nil { cp.manifestCache = map[string]*manifest.Manifest{pdh: mft} } else { cp.manifestCache[pdh] = mft } return mft, nil }