X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b66b029bef5b4b0f54d204318a8928b7a6977219..916ec66d0caeeb37983043810bea22e0bc41751f:/services/crunch-run/copier.go diff --git a/services/crunch-run/copier.go b/services/crunch-run/copier.go index 4a6253c459..f6a64a6217 100644 --- a/services/crunch-run/copier.go +++ b/services/crunch-run/copier.go @@ -15,6 +15,7 @@ import ( "strings" "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" ) @@ -70,45 +71,61 @@ type copier struct { func (cp *copier) Copy() (string, error) { err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true) if err != nil { - return "", fmt.Errorf("in walkMount: %v", err) + return "", fmt.Errorf("error scanning files to copy to output: %v", err) } fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient) if err != nil { - return "", fmt.Errorf("creating Collection.FileSystem: %v", err) + return "", fmt.Errorf("error creating Collection.FileSystem: %v", err) } for _, d := range cp.dirs { err = fs.Mkdir(d, 0777) if err != nil && err != os.ErrExist { - return "", fmt.Errorf("Could not Mkdir %v: %v", d, err) + return "", fmt.Errorf("error making directory %q in output collection: %v", d, err) } } + var unflushed int64 + var lastparentdir string for _, f := range cp.files { - err = cp.copyFile(fs, f) + // If a dir has just had its last file added, do a + // full Flush. Otherwise, do a partial Flush (write + // full-size blocks, but leave the last short block + // open so f's data can be packed with it). + dir, _ := filepath.Split(f.dst) + if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE { + if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil { + return "", fmt.Errorf("error flushing output collection file data: %v", err) + } + unflushed = 0 + } + lastparentdir = dir + + n, err := cp.copyFile(fs, f) if err != nil { - return "", fmt.Errorf("Could not copyFile %v: %v", f, err) + return "", fmt.Errorf("error copying file %q into output collection: %v", f, err) } + unflushed += n } return fs.MarshalManifest(".") } -func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error { +func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) { cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size) dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666) if err != nil { - return err + return 0, err } src, err := os.Open(f.src) if err != nil { dst.Close() - return err + return 0, err } defer src.Close() - _, err = io.Copy(dst, src) + n, err := io.Copy(dst, src) if err != nil { dst.Close() - return err + return n, err } - return dst.Close() + return n, dst.Close() } // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an