projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
17755: Merge branch 'main' into 17755-add-singularity-to-compute-image
[arvados.git]
/
sdk
/
go
/
arvados
/
fs_collection.go
diff --git
a/sdk/go/arvados/fs_collection.go
b/sdk/go/arvados/fs_collection.go
index 0233826a7281e9aa95f5dbb9f74e93ddb1bfd473..4d9db421fc3838b268fdeaeea1b81b9ca1192843 100644
(file)
--- a/
sdk/go/arvados/fs_collection.go
+++ b/
sdk/go/arvados/fs_collection.go
@@
-42,7
+42,9
@@
type CollectionFileSystem interface {
type collectionFileSystem struct {
fileSystem
type collectionFileSystem struct {
fileSystem
- uuid string
+ uuid string
+ replicas int
+ storageClasses []string
}
// FileSystem returns a CollectionFileSystem for the collection.
}
// FileSystem returns a CollectionFileSystem for the collection.
@@
-52,12
+54,16
@@
func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
modTime = time.Now()
}
fs := &collectionFileSystem{
modTime = time.Now()
}
fs := &collectionFileSystem{
- uuid: c.UUID,
+ uuid: c.UUID,
+ storageClasses: c.StorageClassesDesired,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
thr: newThrottle(concurrentWriters),
},
}
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
thr: newThrottle(concurrentWriters),
},
}
+ if r := c.ReplicationDesired; r != nil {
+ fs.replicas = *r
+ }
root := &dirnode{
fs: fs,
treenode: treenode{
root := &dirnode{
fs: fs,
treenode: treenode{
@@
-321,7
+327,7
@@
func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
// filenode implements inode.
type filenode struct {
parent inode
// filenode implements inode.
type filenode struct {
parent inode
- fs FileSystem
+ fs
*collection
FileSystem
fileinfo fileinfo
segments []segment
// number of times `segments` has changed in a
fileinfo fileinfo
segments []segment
// number of times `segments` has changed in a
@@
-610,7
+616,11
@@
func (fn *filenode) pruneMemSegments() {
fn.fs.throttle().Acquire()
go func() {
defer close(done)
fn.fs.throttle().Acquire()
go func() {
defer close(done)
- locator, _, err := fn.FS().PutB(buf)
+ resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
+ Data: buf,
+ Replicas: fn.fs.replicas,
+ StorageClasses: fn.fs.storageClasses,
+ })
fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
@@
-631,7
+641,7
@@
func (fn *filenode) pruneMemSegments() {
fn.memsize -= int64(len(buf))
fn.segments[idx] = storedSegment{
kc: fn.FS(),
fn.memsize -= int64(len(buf))
fn.segments[idx] = storedSegment{
kc: fn.FS(),
- locator:
l
ocator,
+ locator:
resp.L
ocator,
size: len(buf),
offset: 0,
length: len(buf),
size: len(buf),
offset: 0,
length: len(buf),
@@
-674,6
+684,7
@@
func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
+ coll.UUID = dn.fs.uuid
data, err := json.Marshal(&coll)
if err == nil {
data = append(data, '\n')
data, err := json.Marshal(&coll)
if err == nil {
data = append(data, '\n')
@@
-747,7
+758,11
@@
func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
go func() {
defer close(done)
defer close(errs)
go func() {
defer close(done)
defer close(errs)
- locator, _, err := dn.fs.PutB(block)
+ resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
+ Data: block,
+ Replicas: dn.fs.replicas,
+ StorageClasses: dn.fs.storageClasses,
+ })
dn.fs.throttle().Release()
if err != nil {
errs <- err
dn.fs.throttle().Release()
if err != nil {
errs <- err
@@
-779,7
+794,7
@@
func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
data := ref.fn.segments[ref.idx].(*memSegment).buf
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
data := ref.fn.segments[ref.idx].(*memSegment).buf
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
- locator:
l
ocator,
+ locator:
resp.L
ocator,
size: blocksize,
offset: offsets[idx],
length: len(data),
size: blocksize,
offset: offsets[idx],
length: len(data),
@@
-1167,9
+1182,12
@@
func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
node = node.Parent()
continue
}
node = node.Parent()
continue
}
+ modtime := node.Parent().FileInfo().ModTime()
+ node.Lock()
+ locked := node
node, err = node.Child(name, func(child inode) (inode, error) {
if child == nil {
node, err = node.Child(name, func(child inode) (inode, error) {
if child == nil {
- child, err := node.FS().newNode(name, 0755|os.ModeDir,
node.Parent().FileInfo().ModTime()
)
+ child, err := node.FS().newNode(name, 0755|os.ModeDir,
modtime
)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
@@
-1181,6
+1199,7
@@
func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
return child, nil
}
})
return child, nil
}
})
+ locked.Unlock()
if err != nil {
return
}
if err != nil {
return
}
@@
-1191,10
+1210,13
@@
func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
err = fmt.Errorf("invalid file part %q in path %q", basename, path)
return
}
err = fmt.Errorf("invalid file part %q in path %q", basename, path)
return
}
+ modtime := node.FileInfo().ModTime()
+ node.Lock()
+ defer node.Unlock()
_, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
_, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
- child, err = node.FS().newNode(basename, 0755,
node.FileInfo().ModTime()
)
+ child, err = node.FS().newNode(basename, 0755,
modtime
)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}