var maxBlockSize = 1 << 26
+var concurrentWriters = 4
+
// A CollectionFileSystem is a FileSystem that can be serialized as a
// manifest and stored as a collection.
type CollectionFileSystem interface {
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
+ return fs.fileSystem.root.(*dirnode).marshalManifest(prefix, newThrottle(concurrentWriters))
}
func (fs *collectionFileSystem) Size() int64 {
// children with the given names, which must be children of dn) to
// local persistent storage. Caller must have write lock on dn and the
// named children.
-func (dn *dirnode) sync(names []string) error {
+func (dn *dirnode) sync(names []string, throttle *throttle) error {
type shortBlock struct {
fn *filenode
idx int
var pending []shortBlock
var pendingLen int
- flush := func(sbs []shortBlock) error {
+ var wg sync.WaitGroup
+ errors := make(chan error, 1)
+ flush := func(sbs []shortBlock) {
+ defer wg.Done()
if len(sbs) == 0 {
- return nil
+ return
}
+ throttle.Acquire()
+ defer throttle.Release()
block := make([]byte, 0, maxBlockSize)
for _, sb := range sbs {
block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
}
locator, _, err := dn.fs.PutB(block)
if err != nil {
- return err
+ select {
+ case errors <- err:
+ default:
+ }
}
off := 0
for _, sb := range sbs {
off += len(data)
sb.fn.memsize -= int64(len(data))
}
- return nil
}
localLocator := map[string]string{}
fn.segments[idx] = seg
case *memSegment:
if seg.Len() > maxBlockSize/2 {
- if err := flush([]shortBlock{{fn, idx}}); err != nil {
- return err
- }
+ wg.Add(1)
+ go flush([]shortBlock{{fn, idx}})
continue
}
if pendingLen+seg.Len() > maxBlockSize {
- if err := flush(pending); err != nil {
- return err
- }
+ wg.Add(1)
+ go flush(pending)
pending = nil
pendingLen = 0
}
}
}
}
- return flush(pending)
+ wg.Add(1)
+ flush(pending)
+ wg.Wait()
+ close(errors)
+ return <-errors
}
// caller must have write lock.
-func (dn *dirnode) marshalManifest(prefix string) (string, error) {
+func (dn *dirnode) marshalManifest(prefix string, throttle *throttle) (string, error) {
var streamLen int64
type filepart struct {
name string
node.Lock()
defer node.Unlock()
}
- if err := dn.sync(names); err != nil {
+ if err := dn.sync(names, throttle); err != nil {
return "", err
}
for _, name := range names {
switch node := dn.inodes[name].(type) {
case *dirnode:
- subdir, err := node.marshalManifest(prefix + "/" + name)
+ subdir, err := node.marshalManifest(prefix+"/"+name, throttle)
if err != nil {
return "", err
}