Merge branch '14259-pysdk-remote-block-copy'
[arvados.git] / sdk / go / arvados / fs_collection.go
index 96977cbc661417f46d557a31ffd10c37d110fc0e..b996542abd52cf7be04549962fdb31dfb7a366a0 100644 (file)
@@ -8,7 +8,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "log"
        "os"
        "path"
        "regexp"
@@ -31,6 +30,9 @@ type CollectionFileSystem interface {
        // Prefix (normally ".") is a top level directory, effectively
        // prepended to all paths in the returned manifest.
        MarshalManifest(prefix string) (string, error)
+
+       // Total data bytes in all files.
+       Size() int64
 }
 
 type collectionFileSystem struct {
@@ -63,14 +65,27 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
                        inodes: make(map[string]inode),
                },
        }
-       root.SetParent(root)
+       root.SetParent(root, ".")
        if err := root.loadManifest(c.ManifestText); err != nil {
                return nil, err
        }
+       backdateTree(root, modTime)
        fs.root = root
        return fs, nil
 }
 
+func backdateTree(n inode, modTime time.Time) {
+       switch n := n.(type) {
+       case *filenode:
+               n.fileinfo.modTime = modTime
+       case *dirnode:
+               n.fileinfo.modTime = modTime
+               for _, n := range n.inodes {
+                       backdateTree(n, modTime)
+               }
+       }
+}
+
 func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
        if name == "" || name == "." || name == ".." {
                return nil, ErrInvalidArgument
@@ -100,14 +115,12 @@ func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime t
 }
 
 func (fs *collectionFileSystem) Sync() error {
-       log.Printf("cfs.Sync()")
        if fs.uuid == "" {
                return nil
        }
        txt, err := fs.MarshalManifest(".")
        if err != nil {
-               log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
-               return err
+               return fmt.Errorf("sync failed: %s", err)
        }
        coll := &Collection{
                UUID:         fs.uuid,
@@ -115,9 +128,9 @@ func (fs *collectionFileSystem) Sync() error {
        }
        err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}})
        if err != nil {
-               log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+               return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
        }
-       return err
+       return nil
 }
 
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
@@ -126,6 +139,10 @@ func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
 }
 
+func (fs *collectionFileSystem) Size() int64 {
+       return fs.fileSystem.root.(*dirnode).TreeSize()
+}
+
 // filenodePtr is an offset into a file that is (usually) efficient to
 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
 // then
@@ -219,10 +236,11 @@ func (fn *filenode) appendSegment(e segment) {
        fn.fileinfo.size += int64(e.Len())
 }
 
