15652: Add collectionfs Flush() method to control memory use.
[arvados.git] / services / crunch-run / copier.go
index 4c45f6acb9bed4162dc9eaad28f7f6af82715b81..555a2654d8c1513d79cd99b0b493acc42ffc9319 100644 (file)
@@ -70,22 +70,33 @@ type copier struct {
 func (cp *copier) Copy() (string, error) {
        err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
        if err != nil {
-               return "", err
+               return "", fmt.Errorf("error scanning files to copy to output: %v", err)
        }
        fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
        if err != nil {
-               return "", err
+               return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
        }
        for _, d := range cp.dirs {
                err = fs.Mkdir(d, 0777)
-               if err != nil {
-                       return "", err
+               if err != nil && err != os.ErrExist {
+                       return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
                }
        }
+       var lastparentdir string
        for _, f := range cp.files {
+               // If a dir has just had its last file added, do a
+               // full Flush. Otherwise, do a partial Flush (write
+               // full-size blocks, but leave the last short block
+               // open so f's data can be packed with it).
+               dir, _ := filepath.Split(f.dst)
+               if err := fs.Flush(dir != lastparentdir); err != nil {
+                       return "", fmt.Errorf("error flushing output collection file data: %v", err)
+               }
+               lastparentdir = dir
+
                err = cp.copyFile(fs, f)
                if err != nil {
-                       return "", err
+                       return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
                }
        }
        return fs.MarshalManifest(".")