14538: Use concurrent writers to sync multiple streams.
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 26 Nov 2018 21:31:26 +0000 (16:31 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 28 Nov 2018 20:43:04 +0000 (15:43 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/fs_collection.go

index afef1e391e30f90423c8e6c3dbc82246b82553b8..0a7f408f8f2bbff4e720f5cb5c4651b04fb1af36 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "io"
@@ -138,7 +139,7 @@ func (fs *collectionFileSystem) Sync() error {
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
-       return fs.fileSystem.root.(*dirnode).marshalManifest(prefix, newThrottle(concurrentWriters))
+       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
 }
 
 func (fs *collectionFileSystem) Size() int64 {
@@ -552,7 +553,10 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
 // children with the given names, which must be children of dn) to
 // local persistent storage. Caller must have write lock on dn and the
 // named children.
-func (dn *dirnode) sync(names []string, throttle *throttle) error {
+func (dn *dirnode) sync(ctx context.Context, names []string, throttle *throttle) error {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+
        type shortBlock struct {
                fn  *filenode
                idx int
@@ -560,8 +564,10 @@ func (dn *dirnode) sync(names []string, throttle *throttle) error {
        var pending []shortBlock
        var pendingLen int
 
-       var wg sync.WaitGroup
        errors := make(chan error, 1)
+       var wg sync.WaitGroup
+       defer wg.Wait() // we have locks: unsafe to return until all goroutines finish
+
        flush := func(sbs []shortBlock) {
                defer wg.Done()
                if len(sbs) == 0 {
@@ -569,6 +575,9 @@ func (dn *dirnode) sync(names []string, throttle *throttle) error {
                }
                throttle.Acquire()
                defer throttle.Release()
+               if ctx.Err() != nil {
+                       return
+               }
                block := make([]byte, 0, maxBlockSize)
                for _, sb := range sbs {
                        block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
@@ -579,6 +588,7 @@ func (dn *dirnode) sync(names []string, throttle *throttle) error {
                        case errors <- err:
                        default:
                        }
+                       cancel()
                }
                off := 0
                for _, sb := range sbs {
@@ -636,22 +646,17 @@ func (dn *dirnode) sync(names []string, throttle *throttle) error {
        }
        wg.Add(1)
        flush(pending)
-       wg.Wait()
-       close(errors)
+       go func() {
+               wg.Wait()
+               close(errors)
+       }()
        return <-errors
 }
 
 // caller must have write lock.
-func (dn *dirnode) marshalManifest(prefix string, throttle *throttle) (string, error) {
-       var streamLen int64
-       type filepart struct {
-               name   string
-               offset int64
-               length int64
-       }
-       var fileparts []filepart
-       var subdirs string
-       var blocks []string
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
 
        if len(dn.inodes) == 0 {
                if prefix == "." {
@@ -669,26 +674,62 @@ func (dn *dirnode) marshalManifest(prefix string, throttle *throttle) (string, e
                names = append(names, name)
        }
        sort.Strings(names)
+       var dirnames []string
+       var filenames []string
        for _, name := range names {
                node := dn.inodes[name]
                node.Lock()
                defer node.Unlock()
-       }
-       if err := dn.sync(names, throttle); err != nil {
-               return "", err
-       }
-       for _, name := range names {
-               switch node := dn.inodes[name].(type) {
+               switch node := node.(type) {
                case *dirnode:
-                       subdir, err := node.marshalManifest(prefix+"/"+name, throttle)
+                       dirnames = append(dirnames, name)
+               case *filenode:
+                       filenames = append(filenames, name)
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+
+       var wg sync.WaitGroup
+       errors := make(chan error, len(dirnames)+1)
+       subdirs := make([]string, len(dirnames))
+       rootdir := ""
+       for i, name := range dirnames {
+               wg.Add(1)
+               go func(i int, name string) {
+                       defer wg.Done()
+                       var err error
+                       subdirs[i], err = dn.inodes[name].(*dirnode).marshalManifest(ctx, prefix+"/"+name, throttle)
                        if err != nil {
-                               return "", err
+                               errors <- err
+                               cancel()
                        }
-                       subdirs = subdirs + subdir
-               case *filenode:
+               }(i, name)
+       }
+
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+
+               var streamLen int64
+               type filepart struct {
+                       name   string
+                       offset int64
+                       length int64
+               }
+
+               var fileparts []filepart
+               var blocks []string
+               if err := dn.sync(ctx, names, throttle); err != nil {
+                       errors <- err
+                       cancel()
+                       return
+               }
+               for _, name := range filenames {
+                       node := dn.inodes[name].(*filenode)
                        if len(node.segments) == 0 {
                                fileparts = append(fileparts, filepart{name: name})
-                               break
+                               continue
                        }
                        for _, seg := range node.segments {
                                switch seg := seg.(type) {
@@ -718,20 +759,26 @@ func (dn *dirnode) marshalManifest(prefix string, throttle *throttle) (string, e
                                        panic(fmt.Sprintf("can't marshal segment type %T", seg))
                                }
                        }
-               default:
-                       panic(fmt.Sprintf("can't marshal inode type %T", node))
                }
+               var filetokens []string
+               for _, s := range fileparts {
+                       filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+               }
+               if len(filetokens) == 0 {
+                       return
+               } else if len(blocks) == 0 {
+                       blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+               }
+               rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
+       }()
+
+       wg.Wait()
+       select {
+       case err := <-errors:
+               return "", err
+       default:
        }
-       var filetokens []string
-       for _, s := range fileparts {
-               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
-       }
-       if len(filetokens) == 0 {
-               return subdirs, nil
-       } else if len(blocks) == 0 {
-               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
-       }
-       return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+       return rootdir + strings.Join(subdirs, ""), nil
 }
 
 func (dn *dirnode) loadManifest(txt string) error {