"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
)
// FileSystem returns a CollectionFileSystem for the collection.
func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
- var modTime time.Time
- if c.ModifiedAt == nil {
+ modTime := c.ModifiedAt
+ if modTime.IsZero() {
modTime = time.Now()
- } else {
- modTime = *c.ModifiedAt
}
fs := &collectionFileSystem{
uuid: c.UUID,
// A new seg.buf has been allocated.
return
}
- seg.flushing = nil
if err != nil {
// TODO: stall (or return errors from)
// subsequent writes until flushing
offsets := make([]int, 0, len(refs)) // location of segment's data within block
for _, ref := range refs {
seg := ref.fn.segments[ref.idx].(*memSegment)
- if seg.flushing != nil && !sync {
+ if !sync && seg.flushingUnfinished() {
// Let the other flushing goroutine finish. If
// it fails, we'll try again next time.
+ close(done)
return nil
} else {
// In sync mode, we proceed regardless of
}
segs = append(segs, seg)
}
+ blocksize := len(block)
dn.fs.throttle().Acquire()
errs := make(chan error, 1)
go func() {
defer close(done)
defer close(errs)
- locked := map[*filenode]bool{}
locator, _, err := dn.fs.PutB(block)
dn.fs.throttle().Release()
- {
- if !sync {
- for _, name := range dn.sortedNames() {
- if fn, ok := dn.inodes[name].(*filenode); ok {
- fn.Lock()
- defer fn.Unlock()
- locked[fn] = true
- }
- }
- }
- defer func() {
- for _, seg := range segs {
- if seg.flushing == done {
- seg.flushing = nil
- }
- }
- }()
- }
if err != nil {
errs <- err
return
}
for idx, ref := range refs {
if !sync {
+ ref.fn.Lock()
// In async mode, fn's lock was
// released while we were waiting for
// PutB(); lots of things might have
// file segments have
// rearranged or changed in
// some way
+ ref.fn.Unlock()
continue
} else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
// segment has been replaced
+ ref.fn.Unlock()
continue
} else if seg.flushing != done {
// seg.buf has been replaced
- continue
- } else if !locked[ref.fn] {
- // file was renamed, moved, or
- // deleted since we called
- // PutB
+ ref.fn.Unlock()
continue
}
}
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
locator: locator,
- size: len(block),
+ size: blocksize,
offset: offsets[idx],
length: len(data),
}
- ref.fn.memsize -= int64(len(data))
+ // atomic is needed here despite caller having
+ // lock: caller might be running concurrent
+ // commitBlock() goroutines using the same
+ // lock, writing different segments from the
+ // same file.
+ atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
+ if !sync {
+ ref.fn.Unlock()
+ }
}
}()
if sync {
type memSegment struct {
buf []byte
- // If flushing is not nil, then a) buf is being shared by a
- // pruneMemSegments goroutine, and must be copied on write;
- // and b) the flushing channel will close when the goroutine
- // finishes, whether it succeeds or not.
+ // If flushing is not nil and not ready/closed, then a) buf is
+ // being shared by a pruneMemSegments goroutine, and must be
+ // copied on write; and b) the flushing channel will close
+ // when the goroutine finishes, whether it succeeds or not.
flushing <-chan struct{}
}
+func (me *memSegment) flushingUnfinished() bool {
+ if me.flushing == nil {
+ return false
+ }
+ select {
+ case <-me.flushing:
+ me.flushing = nil
+ return false
+ default:
+ return true
+ }
+}
+
func (me *memSegment) Len() int {
return len(me.buf)
}