15910: Fix races in collectionfs flush/sync.
authorTom Clegg <tclegg@veritasgenetics.com>
Wed, 4 Dec 2019 16:28:38 +0000 (11:28 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 4 Dec 2019 16:28:38 +0000 (11:28 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go

index 40c8908024a88f87f8979d7dff653cbcb42a2239..3d0928b84ea77ef3817ac9c3d2cfe3679bd970e0 100644 (file)
@@ -16,6 +16,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 )
 
@@ -691,6 +692,7 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
                }
                segs = append(segs, seg)
        }
+       blocksize := len(block)
        dn.fs.throttle().Acquire()
        errs := make(chan error, 1)
        go func() {
@@ -701,6 +703,8 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
                dn.fs.throttle().Release()
                {
                        if !sync {
+                               dn.Lock()
+                               defer dn.Unlock()
                                for _, name := range dn.sortedNames() {
                                        if fn, ok := dn.inodes[name].(*filenode); ok {
                                                fn.Lock()
@@ -749,11 +753,16 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
                        ref.fn.segments[ref.idx] = storedSegment{
                                kc:      dn.fs,
                                locator: locator,
-                               size:    len(block),
+                               size:    blocksize,
                                offset:  offsets[idx],
                                length:  len(data),
                        }
-                       ref.fn.memsize -= int64(len(data))
+                       // atomic is needed here despite caller having
+                       // lock: caller might be running concurrent
+                       // commitBlock() goroutines using the same
+                       // lock, writing different segments from the
+                       // same file.
+                       atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
                }
        }()
        if sync {
index 352b226bf1f63bfbf1ac94b211de57d3ec25f581..49fdc397daee610c4a98d1ab8753c716f22a8cba 100644 (file)
@@ -1280,6 +1280,30 @@ func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
        fs.Flush("", true)
 }
 
+func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
+       s.kc.onPut = func([]byte) {
+               s.kc.Lock()
+               s.kc.blocks = map[string][]byte{}
+               s.kc.Unlock()
+       }
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       for _, blocksize := range []int{8, 1000000} {
+               dir := fmt.Sprintf("dir%d", blocksize)
+               err = fs.Mkdir(dir, 0755)
+               c.Assert(err, check.IsNil)
+               data := make([]byte, blocksize)
+               for i := 0; i < 100; i++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       _, err = f.Write(data)
+                       c.Assert(err, check.IsNil)
+                       f.Close()
+                       fs.Flush(dir, false)
+               }
+       }
+}
+
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
        for _, txt := range []string{
                "\n",