Show initial fuse mount info when starting tests.
[arvados.git] / services / crunch-run / copier.go
index 4a6253c459de53808203bd97499da561407b07a5..f6a64a6217f1f9c80c8e90a3756f5238fd796f06 100644 (file)
@@ -15,6 +15,7 @@ import (
        "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
@@ -70,45 +71,61 @@ type copier struct {
 func (cp *copier) Copy() (string, error) {
        err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
        if err != nil {
-               return "", fmt.Errorf("in walkMount: %v", err)
+               return "", fmt.Errorf("error scanning files to copy to output: %v", err)
        }
        fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
        if err != nil {
-               return "", fmt.Errorf("creating Collection.FileSystem: %v", err)
+               return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
        }
        for _, d := range cp.dirs {
                err = fs.Mkdir(d, 0777)
                if err != nil && err != os.ErrExist {
-                       return "", fmt.Errorf("Could not Mkdir %v: %v", d, err)
+                       return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
                }
        }
+       var unflushed int64
+       var lastparentdir string
        for _, f := range cp.files {
-               err = cp.copyFile(fs, f)
+               // If a dir has just had its last file added, do a
+               // full Flush. Otherwise, do a partial Flush (write
+               // full-size blocks, but leave the last short block
+               // open so f's data can be packed with it).
+               dir, _ := filepath.Split(f.dst)
+               if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
+                       if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
+                               return "", fmt.Errorf("error flushing output collection file data: %v", err)
+                       }
+                       unflushed = 0
+               }
+               lastparentdir = dir
+
+               n, err := cp.copyFile(fs, f)
                if err != nil {
-                       return "", fmt.Errorf("Could not copyFile %v: %v", f, err)
+                       return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
                }
+               unflushed += n
        }
        return fs.MarshalManifest(".")
 }
 
-func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
        cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
        dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
        if err != nil {
-               return err
+               return 0, err
        }
        src, err := os.Open(f.src)
        if err != nil {
                dst.Close()
-               return err
+               return 0, err
        }
        defer src.Close()
-       _, err = io.Copy(dst, src)
+       n, err := io.Copy(dst, src)
        if err != nil {
                dst.Close()
-               return err
+               return n, err
        }
-       return dst.Close()
+       return n, dst.Close()
 }
 
 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an