X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/aa4b9b1d650f2174eb19bbf2ba9787f3ace59b04..63205afef86bf2f50b829a6f0c863ee122fe785b:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index b3e6aa96e4..d0e97f2ad1 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" ) @@ -49,11 +50,9 @@ type collectionFileSystem struct { // FileSystem returns a CollectionFileSystem for the collection. func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) { - var modTime time.Time - if c.ModifiedAt == nil { + modTime := c.ModifiedAt + if modTime.IsZero() { modTime = time.Now() - } else { - modTime = *c.ModifiedAt } fs := &collectionFileSystem{ uuid: c.UUID, @@ -693,24 +692,15 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize } segs = append(segs, seg) } + blocksize := len(block) dn.fs.throttle().Acquire() errs := make(chan error, 1) go func() { defer close(done) defer close(errs) - locked := map[*filenode]bool{} locator, _, err := dn.fs.PutB(block) dn.fs.throttle().Release() { - if !sync { - for _, name := range dn.sortedNames() { - if fn, ok := dn.inodes[name].(*filenode); ok { - fn.Lock() - defer fn.Unlock() - locked[fn] = true - } - } - } defer func() { for _, seg := range segs { if seg.flushing == done { @@ -725,6 +715,7 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize } for idx, ref := range refs { if !sync { + ref.fn.Lock() // In async mode, fn's lock was // released while we were waiting for // PutB(); lots of things might have @@ -733,17 +724,15 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize // file segments have // rearranged or changed in // some way + ref.fn.Unlock() continue } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] { // segment has been replaced + ref.fn.Unlock() continue } else if seg.flushing != done { // seg.buf has been replaced - continue - } else if !locked[ref.fn] { - // file was renamed, moved, or - // deleted since we called - // PutB + ref.fn.Unlock() continue } } @@ -751,11 +740,19 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize ref.fn.segments[ref.idx] = storedSegment{ kc: dn.fs, locator: locator, - size: len(block), + size: blocksize, offset: offsets[idx], length: len(data), } - ref.fn.memsize -= int64(len(data)) + // atomic is needed here despite caller having + // lock: caller might be running concurrent + // commitBlock() goroutines using the same + // lock, writing different segments from the + // same file. + atomic.AddInt64(&ref.fn.memsize, -int64(len(data))) + if !sync { + ref.fn.Unlock() + } } }() if sync {