"encoding/json"
"fmt"
"io"
- "log"
"os"
"path"
"regexp"
// Prefix (normally ".") is a top level directory, effectively
// prepended to all paths in the returned manifest.
MarshalManifest(prefix string) (string, error)
+
+ // Total data bytes in all files.
+ Size() int64
}
type collectionFileSystem struct {
}
func (fs *collectionFileSystem) Sync() error {
- log.Printf("cfs.Sync()")
if fs.uuid == "" {
return nil
}
txt, err := fs.MarshalManifest(".")
if err != nil {
- log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
- return err
+ return fmt.Errorf("sync failed: %s", err)
}
coll := &Collection{
UUID: fs.uuid,
}
err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}})
if err != nil {
- log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+ return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
}
- return err
+ return nil
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
}
+func (fs *collectionFileSystem) Size() int64 {
+ return fs.fileSystem.root.(*dirnode).TreeSize()
+}
+
// filenodePtr is an offset into a file that is (usually) efficient to
// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
// then
return dn.fs
}
-func (dn *dirnode) Child(name string, replace func(inode) inode) inode {
+func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
if dn == dn.fs.rootnode() && name == ".arvados#collection" {
gn := &getternode{Getter: func() ([]byte, error) {
var coll Collection
return data, err
}}
gn.SetParent(dn, name)
- return gn
+ return gn, nil
}
return dn.treenode.Child(name, replace)
}
-// sync flushes in-memory data (for all files in the tree rooted at
-// dn) to persistent storage. Caller must hold dn.Lock().
-func (dn *dirnode) sync() error {
+// sync flushes in-memory data and remote block references (for the
+// children with the given names, which must be children of dn) to
+// local persistent storage. Caller must have write lock on dn and the
+// named children.
+func (dn *dirnode) sync(names []string) error {
type shortBlock struct {
fn *filenode
idx int
return nil
}
- names := make([]string, 0, len(dn.inodes))
- for name := range dn.inodes {
- names = append(names, name)
- }
- sort.Strings(names)
-
+ localLocator := map[string]string{}
for _, name := range names {
fn, ok := dn.inodes[name].(*filenode)
if !ok {
continue
}
- fn.Lock()
- defer fn.Unlock()
for idx, seg := range fn.segments {
- seg, ok := seg.(*memSegment)
- if !ok {
- continue
- }
- if seg.Len() > maxBlockSize/2 {
- if err := flush([]shortBlock{{fn, idx}}); err != nil {
- return err
+ switch seg := seg.(type) {
+ case storedSegment:
+ loc, ok := localLocator[seg.locator]
+ if !ok {
+ var err error
+ loc, err = dn.fs.LocalLocator(seg.locator)
+ if err != nil {
+ return err
+ }
+ localLocator[seg.locator] = loc
}
- continue
- }
- if pendingLen+seg.Len() > maxBlockSize {
- if err := flush(pending); err != nil {
- return err
+ seg.locator = loc
+ fn.segments[idx] = seg
+ case *memSegment:
+ if seg.Len() > maxBlockSize/2 {
+ if err := flush([]shortBlock{{fn, idx}}); err != nil {
+ return err
+ }
+ continue
+ }
+ if pendingLen+seg.Len() > maxBlockSize {
+ if err := flush(pending); err != nil {
+ return err
+ }
+ pending = nil
+ pendingLen = 0
}
- pending = nil
- pendingLen = 0
+ pending = append(pending, shortBlock{fn, idx})
+ pendingLen += seg.Len()
+ default:
+ panic(fmt.Sprintf("can't sync segment type %T", seg))
}
- pending = append(pending, shortBlock{fn, idx})
- pendingLen += seg.Len()
}
}
return flush(pending)
}
-// caller must have read lock.
+// caller must have write lock.
func (dn *dirnode) marshalManifest(prefix string) (string, error) {
var streamLen int64
type filepart struct {
var subdirs string
var blocks []string
- if err := dn.sync(); err != nil {
- return "", err
+ if len(dn.inodes) == 0 {
+ if prefix == "." {
+ return "", nil
+ }
+ // Express the existence of an empty directory by
+ // adding an empty file named `\056`, which (unlike
+ // the more obvious spelling `.`) is accepted by the
+ // API's manifest validator.
+ return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
}
names := make([]string, 0, len(dn.inodes))
- for name, node := range dn.inodes {
+ for name := range dn.inodes {
names = append(names, name)
+ }
+ sort.Strings(names)
+ for _, name := range names {
+ node := dn.inodes[name]
node.Lock()
defer node.Unlock()
}
- sort.Strings(names)
-
+ if err := dn.sync(names); err != nil {
+ return "", err
+ }
for _, name := range names {
switch node := dn.inodes[name].(type) {
case *dirnode:
}
name := dirname + "/" + manifestUnescape(toks[2])
fnode, err := dn.createFileAndParents(name)
- if err != nil {
- return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+ if fnode == nil && err == nil && length == 0 {
+ // Special case: an empty file used as
+ // a marker to preserve an otherwise
+ // empty directory in a manifest.
+ continue
+ }
+ if err != nil || (fnode == nil && length != 0) {
+ return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
}
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
return nil
}
-// only safe to call from loadManifest -- no locking
+// only safe to call from loadManifest -- no locking.
+//
+// If path is a "parent directory exists" marker (the last path
+// component is "."), the returned values are both nil.
func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
var node inode = dn
names := strings.Split(path, "/")
basename := names[len(names)-1]
- if basename == "" || basename == "." || basename == ".." {
- err = fmt.Errorf("invalid filename")
- return
- }
for _, name := range names[:len(names)-1] {
switch name {
case "", ".":
node = node.Parent()
continue
}
- node.Child(name, func(child inode) inode {
+ node, err = node.Child(name, func(child inode) (inode, error) {
if child == nil {
- child, err = node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
+ child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
+ if err != nil {
+ return nil, err
+ }
child.SetParent(node, name)
- node = child
+ return child, nil
} else if !child.IsDir() {
- err = ErrFileExists
+ return child, ErrFileExists
} else {
- node = child
+ return child, nil
}
- return child
})
if err != nil {
return
}
}
- node.Child(basename, func(child inode) inode {
+ if basename == "." {
+ return
+ } else if !permittedName(basename) {
+ err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+ return
+ }
+ _, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
+ if err != nil {
+ return nil, err
+ }
child.SetParent(node, basename)
fn = child.(*filenode)
- return child
+ return child, nil
case *filenode:
fn = child
- return child
+ return child, nil
case *dirnode:
- err = ErrIsDirectory
- return child
+ return child, ErrIsDirectory
default:
- err = ErrInvalidArgument
- return child
+ return child, ErrInvalidArgument
}
})
return
}
+func (dn *dirnode) TreeSize() (bytes int64) {
+ dn.RLock()
+ defer dn.RUnlock()
+ for _, i := range dn.inodes {
+ switch i := i.(type) {
+ case *filenode:
+ bytes += i.Size()
+ case *dirnode:
+ bytes += i.TreeSize()
+ }
+ }
+ return
+}
+
type segment interface {
io.ReaderAt
Len() int