X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d1ae12cad34862d063a1235bfe53459eef7ae589..d8d6bca4b5db4851a29473f08dc600816c977a21:/sdk/go/arvados/fs_base.go diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go index 4d22c1e82c..2478641df5 100644 --- a/sdk/go/arvados/fs_base.go +++ b/sdk/go/arvados/fs_base.go @@ -27,9 +27,14 @@ var ( ErrWriteOnlyMode = errors.New("file is O_WRONLY") ErrSyncNotSupported = errors.New("O_SYNC flag is not supported") ErrIsDirectory = errors.New("cannot rename file to overwrite existing directory") + ErrNotADirectory = errors.New("not a directory") ErrPermission = os.ErrPermission ) +type syncer interface { + Sync() error +} + // A File is an *os.File-like interface for reading and writing files // in a FileSystem. type File interface { @@ -57,6 +62,9 @@ type FileSystem interface { // while locking multiple inodes. locker() sync.Locker + // throttle for limiting concurrent background writers + throttle() *throttle + // create a new node with nil parent. newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) @@ -85,7 +93,22 @@ type FileSystem interface { Remove(name string) error RemoveAll(name string) error Rename(oldname, newname string) error + + // Write buffered data from memory to storage, returning when + // all updates have been saved to persistent storage. Sync() error + + // Write buffered data from memory to storage, but don't wait + // for all writes to finish before returning. If shortBlocks + // is true, flush everything; otherwise, if there's less than + // a full block of buffered data at the end of a stream, leave + // it buffered in memory in case more data can be appended. If + // path is "", flush all dirs/streams; otherwise, flush only + // the specified dir/stream. + Flush(path string, shortBlocks bool) error + + // Estimate current memory usage. + MemorySize() int64 } type inode interface { @@ -102,6 +125,9 @@ type inode interface { // Child() performs lookups and updates of named child nodes. // + // (The term "child" here is used strictly. This means name is + // not "." or "..", and name does not contain "/".) + // // If replace is non-nil, Child calls replace(x) where x is // the current child inode with the given name. If possible, // the child inode is replaced with the one returned by @@ -128,11 +154,12 @@ type inode interface { // a child was added or changed, the new child is returned. // // Caller must have lock (or rlock if replace is nil). - Child(name string, replace func(inode) inode) inode + Child(name string, replace func(inode) (inode, error)) (inode, error) sync.Locker RLock() RUnlock() + MemorySize() int64 } type fileinfo struct { @@ -202,8 +229,15 @@ func (*nullnode) Readdir() ([]os.FileInfo, error) { return nil, ErrInvalidOperation } -func (*nullnode) Child(name string, replace func(inode) inode) inode { - return nil +func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, error) { + return nil, ErrNotADirectory +} + +func (*nullnode) MemorySize() int64 { + // Types that embed nullnode should report their own size, but + // if they don't, we at least report a non-zero size to ensure + // a large tree doesn't get reported as 0 bytes. + return 64 } type treenode struct { @@ -236,18 +270,25 @@ func (n *treenode) IsDir() bool { return true } -func (n *treenode) Child(name string, replace func(inode) inode) (child inode) { - // TODO: special treatment for "", ".", ".." +func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) { child = n.inodes[name] - if replace != nil { - newchild := replace(child) - if newchild == nil { - delete(n.inodes, name) - } else if newchild != child { - n.inodes[name] = newchild - n.fileinfo.modTime = time.Now() - child = newchild - } + if name == "" || name == "." || name == ".." { + err = ErrInvalidArgument + return + } + if replace == nil { + return + } + newchild, err := replace(child) + if err != nil { + return + } + if newchild == nil { + delete(n.inodes, name) + } else if newchild != child { + n.inodes[name] = newchild + n.fileinfo.modTime = time.Now() + child = newchild } return } @@ -273,16 +314,46 @@ func (n *treenode) Readdir() (fi []os.FileInfo, err error) { return } +func (n *treenode) Sync() error { + n.RLock() + defer n.RUnlock() + for _, inode := range n.inodes { + syncer, ok := inode.(syncer) + if !ok { + return ErrInvalidOperation + } + err := syncer.Sync() + if err != nil { + return err + } + } + return nil +} + +func (n *treenode) MemorySize() (size int64) { + n.RLock() + defer n.RUnlock() + for _, inode := range n.inodes { + size += inode.MemorySize() + } + return +} + type fileSystem struct { root inode fsBackend mutex sync.Mutex + thr *throttle } func (fs *fileSystem) rootnode() inode { return fs.root } +func (fs *fileSystem) throttle() *throttle { + return fs.thr +} + func (fs *fileSystem) locker() sync.Locker { return &fs.mutex } @@ -297,9 +368,9 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha return nil, ErrSyncNotSupported } dirname, name := path.Split(name) - parent := rlookup(fs.root, dirname) - if parent == nil { - return nil, os.ErrNotExist + parent, err := rlookup(fs.root, dirname) + if err != nil { + return nil, err } var readable, writable bool switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) { @@ -331,22 +402,26 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha parent.RLock() defer parent.RUnlock() } - n := parent.Child(name, nil) - if n == nil { + n, err := parent.Child(name, nil) + if err != nil { + return nil, err + } else if n == nil { if !createMode { return nil, os.ErrNotExist } - var err error - n = parent.Child(name, func(inode) inode { - n, err = parent.FS().newNode(name, perm|0755, time.Now()) - n.SetParent(parent, name) - return n + n, err = parent.Child(name, func(inode) (repl inode, err error) { + repl, err = parent.FS().newNode(name, perm|0755, time.Now()) + if err != nil { + return + } + repl.SetParent(parent, name) + return }) if err != nil { return nil, err } else if n == nil { - // parent rejected new child - return nil, ErrInvalidOperation + // Parent rejected new child, but returned no error + return nil, ErrInvalidArgument } } else if flag&os.O_EXCL != 0 { return nil, ErrFileExists @@ -375,38 +450,37 @@ func (fs *fileSystem) Create(name string) (File, error) { return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0) } -func (fs *fileSystem) Mkdir(name string, perm os.FileMode) (err error) { +func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error { dirname, name := path.Split(name) - n := rlookup(fs.root, dirname) - if n == nil { - return os.ErrNotExist + n, err := rlookup(fs.root, dirname) + if err != nil { + return err } n.Lock() defer n.Unlock() - if n.Child(name, nil) != nil { + if child, err := n.Child(name, nil); err != nil { + return err + } else if child != nil { return os.ErrExist } - child := n.Child(name, func(inode) (child inode) { - child, err = n.FS().newNode(name, perm|os.ModeDir, time.Now()) - child.SetParent(n, name) + + _, err = n.Child(name, func(inode) (repl inode, err error) { + repl, err = n.FS().newNode(name, perm|os.ModeDir, time.Now()) + if err != nil { + return + } + repl.SetParent(n, name) return }) - if err != nil { - return err - } else if child == nil { - return ErrInvalidArgument - } - return nil + return err } -func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) { - node := rlookup(fs.root, name) - if node == nil { - err = os.ErrNotExist - } else { - fi = node.FileInfo() +func (fs *fileSystem) Stat(name string) (os.FileInfo, error) { + node, err := rlookup(fs.root, name) + if err != nil { + return nil, err } - return + return node.FileInfo(), nil } func (fs *fileSystem) Rename(oldname, newname string) error { @@ -438,7 +512,7 @@ func (fs *fileSystem) Rename(oldname, newname string) error { // call nca.FS().Rename() instead of proceeding. Until then // it's awkward for filesystems to implement their own Rename // methods effectively: the only one that runs is the one on - // the root filesystem exposed to the caller (webdav, fuse, + // the root FileSystem exposed to the caller (webdav, fuse, // etc). // When acquiring locks on multiple inodes, avoid deadlock by @@ -475,43 +549,31 @@ func (fs *fileSystem) Rename(oldname, newname string) error { } } - // Return ErrInvalidOperation if olddirf.inode doesn't even - // bother calling our "remove oldname entry" replacer func. - err = ErrInvalidArgument - olddirf.inode.Child(oldname, func(oldinode inode) inode { - err = nil + _, err = olddirf.inode.Child(oldname, func(oldinode inode) (inode, error) { if oldinode == nil { - err = os.ErrNotExist - return nil + return oldinode, os.ErrNotExist } if locked[oldinode] { // oldinode cannot become a descendant of itself. - err = ErrInvalidArgument - return oldinode + return oldinode, ErrInvalidArgument } if oldinode.FS() != cfs && newdirf.inode != olddirf.inode { // moving a mount point to a different parent // is not (yet) supported. - err = ErrInvalidArgument - return oldinode + return oldinode, ErrInvalidArgument } - accepted := newdirf.inode.Child(newname, func(existing inode) inode { + accepted, err := newdirf.inode.Child(newname, func(existing inode) (inode, error) { if existing != nil && existing.IsDir() { - err = ErrIsDirectory - return existing + return existing, ErrIsDirectory } - return oldinode + return oldinode, nil }) - if accepted != oldinode { - if err == nil { - // newdirf didn't accept oldinode. - err = ErrInvalidArgument - } + if err != nil { // Leave oldinode in olddir. - return oldinode + return oldinode, err } accepted.SetParent(newdirf.inode, newname) - return nil + return nil, nil }) return err } @@ -530,45 +592,51 @@ func (fs *fileSystem) RemoveAll(name string) error { return err } -func (fs *fileSystem) remove(name string, recursive bool) (err error) { +func (fs *fileSystem) remove(name string, recursive bool) error { dirname, name := path.Split(name) if name == "" || name == "." || name == ".." { return ErrInvalidArgument } - dir := rlookup(fs.root, dirname) - if dir == nil { - return os.ErrNotExist + dir, err := rlookup(fs.root, dirname) + if err != nil { + return err } dir.Lock() defer dir.Unlock() - dir.Child(name, func(node inode) inode { + _, err = dir.Child(name, func(node inode) (inode, error) { if node == nil { - err = os.ErrNotExist - return nil + return nil, os.ErrNotExist } if !recursive && node.IsDir() && node.Size() > 0 { - err = ErrDirectoryNotEmpty - return node + return node, ErrDirectoryNotEmpty } - return nil + return nil, nil }) return err } func (fs *fileSystem) Sync() error { - log.Printf("TODO: sync fileSystem") + if syncer, ok := fs.root.(syncer); ok { + return syncer.Sync() + } return ErrInvalidOperation } +func (fs *fileSystem) Flush(string, bool) error { + log.Printf("TODO: flush fileSystem") + return ErrInvalidOperation +} + +func (fs *fileSystem) MemorySize() int64 { + return fs.root.MemorySize() +} + // rlookup (recursive lookup) returns the inode for the file/directory // with the given name (which may contain "/" separators). If no such // file/directory exists, the returned node is nil. -func rlookup(start inode, path string) (node inode) { +func rlookup(start inode, path string) (node inode, err error) { node = start for _, name := range strings.Split(path, "/") { - if node == nil { - break - } if node.IsDir() { if name == "." || name == "" { continue @@ -578,11 +646,21 @@ func rlookup(start inode, path string) (node inode) { continue } } - node = func() inode { + node, err = func() (inode, error) { node.RLock() defer node.RUnlock() return node.Child(name, nil) }() + if node == nil || err != nil { + break + } + } + if node == nil && err == nil { + err = os.ErrNotExist } return } + +func permittedName(name string) bool { + return name != "" && name != "." && name != ".." && !strings.Contains(name, "/") +}