14259: Add method to keep client to request remote block copy via HEAD request.
[arvados.git] / sdk / go / arvados / fs_collection.go
index e4511895ffc100e7683a096bef7a11a11e13c3b9..f6afadba5b47488dfff41a53ebca31a97e1dc940 100644 (file)
@@ -8,6 +8,7 @@ import (
        "encoding/json"
        "fmt"
        "io"
+       "log"
        "os"
        "path"
        "regexp"
@@ -20,11 +21,6 @@ import (
 
 var maxBlockSize = 1 << 26
 
-type keepClient interface {
-       ReadAt(locator string, p []byte, off int) (int, error)
-       PutB(p []byte) (string, int, error)
-}
-
 // A CollectionFileSystem is a FileSystem that can be serialized as a
 // manifest and stored as a collection.
 type CollectionFileSystem interface {
@@ -35,64 +31,119 @@ type CollectionFileSystem interface {
        // Prefix (normally ".") is a top level directory, effectively
        // prepended to all paths in the returned manifest.
        MarshalManifest(prefix string) (string, error)
+
+       // Total data bytes in all files.
+       Size() int64
+}
+
+type collectionFileSystem struct {
+       fileSystem
+       uuid string
 }
 
 // FileSystem returns a CollectionFileSystem for the collection.
-func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
+func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
        var modTime time.Time
        if c.ModifiedAt == nil {
                modTime = time.Now()
        } else {
                modTime = *c.ModifiedAt
        }
-       dn := &dirnode{
-               client: client,
-               kc:     kc,
+       fs := &collectionFileSystem{
+               uuid: c.UUID,
+               fileSystem: fileSystem{
+                       fsBackend: keepBackend{apiClient: client, keepClient: kc},
+               },
+       }
+       root := &dirnode{
+               fs: fs,
                treenode: treenode{
                        fileinfo: fileinfo{
                                name:    ".",
                                mode:    os.ModeDir | 0755,
                                modTime: modTime,
                        },
-                       parent: nil,
                        inodes: make(map[string]inode),
                },
        }
-       dn.parent = dn
-       fs := &collectionFileSystem{fileSystem: fileSystem{inode: dn}}
-       if err := dn.loadManifest(c.ManifestText); err != nil {
+       root.SetParent(root, ".")
+       if err := root.loadManifest(c.ManifestText); err != nil {
                return nil, err
        }
+       backdateTree(root, modTime)
+       fs.root = root
        return fs, nil
 }
 
-type collectionFileSystem struct {
-       fileSystem
+func backdateTree(n inode, modTime time.Time) {
+       switch n := n.(type) {
+       case *filenode:
+               n.fileinfo.modTime = modTime
+       case *dirnode:
+               n.fileinfo.modTime = modTime
+               for _, n := range n.inodes {
+                       backdateTree(n, modTime)
+               }
+       }
 }
 
-func (fs collectionFileSystem) Child(name string, replace func(inode) inode) inode {
-       if name == ".arvados#collection" {
-               return &getternode{Getter: func() ([]byte, error) {
-                       var coll Collection
-                       var err error
-                       coll.ManifestText, err = fs.MarshalManifest(".")
-                       if err != nil {
-                               return nil, err
-                       }
-                       data, err := json.Marshal(&coll)
-                       if err == nil {
-                               data = append(data, 10)
-                       }
-                       return data, err
-               }}
+func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
+       if name == "" || name == "." || name == ".." {
+               return nil, ErrInvalidArgument
        }
-       return fs.fileSystem.Child(name, replace)
+       if perm.IsDir() {
+               return &dirnode{
+                       fs: fs,
+                       treenode: treenode{
+                               fileinfo: fileinfo{
+                                       name:    name,
+                                       mode:    perm | os.ModeDir,
+                                       modTime: modTime,
+                               },
+                               inodes: make(map[string]inode),
+                       },
+               }, nil
+       } else {
+               return &filenode{
+                       fs: fs,
+                       fileinfo: fileinfo{
+                               name:    name,
+                               mode:    perm & ^os.ModeDir,
+                               modTime: modTime,
+                       },
+               }, nil
+       }
+}
+
+func (fs *collectionFileSystem) Sync() error {
+       log.Printf("cfs.Sync()")
+       if fs.uuid == "" {
+               return nil
+       }
+       txt, err := fs.MarshalManifest(".")
+       if err != nil {
+               log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+               return err
+       }
+       coll := &Collection{
+               UUID:         fs.uuid,
+               ManifestText: txt,
+       }
+       err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}})
+       if err != nil {
+               log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+       }
+       return err
+}
+
+func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
+       fs.fileSystem.root.Lock()
+       defer fs.fileSystem.root.Unlock()
+       return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
 }
 
-func (fs collectionFileSystem) MarshalManifest(prefix string) (string, error) {
-       fs.fileSystem.inode.Lock()
-       defer fs.fileSystem.inode.Unlock()
-       return fs.fileSystem.inode.(*dirnode).marshalManifest(prefix)
+func (fs *collectionFileSystem) Size() int64 {
+       return fs.fileSystem.root.(*dirnode).TreeSize()
 }
 
 // filenodePtr is an offset into a file that is (usually) efficient to
@@ -170,8 +221,9 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
 
 // filenode implements inode.
 type filenode struct {
+       parent   inode
+       fs       FileSystem
        fileinfo fileinfo
-       parent   *dirnode
        segments []segment
        // number of times `segments` has changed in a
        // way that might invalidate a filenodePtr
@@ -187,12 +239,23 @@ func (fn *filenode) appendSegment(e segment) {
        fn.fileinfo.size += int64(e.Len())
 }
 
+func (fn *filenode) SetParent(p inode, name string) {
+       fn.Lock()
+       defer fn.Unlock()
+       fn.parent = p
+       fn.fileinfo.name = name
+}
+
 func (fn *filenode) Parent() inode {
        fn.RLock()
        defer fn.RUnlock()
        return fn.parent
 }
 
+func (fn *filenode) FS() FileSystem {
+       return fn.fs
+}
+
 // Read reads file data from a single segment, starting at startPtr,
 // into p. startPtr is assumed not to be up-to-date. Caller must have
 // RLock or Lock.
@@ -438,7 +501,7 @@ func (fn *filenode) pruneMemSegments() {
                if !ok || seg.Len() < maxBlockSize {
                        continue
                }
-               locator, _, err := fn.parent.kc.PutB(seg.buf)
+               locator, _, err := fn.FS().PutB(seg.buf)
                if err != nil {
                        // TODO: stall (or return errors from)
                        // subsequent writes until flushing
@@ -447,7 +510,7 @@ func (fn *filenode) pruneMemSegments() {
                }
                fn.memsize -= int64(seg.Len())
                fn.segments[idx] = storedSegment{
-                       kc:      fn.parent.kc,
+                       kc:      fn.FS(),
                        locator: locator,
                        size:    seg.Len(),
                        offset:  0,
@@ -457,14 +520,39 @@ func (fn *filenode) pruneMemSegments() {
 }
 
 type dirnode struct {
+       fs *collectionFileSystem
        treenode
-       client *Client
-       kc     keepClient
 }
 
-// sync flushes in-memory data (for all files in the tree rooted at
-// dn) to persistent storage. Caller must hold dn.Lock().
-func (dn *dirnode) sync() error {
+func (dn *dirnode) FS() FileSystem {
+       return dn.fs
+}
+
+func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
+       if dn == dn.fs.rootnode() && name == ".arvados#collection" {
+               gn := &getternode{Getter: func() ([]byte, error) {
+                       var coll Collection
+                       var err error
+                       coll.ManifestText, err = dn.fs.MarshalManifest(".")
+                       if err != nil {
+                               return nil, err
+                       }
+                       data, err := json.Marshal(&coll)
+                       if err == nil {
+                               data = append(data, '\n')
+                       }
+                       return data, err
+               }}
+               gn.SetParent(dn, name)
+               return gn, nil
+       }
+       return dn.treenode.Child(name, replace)
+}
+
+// sync flushes in-memory data (for the children with the given names,
+// which must be children of dn) to persistent storage. Caller must
+// have write lock on dn and the named children.
+func (dn *dirnode) sync(names []string) error {
        type shortBlock struct {
                fn  *filenode
                idx int
@@ -480,7 +568,7 @@ func (dn *dirnode) sync() error {
                for _, sb := range sbs {
                        block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
                }
-               locator, _, err := dn.kc.PutB(block)
+               locator, _, err := dn.fs.PutB(block)
                if err != nil {
                        return err
                }
@@ -488,7 +576,7 @@ func (dn *dirnode) sync() error {
                for _, sb := range sbs {
                        data := sb.fn.segments[sb.idx].(*memSegment).buf
                        sb.fn.segments[sb.idx] = storedSegment{
-                               kc:      dn.kc,
+                               kc:      dn.fs,
                                locator: locator,
                                size:    len(block),
                                offset:  off,
@@ -500,19 +588,11 @@ func (dn *dirnode) sync() error {
                return nil
        }
 
-       names := make([]string, 0, len(dn.inodes))
-       for name := range dn.inodes {
-               names = append(names, name)
-       }
-       sort.Strings(names)
-
        for _, name := range names {
                fn, ok := dn.inodes[name].(*filenode)
                if !ok {
                        continue
                }
-               fn.Lock()
-               defer fn.Unlock()
                for idx, seg := range fn.segments {
                        seg, ok := seg.(*memSegment)
                        if !ok {
@@ -550,18 +630,19 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
        var subdirs string
        var blocks []string
 
-       if err := dn.sync(); err != nil {
-               return "", err
-       }
-
        names := make([]string, 0, len(dn.inodes))
-       for name, node := range dn.inodes {
+       for name := range dn.inodes {
                names = append(names, name)
+       }
+       sort.Strings(names)
+       for _, name := range names {
+               node := dn.inodes[name]
                node.Lock()
                defer node.Unlock()
        }
-       sort.Strings(names)
-
+       if err := dn.sync(names); err != nil {
+               return "", err
+       }
        for _, name := range names {
                switch node := dn.inodes[name].(type) {
                case *dirnode:
@@ -709,7 +790,7 @@ func (dn *dirnode) loadManifest(txt string) error {
                                        blkLen = int(offset + length - pos - int64(blkOff))
                                }
                                fnode.appendSegment(storedSegment{
-                                       kc:      dn.kc,
+                                       kc:      dn.fs,
                                        locator: seg.locator,
                                        size:    seg.size,
                                        offset:  blkOff,
@@ -738,11 +819,11 @@ func (dn *dirnode) loadManifest(txt string) error {
 
 // only safe to call from loadManifest -- no locking
 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
-       node := dn
+       var node inode = dn
        names := strings.Split(path, "/")
        basename := names[len(names)-1]
-       if basename == "" || basename == "." || basename == ".." {
-               err = fmt.Errorf("invalid filename")
+       if !permittedName(basename) {
+               err = fmt.Errorf("invalid file part %q in path %q", basename, path)
                return
        }
        for _, name := range names[:len(names)-1] {
@@ -754,108 +835,63 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
                                // can't be sure parent will be a *dirnode
                                return nil, ErrInvalidArgument
                        }
-                       node = node.Parent().(*dirnode)
+                       node = node.Parent()
                        continue
                }
-               node.Child(name, func(child inode) inode {
-                       switch child.(type) {
-                       case nil:
-                               node, err = dn.newDirnode(node, name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
-                               child = node
-                       case *dirnode:
-                               node = child.(*dirnode)
-                       case *filenode:
-                               err = ErrFileExists
-                       default:
-                               err = ErrInvalidOperation
+               node, err = node.Child(name, func(child inode) (inode, error) {
+                       if child == nil {
+                               child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
+                               if err != nil {
+                                       return nil, err
+                               }
+                               child.SetParent(node, name)
+                               return child, nil
+                       } else if !child.IsDir() {
+                               return child, ErrFileExists
+                       } else {
+                               return child, nil
                        }
-                       return child
                })
                if err != nil {
                        return
                }
        }
-       node.Child(basename, func(child inode) inode {
+       _, err = node.Child(basename, func(child inode) (inode, error) {
                switch child := child.(type) {
                case nil:
-                       fn, err = dn.newFilenode(node, basename, 0755, node.FileInfo().ModTime())
-                       return fn
+                       child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
+                       if err != nil {
+                               return nil, err
+                       }
+                       child.SetParent(node, basename)
+                       fn = child.(*filenode)
+                       return child, nil
                case *filenode:
                        fn = child
-                       return child
+                       return child, nil
                case *dirnode:
-                       err = ErrIsDirectory
-                       return child
+                       return child, ErrIsDirectory
                default:
-                       err = ErrInvalidOperation
-                       return child
+                       return child, ErrInvalidArgument
                }
        })
        return
 }
 
-// rlookup (recursive lookup) returns the inode for the file/directory
-// with the given name (which may contain "/" separators). If no such
-// file/directory exists, the returned node is nil.
-func rlookup(start inode, path string) (node inode) {
-       node = start
-       for _, name := range strings.Split(path, "/") {
-               if node == nil {
-                       break
-               }
-               if node.IsDir() {
-                       if name == "." || name == "" {
-                               continue
-                       }
-                       if name == ".." {
-                               node = node.Parent()
-                               continue
-                       }
+func (dn *dirnode) TreeSize() (bytes int64) {
+       dn.RLock()
+       defer dn.RUnlock()
+       for _, i := range dn.inodes {
+               switch i := i.(type) {
+               case *filenode:
+                       bytes += i.Size()
+               case *dirnode:
+                       bytes += i.TreeSize()
                }
-               node = func() inode {
-                       node.RLock()
-                       defer node.RUnlock()
-                       return node.Child(name, nil)
-               }()
        }
        return
 }
 
-// Caller must have lock, and must have already ensured
-// Children(name,nil) is nil.
-func (dn *dirnode) newDirnode(parent *dirnode, name string, perm os.FileMode, modTime time.Time) (node *dirnode, err error) {
-       if name == "" || name == "." || name == ".." {
-               return nil, ErrInvalidArgument
-       }
-       return &dirnode{
-               client: dn.client,
-               kc:     dn.kc,
-               treenode: treenode{
-                       parent: parent,
-                       fileinfo: fileinfo{
-                               name:    name,
-                               mode:    perm | os.ModeDir,
-                               modTime: modTime,
-                       },
-                       inodes: make(map[string]inode),
-               },
-       }, nil
-}
-
-func (dn *dirnode) newFilenode(parent *dirnode, name string, perm os.FileMode, modTime time.Time) (node *filenode, err error) {
-       if name == "" || name == "." || name == ".." {
-               return nil, ErrInvalidArgument
-       }
-       return &filenode{
-               parent: parent,
-               fileinfo: fileinfo{
-                       name:    name,
-                       mode:    perm & ^os.ModeDir,
-                       modTime: modTime,
-               },
-       }, nil
-}
-
 type segment interface {
        io.ReaderAt
        Len() int
@@ -920,7 +956,7 @@ func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
 }
 
 type storedSegment struct {
-       kc      keepClient
+       kc      fsBackend
        locator string
        size    int // size of stored block (also encoded in locator)
        offset  int // position of segment within the stored block