15652: Limit concurrent writes per filesystem, not per flush.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 22 Oct 2019 15:04:49 +0000 (11:04 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 22 Oct 2019 15:04:49 +0000 (11:04 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_site.go

index 359e6b67e646596c053c62a042134fdd3d362986..d06aba3695adc37f3d74057de2568778bcd9f9c9 100644 (file)
@@ -58,6 +58,9 @@ type FileSystem interface {
        // while locking multiple inodes.
        locker() sync.Locker
 
+       // throttle for limiting concurrent background writers
+       throttle() *throttle
+
        // create a new node with nil parent.
        newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
 
@@ -300,12 +303,17 @@ type fileSystem struct {
        root inode
        fsBackend
        mutex sync.Mutex
+       thr   *throttle
 }
 
 func (fs *fileSystem) rootnode() inode {
        return fs.root
 }
 
+func (fs *fileSystem) throttle() *throttle {
+       return fs.thr
+}
+
 func (fs *fileSystem) locker() sync.Locker {
        return &fs.mutex
 }
index 0722da6b0491873e0a80dea9094e7b558efb6821..386408a109ed01c062dc56297ce02acc38d1e59a 100644 (file)
@@ -21,8 +21,7 @@ import (
 
 var (
        maxBlockSize      = 1 << 26
-       concurrentWriters = 4 // max goroutines writing to Keep during flush()
-       writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
+       concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
 )
 
 // A CollectionFileSystem is a FileSystem that can be serialized as a
@@ -60,6 +59,7 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
                uuid: c.UUID,
                fileSystem: fileSystem{
                        fsBackend: keepBackend{apiClient: client, keepClient: kc},
+                       thr:       newThrottle(concurrentWriters),
                },
        }
        root := &dirnode{
@@ -170,7 +170,7 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
                }
                names = filenames
        }
-       return dn.flush(context.TODO(), newThrottle(concurrentWriters), names, flushOpts{sync: false, shortBlocks: shortBlocks})
+       return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
 }
 
 func (fs *collectionFileSystem) memorySize() int64 {
@@ -182,7 +182,7 @@ func (fs *collectionFileSystem) memorySize() int64 {
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
-       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
+       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
 }
 
 func (fs *collectionFileSystem) Size() int64 {
@@ -274,7 +274,6 @@ type filenode struct {
        memsize  int64 // bytes in memSegments
        sync.RWMutex
        nullnode
-       throttle *throttle
 }
 
 // caller must have lock
@@ -539,10 +538,6 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 func (fn *filenode) pruneMemSegments() {
        // TODO: share code with (*dirnode)flush()
        // TODO: pack/flush small blocks too, when fragmented
-       if fn.throttle == nil {
-               // TODO: share a throttle with filesystem
-               fn.throttle = newThrottle(writeAheadBlocks)
-       }
        for idx, seg := range fn.segments {
                seg, ok := seg.(*memSegment)
                if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
@@ -558,11 +553,11 @@ func (fn *filenode) pruneMemSegments() {
                // progress, block here until one finishes, rather
                // than pile up an unlimited number of buffered writes
                // and network flush operations.
-               fn.throttle.Acquire()
+               fn.fs.throttle().Acquire()
                go func() {
                        defer close(done)
                        locator, _, err := fn.FS().PutB(buf)
-                       fn.throttle.Release()
+                       fn.fs.throttle().Release()
                        fn.Lock()
                        defer fn.Unlock()
                        if seg.flushing != done {
@@ -763,7 +758,7 @@ type flushOpts struct {
 // Caller must have write lock on dn and the named children.
 //
 // If any children are dirs, they will be flushed recursively.
-func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string, opts flushOpts) error {
+func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
        cg := newContextGroup(ctx)
        defer cg.Cancel()
 
@@ -772,8 +767,8 @@ func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string
                        return
                }
                cg.Go(func() error {
-                       throttle.Acquire()
-                       defer throttle.Release()
+                       dn.fs.throttle().Acquire()
+                       defer dn.fs.throttle().Release()
                        return dn.commitBlock(cg.Context(), refs, opts.sync)
                })
        }
@@ -790,7 +785,7 @@ func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string
                                grandchild.Lock()
                                defer grandchild.Unlock()
                        }
-                       cg.Go(func() error { return node.flush(cg.Context(), throttle, grandchildNames, opts) })
+                       cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
                case *filenode:
                        for idx, seg := range node.segments {
                                switch seg := seg.(type) {
@@ -862,7 +857,7 @@ func (dn *dirnode) sortedNames() []string {
 }
 
 // caller must have write lock.
-func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
        cg := newContextGroup(ctx)
        defer cg.Cancel()
 
@@ -909,7 +904,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
        for i, name := range dirnames {
                i, name := i, name
                cg.Go(func() error {
-                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
                        subdirs[i] = txt
                        return err
                })
@@ -925,7 +920,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 
                var fileparts []filepart
                var blocks []string
-               if err := dn.flush(cg.Context(), throttle, filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
+               if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
                        return err
                }
                for _, name := range filenames {
index edc35ab9bb6b22c6c22d106f4fee9b28533da234..5f8d67510f06c94174b05a5659051b6aea16d6f1 100644 (file)
@@ -1040,11 +1040,11 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
-       defer func(wab, mbs int) {
-               writeAheadBlocks = wab
+       defer func(cw, mbs int) {
+               concurrentWriters = cw
                maxBlockSize = mbs
-       }(writeAheadBlocks, maxBlockSize)
-       writeAheadBlocks = 2
+       }(concurrentWriters, maxBlockSize)
+       concurrentWriters = 2
        maxBlockSize = 1024
 
        proceed := make(chan struct{})
@@ -1069,7 +1069,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
                default:
                        time.Sleep(time.Millisecond)
                }
-               c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+               c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
        }
 
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
index 82114e2ea9ed54ac89cc94dd387d73711efae561..4264be4fa600be7abb05351f506feb00dccac6cc 100644 (file)
@@ -21,6 +21,7 @@ type CustomFileSystem interface {
 type customFileSystem struct {
        fileSystem
        root *vdirnode
+       thr  *throttle
 
        staleThreshold time.Time
        staleLock      sync.Mutex
@@ -33,6 +34,7 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
                fileSystem: fileSystem{
                        fsBackend: keepBackend{apiClient: c, keepClient: kc},
                        root:      root,
+                       thr:       newThrottle(concurrentWriters),
                },
        }
        root.inode = &treenode{