X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0daa251cdddbef3db6a69b388170fdb2901964c6..8b43f32b2c11d45f951bf4ff1bffab03d391ff41:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index 00eee7405d..b3e6aa96e4 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -170,6 +170,11 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error { } names = filenames } + for _, name := range names { + child := dn.inodes[name] + child.Lock() + defer child.Unlock() + } return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks}) } @@ -644,6 +649,9 @@ type fnSegmentRef struct { // storedSegments that reference the relevant portions of the new // block. // +// bufsize is the total data size in refs. It is used to preallocate +// the correct amount of memory when len(refs)>1. +// // If sync is false, commitBlock returns right away, after starting a // goroutine to do the writes, reacquire the filenodes' locks, and // swap out the *memSegments. Some filenodes' segments might get @@ -651,12 +659,15 @@ type fnSegmentRef struct { // won't replace them. // // Caller must have write lock. -func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error { +func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error { + if len(refs) == 0 { + return nil + } if err := ctx.Err(); err != nil { return err } done := make(chan struct{}) - block := make([]byte, 0, maxBlockSize) + var block []byte segs := make([]*memSegment, 0, len(refs)) offsets := make([]int, 0, len(refs)) // location of segment's data within block for _, ref := range refs { @@ -673,7 +684,13 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bo } seg.flushing = done offsets = append(offsets, len(block)) - block = append(block, seg.buf...) + if len(refs) == 1 { + block = seg.buf + } else if block == nil { + block = append(make([]byte, 0, bufsize), seg.buf...) + } else { + block = append(block, seg.buf...) + } segs = append(segs, seg) } dn.fs.throttle().Acquire() @@ -764,12 +781,9 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er cg := newContextGroup(ctx) defer cg.Cancel() - goCommit := func(refs []fnSegmentRef) { - if len(refs) == 0 { - return - } + goCommit := func(refs []fnSegmentRef, bufsize int) { cg.Go(func() error { - return dn.commitBlock(cg.Context(), refs, opts.sync) + return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync) }) } @@ -803,11 +817,11 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er node.segments[idx] = seg case *memSegment: if seg.Len() > maxBlockSize/2 { - goCommit([]fnSegmentRef{{node, idx}}) + goCommit([]fnSegmentRef{{node, idx}}, seg.Len()) continue } if pendingLen+seg.Len() > maxBlockSize { - goCommit(pending) + goCommit(pending, pendingLen) pending = nil pendingLen = 0 } @@ -820,7 +834,7 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er } } if opts.shortBlocks { - goCommit(pending) + goCommit(pending, pendingLen) } return cg.Wait() }