X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/15917b2b3902994ca4f6b004577b1130ba0fdaf0..8b43f32b2c11d45f951bf4ff1bffab03d391ff41:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index 543d89385b..b3e6aa96e4 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -21,8 +21,7 @@ import ( var ( maxBlockSize = 1 << 26 - concurrentWriters = 4 // max goroutines writing to Keep during flush() - writeAheadBlocks = 1 // max background jobs flushing to Keep before blocking writes + concurrentWriters = 4 // max goroutines writing to Keep in background and during flush() ) // A CollectionFileSystem is a FileSystem that can be serialized as a @@ -60,6 +59,7 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile uuid: c.UUID, fileSystem: fileSystem{ fsBackend: keepBackend{apiClient: client, keepClient: kc}, + thr: newThrottle(concurrentWriters), }, } root := &dirnode{ @@ -146,11 +146,36 @@ func (fs *collectionFileSystem) Sync() error { return nil } -func (fs *collectionFileSystem) Flush(shortBlocks bool) error { - fs.fileSystem.root.Lock() - defer fs.fileSystem.root.Unlock() - dn := fs.fileSystem.root.(*dirnode) - return dn.flush(context.TODO(), newThrottle(concurrentWriters), dn.sortedNames(), flushOpts{sync: false, shortBlocks: shortBlocks}) +func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error { + node, err := rlookup(fs.fileSystem.root, path) + if err != nil { + return err + } + dn, ok := node.(*dirnode) + if !ok { + return ErrNotADirectory + } + dn.Lock() + defer dn.Unlock() + names := dn.sortedNames() + if path != "" { + // Caller only wants to flush the specified dir, + // non-recursively. Drop subdirs from the list of + // names. + var filenames []string + for _, name := range names { + if _, ok := dn.inodes[name].(*filenode); ok { + filenames = append(filenames, name) + } + } + names = filenames + } + for _, name := range names { + child := dn.inodes[name] + child.Lock() + defer child.Unlock() + } + return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks}) } func (fs *collectionFileSystem) memorySize() int64 { @@ -162,7 +187,7 @@ func (fs *collectionFileSystem) memorySize() int64 { func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) { fs.fileSystem.root.Lock() defer fs.fileSystem.root.Unlock() - return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters)) + return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix) } func (fs *collectionFileSystem) Size() int64 { @@ -254,7 +279,6 @@ type filenode struct { memsize int64 // bytes in memSegments sync.RWMutex nullnode - throttle *throttle } // caller must have lock @@ -519,10 +543,6 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt func (fn *filenode) pruneMemSegments() { // TODO: share code with (*dirnode)flush() // TODO: pack/flush small blocks too, when fragmented - if fn.throttle == nil { - // TODO: share a throttle with filesystem - fn.throttle = newThrottle(writeAheadBlocks) - } for idx, seg := range fn.segments { seg, ok := seg.(*memSegment) if !ok || seg.Len() < maxBlockSize || seg.flushing != nil { @@ -538,11 +558,11 @@ func (fn *filenode) pruneMemSegments() { // progress, block here until one finishes, rather // than pile up an unlimited number of buffered writes // and network flush operations. - fn.throttle.Acquire() + fn.fs.throttle().Acquire() go func() { defer close(done) locator, _, err := fn.FS().PutB(buf) - fn.throttle.Release() + fn.fs.throttle().Release() fn.Lock() defer fn.Unlock() if seg.flushing != done { @@ -629,6 +649,9 @@ type fnSegmentRef struct { // storedSegments that reference the relevant portions of the new // block. // +// bufsize is the total data size in refs. It is used to preallocate +// the correct amount of memory when len(refs)>1. +// // If sync is false, commitBlock returns right away, after starting a // goroutine to do the writes, reacquire the filenodes' locks, and // swap out the *memSegments. Some filenodes' segments might get @@ -636,12 +659,15 @@ type fnSegmentRef struct { // won't replace them. // // Caller must have write lock. -func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error { +func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error { + if len(refs) == 0 { + return nil + } if err := ctx.Err(); err != nil { return err } done := make(chan struct{}) - block := make([]byte, 0, maxBlockSize) + var block []byte segs := make([]*memSegment, 0, len(refs)) offsets := make([]int, 0, len(refs)) // location of segment's data within block for _, ref := range refs { @@ -658,15 +684,23 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bo } seg.flushing = done offsets = append(offsets, len(block)) - block = append(block, seg.buf...) + if len(refs) == 1 { + block = seg.buf + } else if block == nil { + block = append(make([]byte, 0, bufsize), seg.buf...) + } else { + block = append(block, seg.buf...) + } segs = append(segs, seg) } + dn.fs.throttle().Acquire() errs := make(chan error, 1) go func() { defer close(done) defer close(errs) locked := map[*filenode]bool{} locator, _, err := dn.fs.PutB(block) + dn.fs.throttle().Release() { if !sync { for _, name := range dn.sortedNames() { @@ -743,18 +777,13 @@ type flushOpts struct { // Caller must have write lock on dn and the named children. // // If any children are dirs, they will be flushed recursively. -func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string, opts flushOpts) error { +func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error { cg := newContextGroup(ctx) defer cg.Cancel() - goCommit := func(refs []fnSegmentRef) { - if len(refs) == 0 { - return - } + goCommit := func(refs []fnSegmentRef, bufsize int) { cg.Go(func() error { - throttle.Acquire() - defer throttle.Release() - return dn.commitBlock(cg.Context(), refs, opts.sync) + return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync) }) } @@ -770,7 +799,7 @@ func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string grandchild.Lock() defer grandchild.Unlock() } - cg.Go(func() error { return node.flush(cg.Context(), throttle, grandchildNames, opts) }) + cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) }) case *filenode: for idx, seg := range node.segments { switch seg := seg.(type) { @@ -788,11 +817,11 @@ func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string node.segments[idx] = seg case *memSegment: if seg.Len() > maxBlockSize/2 { - goCommit([]fnSegmentRef{{node, idx}}) + goCommit([]fnSegmentRef{{node, idx}}, seg.Len()) continue } if pendingLen+seg.Len() > maxBlockSize { - goCommit(pending) + goCommit(pending, pendingLen) pending = nil pendingLen = 0 } @@ -805,7 +834,7 @@ func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string } } if opts.shortBlocks { - goCommit(pending) + goCommit(pending, pendingLen) } return cg.Wait() } @@ -842,7 +871,7 @@ func (dn *dirnode) sortedNames() []string { } // caller must have write lock. -func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) { +func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) { cg := newContextGroup(ctx) defer cg.Cancel() @@ -889,7 +918,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle for i, name := range dirnames { i, name := i, name cg.Go(func() error { - txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle) + txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name) subdirs[i] = txt return err }) @@ -905,7 +934,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle var fileparts []filepart var blocks []string - if err := dn.flush(cg.Context(), throttle, filenames, flushOpts{sync: true, shortBlocks: true}); err != nil { + if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil { return err } for _, name := range filenames {