Merge branch '11419-text-mode'
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 10 Dec 2018 21:18:35 +0000 (16:18 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Mon, 10 Dec 2018 21:18:35 +0000 (16:18 -0500)
closes #11419

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/cwl/setup.py
sdk/go/arvados/contextgroup.go [new file with mode: 0644]
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/throttle.go [new file with mode: 0644]

index 9d25a562ab32d09dcdfba627fc2089260879cce1..f731db9555e4f1c0e73b0f300fdf3349ddb89f42 100644 (file)
@@ -33,8 +33,8 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20181116032456',
-          'schema-salad==2.7.20181116024232',
+          'cwltool==1.0.20181201184214',
+          'schema-salad==3.0.20181129082112',
           'typing >= 3.6.4',
           'ruamel.yaml >=0.15.54, <=0.15.77',
           'arvados-python-client>=1.2.1.20181130020805',
diff --git a/sdk/go/arvados/contextgroup.go b/sdk/go/arvados/contextgroup.go
new file mode 100644 (file)
index 0000000..fa0de24
--- /dev/null
@@ -0,0 +1,95 @@
+package arvados
+
+import (
+       "context"
+       "sync"
+)
+
+// A contextGroup is a context-aware variation on sync.WaitGroup. It
+// provides a child context for the added funcs to use, so they can
+// exit early if another added func returns an error. Its Wait()
+// method returns the first error returned by any added func.
+//
+// Example:
+//
+//     err := errors.New("oops")
+//     cg := newContextGroup()
+//     defer cg.Cancel()
+//     cg.Go(func() error {
+//             someFuncWithContext(cg.Context())
+//             return nil
+//     })
+//     cg.Go(func() error {
+//             return err // this cancels cg.Context()
+//     })
+//     return cg.Wait() // returns err after both goroutines have ended
+type contextGroup struct {
+       ctx    context.Context
+       cancel context.CancelFunc
+       wg     sync.WaitGroup
+       err    error
+       mtx    sync.Mutex
+}
+
+// newContextGroup returns a new contextGroup. The caller must
+// eventually call the Cancel() method of the returned contextGroup.
+func newContextGroup(ctx context.Context) *contextGroup {
+       ctx, cancel := context.WithCancel(ctx)
+       return &contextGroup{
+               ctx:    ctx,
+               cancel: cancel,
+       }
+}
+
+// Cancel cancels the context group.
+func (cg *contextGroup) Cancel() {
+       cg.cancel()
+}
+
+// Context returns a context.Context which will be canceled when all
+// funcs have succeeded or one has failed.
+func (cg *contextGroup) Context() context.Context {
+       return cg.ctx
+}
+
+// Go calls f in a new goroutine. If f returns an error, the
+// contextGroup is canceled.
+//
+// If f notices cg.Context() is done, it should abandon further work
+// and return. In this case, f's return value will be ignored.
+func (cg *contextGroup) Go(f func() error) {
+       cg.mtx.Lock()
+       defer cg.mtx.Unlock()
+       if cg.err != nil {
+               return
+       }
+       cg.wg.Add(1)
+       go func() {
+               defer cg.wg.Done()
+               err := f()
+               cg.mtx.Lock()
+               defer cg.mtx.Unlock()
+               if err != nil && cg.err == nil {
+                       cg.err = err
+                       cg.cancel()
+               }
+       }()
+}
+
+// Wait waits for all added funcs to return, and returns the first
+// non-nil error.
+//
+// If the parent context is canceled before a func returns an error,
+// Wait returns the parent context's Err().
+//
+// Wait returns nil if all funcs return nil before the parent context
+// is canceled.
+func (cg *contextGroup) Wait() error {
+       cg.wg.Wait()
+       cg.mtx.Lock()
+       defer cg.mtx.Unlock()
+       if cg.err != nil {
+               return cg.err
+       }
+       return cg.ctx.Err()
+}
index b996542abd52cf7be04549962fdb31dfb7a366a0..6644f4cfb8e93ef7d601e667cee21a9dbce5d39b 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "io"
@@ -18,7 +19,11 @@ import (
        "time"
 )
 
-var maxBlockSize = 1 << 26
+var (
+       maxBlockSize      = 1 << 26
+       concurrentWriters = 4 // max goroutines writing to Keep during sync()
+       writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
+)
 
 // A CollectionFileSystem is a FileSystem that can be serialized as a
 // manifest and stored as a collection.
@@ -136,7 +141,7 @@ func (fs *collectionFileSystem) Sync() error {
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
-       return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
+       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
 }
 
 func (fs *collectionFileSystem) Size() int64 {
@@ -228,6 +233,7 @@ type filenode struct {
        memsize  int64 // bytes in memSegments
        sync.RWMutex
        nullnode
+       throttle *throttle
 }
 
 // caller must have lock
@@ -490,30 +496,75 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // Write some data out to disk to reduce memory use. Caller must have
 // write lock.
 func (fn *filenode) pruneMemSegments() {
-       // TODO: async (don't hold Lock() while waiting for Keep)
        // TODO: share code with (*dirnode)sync()
        // TODO: pack/flush small blocks too, when fragmented
+       if fn.throttle == nil {
+               // TODO: share a throttle with filesystem
+               fn.throttle = newThrottle(writeAheadBlocks)
+       }
        for idx, seg := range fn.segments {
                seg, ok := seg.(*memSegment)
-               if !ok || seg.Len() < maxBlockSize {
-                       continue
-               }
-               locator, _, err := fn.FS().PutB(seg.buf)
-               if err != nil {
-                       // TODO: stall (or return errors from)
-                       // subsequent writes until flushing
-                       // starts to succeed
+               if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
                        continue
                }
-               fn.memsize -= int64(seg.Len())
-               fn.segments[idx] = storedSegment{
-                       kc:      fn.FS(),
-                       locator: locator,
-                       size:    seg.Len(),
-                       offset:  0,
-                       length:  seg.Len(),
+               // Setting seg.flushing guarantees seg.buf will not be
+               // modified in place: WriteAt and Truncate will
+               // allocate a new buf instead, if necessary.
+               idx, buf := idx, seg.buf
+               done := make(chan struct{})
+               seg.flushing = done
+               // If lots of background writes are already in
+               // progress, block here until one finishes, rather
+               // than pile up an unlimited number of buffered writes
+               // and network flush operations.
+               fn.throttle.Acquire()
+               go func() {
+                       defer close(done)
+                       locator, _, err := fn.FS().PutB(buf)
+                       fn.throttle.Release()
+                       fn.Lock()
+                       defer fn.Unlock()
+                       if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+                               // A new seg.buf has been allocated.
+                               return
+                       }
+                       seg.flushing = nil
+                       if err != nil {
+                               // TODO: stall (or return errors from)
+                               // subsequent writes until flushing
+                               // starts to succeed.
+                               return
+                       }
+                       if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
+                               // Segment has been dropped/moved/resized.
+                               return
+                       }
+                       fn.memsize -= int64(len(buf))
+                       fn.segments[idx] = storedSegment{
+                               kc:      fn.FS(),
+                               locator: locator,
+                               size:    len(buf),
+                               offset:  0,
+                               length:  len(buf),
+                       }
+               }()
+       }
+}
+
+// Block until all pending pruneMemSegments work is finished. Caller
+// must NOT have lock.
+func (fn *filenode) waitPrune() {
+       var pending []<-chan struct{}
+       fn.Lock()
+       for _, seg := range fn.segments {
+               if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
+                       pending = append(pending, seg.flushing)
                }
        }
+       fn.Unlock()
+       for _, p := range pending {
+               <-p
+       }
 }
 
 type dirnode struct {
@@ -546,46 +597,67 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
        return dn.treenode.Child(name, replace)
 }
 
