14345: Use "." placeholder to persist empty directories.
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 5 Nov 2018 21:15:15 +0000 (16:15 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 7 Nov 2018 19:10:28 +0000 (14:10 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go

index e5c0b32acef7fc5d01b5753c5930a90bd6998c52..b996542abd52cf7be04549962fdb31dfb7a366a0 100644 (file)
@@ -642,6 +642,17 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
        var subdirs string
        var blocks []string
 
+       if len(dn.inodes) == 0 {
+               if prefix == "." {
+                       return "", nil
+               }
+               // Express the existence of an empty directory by
+               // adding an empty file named `\056`, which (unlike
+               // the more obvious spelling `.`) is accepted by the
+               // API's manifest validator.
+               return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
+       }
+
        names := make([]string, 0, len(dn.inodes))
        for name := range dn.inodes {
                names = append(names, name)
@@ -770,8 +781,14 @@ func (dn *dirnode) loadManifest(txt string) error {
                        }
                        name := dirname + "/" + manifestUnescape(toks[2])
                        fnode, err := dn.createFileAndParents(name)
-                       if err != nil {
-                               return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+                       if fnode == nil && err == nil && length == 0 {
+                               // Special case: an empty file used as
+                               // a marker to preserve an otherwise
+                               // empty directory in a manifest.
+                               continue
+                       }
+                       if err != nil || (fnode == nil && length != 0) {
+                               return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
                        }
                        // Map the stream offset/range coordinates to
                        // block/offset/range coordinates and add
@@ -829,15 +846,14 @@ func (dn *dirnode) loadManifest(txt string) error {
        return nil
 }
 
-// only safe to call from loadManifest -- no locking
+// only safe to call from loadManifest -- no locking.
+//
+// If path is a "parent directory exists" marker (the last path
+// component is "."), the returned values are both nil.
 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
        var node inode = dn
        names := strings.Split(path, "/")
        basename := names[len(names)-1]
-       if !permittedName(basename) {
-               err = fmt.Errorf("invalid file part %q in path %q", basename, path)
-               return
-       }
        for _, name := range names[:len(names)-1] {
                switch name {
                case "", ".":
@@ -868,6 +884,12 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
                        return
                }
        }
+       if basename == "." {
+               return
+       } else if !permittedName(basename) {
+               err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+               return
+       }
        _, err = node.Child(basename, func(child inode) (inode, error) {
                switch child := child.(type) {
                case nil:
index 5faa5794f4a94df38e1432e38844bb925c8071c4..fee7feaef4907666db47c3c48abbeeb4db2f2072 100644 (file)
@@ -842,11 +842,11 @@ func (s *CollectionFSSuite) TestPersist(c *check.C) {
        }
 }
 
-func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
        var err error
        s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
-       for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+       for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
                err = s.fs.Mkdir(name, 0755)
                c.Assert(err, check.IsNil)
        }
@@ -892,6 +892,23 @@ func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
                c.Check(err, check.IsNil)
                c.Check(buf, check.DeepEquals, data)
        }
+
+       expectDir := map[string]int{
+               "empty":           0,
+               "not empty":       1,
+               "not empty/empty": 0,
+       }
+       for name, expectLen := range expectDir {
+               _, err := persisted.Open(name + "/bogus")
+               c.Check(err, check.NotNil)
+
+               d, err := persisted.Open(name)
+               defer d.Close()
+               c.Check(err, check.IsNil)
+               fi, err := d.Readdir(-1)
+               c.Check(err, check.IsNil)
+               c.Check(fi, check.HasLen, expectLen)
+       }
 }
 
 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
@@ -1043,6 +1060,12 @@ func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
                ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
                "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
        } {
@@ -1058,7 +1081,9 @@ func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
        for _, txt := range []string{
                "",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
-               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
        } {