package arvados
import (
+ "bytes"
"context"
"encoding/json"
"fmt"
// Total data bytes in all files.
Size() int64
-
- // Memory consumed by buffered file data.
- memorySize() int64
}
type collectionFileSystem struct {
fileSystem
- uuid string
+ uuid string
+ replicas int
+ storageClasses []string
}
// FileSystem returns a CollectionFileSystem for the collection.
modTime = time.Now()
}
fs := &collectionFileSystem{
- uuid: c.UUID,
+ uuid: c.UUID,
+ storageClasses: c.StorageClassesDesired,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
thr: newThrottle(concurrentWriters),
},
}
+ if r := c.ReplicationDesired; r != nil {
+ fs.replicas = *r
+ }
root := &dirnode{
fs: fs,
treenode: treenode{
inodes: make(map[string]inode),
},
}, nil
- } else {
- return &filenode{
- fs: fs,
- fileinfo: fileinfo{
- name: name,
- mode: perm & ^os.ModeDir,
- modTime: modTime,
- },
- }, nil
}
+ return &filenode{
+ fs: fs,
+ fileinfo: fileinfo{
+ name: name,
+ mode: perm & ^os.ModeDir,
+ modTime: modTime,
+ },
+ }, nil
+}
+
+func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) {
+ return fs.rootnode().Child(name, replace)
+}
+
+func (fs *collectionFileSystem) FS() FileSystem {
+ return fs
+}
+
+func (fs *collectionFileSystem) FileInfo() os.FileInfo {
+ return fs.rootnode().FileInfo()
+}
+
+func (fs *collectionFileSystem) IsDir() bool {
+ return true
+}
+
+func (fs *collectionFileSystem) Lock() {
+ fs.rootnode().Lock()
+}
+
+func (fs *collectionFileSystem) Unlock() {
+ fs.rootnode().Unlock()
+}
+
+func (fs *collectionFileSystem) RLock() {
+ fs.rootnode().RLock()
+}
+
+func (fs *collectionFileSystem) RUnlock() {
+ fs.rootnode().RUnlock()
+}
+
+func (fs *collectionFileSystem) Parent() inode {
+ return fs.rootnode().Parent()
+}
+
+func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) {
+ return fs.rootnode().Readdir()
+}
+
+func (fs *collectionFileSystem) SetParent(parent inode, name string) {
+ fs.rootnode().SetParent(parent, name)
+}
+
+func (fs *collectionFileSystem) Truncate(int64) error {
+ return ErrInvalidOperation
}
func (fs *collectionFileSystem) Sync() error {
return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
}
-func (fs *collectionFileSystem) memorySize() int64 {
+func (fs *collectionFileSystem) MemorySize() int64 {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).memorySize()
+ return fs.fileSystem.root.(*dirnode).MemorySize()
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
// filenode implements inode.
type filenode struct {
parent inode
- fs FileSystem
+ fs *collectionFileSystem
fileinfo fileinfo
segments []segment
// number of times `segments` has changed in a
seg.Truncate(len(cando))
fn.memsize += int64(len(cando))
fn.segments[cur] = seg
- cur++
- prev++
}
}
fn.fs.throttle().Acquire()
go func() {
defer close(done)
- locator, _, err := fn.FS().PutB(buf)
+ resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
+ Data: buf,
+ Replicas: fn.fs.replicas,
+ StorageClasses: fn.fs.storageClasses,
+ })
fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
fn.memsize -= int64(len(buf))
fn.segments[idx] = storedSegment{
kc: fn.FS(),
- locator: locator,
+ locator: resp.Locator,
size: len(buf),
offset: 0,
length: len(buf),
if err != nil {
return nil, err
}
+ coll.UUID = dn.fs.uuid
data, err := json.Marshal(&coll)
if err == nil {
data = append(data, '\n')
// it fails, we'll try again next time.
close(done)
return nil
- } else {
- // In sync mode, we proceed regardless of
- // whether another flush is in progress: It
- // can't finish before we do, because we hold
- // fn's lock until we finish our own writes.
}
+ // In sync mode, we proceed regardless of
+ // whether another flush is in progress: It
+ // can't finish before we do, because we hold
+ // fn's lock until we finish our own writes.
seg.flushing = done
offsets = append(offsets, len(block))
if len(refs) == 1 {
go func() {
defer close(done)
defer close(errs)
- locator, _, err := dn.fs.PutB(block)
+ resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
+ Data: block,
+ Replicas: dn.fs.replicas,
+ StorageClasses: dn.fs.storageClasses,
+ })
dn.fs.throttle().Release()
if err != nil {
errs <- err
data := ref.fn.segments[ref.idx].(*memSegment).buf
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
- locator: locator,
+ locator: resp.Locator,
size: blocksize,
offset: offsets[idx],
length: len(data),
}()
if sync {
return <-errs
- } else {
- return nil
}
+ return nil
}
type flushOpts struct {
}
// caller must have write lock.
-func (dn *dirnode) memorySize() (size int64) {
+func (dn *dirnode) MemorySize() (size int64) {
for _, name := range dn.sortedNames() {
node := dn.inodes[name]
node.Lock()
defer node.Unlock()
switch node := node.(type) {
case *dirnode:
- size += node.memorySize()
+ size += node.MemorySize()
case *filenode:
for _, seg := range node.segments {
switch seg := seg.(type) {
}
func (dn *dirnode) loadManifest(txt string) error {
- var dirname string
- streams := strings.Split(txt, "\n")
- if streams[len(streams)-1] != "" {
+ streams := bytes.Split([]byte(txt), []byte{'\n'})
+ if len(streams[len(streams)-1]) != 0 {
return fmt.Errorf("line %d: no trailing newline", len(streams))
}
streams = streams[:len(streams)-1]
segments := []storedSegment{}
+ // To reduce allocs, we reuse a single "pathparts" slice
+ // (pre-split on "/" separators) for the duration of this
+ // func.
+ var pathparts []string
+ // To reduce allocs, we reuse a single "toks" slice of 3 byte
+ // slices.
+ var toks = make([][]byte, 3)
+ // Similar to bytes.SplitN(token, []byte{c}, 3), but splits
+ // into the toks slice rather than allocating a new one, and
+ // returns the number of toks (1, 2, or 3).
+ splitToToks := func(src []byte, c rune) int {
+ c1 := bytes.IndexRune(src, c)
+ if c1 < 0 {
+ toks[0] = src
+ return 1
+ }
+ toks[0], src = src[:c1], src[c1+1:]
+ c2 := bytes.IndexRune(src, c)
+ if c2 < 0 {
+ toks[1] = src
+ return 2
+ }
+ toks[1], toks[2] = src[:c2], src[c2+1:]
+ return 3
+ }
for i, stream := range streams {
lineno := i + 1
var anyFileTokens bool
var pos int64
var segIdx int
segments = segments[:0]
- for i, token := range strings.Split(stream, " ") {
+ pathparts = nil
+ streamparts := 0
+ for i, token := range bytes.Split(stream, []byte{' '}) {
if i == 0 {
- dirname = manifestUnescape(token)
+ pathparts = strings.Split(manifestUnescape(string(token)), "/")
+ streamparts = len(pathparts)
continue
}
- if !strings.Contains(token, ":") {
+ if !bytes.ContainsRune(token, ':') {
if anyFileTokens {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- toks := strings.SplitN(token, "+", 3)
- if len(toks) < 2 {
+ if splitToToks(token, '+') < 2 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
- length, err := strconv.ParseInt(toks[1], 10, 32)
+ length, err := strconv.ParseInt(string(toks[1]), 10, 32)
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
segments = append(segments, storedSegment{
- locator: token,
+ locator: string(token),
size: int(length),
offset: 0,
length: int(length),
} else if len(segments) == 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
-
- toks := strings.SplitN(token, ":", 3)
- if len(toks) != 3 {
+ if splitToToks(token, ':') != 3 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
anyFileTokens = true
- offset, err := strconv.ParseInt(toks[0], 10, 64)
+ offset, err := strconv.ParseInt(string(toks[0]), 10, 64)
if err != nil || offset < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- length, err := strconv.ParseInt(toks[1], 10, 64)
+ length, err := strconv.ParseInt(string(toks[1]), 10, 64)
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- name := dirname + "/" + manifestUnescape(toks[2])
- fnode, err := dn.createFileAndParents(name)
+ if !bytes.ContainsAny(toks[2], `\/`) {
+ // optimization for a common case
+ pathparts = append(pathparts[:streamparts], string(toks[2]))
+ } else {
+ pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+ }
+ fnode, err := dn.createFileAndParents(pathparts)
if fnode == nil && err == nil && length == 0 {
// Special case: an empty file used as
// a marker to preserve an otherwise
continue
}
if err != nil || (fnode == nil && length != 0) {
- return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
+ return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
}
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
// situation might be rare anyway)
segIdx, pos = 0, 0
}
- for next := int64(0); segIdx < len(segments); segIdx++ {
+ for ; segIdx < len(segments); segIdx++ {
seg := segments[segIdx]
- next = pos + int64(seg.Len())
+ next := pos + int64(seg.Len())
if next <= offset || seg.Len() == 0 {
pos = next
continue
return fmt.Errorf("line %d: no file segments", lineno)
} else if len(segments) == 0 {
return fmt.Errorf("line %d: no locators", lineno)
- } else if dirname == "" {
+ } else if streamparts == 0 {
return fmt.Errorf("line %d: no stream name", lineno)
}
}
//
// If path is a "parent directory exists" marker (the last path
// component is "."), the returned values are both nil.
-func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+//
+// Newly added nodes have modtime==0. Caller is responsible for fixing
+// them with backdateTree.
+func (dn *dirnode) createFileAndParents(names []string) (fn *filenode, err error) {
var node inode = dn
- names := strings.Split(path, "/")
basename := names[len(names)-1]
for _, name := range names[:len(names)-1] {
switch name {
node = node.Parent()
continue
}
+ node.Lock()
+ unlock := node.Unlock
node, err = node.Child(name, func(child inode) (inode, error) {
if child == nil {
- child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
+ // note modtime will be fixed later in backdateTree()
+ child, err := node.FS().newNode(name, 0755|os.ModeDir, time.Time{})
if err != nil {
return nil, err
}
return child, nil
}
})
+ unlock()
if err != nil {
return
}
if basename == "." {
return
} else if !permittedName(basename) {
- err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+ err = fmt.Errorf("invalid file part %q in path %q", basename, names)
return
}
+ node.Lock()
+ defer node.Unlock()
_, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
- child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
+ child, err = node.FS().newNode(basename, 0755, time.Time{})
if err != nil {
return nil, err
}