X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/97e783760423f6f0200b057629aa9cd593a5c9c7..3b4bb3d393adc3bd3ddfb4442a65087275a5c5c3:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index 386408a109..37bd494914 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" ) @@ -49,11 +50,9 @@ type collectionFileSystem struct { // FileSystem returns a CollectionFileSystem for the collection. func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) { - var modTime time.Time - if c.ModifiedAt == nil { + modTime := c.ModifiedAt + if modTime.IsZero() { modTime = time.Now() - } else { - modTime = *c.ModifiedAt } fs := &collectionFileSystem{ uuid: c.UUID, @@ -170,6 +169,11 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error { } names = filenames } + for _, name := range names { + child := dn.inodes[name] + child.Lock() + defer child.Unlock() + } return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks}) } @@ -564,7 +568,6 @@ func (fn *filenode) pruneMemSegments() { // A new seg.buf has been allocated. return } - seg.flushing = nil if err != nil { // TODO: stall (or return errors from) // subsequent writes until flushing @@ -644,6 +647,9 @@ type fnSegmentRef struct { // storedSegments that reference the relevant portions of the new // block. // +// bufsize is the total data size in refs. It is used to preallocate +// the correct amount of memory when len(refs)>1. +// // If sync is false, commitBlock returns right away, after starting a // goroutine to do the writes, reacquire the filenodes' locks, and // swap out the *memSegments. Some filenodes' segments might get @@ -651,19 +657,23 @@ type fnSegmentRef struct { // won't replace them. // // Caller must have write lock. -func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error { +func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error { + if len(refs) == 0 { + return nil + } if err := ctx.Err(); err != nil { return err } done := make(chan struct{}) - block := make([]byte, 0, maxBlockSize) + var block []byte segs := make([]*memSegment, 0, len(refs)) offsets := make([]int, 0, len(refs)) // location of segment's data within block for _, ref := range refs { seg := ref.fn.segments[ref.idx].(*memSegment) - if seg.flushing != nil && !sync { + if !sync && seg.flushingUnfinished() { // Let the other flushing goroutine finish. If // it fails, we'll try again next time. + close(done) return nil } else { // In sync mode, we proceed regardless of @@ -673,39 +683,30 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bo } seg.flushing = done offsets = append(offsets, len(block)) - block = append(block, seg.buf...) + if len(refs) == 1 { + block = seg.buf + } else if block == nil { + block = append(make([]byte, 0, bufsize), seg.buf...) + } else { + block = append(block, seg.buf...) + } segs = append(segs, seg) } + blocksize := len(block) + dn.fs.throttle().Acquire() errs := make(chan error, 1) go func() { defer close(done) defer close(errs) - locked := map[*filenode]bool{} locator, _, err := dn.fs.PutB(block) - { - if !sync { - for _, name := range dn.sortedNames() { - if fn, ok := dn.inodes[name].(*filenode); ok { - fn.Lock() - defer fn.Unlock() - locked[fn] = true - } - } - } - defer func() { - for _, seg := range segs { - if seg.flushing == done { - seg.flushing = nil - } - } - }() - } + dn.fs.throttle().Release() if err != nil { errs <- err return } for idx, ref := range refs { if !sync { + ref.fn.Lock() // In async mode, fn's lock was // released while we were waiting for // PutB(); lots of things might have @@ -714,17 +715,15 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bo // file segments have // rearranged or changed in // some way + ref.fn.Unlock() continue } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] { // segment has been replaced + ref.fn.Unlock() continue } else if seg.flushing != done { // seg.buf has been replaced - continue - } else if !locked[ref.fn] { - // file was renamed, moved, or - // deleted since we called - // PutB + ref.fn.Unlock() continue } } @@ -732,11 +731,19 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bo ref.fn.segments[ref.idx] = storedSegment{ kc: dn.fs, locator: locator, - size: len(block), + size: blocksize, offset: offsets[idx], length: len(data), } - ref.fn.memsize -= int64(len(data)) + // atomic is needed here despite caller having + // lock: caller might be running concurrent + // commitBlock() goroutines using the same + // lock, writing different segments from the + // same file. + atomic.AddInt64(&ref.fn.memsize, -int64(len(data))) + if !sync { + ref.fn.Unlock() + } } }() if sync { @@ -762,14 +769,9 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er cg := newContextGroup(ctx) defer cg.Cancel() - goCommit := func(refs []fnSegmentRef) { - if len(refs) == 0 { - return - } + goCommit := func(refs []fnSegmentRef, bufsize int) { cg.Go(func() error { - dn.fs.throttle().Acquire() - defer dn.fs.throttle().Release() - return dn.commitBlock(cg.Context(), refs, opts.sync) + return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync) }) } @@ -803,11 +805,11 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er node.segments[idx] = seg case *memSegment: if seg.Len() > maxBlockSize/2 { - goCommit([]fnSegmentRef{{node, idx}}) + goCommit([]fnSegmentRef{{node, idx}}, seg.Len()) continue } if pendingLen+seg.Len() > maxBlockSize { - goCommit(pending) + goCommit(pending, pendingLen) pending = nil pendingLen = 0 } @@ -820,7 +822,7 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er } } if opts.shortBlocks { - goCommit(pending) + goCommit(pending, pendingLen) } return cg.Wait() } @@ -1187,13 +1189,26 @@ type segment interface { type memSegment struct { buf []byte - // If flushing is not nil, then a) buf is being shared by a - // pruneMemSegments goroutine, and must be copied on write; - // and b) the flushing channel will close when the goroutine - // finishes, whether it succeeds or not. + // If flushing is not nil and not ready/closed, then a) buf is + // being shared by a pruneMemSegments goroutine, and must be + // copied on write; and b) the flushing channel will close + // when the goroutine finishes, whether it succeeds or not. flushing <-chan struct{} } +func (me *memSegment) flushingUnfinished() bool { + if me.flushing == nil { + return false + } + select { + case <-me.flushing: + me.flushing = nil + return false + default: + return true + } +} + func (me *memSegment) Len() int { return len(me.buf) }