21214: Permit filter groups to match filter groups but avoid cycles.
authorTom Clegg <tom@curii.com>
Tue, 12 Dec 2023 16:27:27 +0000 (11:27 -0500)
committerTom Clegg <tom@curii.com>
Tue, 12 Dec 2023 16:27:27 +0000 (11:27 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/controller/localdb/group.go
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_lookup.go
sdk/go/arvados/fs_project.go
sdk/go/arvados/fs_project_test.go
sdk/go/arvados/fs_site.go
services/keep-web/handler_test.go

index a21a062db805d77563de347e19208215848beddf..418fd6b8b7ec5bcc955c62c0d1207b66284d587b 100644 (file)
@@ -97,12 +97,6 @@ func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupConten
                                filter.Operand = tmp[2]
                                options.Filters = append(options.Filters, filter)
                        }
-                       // Prevent infinite-depth trees caused by a
-                       // filter group whose filters match [other
-                       // filter groups that match] itself. Such
-                       // cycles are supported by sitefs, but they
-                       // cause havoc in webdav. See #21214.
-                       options.Filters = append(options.Filters, arvados.Filter{"groups.group_class", "=", "project"})
                } else {
                        return resp, fmt.Errorf("filter unparsable: not an array\n")
                }
index 274d20702287ed464d4ea8f3796e528c3f61b30b..1841d79f997f697557524a2787e244becd13d25c 100644 (file)
@@ -13,6 +13,7 @@ import (
        "net/http"
        "os"
        "path"
+       "path/filepath"
        "strings"
        "sync"
        "time"
@@ -387,17 +388,28 @@ func (n *treenode) Size() int64 {
 }
 
 func (n *treenode) FileInfo() os.FileInfo {
-       n.Lock()
-       defer n.Unlock()
-       n.fileinfo.size = int64(len(n.inodes))
-       return n.fileinfo
+       n.RLock()
+       defer n.RUnlock()
+       fi := n.fileinfo
+       fi.size = int64(len(n.inodes))
+       return fi
 }
 
 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
+       // We need RLock to safely read n.inodes, but we must release
+       // it before calling FileInfo() on the child nodes. Otherwise,
+       // we risk deadlock when filter groups A and B match each
+       // other, concurrent Readdir() calls try to RLock them in
+       // opposite orders, and one cannot be RLocked a second time
+       // because a third caller is waiting for a write lock.
        n.RLock()
-       defer n.RUnlock()
-       fi = make([]os.FileInfo, 0, len(n.inodes))
+       inodes := make([]inode, 0, len(n.inodes))
        for _, inode := range n.inodes {
+               inodes = append(inodes, inode)
+       }
+       n.RUnlock()
+       fi = make([]os.FileInfo, 0, len(inodes))
+       for _, inode := range inodes {
                fi = append(fi, inode.FileInfo())
        }
        return
@@ -468,7 +480,8 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha
                return nil, ErrSyncNotSupported
        }
        dirname, name := path.Split(name)
-       parent, err := rlookup(fs.root, dirname)
+       ancestors := map[inode]bool{}
+       parent, err := rlookup(fs.root, dirname, ancestors)
        if err != nil {
                return nil, err
        }
@@ -533,6 +546,24 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha
                        return nil, err
                }
        }
