// A new seg.buf has been allocated.
return
}
- seg.flushing = nil
if err != nil {
// TODO: stall (or return errors from)
// subsequent writes until flushing
offsets := make([]int, 0, len(refs)) // location of segment's data within block
for _, ref := range refs {
seg := ref.fn.segments[ref.idx].(*memSegment)
- if seg.flushing != nil && !sync {
+ if !sync && seg.flushingUnfinished() {
// Let the other flushing goroutine finish. If
// it fails, we'll try again next time.
+ close(done)
return nil
} else {
// In sync mode, we proceed regardless of
go func() {
defer close(done)
defer close(errs)
- locked := map[*filenode]bool{}
locator, _, err := dn.fs.PutB(block)
dn.fs.throttle().Release()
- {
- if !sync {
- dn.Lock()
- defer dn.Unlock()
- for _, name := range dn.sortedNames() {
- if fn, ok := dn.inodes[name].(*filenode); ok {
- fn.Lock()
- defer fn.Unlock()
- locked[fn] = true
- }
- }
- }
- defer func() {
- for _, seg := range segs {
- if seg.flushing == done {
- seg.flushing = nil
- }
- }
- }()
- }
if err != nil {
errs <- err
return
}
for idx, ref := range refs {
if !sync {
+ ref.fn.Lock()
// In async mode, fn's lock was
// released while we were waiting for
// PutB(); lots of things might have
// file segments have
// rearranged or changed in
// some way
+ ref.fn.Unlock()
continue
} else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
// segment has been replaced
+ ref.fn.Unlock()
continue
} else if seg.flushing != done {
// seg.buf has been replaced
- continue
- } else if !locked[ref.fn] {
- // file was renamed, moved, or
- // deleted since we called
- // PutB
+ ref.fn.Unlock()
continue
}
}
// lock, writing different segments from the
// same file.
atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
+ if !sync {
+ ref.fn.Unlock()
+ }
}
}()
if sync {
type memSegment struct {
buf []byte
- // If flushing is not nil, then a) buf is being shared by a
- // pruneMemSegments goroutine, and must be copied on write;
- // and b) the flushing channel will close when the goroutine
- // finishes, whether it succeeds or not.
+ // If flushing is not nil and not ready/closed, then a) buf is
+ // being shared by a pruneMemSegments goroutine, and must be
+ // copied on write; and b) the flushing channel will close
+ // when the goroutine finishes, whether it succeeds or not.
flushing <-chan struct{}
}
+func (me *memSegment) flushingUnfinished() bool {
+ if me.flushing == nil {
+ return false
+ }
+ select {
+ case <-me.flushing:
+ me.flushing = nil
+ return false
+ default:
+ return true
+ }
+}
+
func (me *memSegment) Len() int {
return len(me.buf)
}