+type fnSegmentRef struct {
+       fn  *filenode
+       idx int
+}
+
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+       if len(refs) == 0 {
+               return nil
+       }
+       throttle.Acquire()
+       defer throttle.Release()
+       if err := ctx.Err(); err != nil {
+               return err
+       }
+       block := make([]byte, 0, maxBlockSize)
+       for _, ref := range refs {
+               block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
+       }
+       locator, _, err := dn.fs.PutB(block)
+       if err != nil {
+               return err
+       }
+       off := 0
+       for _, ref := range refs {
+               data := ref.fn.segments[ref.idx].(*memSegment).buf
+               ref.fn.segments[ref.idx] = storedSegment{
+                       kc:      dn.fs,
+                       locator: locator,
+                       size:    len(block),
+                       offset:  off,
+                       length:  len(data),
+               }
+               off += len(data)
+               ref.fn.memsize -= int64(len(data))
+       }
+       return nil
+}
+
 // sync flushes in-memory data and remote block references (for the
 // children with the given names, which must be children of dn) to
 // local persistent storage. Caller must have write lock on dn and the
 // named children.
-func (dn *dirnode) sync(names []string) error {
-       type shortBlock struct {
-               fn  *filenode
-               idx int
-       }
-       var pending []shortBlock
-       var pendingLen int
+func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+       cg := newContextGroup(ctx)
+       defer cg.Cancel()
 
-       flush := func(sbs []shortBlock) error {
-               if len(sbs) == 0 {
-                       return nil
-               }
-               block := make([]byte, 0, maxBlockSize)
-               for _, sb := range sbs {
-                       block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
-               }
-               locator, _, err := dn.fs.PutB(block)
-               if err != nil {
-                       return err
-               }
-               off := 0
-               for _, sb := range sbs {
-                       data := sb.fn.segments[sb.idx].(*memSegment).buf
-                       sb.fn.segments[sb.idx] = storedSegment{
-                               kc:      dn.fs,
-                               locator: locator,
-                               size:    len(block),
-                               offset:  off,
-                               length:  len(data),
-                       }
-                       off += len(data)
-                       sb.fn.memsize -= int64(len(data))
-               }
-               return nil
+       goCommit := func(refs []fnSegmentRef) {
+               cg.Go(func() error {
+                       return dn.commitBlock(cg.Context(), throttle, refs)
+               })
        }
 
+       var pending []fnSegmentRef
+       var pendingLen int = 0
        localLocator := map[string]string{}
        for _, name := range names {
                fn, ok := dn.inodes[name].(*filenode)
@@ -608,39 +680,29 @@ func (dn *dirnode) sync(names []string) error {
                                fn.segments[idx] = seg
                        case *memSegment:
                                if seg.Len() > maxBlockSize/2 {
-                                       if err := flush([]shortBlock{{fn, idx}}); err != nil {
-                                               return err
-                                       }
+                                       goCommit([]fnSegmentRef{{fn, idx}})
                                        continue
                                }
                                if pendingLen+seg.Len() > maxBlockSize {
-                                       if err := flush(pending); err != nil {
-                                               return err
-                                       }
+                                       goCommit(pending)
                                        pending = nil
                                        pendingLen = 0
                                }
-                               pending = append(pending, shortBlock{fn, idx})
+                               pending = append(pending, fnSegmentRef{fn, idx})
                                pendingLen += seg.Len()
                        default:
                                panic(fmt.Sprintf("can't sync segment type %T", seg))
                        }
                }
        }
-       return flush(pending)
+       goCommit(pending)
+       return cg.Wait()
 }
 
 // caller must have write lock.
