"time"
)
-var maxBlockSize = 1 << 26
-
-var concurrentWriters = 4
+var (
+ maxBlockSize = 1 << 26
+ concurrentWriters = 4 // max goroutines writing to Keep during sync()
+ writeAheadBlocks = 1 // max background jobs flushing to Keep before blocking writes
+)
// A CollectionFileSystem is a FileSystem that can be serialized as a
// manifest and stored as a collection.
// TODO: pack/flush small blocks too, when fragmented
if fn.throttle == nil {
// TODO: share a throttle with filesystem
- fn.throttle = newThrottle(concurrentWriters)
+ fn.throttle = newThrottle(writeAheadBlocks)
}
for idx, seg := range fn.segments {
seg, ok := seg.(*memSegment)
return dn.treenode.Child(name, replace)
}
+type fnSegmentRef struct {
+ fn *filenode
+ idx int
+}
+
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+ if len(refs) == 0 {
+ return nil
+ }
+ throttle.Acquire()
+ defer throttle.Release()
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ block := make([]byte, 0, maxBlockSize)
+ for _, ref := range refs {
+ block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
+ }
+ locator, _, err := dn.fs.PutB(block)
+ if err != nil {
+ return err
+ }
+ off := 0
+ for _, ref := range refs {
+ data := ref.fn.segments[ref.idx].(*memSegment).buf
+ ref.fn.segments[ref.idx] = storedSegment{
+ kc: dn.fs,
+ locator: locator,
+ size: len(block),
+ offset: off,
+ length: len(data),
+ }
+ off += len(data)
+ ref.fn.memsize -= int64(len(data))
+ }
+ return nil
+}
+
// sync flushes in-memory data and remote block references (for the
// children with the given names, which must be children of dn) to
// local persistent storage. Caller must have write lock on dn and the
// named children.
-func (dn *dirnode) sync(ctx context.Context, names []string, throttle *throttle) error {
+func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
cg := newContextGroup(ctx)
defer cg.Cancel()
- type shortBlock struct {
- fn *filenode
- idx int
- }
-
- flush := func(sbs []shortBlock) error {
- if len(sbs) == 0 {
- return nil
- }
- throttle.Acquire()
- defer throttle.Release()
- if err := cg.Context().Err(); err != nil {
- return err
- }
- block := make([]byte, 0, maxBlockSize)
- for _, sb := range sbs {
- block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
- }
- locator, _, err := dn.fs.PutB(block)
- if err != nil {
- return err
- }
- off := 0
- for _, sb := range sbs {
- data := sb.fn.segments[sb.idx].(*memSegment).buf
- sb.fn.segments[sb.idx] = storedSegment{
- kc: dn.fs,
- locator: locator,
- size: len(block),
- offset: off,
- length: len(data),
- }
- off += len(data)
- sb.fn.memsize -= int64(len(data))
- }
- return nil
- }
-
- goFlush := func(sbs []shortBlock) {
+ goCommit := func(refs []fnSegmentRef) {
cg.Go(func() error {
- return flush(sbs)
+ return dn.commitBlock(cg.Context(), throttle, refs)
})
}
- var pending []shortBlock
- var pendingLen int
+ var pending []fnSegmentRef
+ var pendingLen int = 0
localLocator := map[string]string{}
for _, name := range names {
fn, ok := dn.inodes[name].(*filenode)
fn.segments[idx] = seg
case *memSegment:
if seg.Len() > maxBlockSize/2 {
- goFlush([]shortBlock{{fn, idx}})
+ goCommit([]fnSegmentRef{{fn, idx}})
continue
}
if pendingLen+seg.Len() > maxBlockSize {
- goFlush(pending)
+ goCommit(pending)
pending = nil
pendingLen = 0
}
- pending = append(pending, shortBlock{fn, idx})
+ pending = append(pending, fnSegmentRef{fn, idx})
pendingLen += seg.Len()
default:
panic(fmt.Sprintf("can't sync segment type %T", seg))
}
}
}
- goFlush(pending)
+ goCommit(pending)
return cg.Wait()
}
var fileparts []filepart
var blocks []string
- if err := dn.sync(cg.Context(), names, throttle); err != nil {
+ if err := dn.sync(cg.Context(), throttle, names); err != nil {
return err
}
for _, name := range filenames {