Merge branch '21535-multi-wf-delete'
[arvados.git] / sdk / go / arvados / fs_base.go
index 274d20702287ed464d4ea8f3796e528c3f61b30b..430a0d4c9be4f69e4f86adc3cb93d2549c8fc930 100644 (file)
@@ -13,6 +13,7 @@ import (
        "net/http"
        "os"
        "path"
+       "path/filepath"
        "strings"
        "sync"
        "time"
@@ -387,17 +388,28 @@ func (n *treenode) Size() int64 {
 }
 
 func (n *treenode) FileInfo() os.FileInfo {
-       n.Lock()
-       defer n.Unlock()
-       n.fileinfo.size = int64(len(n.inodes))
-       return n.fileinfo
+       n.RLock()
+       defer n.RUnlock()
+       fi := n.fileinfo
+       fi.size = int64(len(n.inodes))
+       return fi
 }
 
 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
+       // We need RLock to safely read n.inodes, but we must release
+       // it before calling FileInfo() on the child nodes. Otherwise,
+       // we risk deadlock when filter groups A and B match each
+       // other, concurrent Readdir() calls try to RLock them in
+       // opposite orders, and one cannot be RLocked a second time
+       // because a third caller is waiting for a write lock.
        n.RLock()
-       defer n.RUnlock()
-       fi = make([]os.FileInfo, 0, len(n.inodes))
+       inodes := make([]inode, 0, len(n.inodes))
        for _, inode := range n.inodes {
+               inodes = append(inodes, inode)
+       }
+       n.RUnlock()
+       fi = make([]os.FileInfo, 0, len(inodes))
+       for _, inode := range inodes {
                fi = append(fi, inode.FileInfo())
        }
        return
@@ -468,7 +480,8 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha
                return nil, ErrSyncNotSupported
        }
        dirname, name := path.Split(name)
-       parent, err := rlookup(fs.root, dirname)
+       ancestors := map[inode]bool{}
+       parent, err := rlookup(fs.root, dirname, ancestors)
        if err != nil {
                return nil, err
        }
@@ -533,6 +546,24 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha
                        return nil, err
                }
        }
+       // If n and one of its parents/ancestors are [hardlinks to]
+       // the same node (e.g., a filter group that matches itself),
+       // open an "empty directory" node instead, so the inner
+       // hardlink appears empty. This is needed to ensure
+       // Open("a/b/c/x/x").Readdir() appears empty, matching the
+       // behavior of rlookup("a/b/c/x/x/z") => ErrNotExist.
+       if hl, ok := n.(*hardlink); (ok && ancestors[hl.inode]) || ancestors[n] {
+               n = &treenode{
+                       fs:     n.FS(),
+                       parent: parent,
+                       inodes: nil,
+                       fileinfo: fileinfo{
+                               name:    name,
+                               modTime: time.Now(),
+                               mode:    0555 | os.ModeDir,
+                       },
+               }
+       }
        return &filehandle{
                inode:    n,
                append:   flag&os.O_APPEND != 0,
@@ -551,7 +582,7 @@ func (fs *fileSystem) Create(name string) (File, error) {
 
 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
        dirname, name := path.Split(name)
-       n, err := rlookup(fs.root, dirname)
+       n, err := rlookup(fs.root, dirname, nil)
        if err != nil {
                return err
        }
@@ -575,7 +606,7 @@ func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
 }
 
 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
-       node, err := rlookup(fs.root, name)
+       node, err := rlookup(fs.root, name, nil)
        if err != nil {
                return nil, err
        }
@@ -704,7 +735,7 @@ func (fs *fileSystem) remove(name string, recursive bool) error {
        if name == "" || name == "." || name == ".." {
                return ErrInvalidArgument
        }
-       dir, err := rlookup(fs.root, dirname)
+       dir, err := rlookup(fs.root, dirname, nil)
        if err != nil {
                return err
        }
@@ -741,9 +772,31 @@ func (fs *fileSystem) MemorySize() int64 {
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
-func rlookup(start inode, path string) (node inode, err error) {
+//
+// The visited map should be either nil or empty. If non-nil, all
+// nodes and hardlink targets visited by the given path will be added
+// to it.
+//
+// If a cycle is detected, the second occurrence of the offending node
+// will be replaced by an empty directory. For example, if "x" is a
+// filter group that matches itself, then rlookup("a/b/c/x") will
+// return the filter group, and rlookup("a/b/c/x/x") will return an
+// empty directory.
+func rlookup(start inode, path string, visited map[inode]bool) (node inode, err error) {
+       if visited == nil {
+               visited = map[inode]bool{}
+       }
        node = start
+       // Clean up ./ and ../ and double-slashes, but (unlike
+       // filepath.Clean) retain a trailing slash, because looking up
+       // ".../regularfile/" should fail.
+       trailingSlash := strings.HasSuffix(path, "/")
+       path = filepath.Clean(path)
+       if trailingSlash && path != "/" {
+               path += "/"
+       }
        for _, name := range strings.Split(path, "/") {
+               visited[node] = true
                if node.IsDir() {
                        if name == "." || name == "" {
                                continue
@@ -761,6 +814,24 @@ func rlookup(start inode, path string) (node inode, err error) {
                if node == nil || err != nil {
                        break
                }
+               checknode := node
+               if hardlinked, ok := checknode.(*hardlink); ok {
+                       checknode = hardlinked.inode
+               }
+               if visited[checknode] {
+                       node = &treenode{
+                               fs:     node.FS(),
+                               parent: node.Parent(),
+                               inodes: nil,
+                               fileinfo: fileinfo{
+                                       name:    name,
+                                       modTime: time.Now(),
+                                       mode:    0555 | os.ModeDir,
+                               },
+                       }
+               } else {
+                       visited[checknode] = true
+               }
        }
        if node == nil && err == nil {
                err = os.ErrNotExist