12483: Add Rename(old,new) to CollectionFileSystem.
authorTom Clegg <tclegg@veritasgenetics.com>
Fri, 17 Nov 2017 16:06:24 +0000 (11:06 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Sat, 18 Nov 2017 07:28:40 +0000 (02:28 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/collection_fs.go
sdk/go/arvados/collection_fs_test.go

index 6628a7596e94508d6f0b57fd03a993b54e161532..aa549e1159f18ebbd7de4fb3e48c48d7e2eb3525 100644 (file)
@@ -24,9 +24,11 @@ var (
        ErrNegativeOffset    = errors.New("cannot seek to negative offset")
        ErrFileExists        = errors.New("file exists")
        ErrInvalidOperation  = errors.New("invalid operation")
+       ErrInvalidArgument   = errors.New("invalid argument")
        ErrDirectoryNotEmpty = errors.New("directory not empty")
        ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
        ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
+       ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
        ErrPermission        = os.ErrPermission
 
        maxBlockSize = 1 << 26
@@ -114,6 +116,7 @@ type CollectionFileSystem interface {
 
        Mkdir(name string, perm os.FileMode) error
        Remove(name string) error
+       Rename(oldname, newname string) error
        MarshalManifest(prefix string) (string, error)
 }
 
@@ -947,6 +950,79 @@ func (dn *dirnode) Remove(name string) error {
        return nil
 }
 
+func (dn *dirnode) Rename(oldname, newname string) error {
+       olddir, oldname := path.Split(oldname)
+       if oldname == "" || oldname == "." || oldname == ".." {
+               return ErrInvalidArgument
+       }
+       olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
+       if err != nil {
+               return fmt.Errorf("%q: %s", olddir, err)
+       }
+       defer olddirf.Close()
+       newdir, newname := path.Split(newname)
+       if newname == "." || newname == ".." {
+               return ErrInvalidArgument
+       } else if newname == "" {
+               // Rename("a/b", "c/") means Rename("a/b", "c/b")
+               newname = oldname
+       }
+       newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
+       if err != nil {
+               return fmt.Errorf("%q: %s", newdir, err)
+       }
+       defer newdirf.Close()
+
+       // When acquiring locks on multiple nodes, all common
+       // ancestors must be locked first in order to avoid
+       // deadlock. This is assured by locking the path from root to
+       // newdir, then locking the path from root to olddir, skipping
+       // any already-locked nodes.
+       needLock := []sync.Locker{}
+       for _, f := range []*file{olddirf, newdirf} {
+               node := f.inode
+               needLock = append(needLock, node)
+               for node.Parent() != node {
+                       node = node.Parent()
+                       needLock = append(needLock, node)
+               }
+       }
+       locked := map[sync.Locker]bool{}
+       for i := len(needLock) - 1; i >= 0; i-- {
+               if n := needLock[i]; !locked[n] {
+                       n.Lock()
+                       defer n.Unlock()
+                       locked[n] = true
+               }
+       }
+
+       olddn := olddirf.inode.(*dirnode)
+       newdn := newdirf.inode.(*dirnode)
+       oldinode, ok := olddn.inodes[oldname]
+       if !ok {
+               return os.ErrNotExist
+       }
+       if existing, ok := newdn.inodes[newname]; ok {
+               // overwriting an existing file or dir
+               if dn, ok := existing.(*dirnode); ok {
+                       if !oldinode.Stat().IsDir() {
+                               return ErrIsDirectory
+                       }
+                       dn.RLock()
+                       defer dn.RUnlock()
+                       if len(dn.inodes) > 0 {
+                               return ErrDirectoryNotEmpty
+                       }
+               }
+       } else {
+               newdn.fileinfo.size++
+       }
+       newdn.inodes[newname] = oldinode
+       delete(olddn.inodes, oldname)
+       olddn.fileinfo.size--
+       return nil
+}
+
 func (dn *dirnode) Parent() inode {
        dn.RLock()
        defer dn.RUnlock()
index fc3af34b7ac4ef78980eb828a8ea747a8d9c61cb..28f5a3499c4a544c6f6b7f8083cca1277f8d3920 100644 (file)
@@ -507,6 +507,89 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        // TODO: check manifest content
 }
 
+func (s *CollectionFSSuite) TestRename(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       const (
+               outer = 16
+               inner = 16
+       )
+       for i := 0; i < outer; i++ {
+               err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
+               c.Assert(err, check.IsNil)
+               for j := 0; j < inner; j++ {
+                       err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
+                       c.Assert(err, check.IsNil)
+                       for _, fnm := range []string{
+                               fmt.Sprintf("dir%d/file%d", i, j),
+                               fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+                       } {
+                               f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
+                               c.Assert(err, check.IsNil)
+                               _, err = f.Write([]byte("beep"))
+                               c.Assert(err, check.IsNil)
+                               f.Close()
+                       }
+               }
+       }
+       var wg sync.WaitGroup
+       for i := 0; i < outer; i++ {
+               for j := 0; j < inner; j++ {
+                       wg.Add(1)
+                       go func(i, j int) {
+                               defer wg.Done()
+                               oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
+                               newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
+                               _, err := fs.Open(newname)
+                               c.Check(err, check.Equals, os.ErrNotExist)
+                               err = fs.Rename(oldname, newname)
+                               c.Check(err, check.IsNil)
+                               f, err := fs.Open(newname)
+                               c.Check(err, check.IsNil)
+                               f.Close()
+                       }(i, j)
+
+                       wg.Add(1)
+                       go func(i, j int) {
+                               defer wg.Done()
+                               // oldname does not exist
+                               err := fs.Rename(
+                                       fmt.Sprintf("dir%d/dir%d/missing", i, j),
+                                       fmt.Sprintf("dir%d/irelevant", outer-i-1))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+                               // newname parent dir does not exist
+                               err = fs.Rename(
+                                       fmt.Sprintf("dir%d/dir%d", i, j),
+                                       fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+                               // oldname parent dir is a file
+                               err = fs.Rename(
+                                       fmt.Sprintf("dir%d/file%d/patherror", i, j),
+                                       fmt.Sprintf("dir%d/irrelevant", i))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+                               // newname parent dir is a file
+                               err = fs.Rename(
+                                       fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+                                       fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+                       }(i, j)
+               }
+       }
+       wg.Wait()
+
+       f, err := fs.OpenFile("dir1/newfile3", 0, 0)
+       c.Assert(err, check.IsNil)
+       c.Check(f.Size(), check.Equals, int64(4))
+       buf, err := ioutil.ReadAll(f)
+       c.Check(buf, check.DeepEquals, []byte("beep"))
+       c.Check(err, check.IsNil)
+       _, err = fs.Open("dir1/dir1/file1")
+       c.Check(err, check.Equals, os.ErrNotExist)
+}
+
 func (s *CollectionFSSuite) TestPersist(c *check.C) {
        maxBlockSize = 1024
        defer func() { maxBlockSize = 2 << 26 }()
@@ -626,7 +709,7 @@ func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
        }
 }
 
-func (s *CollectionFSSuite) TestFileModes(c *check.C) {
+func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)