// Total data bytes in all files.
Size() int64
-
- // Memory consumed by buffered file data.
- memorySize() int64
}
type collectionFileSystem struct {
fileSystem
- uuid string
+ uuid string
+ replicas int
+ storageClasses []string
}
// FileSystem returns a CollectionFileSystem for the collection.
modTime = time.Now()
}
fs := &collectionFileSystem{
- uuid: c.UUID,
+ uuid: c.UUID,
+ storageClasses: c.StorageClassesDesired,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
thr: newThrottle(concurrentWriters),
},
}
+ if r := c.ReplicationDesired; r != nil {
+ fs.replicas = *r
+ }
root := &dirnode{
fs: fs,
treenode: treenode{
return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
}
-func (fs *collectionFileSystem) memorySize() int64 {
+func (fs *collectionFileSystem) MemorySize() int64 {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).memorySize()
+ return fs.fileSystem.root.(*dirnode).MemorySize()
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
// filenode implements inode.
type filenode struct {
parent inode
- fs FileSystem
+ fs *collectionFileSystem
fileinfo fileinfo
segments []segment
// number of times `segments` has changed in a
fn.fs.throttle().Acquire()
go func() {
defer close(done)
- locator, _, err := fn.FS().PutB(buf)
+ resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
+ Data: buf,
+ Replicas: fn.fs.replicas,
+ StorageClasses: fn.fs.storageClasses,
+ })
fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
fn.memsize -= int64(len(buf))
fn.segments[idx] = storedSegment{
kc: fn.FS(),
- locator: locator,
+ locator: resp.Locator,
size: len(buf),
offset: 0,
length: len(buf),
if err != nil {
return nil, err
}
+ coll.UUID = dn.fs.uuid
data, err := json.Marshal(&coll)
if err == nil {
data = append(data, '\n')
go func() {
defer close(done)
defer close(errs)
- locator, _, err := dn.fs.PutB(block)
+ resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
+ Data: block,
+ Replicas: dn.fs.replicas,
+ StorageClasses: dn.fs.storageClasses,
+ })
dn.fs.throttle().Release()
if err != nil {
errs <- err
data := ref.fn.segments[ref.idx].(*memSegment).buf
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
- locator: locator,
+ locator: resp.Locator,
size: blocksize,
offset: offsets[idx],
length: len(data),
}
// caller must have write lock.
-func (dn *dirnode) memorySize() (size int64) {
+func (dn *dirnode) MemorySize() (size int64) {
for _, name := range dn.sortedNames() {
node := dn.inodes[name]
node.Lock()
defer node.Unlock()
switch node := node.(type) {
case *dirnode:
- size += node.memorySize()
+ size += node.MemorySize()
case *filenode:
for _, seg := range node.segments {
switch seg := seg.(type) {
node = node.Parent()
continue
}
+ modtime := node.Parent().FileInfo().ModTime()
+ node.Lock()
+ locked := node
node, err = node.Child(name, func(child inode) (inode, error) {
if child == nil {
- child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
+ child, err := node.FS().newNode(name, 0755|os.ModeDir, modtime)
if err != nil {
return nil, err
}
return child, nil
}
})
+ locked.Unlock()
if err != nil {
return
}
err = fmt.Errorf("invalid file part %q in path %q", basename, path)
return
}
+ modtime := node.FileInfo().ModTime()
+ node.Lock()
+ defer node.Unlock()
_, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
- child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
+ child, err = node.FS().newNode(basename, 0755, modtime)
if err != nil {
return nil, err
}