var (
maxBlockSize = 1 << 26
- concurrentWriters = 4 // max goroutines writing to Keep during flush()
- writeAheadBlocks = 1 // max background jobs flushing to Keep before blocking writes
+ concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
)
// A CollectionFileSystem is a FileSystem that can be serialized as a
uuid: c.UUID,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
+ thr: newThrottle(concurrentWriters),
},
}
root := &dirnode{
return nil
}
-func (fs *collectionFileSystem) Flush(shortBlocks bool) error {
- fs.fileSystem.root.Lock()
- defer fs.fileSystem.root.Unlock()
- dn := fs.fileSystem.root.(*dirnode)
- return dn.flush(context.TODO(), newThrottle(concurrentWriters), dn.sortedNames(), flushOpts{sync: false, shortBlocks: shortBlocks})
+func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
+ node, err := rlookup(fs.fileSystem.root, path)
+ if err != nil {
+ return err
+ }
+ dn, ok := node.(*dirnode)
+ if !ok {
+ return ErrNotADirectory
+ }
+ dn.Lock()
+ defer dn.Unlock()
+ names := dn.sortedNames()
+ if path != "" {
+ // Caller only wants to flush the specified dir,
+ // non-recursively. Drop subdirs from the list of
+ // names.
+ var filenames []string
+ for _, name := range names {
+ if _, ok := dn.inodes[name].(*filenode); ok {
+ filenames = append(filenames, name)
+ }
+ }
+ names = filenames
+ }
+ for _, name := range names {
+ child := dn.inodes[name]
+ child.Lock()
+ defer child.Unlock()
+ }
+ return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
}
func (fs *collectionFileSystem) memorySize() int64 {
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
+ return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
}
func (fs *collectionFileSystem) Size() int64 {
memsize int64 // bytes in memSegments
sync.RWMutex
nullnode
- throttle *throttle
}
// caller must have lock
func (fn *filenode) pruneMemSegments() {
// TODO: share code with (*dirnode)flush()
// TODO: pack/flush small blocks too, when fragmented
- if fn.throttle == nil {
- // TODO: share a throttle with filesystem
- fn.throttle = newThrottle(writeAheadBlocks)
- }
for idx, seg := range fn.segments {
seg, ok := seg.(*memSegment)
if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
// progress, block here until one finishes, rather
// than pile up an unlimited number of buffered writes
// and network flush operations.
- fn.throttle.Acquire()
+ fn.fs.throttle().Acquire()
go func() {
defer close(done)
locator, _, err := fn.FS().PutB(buf)
- fn.throttle.Release()
+ fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
if seg.flushing != done {
// storedSegments that reference the relevant portions of the new
// block.
//
+// bufsize is the total data size in refs. It is used to preallocate
+// the correct amount of memory when len(refs)>1.
+//
// If sync is false, commitBlock returns right away, after starting a
// goroutine to do the writes, reacquire the filenodes' locks, and
// swap out the *memSegments. Some filenodes' segments might get
// won't replace them.
//
// Caller must have write lock.
-func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error {
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
+ if len(refs) == 0 {
+ return nil
+ }
if err := ctx.Err(); err != nil {
return err
}
done := make(chan struct{})
- block := make([]byte, 0, maxBlockSize)
+ var block []byte
segs := make([]*memSegment, 0, len(refs))
offsets := make([]int, 0, len(refs)) // location of segment's data within block
for _, ref := range refs {
}
seg.flushing = done
offsets = append(offsets, len(block))
- block = append(block, seg.buf...)
+ if len(refs) == 1 {
+ block = seg.buf
+ } else if block == nil {
+ block = append(make([]byte, 0, bufsize), seg.buf...)
+ } else {
+ block = append(block, seg.buf...)
+ }
segs = append(segs, seg)
}
+ dn.fs.throttle().Acquire()
errs := make(chan error, 1)
go func() {
defer close(done)
defer close(errs)
locked := map[*filenode]bool{}
locator, _, err := dn.fs.PutB(block)
+ dn.fs.throttle().Release()
{
if !sync {
for _, name := range dn.sortedNames() {
// Caller must have write lock on dn and the named children.
//
// If any children are dirs, they will be flushed recursively.
-func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string, opts flushOpts) error {
+func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
cg := newContextGroup(ctx)
defer cg.Cancel()
- goCommit := func(refs []fnSegmentRef) {
- if len(refs) == 0 {
- return
- }
+ goCommit := func(refs []fnSegmentRef, bufsize int) {
cg.Go(func() error {
- throttle.Acquire()
- defer throttle.Release()
- return dn.commitBlock(cg.Context(), refs, opts.sync)
+ return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
})
}
for _, name := range names {
switch node := dn.inodes[name].(type) {
case *dirnode:
- names := node.sortedNames()
- for _, name := range names {
- child := node.inodes[name]
- child.Lock()
- defer child.Unlock()
+ grandchildNames := node.sortedNames()
+ for _, grandchildName := range grandchildNames {
+ grandchild := node.inodes[grandchildName]
+ grandchild.Lock()
+ defer grandchild.Unlock()
}
- cg.Go(func() error { return node.flush(cg.Context(), throttle, node.sortedNames(), opts) })
+ cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
case *filenode:
for idx, seg := range node.segments {
switch seg := seg.(type) {
node.segments[idx] = seg
case *memSegment:
if seg.Len() > maxBlockSize/2 {
- goCommit([]fnSegmentRef{{node, idx}})
+ goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
continue
}
if pendingLen+seg.Len() > maxBlockSize {
- goCommit(pending)
+ goCommit(pending, pendingLen)
pending = nil
pendingLen = 0
}
}
}
if opts.shortBlocks {
- goCommit(pending)
+ goCommit(pending, pendingLen)
}
return cg.Wait()
}
}
// caller must have write lock.
-func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
cg := newContextGroup(ctx)
defer cg.Cancel()
for i, name := range dirnames {
i, name := i, name
cg.Go(func() error {
- txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+ txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
subdirs[i] = txt
return err
})
var fileparts []filepart
var blocks []string
- if err := dn.flush(cg.Context(), throttle, filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
+ if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
return err
}
for _, name := range filenames {