X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5f9e11ae95dc2b768ccc7e5f165be06b6baabdb2..3b4bb3d393adc3bd3ddfb4442a65087275a5c5c3:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index 3d0928b84e..37bd494914 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -568,7 +568,6 @@ func (fn *filenode) pruneMemSegments() { // A new seg.buf has been allocated. return } - seg.flushing = nil if err != nil { // TODO: stall (or return errors from) // subsequent writes until flushing @@ -671,9 +670,10 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize offsets := make([]int, 0, len(refs)) // location of segment's data within block for _, ref := range refs { seg := ref.fn.segments[ref.idx].(*memSegment) - if seg.flushing != nil && !sync { + if !sync && seg.flushingUnfinished() { // Let the other flushing goroutine finish. If // it fails, we'll try again next time. + close(done) return nil } else { // In sync mode, we proceed regardless of @@ -698,35 +698,15 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize go func() { defer close(done) defer close(errs) - locked := map[*filenode]bool{} locator, _, err := dn.fs.PutB(block) dn.fs.throttle().Release() - { - if !sync { - dn.Lock() - defer dn.Unlock() - for _, name := range dn.sortedNames() { - if fn, ok := dn.inodes[name].(*filenode); ok { - fn.Lock() - defer fn.Unlock() - locked[fn] = true - } - } - } - defer func() { - for _, seg := range segs { - if seg.flushing == done { - seg.flushing = nil - } - } - }() - } if err != nil { errs <- err return } for idx, ref := range refs { if !sync { + ref.fn.Lock() // In async mode, fn's lock was // released while we were waiting for // PutB(); lots of things might have @@ -735,17 +715,15 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize // file segments have // rearranged or changed in // some way + ref.fn.Unlock() continue } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] { // segment has been replaced + ref.fn.Unlock() continue } else if seg.flushing != done { // seg.buf has been replaced - continue - } else if !locked[ref.fn] { - // file was renamed, moved, or - // deleted since we called - // PutB + ref.fn.Unlock() continue } } @@ -763,6 +741,9 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize // lock, writing different segments from the // same file. atomic.AddInt64(&ref.fn.memsize, -int64(len(data))) + if !sync { + ref.fn.Unlock() + } } }() if sync { @@ -1208,13 +1189,26 @@ type segment interface { type memSegment struct { buf []byte - // If flushing is not nil, then a) buf is being shared by a - // pruneMemSegments goroutine, and must be copied on write; - // and b) the flushing channel will close when the goroutine - // finishes, whether it succeeds or not. + // If flushing is not nil and not ready/closed, then a) buf is + // being shared by a pruneMemSegments goroutine, and must be + // copied on write; and b) the flushing channel will close + // when the goroutine finishes, whether it succeeds or not. flushing <-chan struct{} } +func (me *memSegment) flushingUnfinished() bool { + if me.flushing == nil { + return false + } + select { + case <-me.flushing: + me.flushing = nil + return false + default: + return true + } +} + func (me *memSegment) Len() int { return len(me.buf) }