X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a6cba47c47dcd6a7f0bce99de9bbed6a87ef3102..ae562784e8d8d8bd501c0bd373739d0a2da8fc9f:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index 00f7ee73e3..b3e6aa96e4 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -649,6 +649,9 @@ type fnSegmentRef struct { // storedSegments that reference the relevant portions of the new // block. // +// bufsize is the total data size in refs. It is used to preallocate +// the correct amount of memory when len(refs)>1. +// // If sync is false, commitBlock returns right away, after starting a // goroutine to do the writes, reacquire the filenodes' locks, and // swap out the *memSegments. Some filenodes' segments might get @@ -656,12 +659,15 @@ type fnSegmentRef struct { // won't replace them. // // Caller must have write lock. -func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error { +func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error { + if len(refs) == 0 { + return nil + } if err := ctx.Err(); err != nil { return err } done := make(chan struct{}) - block := make([]byte, 0, maxBlockSize) + var block []byte segs := make([]*memSegment, 0, len(refs)) offsets := make([]int, 0, len(refs)) // location of segment's data within block for _, ref := range refs { @@ -678,7 +684,13 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bo } seg.flushing = done offsets = append(offsets, len(block)) - block = append(block, seg.buf...) + if len(refs) == 1 { + block = seg.buf + } else if block == nil { + block = append(make([]byte, 0, bufsize), seg.buf...) + } else { + block = append(block, seg.buf...) + } segs = append(segs, seg) } dn.fs.throttle().Acquire() @@ -769,12 +781,9 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er cg := newContextGroup(ctx) defer cg.Cancel() - goCommit := func(refs []fnSegmentRef) { - if len(refs) == 0 { - return - } + goCommit := func(refs []fnSegmentRef, bufsize int) { cg.Go(func() error { - return dn.commitBlock(cg.Context(), refs, opts.sync) + return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync) }) } @@ -808,11 +817,11 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er node.segments[idx] = seg case *memSegment: if seg.Len() > maxBlockSize/2 { - goCommit([]fnSegmentRef{{node, idx}}) + goCommit([]fnSegmentRef{{node, idx}}, seg.Len()) continue } if pendingLen+seg.Len() > maxBlockSize { - goCommit(pending) + goCommit(pending, pendingLen) pending = nil pendingLen = 0 } @@ -825,7 +834,7 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er } } if opts.shortBlocks { - goCommit(pending) + goCommit(pending, pendingLen) } return cg.Wait() }