X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f3c89480b1f09c30708150f0733b2d430f4221da..9a71dd94cb72a5fd1ed74ca71b4961de4108db02:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index d0e97f2ad1..37bd494914 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -568,7 +568,6 @@ func (fn *filenode) pruneMemSegments() { // A new seg.buf has been allocated. return } - seg.flushing = nil if err != nil { // TODO: stall (or return errors from) // subsequent writes until flushing @@ -671,9 +670,10 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize offsets := make([]int, 0, len(refs)) // location of segment's data within block for _, ref := range refs { seg := ref.fn.segments[ref.idx].(*memSegment) - if seg.flushing != nil && !sync { + if !sync && seg.flushingUnfinished() { // Let the other flushing goroutine finish. If // it fails, we'll try again next time. + close(done) return nil } else { // In sync mode, we proceed regardless of @@ -700,15 +700,6 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize defer close(errs) locator, _, err := dn.fs.PutB(block) dn.fs.throttle().Release() - { - defer func() { - for _, seg := range segs { - if seg.flushing == done { - seg.flushing = nil - } - } - }() - } if err != nil { errs <- err return @@ -1198,13 +1189,26 @@ type segment interface { type memSegment struct { buf []byte - // If flushing is not nil, then a) buf is being shared by a - // pruneMemSegments goroutine, and must be copied on write; - // and b) the flushing channel will close when the goroutine - // finishes, whether it succeeds or not. + // If flushing is not nil and not ready/closed, then a) buf is + // being shared by a pruneMemSegments goroutine, and must be + // copied on write; and b) the flushing channel will close + // when the goroutine finishes, whether it succeeds or not. flushing <-chan struct{} } +func (me *memSegment) flushingUnfinished() bool { + if me.flushing == nil { + return false + } + select { + case <-me.flushing: + me.flushing = nil + return false + default: + return true + } +} + func (me *memSegment) Len() int { return len(me.buf) }