X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0b0b4c7b23e96a6efb3cfd88b0ba7224158e9544..e672f484160faae900fb7f7e281d06952fd35d28:/lib/crunchrun/copier.go diff --git a/lib/crunchrun/copier.go b/lib/crunchrun/copier.go index 72c714dfa4..b411948733 100644 --- a/lib/crunchrun/copier.go +++ b/lib/crunchrun/copier.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "io" + "io/fs" "os" "path/filepath" "sort" @@ -17,6 +18,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/keepclient" "git.arvados.org/arvados.git/sdk/go/manifest" + "github.com/bmatcuk/doublestar/v4" ) type printfer interface { @@ -51,10 +53,10 @@ type filetodo struct { // manifest, err := (&copier{...}).Copy() type copier struct { client *arvados.Client - arvClient IArvadosClient keepClient IKeepClient hostOutputDir string ctrOutputDir string + globs []string bindmounts map[string]bindmount mounts map[string]arvados.Mount secretMounts map[string]arvados.Mount @@ -73,16 +75,29 @@ func (cp *copier) Copy() (string, error) { if err != nil { return "", fmt.Errorf("error scanning files to copy to output: %v", err) } - fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient) + collfs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient) if err != nil { return "", fmt.Errorf("error creating Collection.FileSystem: %v", err) } + + // Remove files/dirs that don't match globs (the ones that + // were added during cp.walkMount() by copying subtree + // manifests into cp.manifest). + err = cp.applyGlobsToCollectionFS(collfs) + if err != nil { + return "", fmt.Errorf("error while removing non-matching files from output collection: %w", err) + } + // Remove files/dirs that don't match globs (the ones that are + // stored on the local filesystem and would need to be copied + // in copyFile() below). + cp.applyGlobsToFilesAndDirs() for _, d := range cp.dirs { - err = fs.Mkdir(d, 0777) + err = collfs.Mkdir(d, 0777) if err != nil && err != os.ErrExist { return "", fmt.Errorf("error making directory %q in output collection: %v", d, err) } } + var unflushed int64 var lastparentdir string for _, f := range cp.files { @@ -92,24 +107,188 @@ func (cp *copier) Copy() (string, error) { // open so f's data can be packed with it). dir, _ := filepath.Split(f.dst) if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE { - if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil { + if err := collfs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil { return "", fmt.Errorf("error flushing output collection file data: %v", err) } unflushed = 0 } lastparentdir = dir - n, err := cp.copyFile(fs, f) + n, err := cp.copyFile(collfs, f) if err != nil { return "", fmt.Errorf("error copying file %q into output collection: %v", f, err) } unflushed += n } - return fs.MarshalManifest(".") + return collfs.MarshalManifest(".") +} + +func (cp *copier) matchGlobs(path string, isDir bool) bool { + // An entry in the top level of the output directory looks + // like "/foo", but globs look like "foo", so we strip the + // leading "/" before matching. + path = strings.TrimLeft(path, "/") + for _, glob := range cp.globs { + if !isDir && strings.HasSuffix(glob, "/**") { + // doublestar.Match("f*/**", "ff") and + // doublestar.Match("f*/**", "ff/gg") both + // return true, but (to be compatible with + // bash shopt) "ff" should match only if it is + // a directory. + // + // To avoid errant matches, we add the file's + // basename to the end of the pattern: + // + // Match("f*/**/ff", "ff") => false + // Match("f*/**/gg", "ff/gg") => true + // + // Of course, we need to escape basename in + // case it contains *, ?, \, etc. + _, name := filepath.Split(path) + escapedName := strings.TrimSuffix(strings.Replace(name, "", "\\", -1), "\\") + if match, _ := doublestar.Match(glob+"/"+escapedName, path); match { + return true + } + } else if match, _ := doublestar.Match(glob, path); match { + return true + } else if isDir { + // Workaround doublestar bug (v4.6.1). + // "foo*/**" should match "foo", but does not, + // because isZeroLengthPattern does not accept + // "*/**" as a zero length pattern. + if trunc := strings.TrimSuffix(glob, "*/**"); trunc != glob { + if match, _ := doublestar.Match(trunc, path); match { + return true + } + } + } + } + return false +} + +// Delete entries from cp.files that do not match cp.globs. +// +// Delete entries from cp.dirs that do not match cp.globs. +// +// Ensure parent/ancestor directories of remaining cp.files and +// cp.dirs entries are still present in cp.dirs, even if they do not +// match cp.globs themselves. +func (cp *copier) applyGlobsToFilesAndDirs() { + if len(cp.globs) == 0 { + return + } + keepdirs := make(map[string]bool) + for _, path := range cp.dirs { + if cp.matchGlobs(path, true) { + keepdirs[path] = true + } + } + for path := range keepdirs { + for i, c := range path { + if i > 0 && c == '/' { + keepdirs[path[:i]] = true + } + } + } + var keepfiles []filetodo + for _, file := range cp.files { + if cp.matchGlobs(file.dst, false) { + keepfiles = append(keepfiles, file) + } + } + for _, file := range keepfiles { + for i, c := range file.dst { + if i > 0 && c == '/' { + keepdirs[file.dst[:i]] = true + } + } + } + cp.dirs = nil + for path := range keepdirs { + cp.dirs = append(cp.dirs, path) + } + sort.Strings(cp.dirs) + cp.files = keepfiles +} + +// Delete files in collfs that do not match cp.globs. Also delete +// directories that are empty (after deleting non-matching files) and +// do not match cp.globs themselves. +func (cp *copier) applyGlobsToCollectionFS(collfs arvados.CollectionFileSystem) error { + if len(cp.globs) == 0 { + return nil + } + include := make(map[string]bool) + err := fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error { + if cp.matchGlobs(path, ent.IsDir()) { + for i, c := range path { + if i > 0 && c == '/' { + include[path[:i]] = true + } + } + include[path] = true + } + return nil + }) + if err != nil { + return err + } + err = fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error { + if err != nil || path == "" { + return err + } + if !include[path] { + err := collfs.RemoveAll(path) + if err != nil { + return err + } + if ent.IsDir() { + return fs.SkipDir + } + } + return nil + }) + return err +} + +// Return true if it's possible for any descendant of the given path +// to match anything in cp.globs. Used by walkMount to avoid loading +// collections that are mounted underneath ctrOutputPath but excluded +// by globs. +func (cp *copier) subtreeCouldMatch(path string) bool { + if len(cp.globs) == 0 { + return true + } + pathdepth := 1 + strings.Count(path, "/") + for _, glob := range cp.globs { + globdepth := 0 + lastsep := 0 + for i, c := range glob { + if c != '/' || !doublestar.ValidatePattern(glob[:i]) { + // Escaped "/", or "/" in a character + // class, is not a path separator. + continue + } + if glob[lastsep:i] == "**" { + return true + } + lastsep = i + 1 + if globdepth++; globdepth == pathdepth { + if match, _ := doublestar.Match(glob[:i]+"/*", path+"/z"); match { + return true + } + break + } + } + if globdepth < pathdepth && glob[lastsep:] == "**" { + return true + } + } + return false } func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) { - cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size) + cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size) dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return 0, err @@ -162,20 +341,38 @@ func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow b // copy, relative to its mount point -- ".", "./foo.txt", ... srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):]) + // outputRelPath is the destination path relative to the + // output directory. Used for logging and glob matching. + var outputRelPath = "" + if strings.HasPrefix(src, cp.ctrOutputDir) { + outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/") + } + if outputRelPath == "" { + // blank means copy a whole directory, so replace it + // with a wildcard to make it a little clearer what's + // going on since outputRelPath is only used for logging + outputRelPath = "*" + } + switch { case srcMount.ExcludeFromOutput: + case outputRelPath != "*" && !cp.subtreeCouldMatch(outputRelPath): + cp.logger.Printf("not copying %q because contents cannot match output globs", outputRelPath) + return nil case srcMount.Kind == "tmp": // Handle by walking the host filesystem. return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow) case srcMount.Kind != "collection": return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind) case !srcMount.Writable: + cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./")) mft, err := cp.getManifest(srcMount.PortableDataHash) if err != nil { return err } cp.manifest += mft.Extract(srcRelPath, dest).Text default: + cp.logger.Printf("copying %q", outputRelPath) hostRoot, err := cp.hostRoot(srcRoot) if err != nil { return err @@ -313,6 +510,8 @@ func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bo // (...except mount types that are // handled as regular files.) continue + } else if isMount && !cp.subtreeCouldMatch(src[len(cp.ctrOutputDir)+1:]) { + continue } err = cp.walkHostFS(dest, src, maxSymlinks, false) if err != nil { @@ -356,7 +555,7 @@ func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) { return mft, nil } var coll arvados.Collection - err := cp.arvClient.Get("collections", pdh, nil, &coll) + err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil) if err != nil { return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err) }