17755: Merge branch 'main' into 17755-add-singularity-to-compute-image
[arvados.git] / sdk / go / arvados / fs_collection.go
index 0233826a7281e9aa95f5dbb9f74e93ddb1bfd473..4d9db421fc3838b268fdeaeea1b81b9ca1192843 100644 (file)
@@ -42,7 +42,9 @@ type CollectionFileSystem interface {
 
 type collectionFileSystem struct {
        fileSystem
-       uuid string
+       uuid           string
+       replicas       int
+       storageClasses []string
 }
 
 // FileSystem returns a CollectionFileSystem for the collection.
@@ -52,12 +54,16 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
                modTime = time.Now()
        }
        fs := &collectionFileSystem{
-               uuid: c.UUID,
+               uuid:           c.UUID,
+               storageClasses: c.StorageClassesDesired,
                fileSystem: fileSystem{
                        fsBackend: keepBackend{apiClient: client, keepClient: kc},
                        thr:       newThrottle(concurrentWriters),
                },
        }
+       if r := c.ReplicationDesired; r != nil {
+               fs.replicas = *r
+       }
        root := &dirnode{
                fs: fs,
                treenode: treenode{
@@ -321,7 +327,7 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
 // filenode implements inode.
 type filenode struct {
        parent   inode
-       fs       FileSystem
+       fs       *collectionFileSystem
        fileinfo fileinfo
        segments []segment
        // number of times `segments` has changed in a
@@ -610,7 +616,11 @@ func (fn *filenode) pruneMemSegments() {
                fn.fs.throttle().Acquire()
                go func() {
                        defer close(done)
-                       locator, _, err := fn.FS().PutB(buf)
+                       resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
+                               Data:           buf,
+                               Replicas:       fn.fs.replicas,
+                               StorageClasses: fn.fs.storageClasses,
+                       })
                        fn.fs.throttle().Release()
                        fn.Lock()
                        defer fn.Unlock()
@@ -631,7 +641,7 @@ func (fn *filenode) pruneMemSegments() {
                        fn.memsize -= int64(len(buf))
                        fn.segments[idx] = storedSegment{
                                kc:      fn.FS(),
-                               locator: locator,
+                               locator: resp.Locator,
                                size:    len(buf),
                                offset:  0,
                                length:  len(buf),
@@ -674,6 +684,7 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
                        if err != nil {
                                return nil, err
                        }
+                       coll.UUID = dn.fs.uuid
                        data, err := json.Marshal(&coll)
                        if err == nil {
                                data = append(data, '\n')
@@ -747,7 +758,11 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
        go func() {
                defer close(done)
                defer close(errs)
-               locator, _, err := dn.fs.PutB(block)
+               resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
+                       Data:           block,
+                       Replicas:       dn.fs.replicas,
+                       StorageClasses: dn.fs.storageClasses,
+               })
                dn.fs.throttle().Release()
                if err != nil {
                        errs <- err
@@ -779,7 +794,7 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
                        data := ref.fn.segments[ref.idx].(*memSegment).buf
                        ref.fn.segments[ref.idx] = storedSegment{
                                kc:      dn.fs,
-                               locator: locator,
+                               locator: resp.Locator,
                                size:    blocksize,
                                offset:  offsets[idx],
                                length:  len(data),
@@ -1167,9 +1182,12 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
                        node = node.Parent()
                        continue
                }
+               modtime := node.Parent().FileInfo().ModTime()
+               node.Lock()
+               locked := node
                node, err = node.Child(name, func(child inode) (inode, error) {
                        if child == nil {
-                               child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
+                               child, err := node.FS().newNode(name, 0755|os.ModeDir, modtime)
                                if err != nil {
                                        return nil, err
                                }
@@ -1181,6 +1199,7 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
                                return child, nil
                        }
                })
+               locked.Unlock()
                if err != nil {
                        return
                }
@@ -1191,10 +1210,13 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
                err = fmt.Errorf("invalid file part %q in path %q", basename, path)
                return
        }
+       modtime := node.FileInfo().ModTime()
+       node.Lock()
+       defer node.Unlock()
        _, err = node.Child(basename, func(child inode) (inode, error) {
                switch child := child.(type) {
                case nil:
-                       child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
+                       child, err = node.FS().newNode(basename, 0755, modtime)
                        if err != nil {
                                return nil, err
                        }