14538: Use concurrent writers to sync a directory.
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 26 Nov 2018 20:32:51 +0000 (15:32 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 28 Nov 2018 20:43:01 +0000 (15:43 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/fs_collection.go
sdk/go/arvados/throttle.go [new file with mode: 0644]

index b996542abd52cf7be04549962fdb31dfb7a366a0..afef1e391e30f90423c8e6c3dbc82246b82553b8 100644 (file)
@@ -20,6 +20,8 @@ import (
 
 var maxBlockSize = 1 << 26
 
+var concurrentWriters = 4
+
 // A CollectionFileSystem is a FileSystem that can be serialized as a
 // manifest and stored as a collection.
 type CollectionFileSystem interface {
@@ -136,7 +138,7 @@ func (fs *collectionFileSystem) Sync() error {
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
-       return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
+       return fs.fileSystem.root.(*dirnode).marshalManifest(prefix, newThrottle(concurrentWriters))
 }
 
 func (fs *collectionFileSystem) Size() int64 {
@@ -550,7 +552,7 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
 // children with the given names, which must be children of dn) to
 // local persistent storage. Caller must have write lock on dn and the
 // named children.
-func (dn *dirnode) sync(names []string) error {
+func (dn *dirnode) sync(names []string, throttle *throttle) error {
        type shortBlock struct {
                fn  *filenode
                idx int
@@ -558,17 +560,25 @@ func (dn *dirnode) sync(names []string) error {
        var pending []shortBlock
        var pendingLen int
 
-       flush := func(sbs []shortBlock) error {
+       var wg sync.WaitGroup
+       errors := make(chan error, 1)
+       flush := func(sbs []shortBlock) {
+               defer wg.Done()
                if len(sbs) == 0 {
-                       return nil
+                       return
                }
+               throttle.Acquire()
+               defer throttle.Release()
                block := make([]byte, 0, maxBlockSize)
                for _, sb := range sbs {
                        block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
                }
                locator, _, err := dn.fs.PutB(block)
                if err != nil {
-                       return err
+                       select {
+                       case errors <- err:
+                       default:
+                       }
                }
                off := 0
                for _, sb := range sbs {
@@ -583,7 +593,6 @@ func (dn *dirnode) sync(names []string) error {
                        off += len(data)
                        sb.fn.memsize -= int64(len(data))
                }
-               return nil
        }
 
        localLocator := map[string]string{}
@@ -608,15 +617,13 @@ func (dn *dirnode) sync(names []string) error {
                                fn.segments[idx] = seg
                        case *memSegment:
                                if seg.Len() > maxBlockSize/2 {
-                                       if err := flush([]shortBlock{{fn, idx}}); err != nil {
-                                               return err
-                                       }
+                                       wg.Add(1)
+                                       go flush([]shortBlock{{fn, idx}})
                                        continue
                                }
                                if pendingLen+seg.Len() > maxBlockSize {
-                                       if err := flush(pending); err != nil {
-                                               return err
-                                       }
+                                       wg.Add(1)
+                                       go flush(pending)
                                        pending = nil
                                        pendingLen = 0
                                }
@@ -627,11 +634,15 @@ func (dn *dirnode) sync(names []string) error {
                        }
                }
        }
-       return flush(pending)
+       wg.Add(1)
+       flush(pending)
+       wg.Wait()
+       close(errors)
+       return <-errors
 }
 
 // caller must have write lock.
-func (dn *dirnode) marshalManifest(prefix string) (string, error) {
+func (dn *dirnode) marshalManifest(prefix string, throttle *throttle) (string, error) {
        var streamLen int64
        type filepart struct {
                name   string
@@ -663,13 +674,13 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
                node.Lock()
                defer node.Unlock()
        }
-       if err := dn.sync(names); err != nil {
+       if err := dn.sync(names, throttle); err != nil {
                return "", err
        }
        for _, name := range names {
                switch node := dn.inodes[name].(type) {
                case *dirnode:
-                       subdir, err := node.marshalManifest(prefix + "/" + name)
+                       subdir, err := node.marshalManifest(prefix+"/"+name, throttle)
                        if err != nil {
                                return "", err
                        }
diff --git a/sdk/go/arvados/throttle.go b/sdk/go/arvados/throttle.go
new file mode 100644 (file)
index 0000000..464b73b
--- /dev/null
@@ -0,0 +1,17 @@
+package arvados
+
+type throttle struct {
+       c chan struct{}
+}
+
+func newThrottle(n int) *throttle {
+       return &throttle{c: make(chan struct{}, n)}
+}
+
+func (t *throttle) Acquire() {
+       t.c <- struct{}{}
+}
+
+func (t *throttle) Release() {
+       <-t.c
+}