"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
)
// FileSystem returns a CollectionFileSystem for the collection.
func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
- var modTime time.Time
- if c.ModifiedAt == nil {
+ modTime := c.ModifiedAt
+ if modTime.IsZero() {
modTime = time.Now()
- } else {
- modTime = *c.ModifiedAt
}
fs := &collectionFileSystem{
uuid: c.UUID,
}
segs = append(segs, seg)
}
+ blocksize := len(block)
dn.fs.throttle().Acquire()
errs := make(chan error, 1)
go func() {
defer close(done)
defer close(errs)
- locked := map[*filenode]bool{}
locator, _, err := dn.fs.PutB(block)
dn.fs.throttle().Release()
{
- if !sync {
- for _, name := range dn.sortedNames() {
- if fn, ok := dn.inodes[name].(*filenode); ok {
- fn.Lock()
- defer fn.Unlock()
- locked[fn] = true
- }
- }
- }
defer func() {
for _, seg := range segs {
if seg.flushing == done {
}
for idx, ref := range refs {
if !sync {
+ ref.fn.Lock()
// In async mode, fn's lock was
// released while we were waiting for
// PutB(); lots of things might have
// file segments have
// rearranged or changed in
// some way
+ ref.fn.Unlock()
continue
} else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
// segment has been replaced
+ ref.fn.Unlock()
continue
} else if seg.flushing != done {
// seg.buf has been replaced
- continue
- } else if !locked[ref.fn] {
- // file was renamed, moved, or
- // deleted since we called
- // PutB
+ ref.fn.Unlock()
continue
}
}
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
locator: locator,
- size: len(block),
+ size: blocksize,
offset: offsets[idx],
length: len(data),
}
- ref.fn.memsize -= int64(len(data))
+ // atomic is needed here despite caller having
+ // lock: caller might be running concurrent
+ // commitBlock() goroutines using the same
+ // lock, writing different segments from the
+ // same file.
+ atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
+ if !sync {
+ ref.fn.Unlock()
+ }
}
}()
if sync {