"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
)
// Total data bytes in all files.
Size() int64
-
- // Memory consumed by buffered file data.
- memorySize() int64
}
type collectionFileSystem struct {
// FileSystem returns a CollectionFileSystem for the collection.
func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
- var modTime time.Time
- if c.ModifiedAt == nil {
+ modTime := c.ModifiedAt
+ if modTime.IsZero() {
modTime = time.Now()
- } else {
- modTime = *c.ModifiedAt
}
fs := &collectionFileSystem{
uuid: c.UUID,
inodes: make(map[string]inode),
},
}, nil
- } else {
- return &filenode{
- fs: fs,
- fileinfo: fileinfo{
- name: name,
- mode: perm & ^os.ModeDir,
- modTime: modTime,
- },
- }, nil
}
+ return &filenode{
+ fs: fs,
+ fileinfo: fileinfo{
+ name: name,
+ mode: perm & ^os.ModeDir,
+ modTime: modTime,
+ },
+ }, nil
+}
+
+func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) {
+ return fs.rootnode().Child(name, replace)
+}
+
+func (fs *collectionFileSystem) FS() FileSystem {
+ return fs
+}
+
+func (fs *collectionFileSystem) FileInfo() os.FileInfo {
+ return fs.rootnode().FileInfo()
+}
+
+func (fs *collectionFileSystem) IsDir() bool {
+ return true
+}
+
+func (fs *collectionFileSystem) Lock() {
+ fs.rootnode().Lock()
+}
+
+func (fs *collectionFileSystem) Unlock() {
+ fs.rootnode().Unlock()
+}
+
+func (fs *collectionFileSystem) RLock() {
+ fs.rootnode().RLock()
+}
+
+func (fs *collectionFileSystem) RUnlock() {
+ fs.rootnode().RUnlock()
+}
+
+func (fs *collectionFileSystem) Parent() inode {
+ return fs.rootnode().Parent()
+}
+
+func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) {
+ return fs.rootnode().Readdir()
+}
+
+func (fs *collectionFileSystem) SetParent(parent inode, name string) {
+ fs.rootnode().SetParent(parent, name)
+}
+
+func (fs *collectionFileSystem) Truncate(int64) error {
+ return ErrInvalidOperation
}
func (fs *collectionFileSystem) Sync() error {
return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
}
-func (fs *collectionFileSystem) memorySize() int64 {
+func (fs *collectionFileSystem) MemorySize() int64 {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).memorySize()
+ return fs.fileSystem.root.(*dirnode).MemorySize()
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
seg.Truncate(len(cando))
fn.memsize += int64(len(cando))
fn.segments[cur] = seg
- cur++
- prev++
}
}
// A new seg.buf has been allocated.
return
}
- seg.flushing = nil
if err != nil {
// TODO: stall (or return errors from)
// subsequent writes until flushing
// storedSegments that reference the relevant portions of the new
// block.
//
+// bufsize is the total data size in refs. It is used to preallocate
+// the correct amount of memory when len(refs)>1.
+//
// If sync is false, commitBlock returns right away, after starting a
// goroutine to do the writes, reacquire the filenodes' locks, and
// swap out the *memSegments. Some filenodes' segments might get
// won't replace them.
//
// Caller must have write lock.
-func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error {
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
+ if len(refs) == 0 {
+ return nil
+ }
if err := ctx.Err(); err != nil {
return err
}
done := make(chan struct{})
- block := make([]byte, 0, maxBlockSize)
+ var block []byte
segs := make([]*memSegment, 0, len(refs))
offsets := make([]int, 0, len(refs)) // location of segment's data within block
for _, ref := range refs {
seg := ref.fn.segments[ref.idx].(*memSegment)
- if seg.flushing != nil && !sync {
+ if !sync && seg.flushingUnfinished() {
// Let the other flushing goroutine finish. If
// it fails, we'll try again next time.
+ close(done)
return nil
- } else {
- // In sync mode, we proceed regardless of
- // whether another flush is in progress: It
- // can't finish before we do, because we hold
- // fn's lock until we finish our own writes.
}
+ // In sync mode, we proceed regardless of
+ // whether another flush is in progress: It
+ // can't finish before we do, because we hold
+ // fn's lock until we finish our own writes.
seg.flushing = done
offsets = append(offsets, len(block))
- block = append(block, seg.buf...)
+ if len(refs) == 1 {
+ block = seg.buf
+ } else if block == nil {
+ block = append(make([]byte, 0, bufsize), seg.buf...)
+ } else {
+ block = append(block, seg.buf...)
+ }
segs = append(segs, seg)
}
+ blocksize := len(block)
dn.fs.throttle().Acquire()
errs := make(chan error, 1)
go func() {
defer close(done)
defer close(errs)
- locked := map[*filenode]bool{}
locator, _, err := dn.fs.PutB(block)
dn.fs.throttle().Release()
- {
- if !sync {
- for _, name := range dn.sortedNames() {
- if fn, ok := dn.inodes[name].(*filenode); ok {
- fn.Lock()
- defer fn.Unlock()
- locked[fn] = true
- }
- }
- }
- defer func() {
- for _, seg := range segs {
- if seg.flushing == done {
- seg.flushing = nil
- }
- }
- }()
- }
if err != nil {
errs <- err
return
}
for idx, ref := range refs {
if !sync {
+ ref.fn.Lock()
// In async mode, fn's lock was
// released while we were waiting for
// PutB(); lots of things might have
// file segments have
// rearranged or changed in
// some way
+ ref.fn.Unlock()
continue
} else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
// segment has been replaced
+ ref.fn.Unlock()
continue
} else if seg.flushing != done {
// seg.buf has been replaced
- continue
- } else if !locked[ref.fn] {
- // file was renamed, moved, or
- // deleted since we called
- // PutB
+ ref.fn.Unlock()
continue
}
}
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
locator: locator,
- size: len(block),
+ size: blocksize,
offset: offsets[idx],
length: len(data),
}
- ref.fn.memsize -= int64(len(data))
+ // atomic is needed here despite caller having
+ // lock: caller might be running concurrent
+ // commitBlock() goroutines using the same
+ // lock, writing different segments from the
+ // same file.
+ atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
+ if !sync {
+ ref.fn.Unlock()
+ }
}
}()
if sync {
return <-errs
- } else {
- return nil
}
+ return nil
}
type flushOpts struct {
cg := newContextGroup(ctx)
defer cg.Cancel()
- goCommit := func(refs []fnSegmentRef) {
- if len(refs) == 0 {
- return
- }
+ goCommit := func(refs []fnSegmentRef, bufsize int) {
cg.Go(func() error {
- return dn.commitBlock(cg.Context(), refs, opts.sync)
+ return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
})
}
node.segments[idx] = seg
case *memSegment:
if seg.Len() > maxBlockSize/2 {
- goCommit([]fnSegmentRef{{node, idx}})
+ goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
continue
}
if pendingLen+seg.Len() > maxBlockSize {
- goCommit(pending)
+ goCommit(pending, pendingLen)
pending = nil
pendingLen = 0
}
}
}
if opts.shortBlocks {
- goCommit(pending)
+ goCommit(pending, pendingLen)
}
return cg.Wait()
}
// caller must have write lock.
-func (dn *dirnode) memorySize() (size int64) {
+func (dn *dirnode) MemorySize() (size int64) {
for _, name := range dn.sortedNames() {
node := dn.inodes[name]
node.Lock()
defer node.Unlock()
switch node := node.(type) {
case *dirnode:
- size += node.memorySize()
+ size += node.MemorySize()
case *filenode:
for _, seg := range node.segments {
switch seg := seg.(type) {
// situation might be rare anyway)
segIdx, pos = 0, 0
}
- for next := int64(0); segIdx < len(segments); segIdx++ {
+ for ; segIdx < len(segments); segIdx++ {
seg := segments[segIdx]
- next = pos + int64(seg.Len())
+ next := pos + int64(seg.Len())
if next <= offset || seg.Len() == 0 {
pos = next
continue
type memSegment struct {
buf []byte
- // If flushing is not nil, then a) buf is being shared by a
- // pruneMemSegments goroutine, and must be copied on write;
- // and b) the flushing channel will close when the goroutine
- // finishes, whether it succeeds or not.
+ // If flushing is not nil and not ready/closed, then a) buf is
+ // being shared by a pruneMemSegments goroutine, and must be
+ // copied on write; and b) the flushing channel will close
+ // when the goroutine finishes, whether it succeeds or not.
flushing <-chan struct{}
}
+func (me *memSegment) flushingUnfinished() bool {
+ if me.flushing == nil {
+ return false
+ }
+ select {
+ case <-me.flushing:
+ me.flushing = nil
+ return false
+ default:
+ return true
+ }
+}
+
func (me *memSegment) Len() int {
return len(me.buf)
}