-func (fn *filenode) SetParent(p inode) {
-       fn.RLock()
-       defer fn.RUnlock()
+func (fn *filenode) SetParent(p inode, name string) {
+       fn.Lock()
+       defer fn.Unlock()
        fn.parent = p
+       fn.fileinfo.name = name
 }
 
 func (fn *filenode) Parent() inode {
@@ -507,7 +525,7 @@ func (dn *dirnode) FS() FileSystem {
        return dn.fs
 }
 
-func (dn *dirnode) Child(name string, replace func(inode) inode) inode {
+func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
        if dn == dn.fs.rootnode() && name == ".arvados#collection" {
                gn := &getternode{Getter: func() ([]byte, error) {
                        var coll Collection
@@ -518,19 +536,21 @@ func (dn *dirnode) Child(name string, replace func(inode) inode) inode {
                        }
                        data, err := json.Marshal(&coll)
                        if err == nil {
-                               data = append(data, 10)
+                               data = append(data, '\n')
                        }
                        return data, err
                }}
-               gn.SetParent(dn)
-               return gn
+               gn.SetParent(dn, name)
+               return gn, nil
        }
        return dn.treenode.Child(name, replace)
 }
 
-// sync flushes in-memory data (for all files in the tree rooted at
-// dn) to persistent storage. Caller must hold dn.Lock().
-func (dn *dirnode) sync() error {
+// sync flushes in-memory data and remote block references (for the
+// children with the given names, which must be children of dn) to
+// local persistent storage. Caller must have write lock on dn and the
+// named children.
+func (dn *dirnode) sync(names []string) error {
        type shortBlock struct {
                fn  *filenode
                idx int
@@ -566,45 +586,51 @@ func (dn *dirnode) sync() error {
                return nil
        }
 
-       names := make([]string, 0, len(dn.inodes))
-       for name := range dn.inodes {
-               names = append(names, name)
-       }
-       sort.Strings(names)
-
+       localLocator := map[string]string{}
        for _, name := range names {
                fn, ok := dn.inodes[name].(*filenode)
                if !ok {
                        continue
                }
-               fn.Lock()
-               defer fn.Unlock()
                for idx, seg := range fn.segments {
-                       seg, ok := seg.(*memSegment)
-                       if !ok {
-                               continue
-                       }
-                       if seg.Len() > maxBlockSize/2 {
-                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
-                                       return err
+                       switch seg := seg.(type) {
+                       case storedSegment:
+                               loc, ok := localLocator[seg.locator]
+                               if !ok {
+                                       var err error
+                                       loc, err = dn.fs.LocalLocator(seg.locator)
+                                       if err != nil {
+                                               return err
+                                       }
+                                       localLocator[seg.locator] = loc
                                }
-                               continue
-                       }
-                       if pendingLen+seg.Len() > maxBlockSize {
-                               if err := flush(pending); err != nil {
-                                       return err
+                               seg.locator = loc
+                               fn.segments[idx] = seg
+                       case *memSegment:
+                               if seg.Len() > maxBlockSize/2 {
+                                       if err := flush([]shortBlock{{fn, idx}}); err != nil {
+                                               return err
+                                       }
+                                       continue
                                }
-                               pending = nil
-                               pendingLen = 0
+                               if pendingLen+seg.Len() > maxBlockSize {
+                                       if err := flush(pending); err != nil {
+                                               return err
+                                       }
+                                       pending = nil
+                                       pendingLen = 0
+                               }
+                               pending = append(pending, shortBlock{fn, idx})
+                               pendingLen += seg.Len()
+                       default:
+                               panic(fmt.Sprintf("can't sync segment type %T", seg))
                        }
-                       pending = append(pending, shortBlock{fn, idx})
-                       pendingLen += seg.Len()
                }
        }
        return flush(pending)
 }
 
-// caller must have read lock.
+// caller must have write lock.
 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
        var streamLen int64
        type filepart struct {
@@ -616,18 +642,30 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
        var subdirs string
        var blocks []string
 
-       if err := dn.sync(); err != nil {
-               return "", err
+       if len(dn.inodes) == 0 {
+               if prefix == "." {
+                       return "", nil
+               }
+               // Express the existence of an empty directory by
+               // adding an empty file named `\056`, which (unlike
+               // the more obvious spelling `.`) is accepted by the
+               // API's manifest validator.
+               return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
        }
 
        names := make([]string, 0, len(dn.inodes))
-       for name, node := range dn.inodes {
+       for name := range dn.inodes {
                names = append(names, name)
+       }
+       sort.Strings(names)
+       for _, name := range names {
+               node := dn.inodes[name]
                node.Lock()
                defer node.Unlock()
        }
-       sort.Strings(names)
-
+       if err := dn.sync(names); err != nil {
+               return "", err
+       }
        for _, name := range names {
                switch node := dn.inodes[name].(type) {
                case *dirnode:
@@ -743,8 +781,14 @@ func (dn *dirnode) loadManifest(txt string) error {
                        }
                        name := dirname + "/" + manifestUnescape(toks[2])
                        fnode, err := dn.createFileAndParents(name)
-                       if err != nil {
-                               return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+                       if fnode == nil && err == nil && length == 0 {
+                               // Special case: an empty file used as
+                               // a marker to preserve an otherwise
+                               // empty directory in a manifest.
+                               continue
+                       }
+                       if err != nil || (fnode == nil && length != 0) {
+                               return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
                        }
                        // Map the stream offset/range coordinates to
                        // block/offset/range coordinates and add
@@ -802,15 +846,14 @@ func (dn *dirnode) loadManifest(txt string) error {
        return nil
 }
 
-// only safe to call from loadManifest -- no locking
+// only safe to call from loadManifest -- no locking.
+//
+// If path is a "parent directory exists" marker (the last path
+// component is "."), the returned values are both nil.
 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
        var node inode = dn
        names := strings.Split(path, "/")
        basename := names[len(names)-1]
-       if basename == "" || basename == "." || basename == ".." {
-               err = fmt.Errorf("invalid filename")
-               return
-       }
        for _, name := range names[:len(names)-1] {
                switch name {
                case "", ".":
@@ -823,41 +866,66 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
                        node = node.Parent()
                        continue
                }
-               node.Child(name, func(child inode) inode {
+               node, err = node.Child(name, func(child inode) (inode, error) {
                        if child == nil {
-                               node, err = node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
-                               child = node
+                               child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
+                               if err != nil {
+                                       return nil, err
+                               }
+                               child.SetParent(node, name)
+                               return child, nil
                        } else if !child.IsDir() {
-                               err = ErrFileExists
+                               return child, ErrFileExists
                        } else {
-                               node = child
+                               return child, nil
                        }
-                       return child
                })
                if err != nil {
                        return
                }
        }
-       node.Child(basename, func(child inode) inode {
+       if basename == "." {
+               return
+       } else if !permittedName(basename) {
+               err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+               return
+       }
+       _, err = node.Child(basename, func(child inode) (inode, error) {
                switch child := child.(type) {
                case nil:
                        child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
+                       if err != nil {
+                               return nil, err
+                       }
+                       child.SetParent(node, basename)
                        fn = child.(*filenode)
-                       return child
+                       return child, nil
                case *filenode:
                        fn = child
-                       return child
+                       return child, nil
                case *dirnode:
-                       err = ErrIsDirectory
-                       return child
+                       return child, ErrIsDirectory
                default:
-                       err = ErrInvalidArgument
-                       return child
+                       return child, ErrInvalidArgument
                }
        })
        return
 }
 
+func (dn *dirnode) TreeSize() (bytes int64) {
+       dn.RLock()
+       defer dn.RUnlock()
+       for _, i := range dn.inodes {
+               switch i := i.(type) {
+               case *filenode:
+                       bytes += i.Size()
+               case *dirnode:
+                       bytes += i.TreeSize()
+               }
+       }
+       return
+}
+
 type segment interface {
        io.ReaderAt
        Len() int