15652: Add collectionfs Flush() method to control memory use.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 15 Oct 2019 15:51:25 +0000 (11:51 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 15 Oct 2019 15:51:25 +0000 (11:51 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
services/crunch-run/copier.go

index 3058a7609c6665dfd22c2ee6bb205685c97afe7d..f5916f9571d1743f215f97b866a0f871b7c7806d 100644 (file)
@@ -86,7 +86,17 @@ type FileSystem interface {
        Remove(name string) error
        RemoveAll(name string) error
        Rename(oldname, newname string) error
+
+       // Write buffered data from memory to storage, returning when
+       // all updates have been saved to persistent storage.
        Sync() error
+
+       // Write buffered data from memory to storage, but don't wait
+       // for all writes to finish before returning. If shortBlocks
+       // is true, flush everything; otherwise, if there's less than
+       // a full block of buffered data at the end of a stream, leave
+       // it buffered in memory in case more data can be appended.
+       Flush(shortBlocks bool) error
 }
 
 type inode interface {
@@ -560,6 +570,11 @@ func (fs *fileSystem) Sync() error {
        return ErrInvalidOperation
 }
 
+func (fs *fileSystem) Flush(bool) error {
+       log.Printf("TODO: flush fileSystem")
+       return ErrInvalidOperation
+}
+
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
index 972b3979fcfa4dd7fdb3cde62a90eacd37b27c56..3a45dda29467047b5eacdd9a544449aa6bd3ef68 100644 (file)
@@ -21,7 +21,7 @@ import (
 
 var (
        maxBlockSize      = 1 << 26
-       concurrentWriters = 4 // max goroutines writing to Keep during sync()
+       concurrentWriters = 4 // max goroutines writing to Keep during flush()
        writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
 )
 
@@ -38,6 +38,9 @@ type CollectionFileSystem interface {
 
        // Total data bytes in all files.
        Size() int64
+
+       // Memory consumed by buffered file data.
+       memorySize() int64
 }
 
 type collectionFileSystem struct {
@@ -143,6 +146,19 @@ func (fs *collectionFileSystem) Sync() error {
        return nil
 }
 
+func (fs *collectionFileSystem) Flush(shortBlocks bool) error {
+       fs.fileSystem.root.Lock()
+       defer fs.fileSystem.root.Unlock()
+       dn := fs.fileSystem.root.(*dirnode)
+       return dn.flush(context.TODO(), newThrottle(concurrentWriters), dn.sortedNames(), flushOpts{sync: false, shortBlocks: shortBlocks})
+}
+
+func (fs *collectionFileSystem) memorySize() int64 {
+       fs.fileSystem.root.Lock()
+       defer fs.fileSystem.root.Unlock()
+       return fs.fileSystem.root.(*dirnode).memorySize()
+}
+
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
@@ -501,7 +517,7 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // Write some data out to disk to reduce memory use. Caller must have
 // write lock.
 func (fn *filenode) pruneMemSegments() {
-       // TODO: share code with (*dirnode)sync()
+       // TODO: share code with (*dirnode)flush()
        // TODO: pack/flush small blocks too, when fragmented
        if fn.throttle == nil {
                // TODO: share a throttle with filesystem
@@ -529,7 +545,7 @@ func (fn *filenode) pruneMemSegments() {
                        fn.throttle.Release()
                        fn.Lock()
                        defer fn.Unlock()
-                       if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+                       if seg.flushing != done {
                                // A new seg.buf has been allocated.
                                return
                        }
@@ -556,8 +572,8 @@ func (fn *filenode) pruneMemSegments() {
        }
 }
 
-// Block until all pending pruneMemSegments work is finished. Caller
-// must NOT have lock.
+// Block until all pending pruneMemSegments/flush work is
+// finished. Caller must NOT have lock.
 func (fn *filenode) waitPrune() {
        var pending []<-chan struct{}
        fn.Lock()
@@ -613,51 +629,132 @@ type fnSegmentRef struct {
 // storedSegments that reference the relevant portions of the new
 // block.
 //
+// If sync is false, commitBlock returns right away, after starting a
+// goroutine to do the writes, reacquire the filenodes' locks, and
+// swap out the *memSegments. Some filenodes' segments might get
+// modified/rearranged in the meantime, in which case commitBlock
+// won't replace them.
+//
 // Caller must have write lock.
-func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
-       if len(refs) == 0 {
-               return nil
-       }
-       throttle.Acquire()
-       defer throttle.Release()
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error {
        if err := ctx.Err(); err != nil {
                return err
        }
+       done := make(chan struct{})
        block := make([]byte, 0, maxBlockSize)
+       segs := make([]*memSegment, 0, len(refs))
        for _, ref := range refs {
-               block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
-       }
-       locator, _, err := dn.fs.PutB(block)
-       if err != nil {
-               return err
+               seg := ref.fn.segments[ref.idx].(*memSegment)
+               if seg.flushing != nil && !sync {
+                       // Let the other flushing goroutine finish. If
+                       // it fails, we'll try again next time.
+                       return nil
+               } else {
+                       // In sync mode, we proceed regardless of
+                       // whether another flush is in progress: It
+                       // can't finish before we do, because we hold
+                       // fn's lock until we finish our own writes.
+               }
+               seg.flushing = done
+               block = append(block, seg.buf...)
+               segs = append(segs, seg)
        }
-       off := 0
-       for _, ref := range refs {
-               data := ref.fn.segments[ref.idx].(*memSegment).buf
-               ref.fn.segments[ref.idx] = storedSegment{
-                       kc:      dn.fs,
-                       locator: locator,
-                       size:    len(block),
-                       offset:  off,
-                       length:  len(data),
+       errs := make(chan error, 1)
+       go func() {
+               defer close(done)
+               defer close(errs)
+               locked := map[*filenode]bool{}
+               locator, _, err := dn.fs.PutB(block)
+               {
+                       if !sync {
+                               for _, name := range dn.sortedNames() {
+                                       if fn, ok := dn.inodes[name].(*filenode); ok {
+                                               fn.Lock()
+                                               defer fn.Unlock()
+                                               locked[fn] = true
+                                       }
+                               }
+                       }
+                       defer func() {
+                               for _, seg := range segs {
+                                       if seg.flushing == done {
+                                               seg.flushing = nil
+                                       }
+                               }
+                       }()
+               }
+               if err != nil {
+                       errs <- err
+                       return
                }
-               off += len(data)
-               ref.fn.memsize -= int64(len(data))
+               off := 0
+               for idx, ref := range refs {
+                       if !sync {
+                               // In async mode, fn's lock was
+                               // released while we were waiting for
+                               // PutB(); lots of things might have
+                               // changed.
+                               if len(ref.fn.segments) <= ref.idx {
+                                       // file segments have
+                                       // rearranged or changed in
+                                       // some way
+                                       continue
+                               } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
+                                       // segment has been replaced
+                                       continue
+                               } else if seg.flushing != done {
+                                       // seg.buf has been replaced
+                                       continue
+                               } else if !locked[ref.fn] {
+                                       // file was renamed, moved, or
+                                       // deleted since we called
+                                       // PutB
+                                       continue
+                               }
+                       }
+                       data := ref.fn.segments[ref.idx].(*memSegment).buf
+                       ref.fn.segments[ref.idx] = storedSegment{
+                               kc:      dn.fs,
+                               locator: locator,
+                               size:    len(block),
+                               offset:  off,
+                               length:  len(data),
+                       }
+                       off += len(data)
+                       ref.fn.memsize -= int64(len(data))
+               }
+       }()
+       if sync {
+               return <-errs
+       } else {
+               return nil
        }
-       return nil
 }
 
-// sync flushes in-memory data and remote block references (for the
+type flushOpts struct {
+       sync        bool
+       shortBlocks bool
+}
+
+// flush in-memory data and remote-cluster block references (for the
 // children with the given names, which must be children of dn) to
-// local persistent storage. Caller must have write lock on dn and the
-// named children.
-func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+// local-cluster persistent storage.
+//
+// Caller must have write lock on dn and the named children.
+//
+// If any children are dirs, they will be flushed recursively.
+func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string, opts flushOpts) error {
        cg := newContextGroup(ctx)
        defer cg.Cancel()
 
        goCommit := func(refs []fnSegmentRef) {
+               if len(refs) == 0 {
+                       return
+               }
                cg.Go(func() error {
-                       return dn.commitBlock(cg.Context(), throttle, refs)
+                       throttle.Acquire()
+                       defer throttle.Release()
+                       return dn.commitBlock(cg.Context(), refs, opts.sync)
                })
        }
 
@@ -665,45 +762,85 @@ func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string)
        var pendingLen int = 0
        localLocator := map[string]string{}
        for _, name := range names {
-               fn, ok := dn.inodes[name].(*filenode)
-               if !ok {
-                       continue
-               }
-               for idx, seg := range fn.segments {
-                       switch seg := seg.(type) {
-                       case storedSegment:
-                               loc, ok := localLocator[seg.locator]
-                               if !ok {
-                                       var err error
-                                       loc, err = dn.fs.LocalLocator(seg.locator)
-                                       if err != nil {
-                                               return err
+               switch node := dn.inodes[name].(type) {
+               case *dirnode:
+                       names := node.sortedNames()
+                       for _, name := range names {
+                               child := node.inodes[name]
+                               child.Lock()
+                               defer child.Unlock()
+                       }
+                       cg.Go(func() error { return node.flush(cg.Context(), throttle, node.sortedNames(), opts) })
+               case *filenode:
+                       for idx, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case storedSegment:
+                                       loc, ok := localLocator[seg.locator]
+                                       if !ok {
+                                               var err error
+                                               loc, err = dn.fs.LocalLocator(seg.locator)
+                                               if err != nil {
+                                                       return err
+                                               }
+                                               localLocator[seg.locator] = loc
                                        }
-                                       localLocator[seg.locator] = loc
-                               }
-                               seg.locator = loc
-                               fn.segments[idx] = seg
-                       case *memSegment:
-                               if seg.Len() > maxBlockSize/2 {
-                                       goCommit([]fnSegmentRef{{fn, idx}})
-                                       continue
-                               }
-                               if pendingLen+seg.Len() > maxBlockSize {
-                                       goCommit(pending)
-                                       pending = nil
-                                       pendingLen = 0
+                                       seg.locator = loc
+                                       node.segments[idx] = seg
+                               case *memSegment:
+                                       if seg.Len() > maxBlockSize/2 {
+                                               goCommit([]fnSegmentRef{{node, idx}})
+                                               continue
+                                       }
+                                       if pendingLen+seg.Len() > maxBlockSize {
+                                               goCommit(pending)
+                                               pending = nil
+                                               pendingLen = 0
+                                       }
+                                       pending = append(pending, fnSegmentRef{node, idx})
+                                       pendingLen += seg.Len()
+                               default:
+                                       panic(fmt.Sprintf("can't sync segment type %T", seg))
                                }
-                               pending = append(pending, fnSegmentRef{fn, idx})
-                               pendingLen += seg.Len()
-                       default:
-                               panic(fmt.Sprintf("can't sync segment type %T", seg))
                        }
                }
        }
-       goCommit(pending)
+       if opts.shortBlocks {
+               goCommit(pending)
+       }
        return cg.Wait()
 }
 
+// caller must have write lock.
+func (dn *dirnode) memorySize() (size int64) {
+       for _, name := range dn.sortedNames() {
+               node := dn.inodes[name]
+               node.Lock()
+               defer node.Unlock()
+               switch node := node.(type) {
+               case *dirnode:
+                       size += node.memorySize()
+               case *filenode:
+                       for _, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case *memSegment:
+                                       size += int64(seg.Len())
+                               }
+                       }
+               }
+       }
+       return
+}
+
+// caller must have write lock.
+func (dn *dirnode) sortedNames() []string {
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+       return names
+}
+
 // caller must have write lock.
 func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
        cg := newContextGroup(ctx)
@@ -720,11 +857,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
                return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
        }
 
-       names := make([]string, 0, len(dn.inodes))
-       for name := range dn.inodes {
-               names = append(names, name)
-       }
-       sort.Strings(names)
+       names := dn.sortedNames()
 
        // Wait for children to finish any pending write operations
        // before locking them.
@@ -772,7 +905,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 
                var fileparts []filepart
                var blocks []string
-               if err := dn.sync(cg.Context(), throttle, names); err != nil {
+               if err := dn.flush(cg.Context(), throttle, filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
                        return err
                }
                for _, name := range filenames {
@@ -805,7 +938,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
                                default:
                                        // This can't happen: we
                                        // haven't unlocked since
-                                       // calling sync().
+                                       // calling flush(sync=true).
                                        panic(fmt.Sprintf("can't marshal segment type %T", seg))
                                }
                        }
index 7fd03b120a7f34240393f884f88992b885499e1f..fe3ad7a1e9a44041258e1b099733e75323f4270a 100644 (file)
@@ -535,7 +535,7 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
        }
 
        maxBlockSize = 8
-       defer func() { maxBlockSize = 2 << 26 }()
+       defer func() { maxBlockSize = 1 << 26 }()
 
        var wg sync.WaitGroup
        for n := 0; n < 128; n++ {
@@ -1039,7 +1039,7 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
        c.Check(err, check.ErrorMatches, `invalid flag.*`)
 }
 
-func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
        defer func(wab, mbs int) {
                writeAheadBlocks = wab
                maxBlockSize = mbs
@@ -1105,6 +1105,91 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
        c.Check(currentMemExtents(), check.HasLen, 0)
 }
 
+// Ensure blocks get flushed to disk if a lot of data is written to
+// small files/directories without calling sync().
+//
+// Write four 512KiB files into each of 256 top-level dirs (total
+// 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
+// exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
+// 2MiB).
+func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       s.kc.onPut = func([]byte) {
+               // discard flushed data -- otherwise the stub will use
+               // unlimited memory
+               time.Sleep(time.Millisecond)
+               s.kc.Lock()
+               defer s.kc.Unlock()
+               s.kc.blocks = map[string][]byte{}
+       }
+       for i := 0; i < 256; i++ {
+               buf := bytes.NewBuffer(make([]byte, 524288))
+               fmt.Fprintf(buf, "test file in dir%d", i)
+
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               for j := 0; j < 2; j++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       _, err = io.Copy(f, buf)
+                       c.Assert(err, check.IsNil)
+               }
+
+               if i%8 == 0 {
+                       fs.Flush(true)
+               }
+
+               size := fs.memorySize()
+               if !c.Check(size <= 1<<24, check.Equals, true) {
+                       c.Logf("at dir%d fs.memorySize()=%d", i, size)
+                       return
+               }
+       }
+}
+
+// Ensure short blocks at the end of a stream don't get flushed by
+// Flush(false).
+//
+// Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
+// blocks have been flushed while 8x 3MiB is still buffered in memory.
+func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       var flushed int64
+       s.kc.onPut = func(p []byte) {
+               atomic.AddInt64(&flushed, int64(len(p)))
+       }
+
+       nDirs := 8
+       megabyte := make([]byte, 1<<20)
+       for i := 0; i < nDirs; i++ {
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               for j := 0; j < 67; j++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       _, err = f.Write(megabyte)
+                       c.Assert(err, check.IsNil)
+               }
+       }
+       c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+       c.Check(flushed, check.Equals, int64(0))
+
+       fs.Flush(false)
+       expectSize := int64(nDirs * 3 << 20)
+
+       // Wait for flush to finish
+       for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectSize && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+       }
+       c.Check(fs.memorySize(), check.Equals, expectSize)
+       c.Check(flushed, check.Equals, int64(nDirs*64<<20))
+}
+
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
        for _, txt := range []string{
                "\n",
index 3f529f6313b9ee55483efaa21367572bd54207a4..555a2654d8c1513d79cd99b0b493acc42ffc9319 100644 (file)
@@ -82,7 +82,18 @@ func (cp *copier) Copy() (string, error) {
                        return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
                }
        }
+       var lastparentdir string
        for _, f := range cp.files {
+               // If a dir has just had its last file added, do a
+               // full Flush. Otherwise, do a partial Flush (write
+               // full-size blocks, but leave the last short block
+               // open so f's data can be packed with it).
+               dir, _ := filepath.Split(f.dst)
+               if err := fs.Flush(dir != lastparentdir); err != nil {
+                       return "", fmt.Errorf("error flushing output collection file data: %v", err)
+               }
+               lastparentdir = dir
+
                err = cp.copyFile(fs, f)
                if err != nil {
                        return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)