X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f6b78bb3d0769199c811da40367afd4d9bded914..72d7d41944006d1f48f570784dafe56b9812b0c8:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index 0233826a72..4d9db421fc 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -42,7 +42,9 @@ type CollectionFileSystem interface { type collectionFileSystem struct { fileSystem - uuid string + uuid string + replicas int + storageClasses []string } // FileSystem returns a CollectionFileSystem for the collection. @@ -52,12 +54,16 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile modTime = time.Now() } fs := &collectionFileSystem{ - uuid: c.UUID, + uuid: c.UUID, + storageClasses: c.StorageClassesDesired, fileSystem: fileSystem{ fsBackend: keepBackend{apiClient: client, keepClient: kc}, thr: newThrottle(concurrentWriters), }, } + if r := c.ReplicationDesired; r != nil { + fs.replicas = *r + } root := &dirnode{ fs: fs, treenode: treenode{ @@ -321,7 +327,7 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) { // filenode implements inode. type filenode struct { parent inode - fs FileSystem + fs *collectionFileSystem fileinfo fileinfo segments []segment // number of times `segments` has changed in a @@ -610,7 +616,11 @@ func (fn *filenode) pruneMemSegments() { fn.fs.throttle().Acquire() go func() { defer close(done) - locator, _, err := fn.FS().PutB(buf) + resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{ + Data: buf, + Replicas: fn.fs.replicas, + StorageClasses: fn.fs.storageClasses, + }) fn.fs.throttle().Release() fn.Lock() defer fn.Unlock() @@ -631,7 +641,7 @@ func (fn *filenode) pruneMemSegments() { fn.memsize -= int64(len(buf)) fn.segments[idx] = storedSegment{ kc: fn.FS(), - locator: locator, + locator: resp.Locator, size: len(buf), offset: 0, length: len(buf), @@ -674,6 +684,7 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode if err != nil { return nil, err } + coll.UUID = dn.fs.uuid data, err := json.Marshal(&coll) if err == nil { data = append(data, '\n') @@ -747,7 +758,11 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize go func() { defer close(done) defer close(errs) - locator, _, err := dn.fs.PutB(block) + resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{ + Data: block, + Replicas: dn.fs.replicas, + StorageClasses: dn.fs.storageClasses, + }) dn.fs.throttle().Release() if err != nil { errs <- err @@ -779,7 +794,7 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize data := ref.fn.segments[ref.idx].(*memSegment).buf ref.fn.segments[ref.idx] = storedSegment{ kc: dn.fs, - locator: locator, + locator: resp.Locator, size: blocksize, offset: offsets[idx], length: len(data), @@ -1167,9 +1182,12 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { node = node.Parent() continue } + modtime := node.Parent().FileInfo().ModTime() + node.Lock() + locked := node node, err = node.Child(name, func(child inode) (inode, error) { if child == nil { - child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime()) + child, err := node.FS().newNode(name, 0755|os.ModeDir, modtime) if err != nil { return nil, err } @@ -1181,6 +1199,7 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { return child, nil } }) + locked.Unlock() if err != nil { return } @@ -1191,10 +1210,13 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { err = fmt.Errorf("invalid file part %q in path %q", basename, path) return } + modtime := node.FileInfo().ModTime() + node.Lock() + defer node.Unlock() _, err = node.Child(basename, func(child inode) (inode, error) { switch child := child.(type) { case nil: - child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime()) + child, err = node.FS().newNode(basename, 0755, modtime) if err != nil { return nil, err }