"strings"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
)
return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
}
}
+ var unflushed int64
+ var lastparentdir string
for _, f := range cp.files {
- err = cp.copyFile(fs, f)
+ // If a dir has just had its last file added, do a
+ // full Flush. Otherwise, do a partial Flush (write
+ // full-size blocks, but leave the last short block
+ // open so f's data can be packed with it).
+ dir, _ := filepath.Split(f.dst)
+ if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
+ if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
+ return "", fmt.Errorf("error flushing output collection file data: %v", err)
+ }
+ unflushed = 0
+ }
+ lastparentdir = dir
+
+ n, err := cp.copyFile(fs, f)
if err != nil {
return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
}
+ unflushed += n
}
return fs.MarshalManifest(".")
}
-func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
- return err
+ return 0, err
}
src, err := os.Open(f.src)
if err != nil {
dst.Close()
- return err
+ return 0, err
}
defer src.Close()
- _, err = io.Copy(dst, src)
+ n, err := io.Copy(dst, src)
if err != nil {
dst.Close()
- return err
+ return n, err
}
- return dst.Close()
+ return n, dst.Close()
}
// Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an