+       // If n and one of its parents/ancestors are [hardlinks to]
+       // the same node (e.g., a filter group that matches itself),
+       // open an "empty directory" node instead, so the inner
+       // hardlink appears empty. This is needed to ensure
+       // Open("a/b/c/x/x").Readdir() appears empty, matching the
+       // behavior of rlookup("a/b/c/x/x/z") => ErrNotExist.
+       if hl, ok := n.(*hardlink); (ok && ancestors[hl.inode]) || ancestors[n] {
+               n = &treenode{
+                       fs:     n.FS(),
+                       parent: parent,
+                       inodes: nil,
+                       fileinfo: fileinfo{
+                               name:    name,
+                               modTime: time.Now(),
+                               mode:    0555 | os.ModeDir,
+                       },
+               }
+       }
        return &filehandle{
                inode:    n,
                append:   flag&os.O_APPEND != 0,
@@ -551,7 +582,7 @@ func (fs *fileSystem) Create(name string) (File, error) {
 
 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
        dirname, name := path.Split(name)
-       n, err := rlookup(fs.root, dirname)
+       n, err := rlookup(fs.root, dirname, nil)
        if err != nil {
                return err
        }
@@ -575,7 +606,7 @@ func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
 }
 
 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
-       node, err := rlookup(fs.root, name)
+       node, err := rlookup(fs.root, name, nil)
        if err != nil {
                return nil, err
        }
@@ -704,7 +735,7 @@ func (fs *fileSystem) remove(name string, recursive bool) error {
        if name == "" || name == "." || name == ".." {
                return ErrInvalidArgument
        }
-       dir, err := rlookup(fs.root, dirname)
+       dir, err := rlookup(fs.root, dirname, nil)
        if err != nil {
                return err
        }
@@ -741,9 +772,23 @@ func (fs *fileSystem) MemorySize() int64 {
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
-func rlookup(start inode, path string) (node inode, err error) {
+//
+// The visited map should be either nil or empty. If non-nil, all
+// nodes and hardlink targets visited by the given path will be added
+// to it.
+//
+// If a cycle is detected, the second occurrence of the offending node
+// will be replaced by an empty directory. For example, if "x" is a
+// filter group that matches itself, then rlookup("a/b/c/x") will
+// return the filter group, and rlookup("a/b/c/x/x") will return an
+// empty directory.
+func rlookup(start inode, path string, visited map[inode]bool) (node inode, err error) {
+       if visited == nil {
+               visited = map[inode]bool{}
+       }
        node = start
-       for _, name := range strings.Split(path, "/") {
+       for _, name := range strings.Split(filepath.Clean(path), "/") {
+               visited[node] = true
                if node.IsDir() {
                        if name == "." || name == "" {
                                continue
@@ -761,6 +806,24 @@ func rlookup(start inode, path string) (node inode, err error) {
                if node == nil || err != nil {
                        break
                }
+               checknode := node
+               if hardlinked, ok := checknode.(*hardlink); ok {
+                       checknode = hardlinked.inode
+               }
+               if visited[checknode] {
+                       node = &treenode{
+                               fs:     node.FS(),
+                               parent: node.Parent(),
+                               inodes: nil,
+                               fileinfo: fileinfo{
+                                       name:    name,
+                                       modTime: time.Now(),
+                                       mode:    0555 | os.ModeDir,
+                               },
+                       }
+               } else {
+                       visited[checknode] = true
+               }
        }
        if node == nil && err == nil {
                err = os.ErrNotExist
index 84ff69d6bd0ad16b053fd9ab6409fc04ee5c3024..052cc1aa37581aca2351200c5444304ce912571c 100644 (file)
@@ -457,7 +457,7 @@ func (fs *collectionFileSystem) Sync() error {
 }
 
 func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
-       node, err := rlookup(fs.fileSystem.root, path)
+       node, err := rlookup(fs.fileSystem.root, path, nil)
        if err != nil {
                return err
        }
index 2bb09995e16e476b3889abbdc1c61b6fc1abbd15..7f2244931877aff551e5f1c0687d81d4cc8c49ed 100644 (file)
@@ -48,7 +48,19 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
                        return nil, err
                }
                for _, child := range all {
-                       _, err = ln.treenode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
+                       var name string
+                       if hl, ok := child.(*hardlink); ok && hl.inode == ln {
+                               // If child is a hardlink to its
+                               // parent, FileInfo()->RLock() will
+                               // deadlock, because we already have
+                               // the write lock. In this situation
+                               // we can safely access the hardlink's
+                               // name directly.
+                               name = hl.name
+                       } else {
+                               name = child.FileInfo().Name()
+                       }
+                       _, err = ln.treenode.Child(name, func(inode) (inode, error) {
                                return child, nil
                        })
                        if err != nil {
index ed9bb0c08d6c5fa3c2004b574a76bbc26280868c..df1d06e753b35191f344499fe17c9953dc849daa 100644 (file)
@@ -35,6 +35,7 @@ func (fs *customFileSystem) projectsLoadOne(parent inode, uuid, name string) (in
                contents = CollectionList{}
                err = fs.RequestAndDecode(&contents, "GET", "arvados/v1/groups/"+uuid+"/contents", nil, ResourceListParams{
                        Count: "none",
+                       Order: "uuid",
                        Filters: []Filter{
                                {"name", "=", strings.Replace(name, subst, "/", -1)},
                                {"uuid", "is_a", []string{"arvados#collection", "arvados#group"}},
index 5a675100f73bdc1925ef17c4ba30cbd0c877288e..5c2eb33d121b553de5876b11650117a7968fa296 100644 (file)
@@ -54,6 +54,31 @@ func (s *SiteFSSuite) TestFilterGroup(c *check.C) {
                }
        }
 
+       checkDirContains := func(parent, child string, exists bool) {
+               f, err := s.fs.Open(parent)
+               if !c.Check(err, check.IsNil) {
+                       return
+               }
+               ents, err := f.Readdir(-1)
+               if !c.Check(err, check.IsNil) {
+                       return
+               }
+               for _, ent := range ents {
+                       if !exists {
+                               c.Check(ent.Name(), check.Not(check.Equals), child)
+                               if child == "" {
+                                       // no children are expected
+                                       c.Errorf("child %q found in parent %q", child, parent)
+                               }
+                       } else if ent.Name() == child {
+                               return
+                       }
+               }
+               if exists {
+                       c.Errorf("child %q not found in parent %q", child, parent)
+               }
+       }
+
        checkOpen("/users/active/This filter group/baz_file", true)
        checkOpen("/users/active/This filter group/A Subproject", true)
        checkOpen("/users/active/This filter group/A Project", false)
@@ -76,6 +101,13 @@ func (s *SiteFSSuite) TestFilterGroup(c *check.C) {
        checkOpen("/fg2/A Subproject", true)
        checkOpen("/fg2/A Project", true)
 
+       // If a filter group matches itself or one of its ancestors,
+       // the matched item appears as an empty directory.
+       checkDirContains("/users/active/A filter group without filters", "A filter group without filters", true)
+       checkOpen("/users/active/A filter group without filters/A filter group without filters", true)
+       checkOpen("/users/active/A filter group without filters/A filter group without filters/baz_file", false)
+       checkDirContains("/users/active/A filter group without filters/A filter group without filters", "", false)
+
        // An 'is_a' 'arvados#collection' filter means only collections should be returned.
        checkOpen("/users/active/A filter group with an is_a collection filter/baz_file", true)
        checkOpen("/users/active/A filter group with an is_a collection filter/baz_file/baz", true)
index a4a18837e00e7074521ce3e562fb30c21b84c1eb..1e7d51b9ad078adc88644e86a9401eff51c77782 100644 (file)
@@ -386,3 +386,7 @@ func (hl *hardlink) FileInfo() os.FileInfo {
        }
        return fi
 }
+
+func (hl *hardlink) MemorySize() int64 {
+       return 64 + int64(len(hl.name))
+}
index 42bfd80fa5b8151d5b97c701f760a73161fa47e7..5a12e26e9dcdf75ffd1e3e0b7a90ec7176fbe36e 100644 (file)
@@ -1277,7 +1277,7 @@ func (s *IntegrationSuite) testDirectoryListing(c *check.C) {
                        cutDirs: 3,
                },
        } {
-               comment := check.Commentf("HTML: %q => %q", trial.uri, trial.expect)
+               comment := check.Commentf("HTML: %q redir %q => %q", trial.uri, trial.redirect, trial.expect)
                resp := httptest.NewRecorder()
                u := mustParseURL("//" + trial.uri)
                req := &http.Request{
@@ -1346,6 +1346,12 @@ func (s *IntegrationSuite) testDirectoryListing(c *check.C) {
                }
                resp = httptest.NewRecorder()
                s.handler.ServeHTTP(resp, req)
+               // This check avoids logging a big XML document in the
+               // event webdav throws a 500 error after sending
+               // headers for a 207.
+               if !c.Check(strings.HasSuffix(resp.Body.String(), "Internal Server Error"), check.Equals, false) {
+                       continue
+               }
                if trial.expect == nil {
                        c.Check(resp.Code, check.Equals, http.StatusUnauthorized, comment)
                } else {