14538: Use concurrent writers to flush chunks while writing.
authorTom Clegg <tclegg@veritasgenetics.com>
Wed, 28 Nov 2018 21:41:42 +0000 (16:41 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 28 Nov 2018 21:41:42 +0000 (16:41 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go

index 58482142fcd1b21185aee96cfb624fc58db1c60a..cd3dcf053942a4df8c858c550828608081ab5cbf 100644 (file)
@@ -231,6 +231,7 @@ type filenode struct {
        memsize  int64 // bytes in memSegments
        sync.RWMutex
        nullnode
+       throttle *throttle
 }
 
 // caller must have lock
@@ -493,30 +494,75 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // Write some data out to disk to reduce memory use. Caller must have
 // write lock.
 func (fn *filenode) pruneMemSegments() {
-       // TODO: async (don't hold Lock() while waiting for Keep)
        // TODO: share code with (*dirnode)sync()
        // TODO: pack/flush small blocks too, when fragmented
+       if fn.throttle == nil {
+               // TODO: share a throttle with filesystem
+               fn.throttle = newThrottle(concurrentWriters)
+       }
        for idx, seg := range fn.segments {
                seg, ok := seg.(*memSegment)
-               if !ok || seg.Len() < maxBlockSize {
-                       continue
-               }
-               locator, _, err := fn.FS().PutB(seg.buf)
-               if err != nil {
-                       // TODO: stall (or return errors from)
-                       // subsequent writes until flushing
-                       // starts to succeed
+               if !ok || seg.Len() < maxBlockSize || seg.Len() == 0 || seg.flushing != nil {
                        continue
                }
-               fn.memsize -= int64(seg.Len())
-               fn.segments[idx] = storedSegment{
-                       kc:      fn.FS(),
-                       locator: locator,
-                       size:    seg.Len(),
-                       offset:  0,
-                       length:  seg.Len(),
+               // Setting seg.flushing guarantees seg.buf will not be
+               // modified in place: WriteAt and Truncate will
+               // allocate a new buf instead, if necessary.
+               idx, buf := idx, seg.buf
+               done := make(chan struct{})
+               seg.flushing = done
+               // If lots of background writes are already in
+               // progress, block here until one finishes, rather
+               // than pile up an unlimited number of buffered writes
+               // and network flush operations.
+               fn.throttle.Acquire()
+               go func() {
+                       defer close(done)
+                       locator, _, err := fn.FS().PutB(buf)
+                       fn.throttle.Release()
+                       fn.Lock()
+                       defer fn.Unlock()
+                       if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+                               // A new seg.buf has been allocated.
+                               return
+                       }
+                       seg.flushing = nil
+                       if err != nil {
+                               // TODO: stall (or return errors from)
+                               // subsequent writes until flushing
+                               // starts to succeed.
+                               return
+                       }
+                       if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
+                               // Segment has been dropped/moved/resized.
+                               return
+                       }
+                       fn.memsize -= int64(len(buf))
+                       fn.segments[idx] = storedSegment{
+                               kc:      fn.FS(),
+                               locator: locator,
+                               size:    len(buf),
+                               offset:  0,
+                               length:  len(buf),
+                       }
+               }()
+       }
+}
+
+// Block until all pending pruneMemSegments work is finished. Caller
+// must NOT have lock.
+func (fn *filenode) waitPrune() {
+       var pending []<-chan struct{}
+       fn.Lock()
+       for _, seg := range fn.segments {
+               if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
+                       pending = append(pending, seg.flushing)
                }
        }
+       fn.Unlock()
+       for _, p := range pending {
+               <-p
+       }
 }
 
 type dirnode struct {
@@ -665,6 +711,16 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
                names = append(names, name)
        }
        sort.Strings(names)
+
+       // Wait for children to finish any pending write operations
+       // before locking them.
+       for _, name := range names {
+               node := dn.inodes[name]
+               if fn, ok := node.(*filenode); ok {
+                       fn.waitPrune()
+               }
+       }
+
        var dirnames []string
        var filenames []string
        for _, name := range names {
@@ -969,6 +1025,11 @@ type segment interface {
 
 type memSegment struct {
        buf []byte
+       // If flushing is not nil, then a) buf is being shared by a
+       // pruneMemSegments goroutine, and must be copied on write;
+       // and b) the flushing channel will close when the goroutine
+       // finishes, whether it succeeds or not.
+       flushing <-chan struct{}
 }
 
 func (me *memSegment) Len() int {
@@ -985,28 +1046,31 @@ func (me *memSegment) Slice(off, length int) segment {
 }
 
 func (me *memSegment) Truncate(n int) {
-       if n > cap(me.buf) {
+       if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
                newsize := 1024
                for newsize < n {
                        newsize = newsize << 2
                }
                newbuf := make([]byte, n, newsize)
                copy(newbuf, me.buf)
-               me.buf = newbuf
+               me.buf, me.flushing = newbuf, nil
        } else {
-               // Zero unused part when shrinking, in case we grow
-               // and start using it again later.
-               for i := n; i < len(me.buf); i++ {
+               // reclaim existing capacity, and zero reclaimed part
+               oldlen := len(me.buf)
+               me.buf = me.buf[:n]
+               for i := oldlen; i < n; i++ {
                        me.buf[i] = 0
                }
        }
-       me.buf = me.buf[:n]
 }
 
 func (me *memSegment) WriteAt(p []byte, off int) {
        if off+len(p) > len(me.buf) {
                panic("overflowed segment")
        }
+       if me.flushing != nil {
+               me.buf, me.flushing = append([]byte(nil), me.buf...), nil
+       }
        copy(me.buf[off:], p)
 }
 
index a6d4ab1e5b71baccabafdbdf810db0ee264420a5..b872cc214c9debfb4db9ac8db691770956638c1e 100644 (file)
@@ -583,7 +583,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        const ngoroutines = 256
 
        var wg sync.WaitGroup
-       for n := 0; n < nfiles; n++ {
+       for n := 0; n < ngoroutines; n++ {
                wg.Add(1)
                go func(n int) {
                        defer wg.Done()
@@ -592,7 +592,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                        f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < ngoroutines; i++ {
+                       for i := 0; i < nfiles; i++ {
                                trunc := rand.Intn(65)
                                woff := rand.Intn(trunc + 1)
                                wbytes = wbytes[:rand.Intn(64-woff+1)]
@@ -1046,6 +1046,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
                c.Assert(n, check.Equals, len(data))
                c.Assert(err, check.IsNil)
        }
+       f.(*filehandle).inode.(*filenode).waitPrune()
 
        currentMemExtents := func() (memExtents []int) {
                for idx, e := range f.(*filehandle).inode.(*filenode).segments {