-func (dn *dirnode) marshalManifest(prefix string) (string, error) {
-       var streamLen int64
-       type filepart struct {
-               name   string
-               offset int64
-               length int64
-       }
-       var fileparts []filepart
-       var subdirs string
-       var blocks []string
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+       cg := newContextGroup(ctx)
+       defer cg.Cancel()
 
        if len(dn.inodes) == 0 {
                if prefix == "." {
@@ -658,26 +720,61 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
                names = append(names, name)
        }
        sort.Strings(names)
+
+       // Wait for children to finish any pending write operations
+       // before locking them.
        for _, name := range names {
                node := dn.inodes[name]
-               node.Lock()
-               defer node.Unlock()
-       }
-       if err := dn.sync(names); err != nil {
-               return "", err
+               if fn, ok := node.(*filenode); ok {
+                       fn.waitPrune()
+               }
        }
+
+       var dirnames []string
+       var filenames []string
        for _, name := range names {
-               switch node := dn.inodes[name].(type) {
+               node := dn.inodes[name]
+               node.Lock()
+               defer node.Unlock()
+               switch node := node.(type) {
                case *dirnode:
-                       subdir, err := node.marshalManifest(prefix + "/" + name)
-                       if err != nil {
-                               return "", err
-                       }
-                       subdirs = subdirs + subdir
+                       dirnames = append(dirnames, name)
                case *filenode:
+                       filenames = append(filenames, name)
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+
+       subdirs := make([]string, len(dirnames))
+       rootdir := ""
+       for i, name := range dirnames {
+               i, name := i, name
+               cg.Go(func() error {
+                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+                       subdirs[i] = txt
+                       return err
+               })
+       }
+
+       cg.Go(func() error {
+               var streamLen int64
+               type filepart struct {
+                       name   string
+                       offset int64
+                       length int64
+               }
+
+               var fileparts []filepart
+               var blocks []string
+               if err := dn.sync(cg.Context(), throttle, names); err != nil {
+                       return err
+               }
+               for _, name := range filenames {
+                       node := dn.inodes[name].(*filenode)
                        if len(node.segments) == 0 {
                                fileparts = append(fileparts, filepart{name: name})
-                               break
+                               continue
                        }
                        for _, seg := range node.segments {
                                switch seg := seg.(type) {
@@ -707,20 +804,21 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
                                        panic(fmt.Sprintf("can't marshal segment type %T", seg))
                                }
                        }
-               default:
-                       panic(fmt.Sprintf("can't marshal inode type %T", node))
                }
-       }
-       var filetokens []string
-       for _, s := range fileparts {
-               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
-       }
-       if len(filetokens) == 0 {
-               return subdirs, nil
-       } else if len(blocks) == 0 {
-               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
-       }
-       return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+               var filetokens []string
+               for _, s := range fileparts {
+                       filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+               }
+               if len(filetokens) == 0 {
+                       return nil
+               } else if len(blocks) == 0 {
+                       blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+               }
+               rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
+               return nil
+       })
+       err := cg.Wait()
+       return rootdir + strings.Join(subdirs, ""), err
 }
 
 func (dn *dirnode) loadManifest(txt string) error {
@@ -936,6 +1034,11 @@ type segment interface {
 
 type memSegment struct {
        buf []byte
+       // If flushing is not nil, then a) buf is being shared by a
+       // pruneMemSegments goroutine, and must be copied on write;
+       // and b) the flushing channel will close when the goroutine
+       // finishes, whether it succeeds or not.
+       flushing <-chan struct{}
 }
 
 func (me *memSegment) Len() int {
@@ -952,28 +1055,31 @@ func (me *memSegment) Slice(off, length int) segment {
 }
 
 func (me *memSegment) Truncate(n int) {
-       if n > cap(me.buf) {
+       if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
                newsize := 1024
                for newsize < n {
                        newsize = newsize << 2
                }
                newbuf := make([]byte, n, newsize)
                copy(newbuf, me.buf)
-               me.buf = newbuf
+               me.buf, me.flushing = newbuf, nil
        } else {
-               // Zero unused part when shrinking, in case we grow
-               // and start using it again later.
-               for i := n; i < len(me.buf); i++ {
+               // reclaim existing capacity, and zero reclaimed part
+               oldlen := len(me.buf)
+               me.buf = me.buf[:n]
+               for i := oldlen; i < n; i++ {
                        me.buf[i] = 0
                }
        }
-       me.buf = me.buf[:n]
 }
 
 func (me *memSegment) WriteAt(p []byte, off int) {
        if off+len(p) > len(me.buf) {
                panic("overflowed segment")
        }
+       if me.flushing != nil {
+               me.buf, me.flushing = append([]byte(nil), me.buf...), nil
+       }
        copy(me.buf[off:], p)
 }
 
index a6d4ab1e5b71baccabafdbdf810db0ee264420a5..2ae2bd8924e23b583a267091cc6b9985e52d3422 100644 (file)
@@ -19,6 +19,7 @@ import (
        "runtime"
        "strings"
        "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -31,6 +32,7 @@ var _ = check.Suite(&CollectionFSSuite{})
 type keepClientStub struct {
        blocks      map[string][]byte
        refreshable map[string]bool
+       onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
        sync.RWMutex
 }
 
@@ -50,6 +52,9 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
        locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
        buf := make([]byte, len(p))
        copy(buf, p)
+       if kcs.onPut != nil {
+               kcs.onPut(buf)
+       }
        kcs.Lock()
        defer kcs.Unlock()
        kcs.blocks[locator[:32]] = buf
@@ -583,7 +588,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        const ngoroutines = 256
 
        var wg sync.WaitGroup
-       for n := 0; n < nfiles; n++ {
+       for n := 0; n < ngoroutines; n++ {
                wg.Add(1)
                go func(n int) {
                        defer wg.Done()
@@ -592,7 +597,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                        f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < ngoroutines; i++ {
+                       for i := 0; i < nfiles; i++ {
                                trunc := rand.Intn(65)
                                woff := rand.Intn(trunc + 1)
                                wbytes = wbytes[:rand.Intn(64-woff+1)]
@@ -618,11 +623,18 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                                c.Check(string(buf), check.Equals, string(expect))
                                c.Check(err, check.IsNil)
                        }
-                       s.checkMemSize(c, f)
                }(n)
        }
        wg.Wait()
 
+       for n := 0; n < ngoroutines; n++ {
+               f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+               c.Assert(err, check.IsNil)
+               f.(*filehandle).inode.(*filenode).waitPrune()
+               s.checkMemSize(c, f)
+               defer f.Close()
+       }
+
        root, err := s.fs.Open("/")
        c.Assert(err, check.IsNil)
        defer root.Close()
@@ -1029,8 +1041,37 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+       defer func(wab, mbs int) {
+               writeAheadBlocks = wab
+               maxBlockSize = mbs
+       }(writeAheadBlocks, maxBlockSize)
+       writeAheadBlocks = 2
        maxBlockSize = 1024
-       defer func() { maxBlockSize = 2 << 26 }()
+
+       proceed := make(chan struct{})
+       var started, concurrent int32
+       blk2done := false
+       s.kc.onPut = func([]byte) {
+               atomic.AddInt32(&concurrent, 1)
+               switch atomic.AddInt32(&started, 1) {
+               case 1:
+                       // Wait until block 2 starts and finishes, and block 3 starts
+                       select {
+                       case <-proceed:
+                               c.Check(blk2done, check.Equals, true)
+                       case <-time.After(time.Second):
+                               c.Error("timed out")
+                       }
+               case 2:
+                       time.Sleep(time.Millisecond)
+                       blk2done = true
+               case 3:
+                       close(proceed)
+               default:
+                       time.Sleep(time.Millisecond)
+               }
+               c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+       }
 
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
@@ -1056,6 +1097,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
                }
                return
        }
+       f.(*filehandle).inode.(*filenode).waitPrune()
        c.Check(currentMemExtents(), check.HasLen, 1)
 
        m, err := fs.MarshalManifest(".")
diff --git a/sdk/go/arvados/throttle.go b/sdk/go/arvados/throttle.go
new file mode 100644 (file)
index 0000000..464b73b
--- /dev/null
@@ -0,0 +1,17 @@
+package arvados
+
+type throttle struct {
+       c chan struct{}
+}
+
+func newThrottle(n int) *throttle {
+       return &throttle{c: make(chan struct{}, n)}
+}
+
+func (t *throttle) Acquire() {
+       t.c <- struct{}{}
+}
+
+func (t *throttle) Release() {
+       <-t.c
+}