15652: Only flush the current dir, and only once per 64 MiB.
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 17 Oct 2019 14:01:47 +0000 (10:01 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 17 Oct 2019 14:01:47 +0000 (10:01 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
services/crunch-run/copier.go

index f5916f9571d1743f215f97b866a0f871b7c7806d..359e6b67e646596c053c62a042134fdd3d362986 100644 (file)
@@ -95,8 +95,10 @@ type FileSystem interface {
        // for all writes to finish before returning. If shortBlocks
        // is true, flush everything; otherwise, if there's less than
        // a full block of buffered data at the end of a stream, leave
-       // it buffered in memory in case more data can be appended.
-       Flush(shortBlocks bool) error
+       // it buffered in memory in case more data can be appended. If
+       // path is "", flush all dirs/streams; otherwise, flush only
+       // the specified dir/stream.
+       Flush(path string, shortBlocks bool) error
 }
 
 type inode interface {
@@ -570,7 +572,7 @@ func (fs *fileSystem) Sync() error {
        return ErrInvalidOperation
 }
 
-func (fs *fileSystem) Flush(bool) error {
+func (fs *fileSystem) Flush(string, bool) error {
        log.Printf("TODO: flush fileSystem")
        return ErrInvalidOperation
 }
index 543d89385b01f1df43b4576107a6e733cf33a0a0..0722da6b0491873e0a80dea9094e7b558efb6821 100644 (file)
@@ -146,11 +146,31 @@ func (fs *collectionFileSystem) Sync() error {
        return nil
 }
 
-func (fs *collectionFileSystem) Flush(shortBlocks bool) error {
-       fs.fileSystem.root.Lock()
-       defer fs.fileSystem.root.Unlock()
-       dn := fs.fileSystem.root.(*dirnode)
-       return dn.flush(context.TODO(), newThrottle(concurrentWriters), dn.sortedNames(), flushOpts{sync: false, shortBlocks: shortBlocks})
+func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
+       node, err := rlookup(fs.fileSystem.root, path)
+       if err != nil {
+               return err
+       }
+       dn, ok := node.(*dirnode)
+       if !ok {
+               return ErrNotADirectory
+       }
+       dn.Lock()
+       defer dn.Unlock()
+       names := dn.sortedNames()
+       if path != "" {
+               // Caller only wants to flush the specified dir,
+               // non-recursively.  Drop subdirs from the list of
+               // names.
+               var filenames []string
+               for _, name := range names {
+                       if _, ok := dn.inodes[name].(*filenode); ok {
+                               filenames = append(filenames, name)
+                       }
+               }
+               names = filenames
+       }
+       return dn.flush(context.TODO(), newThrottle(concurrentWriters), names, flushOpts{sync: false, shortBlocks: shortBlocks})
 }
 
 func (fs *collectionFileSystem) memorySize() int64 {
index fe3ad7a1e9a44041258e1b099733e75323f4270a..edc35ab9bb6b22c6c22d106f4fee9b28533da234 100644 (file)
@@ -1139,7 +1139,7 @@ func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
                }
 
                if i%8 == 0 {
-                       fs.Flush(true)
+                       fs.Flush("", true)
                }
 
                size := fs.memorySize()
@@ -1164,9 +1164,9 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
                atomic.AddInt64(&flushed, int64(len(p)))
        }
 
-       nDirs := 8
+       nDirs := int64(8)
        megabyte := make([]byte, 1<<20)
-       for i := 0; i < nDirs; i++ {
+       for i := int64(0); i < nDirs; i++ {
                dir := fmt.Sprintf("dir%d", i)
                fs.Mkdir(dir, 0755)
                for j := 0; j < 67; j++ {
@@ -1180,14 +1180,35 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
        c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
        c.Check(flushed, check.Equals, int64(0))
 
-       fs.Flush(false)
-       expectSize := int64(nDirs * 3 << 20)
-
-       // Wait for flush to finish
-       for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectSize && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+       waitForFlush := func(expectUnflushed, expectFlushed int64) {
+               for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+               }
+               c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+               c.Check(flushed, check.Equals, expectFlushed)
        }
-       c.Check(fs.memorySize(), check.Equals, expectSize)
-       c.Check(flushed, check.Equals, int64(nDirs*64<<20))
+
+       // Nothing flushed yet
+       waitForFlush((nDirs*67)<<20, 0)
+
+       // Flushing a non-empty dir "/" is non-recursive and there are
+       // no top-level files, so this has no effect
+       fs.Flush("/", false)
+       waitForFlush((nDirs*67)<<20, 0)
+
+       // Flush the full block in dir0
+       fs.Flush("dir0", false)
+       waitForFlush((nDirs*67-64)<<20, 64<<20)
+
+       err = fs.Flush("dir-does-not-exist", false)
+       c.Check(err, check.NotNil)
+
+       // Flush full blocks in all dirs
+       fs.Flush("", false)
+       waitForFlush(nDirs*3<<20, nDirs*64<<20)
+
+       // Flush non-full blocks, too
+       fs.Flush("", true)
+       waitForFlush(0, nDirs*67<<20)
 }
 
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
index 555a2654d8c1513d79cd99b0b493acc42ffc9319..317db1a7f6184f64086accf42ab784ff15176fa6 100644 (file)
@@ -82,6 +82,7 @@ func (cp *copier) Copy() (string, error) {
                        return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
                }
        }
+       var unflushed int64
        var lastparentdir string
        for _, f := range cp.files {
                // If a dir has just had its last file added, do a
@@ -89,37 +90,41 @@ func (cp *copier) Copy() (string, error) {
                // full-size blocks, but leave the last short block
                // open so f's data can be packed with it).
                dir, _ := filepath.Split(f.dst)
-               if err := fs.Flush(dir != lastparentdir); err != nil {
-                       return "", fmt.Errorf("error flushing output collection file data: %v", err)
+               if dir != lastparentdir || unflushed > 1<<26 {
+                       if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
+                               return "", fmt.Errorf("error flushing output collection file data: %v", err)
+                       }
+                       unflushed = 0
                }
                lastparentdir = dir
 
-               err = cp.copyFile(fs, f)
+               n, err := cp.copyFile(fs, f)
                if err != nil {
                        return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
                }
+               unflushed += n
        }
        return fs.MarshalManifest(".")
 }
 
-func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
        cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
        dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
        if err != nil {
-               return err
+               return 0, err
        }
        src, err := os.Open(f.src)
        if err != nil {
                dst.Close()
-               return err
+               return 0, err
        }
        defer src.Close()
-       _, err = io.Copy(dst, src)
+       n, err := io.Copy(dst, src)
        if err != nil {
                dst.Close()
-               return err
+               return n, err
        }
-       return dst.Close()
+       return n, dst.Close()
 }
 
 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an