X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9c2fb29f2c493fad7bfe15d3aa93ce3a24b0aa72..72d7d41944006d1f48f570784dafe56b9812b0c8:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index 3d0928b84e..4d9db421fc 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -38,14 +38,13 @@ type CollectionFileSystem interface { // Total data bytes in all files. Size() int64 - - // Memory consumed by buffered file data. - memorySize() int64 } type collectionFileSystem struct { fileSystem - uuid string + uuid string + replicas int + storageClasses []string } // FileSystem returns a CollectionFileSystem for the collection. @@ -55,12 +54,16 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile modTime = time.Now() } fs := &collectionFileSystem{ - uuid: c.UUID, + uuid: c.UUID, + storageClasses: c.StorageClassesDesired, fileSystem: fileSystem{ fsBackend: keepBackend{apiClient: client, keepClient: kc}, thr: newThrottle(concurrentWriters), }, } + if r := c.ReplicationDesired; r != nil { + fs.replicas = *r + } root := &dirnode{ fs: fs, treenode: treenode{ @@ -109,16 +112,71 @@ func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime t inodes: make(map[string]inode), }, }, nil - } else { - return &filenode{ - fs: fs, - fileinfo: fileinfo{ - name: name, - mode: perm & ^os.ModeDir, - modTime: modTime, - }, - }, nil } + return &filenode{ + fs: fs, + fileinfo: fileinfo{ + name: name, + mode: perm & ^os.ModeDir, + modTime: modTime, + }, + }, nil +} + +func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) { + return fs.rootnode().Child(name, replace) +} + +func (fs *collectionFileSystem) FS() FileSystem { + return fs +} + +func (fs *collectionFileSystem) FileInfo() os.FileInfo { + return fs.rootnode().FileInfo() +} + +func (fs *collectionFileSystem) IsDir() bool { + return true +} + +func (fs *collectionFileSystem) Lock() { + fs.rootnode().Lock() +} + +func (fs *collectionFileSystem) Unlock() { + fs.rootnode().Unlock() +} + +func (fs *collectionFileSystem) RLock() { + fs.rootnode().RLock() +} + +func (fs *collectionFileSystem) RUnlock() { + fs.rootnode().RUnlock() +} + +func (fs *collectionFileSystem) Parent() inode { + return fs.rootnode().Parent() +} + +func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) { + return 0, ptr, ErrInvalidOperation +} + +func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) { + return 0, ptr, ErrInvalidOperation +} + +func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) { + return fs.rootnode().Readdir() +} + +func (fs *collectionFileSystem) SetParent(parent inode, name string) { + fs.rootnode().SetParent(parent, name) +} + +func (fs *collectionFileSystem) Truncate(int64) error { + return ErrInvalidOperation } func (fs *collectionFileSystem) Sync() error { @@ -177,10 +235,10 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error { return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks}) } -func (fs *collectionFileSystem) memorySize() int64 { +func (fs *collectionFileSystem) MemorySize() int64 { fs.fileSystem.root.Lock() defer fs.fileSystem.root.Unlock() - return fs.fileSystem.root.(*dirnode).memorySize() + return fs.fileSystem.root.(*dirnode).MemorySize() } func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) { @@ -269,7 +327,7 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) { // filenode implements inode. type filenode struct { parent inode - fs FileSystem + fs *collectionFileSystem fileinfo fileinfo segments []segment // number of times `segments` has changed in a @@ -512,8 +570,6 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt seg.Truncate(len(cando)) fn.memsize += int64(len(cando)) fn.segments[cur] = seg - cur++ - prev++ } } @@ -560,7 +616,11 @@ func (fn *filenode) pruneMemSegments() { fn.fs.throttle().Acquire() go func() { defer close(done) - locator, _, err := fn.FS().PutB(buf) + resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{ + Data: buf, + Replicas: fn.fs.replicas, + StorageClasses: fn.fs.storageClasses, + }) fn.fs.throttle().Release() fn.Lock() defer fn.Unlock() @@ -568,7 +628,6 @@ func (fn *filenode) pruneMemSegments() { // A new seg.buf has been allocated. return } - seg.flushing = nil if err != nil { // TODO: stall (or return errors from) // subsequent writes until flushing @@ -582,7 +641,7 @@ func (fn *filenode) pruneMemSegments() { fn.memsize -= int64(len(buf)) fn.segments[idx] = storedSegment{ kc: fn.FS(), - locator: locator, + locator: resp.Locator, size: len(buf), offset: 0, length: len(buf), @@ -625,6 +684,7 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode if err != nil { return nil, err } + coll.UUID = dn.fs.uuid data, err := json.Marshal(&coll) if err == nil { data = append(data, '\n') @@ -671,16 +731,16 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize offsets := make([]int, 0, len(refs)) // location of segment's data within block for _, ref := range refs { seg := ref.fn.segments[ref.idx].(*memSegment) - if seg.flushing != nil && !sync { + if !sync && seg.flushingUnfinished() { // Let the other flushing goroutine finish. If // it fails, we'll try again next time. + close(done) return nil - } else { - // In sync mode, we proceed regardless of - // whether another flush is in progress: It - // can't finish before we do, because we hold - // fn's lock until we finish our own writes. } + // In sync mode, we proceed regardless of + // whether another flush is in progress: It + // can't finish before we do, because we hold + // fn's lock until we finish our own writes. seg.flushing = done offsets = append(offsets, len(block)) if len(refs) == 1 { @@ -698,35 +758,19 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize go func() { defer close(done) defer close(errs) - locked := map[*filenode]bool{} - locator, _, err := dn.fs.PutB(block) + resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{ + Data: block, + Replicas: dn.fs.replicas, + StorageClasses: dn.fs.storageClasses, + }) dn.fs.throttle().Release() - { - if !sync { - dn.Lock() - defer dn.Unlock() - for _, name := range dn.sortedNames() { - if fn, ok := dn.inodes[name].(*filenode); ok { - fn.Lock() - defer fn.Unlock() - locked[fn] = true - } - } - } - defer func() { - for _, seg := range segs { - if seg.flushing == done { - seg.flushing = nil - } - } - }() - } if err != nil { errs <- err return } for idx, ref := range refs { if !sync { + ref.fn.Lock() // In async mode, fn's lock was // released while we were waiting for // PutB(); lots of things might have @@ -735,24 +779,22 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize // file segments have // rearranged or changed in // some way + ref.fn.Unlock() continue } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] { // segment has been replaced + ref.fn.Unlock() continue } else if seg.flushing != done { // seg.buf has been replaced - continue - } else if !locked[ref.fn] { - // file was renamed, moved, or - // deleted since we called - // PutB + ref.fn.Unlock() continue } } data := ref.fn.segments[ref.idx].(*memSegment).buf ref.fn.segments[ref.idx] = storedSegment{ kc: dn.fs, - locator: locator, + locator: resp.Locator, size: blocksize, offset: offsets[idx], length: len(data), @@ -763,13 +805,15 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize // lock, writing different segments from the // same file. atomic.AddInt64(&ref.fn.memsize, -int64(len(data))) + if !sync { + ref.fn.Unlock() + } } }() if sync { return <-errs - } else { - return nil } + return nil } type flushOpts struct { @@ -847,14 +891,14 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er } // caller must have write lock. -func (dn *dirnode) memorySize() (size int64) { +func (dn *dirnode) MemorySize() (size int64) { for _, name := range dn.sortedNames() { node := dn.inodes[name] node.Lock() defer node.Unlock() switch node := node.(type) { case *dirnode: - size += node.memorySize() + size += node.MemorySize() case *filenode: for _, seg := range node.segments { switch seg := seg.(type) { @@ -1072,9 +1116,9 @@ func (dn *dirnode) loadManifest(txt string) error { // situation might be rare anyway) segIdx, pos = 0, 0 } - for next := int64(0); segIdx < len(segments); segIdx++ { + for ; segIdx < len(segments); segIdx++ { seg := segments[segIdx] - next = pos + int64(seg.Len()) + next := pos + int64(seg.Len()) if next <= offset || seg.Len() == 0 { pos = next continue @@ -1138,9 +1182,12 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { node = node.Parent() continue } + modtime := node.Parent().FileInfo().ModTime() + node.Lock() + locked := node node, err = node.Child(name, func(child inode) (inode, error) { if child == nil { - child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime()) + child, err := node.FS().newNode(name, 0755|os.ModeDir, modtime) if err != nil { return nil, err } @@ -1152,6 +1199,7 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { return child, nil } }) + locked.Unlock() if err != nil { return } @@ -1162,10 +1210,13 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { err = fmt.Errorf("invalid file part %q in path %q", basename, path) return } + modtime := node.FileInfo().ModTime() + node.Lock() + defer node.Unlock() _, err = node.Child(basename, func(child inode) (inode, error) { switch child := child.(type) { case nil: - child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime()) + child, err = node.FS().newNode(basename, 0755, modtime) if err != nil { return nil, err } @@ -1208,13 +1259,26 @@ type segment interface { type memSegment struct { buf []byte - // If flushing is not nil, then a) buf is being shared by a - // pruneMemSegments goroutine, and must be copied on write; - // and b) the flushing channel will close when the goroutine - // finishes, whether it succeeds or not. + // If flushing is not nil and not ready/closed, then a) buf is + // being shared by a pruneMemSegments goroutine, and must be + // copied on write; and b) the flushing channel will close + // when the goroutine finishes, whether it succeeds or not. flushing <-chan struct{} } +func (me *memSegment) flushingUnfinished() bool { + if me.flushing == nil { + return false + } + select { + case <-me.flushing: + me.flushing = nil + return false + default: + return true + } +} + func (me *memSegment) Len() int { return len(me.buf) }