15922: Change EnableBetaController14287 -> ForceLegacyAPI14.
[arvados.git] / sdk / go / arvados / fs_collection.go
index 40c8908024a88f87f8979d7dff653cbcb42a2239..d0e97f2ad184b44e7800a577247704e8af717581 100644 (file)
@@ -16,6 +16,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 )
 
@@ -691,24 +692,15 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
                }
                segs = append(segs, seg)
        }
+       blocksize := len(block)
        dn.fs.throttle().Acquire()
        errs := make(chan error, 1)
        go func() {
                defer close(done)
                defer close(errs)
-               locked := map[*filenode]bool{}
                locator, _, err := dn.fs.PutB(block)
                dn.fs.throttle().Release()
                {
-                       if !sync {
-                               for _, name := range dn.sortedNames() {
-                                       if fn, ok := dn.inodes[name].(*filenode); ok {
-                                               fn.Lock()
-                                               defer fn.Unlock()
-                                               locked[fn] = true
-                                       }
-                               }
-                       }
                        defer func() {
                                for _, seg := range segs {
                                        if seg.flushing == done {
@@ -723,6 +715,7 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
                }
                for idx, ref := range refs {
                        if !sync {
+                               ref.fn.Lock()
                                // In async mode, fn's lock was
                                // released while we were waiting for
                                // PutB(); lots of things might have
@@ -731,17 +724,15 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
                                        // file segments have
                                        // rearranged or changed in
                                        // some way
+                                       ref.fn.Unlock()
                                        continue
                                } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
                                        // segment has been replaced
+                                       ref.fn.Unlock()
                                        continue
                                } else if seg.flushing != done {
                                        // seg.buf has been replaced
-                                       continue
-                               } else if !locked[ref.fn] {
-                                       // file was renamed, moved, or
-                                       // deleted since we called
-                                       // PutB
+                                       ref.fn.Unlock()
                                        continue
                                }
                        }
@@ -749,11 +740,19 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
                        ref.fn.segments[ref.idx] = storedSegment{
                                kc:      dn.fs,
                                locator: locator,
-                               size:    len(block),
+                               size:    blocksize,
                                offset:  offsets[idx],
                                length:  len(data),
                        }
-                       ref.fn.memsize -= int64(len(data))
+                       // atomic is needed here despite caller having
+                       // lock: caller might be running concurrent
+                       // commitBlock() goroutines using the same
+                       // lock, writing different segments from the
+                       // same file.
+                       atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
+                       if !sync {
+                               ref.fn.Unlock()
+                       }
                }
        }()
        if sync {