16306: Merge branch 'master'
[arvados.git] / sdk / go / arvados / fs_base.go
index 4d22c1e82c6cc807d34c9e4c98bb8a7b68f731c2..aa75fee7c4d0f5bf47826d12300b51cdcf08a424 100644 (file)
@@ -27,9 +27,14 @@ var (
        ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
        ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
        ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
+       ErrNotADirectory     = errors.New("not a directory")
        ErrPermission        = os.ErrPermission
 )
 
+type syncer interface {
+       Sync() error
+}
+
 // A File is an *os.File-like interface for reading and writing files
 // in a FileSystem.
 type File interface {
@@ -57,6 +62,9 @@ type FileSystem interface {
        // while locking multiple inodes.
        locker() sync.Locker
 
+       // throttle for limiting concurrent background writers
+       throttle() *throttle
+
        // create a new node with nil parent.
        newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
 
@@ -85,7 +93,19 @@ type FileSystem interface {
        Remove(name string) error
        RemoveAll(name string) error
        Rename(oldname, newname string) error
+
+       // Write buffered data from memory to storage, returning when
+       // all updates have been saved to persistent storage.
        Sync() error
+
+       // Write buffered data from memory to storage, but don't wait
+       // for all writes to finish before returning. If shortBlocks
+       // is true, flush everything; otherwise, if there's less than
+       // a full block of buffered data at the end of a stream, leave
+       // it buffered in memory in case more data can be appended. If
+       // path is "", flush all dirs/streams; otherwise, flush only
+       // the specified dir/stream.
+       Flush(path string, shortBlocks bool) error
 }
 
 type inode interface {
@@ -102,6 +122,9 @@ type inode interface {
 
        // Child() performs lookups and updates of named child nodes.
        //
+       // (The term "child" here is used strictly. This means name is
+       // not "." or "..", and name does not contain "/".)
+       //
        // If replace is non-nil, Child calls replace(x) where x is
        // the current child inode with the given name. If possible,
        // the child inode is replaced with the one returned by
@@ -128,7 +151,7 @@ type inode interface {
        // a child was added or changed, the new child is returned.
        //
        // Caller must have lock (or rlock if replace is nil).
-       Child(name string, replace func(inode) inode) inode
+       Child(name string, replace func(inode) (inode, error)) (inode, error)
 
        sync.Locker
        RLock()
@@ -202,8 +225,8 @@ func (*nullnode) Readdir() ([]os.FileInfo, error) {
        return nil, ErrInvalidOperation
 }
 
-func (*nullnode) Child(name string, replace func(inode) inode) inode {
-       return nil
+func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
+       return nil, ErrNotADirectory
 }
 
 type treenode struct {
@@ -236,18 +259,25 @@ func (n *treenode) IsDir() bool {
        return true
 }
 
-func (n *treenode) Child(name string, replace func(inode) inode) (child inode) {
-       // TODO: special treatment for "", ".", ".."
+func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) {
        child = n.inodes[name]
-       if replace != nil {
-               newchild := replace(child)
-               if newchild == nil {
-                       delete(n.inodes, name)
-               } else if newchild != child {
-                       n.inodes[name] = newchild
-                       n.fileinfo.modTime = time.Now()
-                       child = newchild
-               }
+       if name == "" || name == "." || name == ".." {
+               err = ErrInvalidArgument
+               return
+       }
+       if replace == nil {
+               return
+       }
+       newchild, err := replace(child)
+       if err != nil {
+               return
+       }
+       if newchild == nil {
+               delete(n.inodes, name)
+       } else if newchild != child {
+               n.inodes[name] = newchild
+               n.fileinfo.modTime = time.Now()
+               child = newchild
        }
        return
 }
@@ -273,16 +303,37 @@ func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
        return
 }
 
+func (n *treenode) Sync() error {
+       n.RLock()
+       defer n.RUnlock()
+       for _, inode := range n.inodes {
+               syncer, ok := inode.(syncer)
+               if !ok {
+                       return ErrInvalidOperation
+               }
+               err := syncer.Sync()
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
 type fileSystem struct {
        root inode
        fsBackend
        mutex sync.Mutex
+       thr   *throttle
 }
 
 func (fs *fileSystem) rootnode() inode {
        return fs.root
 }
 
+func (fs *fileSystem) throttle() *throttle {
+       return fs.thr
+}
+
 func (fs *fileSystem) locker() sync.Locker {
        return &fs.mutex
 }
@@ -297,9 +348,9 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha
                return nil, ErrSyncNotSupported
        }
        dirname, name := path.Split(name)
-       parent := rlookup(fs.root, dirname)
-       if parent == nil {
-               return nil, os.ErrNotExist
+       parent, err := rlookup(fs.root, dirname)
+       if err != nil {
+               return nil, err
        }
        var readable, writable bool
        switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
@@ -331,22 +382,26 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha
                parent.RLock()
                defer parent.RUnlock()
        }
-       n := parent.Child(name, nil)
-       if n == nil {
+       n, err := parent.Child(name, nil)
+       if err != nil {
+               return nil, err
+       } else if n == nil {
                if !createMode {
                        return nil, os.ErrNotExist
                }
-               var err error
-               n = parent.Child(name, func(inode) inode {
-                       n, err = parent.FS().newNode(name, perm|0755, time.Now())
-                       n.SetParent(parent, name)
-                       return n
+               n, err = parent.Child(name, func(inode) (repl inode, err error) {
+                       repl, err = parent.FS().newNode(name, perm|0755, time.Now())
+                       if err != nil {
+                               return
+                       }
+                       repl.SetParent(parent, name)
+                       return
                })
                if err != nil {
                        return nil, err
                } else if n == nil {
-                       // parent rejected new child
-                       return nil, ErrInvalidOperation
+                       // Parent rejected new child, but returned no error
+                       return nil, ErrInvalidArgument
                }
        } else if flag&os.O_EXCL != 0 {
                return nil, ErrFileExists
@@ -375,38 +430,37 @@ func (fs *fileSystem) Create(name string) (File, error) {
        return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
 }
 
-func (fs *fileSystem) Mkdir(name string, perm os.FileMode) (err error) {
+func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
        dirname, name := path.Split(name)
-       n := rlookup(fs.root, dirname)
-       if n == nil {
-               return os.ErrNotExist
+       n, err := rlookup(fs.root, dirname)
+       if err != nil {
+               return err
        }
        n.Lock()
        defer n.Unlock()
-       if n.Child(name, nil) != nil {
+       if child, err := n.Child(name, nil); err != nil {
+               return err
+       } else if child != nil {
                return os.ErrExist
        }
-       child := n.Child(name, func(inode) (child inode) {
-               child, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
-               child.SetParent(n, name)
+
+       _, err = n.Child(name, func(inode) (repl inode, err error) {
+               repl, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
+               if err != nil {
+                       return
+               }
+               repl.SetParent(n, name)
                return
        })
-       if err != nil {
-               return err
-       } else if child == nil {
-               return ErrInvalidArgument
-       }
-       return nil
+       return err
 }
 
-func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
-       node := rlookup(fs.root, name)
-       if node == nil {
-               err = os.ErrNotExist
-       } else {
-               fi = node.FileInfo()
+func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
+       node, err := rlookup(fs.root, name)
+       if err != nil {
+               return nil, err
        }
-       return
+       return node.FileInfo(), nil
 }
 
 func (fs *fileSystem) Rename(oldname, newname string) error {
@@ -438,7 +492,7 @@ func (fs *fileSystem) Rename(oldname, newname string) error {
        // call nca.FS().Rename() instead of proceeding. Until then
        // it's awkward for filesystems to implement their own Rename
        // methods effectively: the only one that runs is the one on
-       // the root filesystem exposed to the caller (webdav, fuse,
+       // the root FileSystem exposed to the caller (webdav, fuse,
        // etc).
 
        // When acquiring locks on multiple inodes, avoid deadlock by
@@ -475,43 +529,31 @@ func (fs *fileSystem) Rename(oldname, newname string) error {
                }
        }
 
-       // Return ErrInvalidOperation if olddirf.inode doesn't even
-       // bother calling our "remove oldname entry" replacer func.
-       err = ErrInvalidArgument
-       olddirf.inode.Child(oldname, func(oldinode inode) inode {
-               err = nil
+       _, err = olddirf.inode.Child(oldname, func(oldinode inode) (inode, error) {
                if oldinode == nil {
-                       err = os.ErrNotExist
-                       return nil
+                       return oldinode, os.ErrNotExist
                }
                if locked[oldinode] {
                        // oldinode cannot become a descendant of itself.
-                       err = ErrInvalidArgument
-                       return oldinode
+                       return oldinode, ErrInvalidArgument
                }
                if oldinode.FS() != cfs && newdirf.inode != olddirf.inode {
                        // moving a mount point to a different parent
                        // is not (yet) supported.
-                       err = ErrInvalidArgument
-                       return oldinode
+                       return oldinode, ErrInvalidArgument
                }
-               accepted := newdirf.inode.Child(newname, func(existing inode) inode {
+               accepted, err := newdirf.inode.Child(newname, func(existing inode) (inode, error) {
                        if existing != nil && existing.IsDir() {
-                               err = ErrIsDirectory
-                               return existing
+                               return existing, ErrIsDirectory
                        }
-                       return oldinode
+                       return oldinode, nil
                })
-               if accepted != oldinode {
-                       if err == nil {
-                               // newdirf didn't accept oldinode.
-                               err = ErrInvalidArgument
-                       }
+               if err != nil {
                        // Leave oldinode in olddir.
-                       return oldinode
+                       return oldinode, err
                }
                accepted.SetParent(newdirf.inode, newname)
-               return nil
+               return nil, nil
        })
        return err
 }
@@ -530,45 +572,47 @@ func (fs *fileSystem) RemoveAll(name string) error {
        return err
 }
 
-func (fs *fileSystem) remove(name string, recursive bool) (err error) {
+func (fs *fileSystem) remove(name string, recursive bool) error {
        dirname, name := path.Split(name)
        if name == "" || name == "." || name == ".." {
                return ErrInvalidArgument
        }
-       dir := rlookup(fs.root, dirname)
-       if dir == nil {
-               return os.ErrNotExist
+       dir, err := rlookup(fs.root, dirname)
+       if err != nil {
+               return err
        }
        dir.Lock()
        defer dir.Unlock()
-       dir.Child(name, func(node inode) inode {
+       _, err = dir.Child(name, func(node inode) (inode, error) {
                if node == nil {
-                       err = os.ErrNotExist
-                       return nil
+                       return nil, os.ErrNotExist
                }
                if !recursive && node.IsDir() && node.Size() > 0 {
-                       err = ErrDirectoryNotEmpty
-                       return node
+                       return node, ErrDirectoryNotEmpty
                }
-               return nil
+               return nil, nil
        })
        return err
 }
 
 func (fs *fileSystem) Sync() error {
-       log.Printf("TODO: sync fileSystem")
+       if syncer, ok := fs.root.(syncer); ok {
+               return syncer.Sync()
+       }
+       return ErrInvalidOperation
+}
+
+func (fs *fileSystem) Flush(string, bool) error {
+       log.Printf("TODO: flush fileSystem")
        return ErrInvalidOperation
 }
 
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
-func rlookup(start inode, path string) (node inode) {
+func rlookup(start inode, path string) (node inode, err error) {
        node = start
        for _, name := range strings.Split(path, "/") {
-               if node == nil {
-                       break
-               }
                if node.IsDir() {
                        if name == "." || name == "" {
                                continue
@@ -578,11 +622,21 @@ func rlookup(start inode, path string) (node inode) {
                                continue
                        }
                }
-               node = func() inode {
+               node, err = func() (inode, error) {
                        node.RLock()
                        defer node.RUnlock()
                        return node.Child(name, nil)
                }()
+               if node == nil || err != nil {
+                       break
+               }
+       }
+       if node == nil && err == nil {
+               err = os.ErrNotExist
        }
        return
 }
+
+func permittedName(name string) bool {
+       return name != "" && name != "." && name != ".." && !strings.Contains(name, "/")
+}