--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import "io"
+
+type fsBackend interface {
+ keepClient
+ apiClient
+}
+
+// Ideally *Client would do everything; meanwhile keepBackend
+// implements fsBackend by merging the two kinds of arvados client.
+type keepBackend struct {
+ keepClient
+ apiClient
+}
+
+type keepClient interface {
+ ReadAt(locator string, p []byte, off int) (int, error)
+ PutB(p []byte) (string, int, error)
+}
+
+type apiClient interface {
+ RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
+ UpdateBody(rsc resource) io.Reader
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "os"
+ "path"
+ "strings"
+ "sync"
+ "time"
+)
+
+var (
+ ErrReadOnlyFile = errors.New("read-only file")
+ ErrNegativeOffset = errors.New("cannot seek to negative offset")
+ ErrFileExists = errors.New("file exists")
+ ErrInvalidOperation = errors.New("invalid operation")
+ ErrInvalidArgument = errors.New("invalid argument")
+ ErrDirectoryNotEmpty = errors.New("directory not empty")
+ ErrWriteOnlyMode = errors.New("file is O_WRONLY")
+ ErrSyncNotSupported = errors.New("O_SYNC flag is not supported")
+ ErrIsDirectory = errors.New("cannot rename file to overwrite existing directory")
+ ErrPermission = os.ErrPermission
+)
+
+// A File is an *os.File-like interface for reading and writing files
+// in a FileSystem.
+type File interface {
+ io.Reader
+ io.Writer
+ io.Closer
+ io.Seeker
+ Size() int64
+ Readdir(int) ([]os.FileInfo, error)
+ Stat() (os.FileInfo, error)
+ Truncate(int64) error
+ Sync() error
+}
+
+// A FileSystem is an http.Filesystem plus Stat() and support for
+// opening writable files. All methods are safe to call from multiple
+// goroutines.
+type FileSystem interface {
+ http.FileSystem
+ fsBackend
+
+ rootnode() inode
+
+ // filesystem-wide lock: used by Rename() to prevent deadlock
+ // while locking multiple inodes.
+ locker() sync.Locker
+
+ // create a new node with nil parent.
+ newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
+
+ // analogous to os.Stat()
+ Stat(name string) (os.FileInfo, error)
+
+ // analogous to os.Create(): create/truncate a file and open it O_RDWR.
+ Create(name string) (File, error)
+
+ // Like os.OpenFile(): create or open a file or directory.
+ //
+ // If flag&os.O_EXCL==0, it opens an existing file or
+ // directory if one exists. If flag&os.O_CREATE!=0, it creates
+ // a new empty file or directory if one does not already
+ // exist.
+ //
+ // When creating a new item, perm&os.ModeDir determines
+ // whether it is a file or a directory.
+ //
+ // A file can be opened multiple times and used concurrently
+ // from multiple goroutines. However, each File object should
+ // be used by only one goroutine at a time.
+ OpenFile(name string, flag int, perm os.FileMode) (File, error)
+
+ Mkdir(name string, perm os.FileMode) error
+ Remove(name string) error
+ RemoveAll(name string) error
+ Rename(oldname, newname string) error
+ Sync() error
+}
+
+type inode interface {
+ SetParent(parent inode, name string)
+ Parent() inode
+ FS() FileSystem
+ Read([]byte, filenodePtr) (int, filenodePtr, error)
+ Write([]byte, filenodePtr) (int, filenodePtr, error)
+ Truncate(int64) error
+ IsDir() bool
+ Readdir() []os.FileInfo
+ Size() int64
+ FileInfo() os.FileInfo
+
+ // Child() performs lookups and updates of named child nodes.
+ //
+ // If replace is non-nil, Child calls replace(x) where x is
+ // the current child inode with the given name. If possible,
+ // the child inode is replaced with the one returned by
+ // replace().
+ //
+ // If replace(x) returns an inode (besides x or nil) that is
+ // subsequently returned by Child(), then Child()'s caller
+ // must ensure the new child's name and parent are set/updated
+ // to Child()'s name argument and its receiver respectively.
+ // This is not necessarily done before replace(x) returns, but
+ // it must be done before Child()'s caller releases the
+ // parent's lock.
+ //
+ // Nil represents "no child". replace(nil) signifies that no
+ // child with this name exists yet. If replace() returns nil,
+ // the existing child should be deleted if possible.
+ //
+ // An implementation of Child() is permitted to ignore
+ // replace() or its return value. For example, a regular file
+ // inode does not have children, so Child() always returns
+ // nil.
+ //
+ // Child() returns the child, if any, with the given name: if
+ // a child was added or changed, the new child is returned.
+ //
+ // Caller must have lock (or rlock if replace is nil).
+ Child(name string, replace func(inode) inode) inode
+
+ sync.Locker
+ RLock()
+ RUnlock()
+}
+
+type fileinfo struct {
+ name string
+ mode os.FileMode
+ size int64
+ modTime time.Time
+}
+
+// Name implements os.FileInfo.
+func (fi fileinfo) Name() string {
+ return fi.name
+}
+
+// ModTime implements os.FileInfo.
+func (fi fileinfo) ModTime() time.Time {
+ return fi.modTime
+}
+
+// Mode implements os.FileInfo.
+func (fi fileinfo) Mode() os.FileMode {
+ return fi.mode
+}
+
+// IsDir implements os.FileInfo.
+func (fi fileinfo) IsDir() bool {
+ return fi.mode&os.ModeDir != 0
+}
+
+// Size implements os.FileInfo.
+func (fi fileinfo) Size() int64 {
+ return fi.size
+}
+
+// Sys implements os.FileInfo.
+func (fi fileinfo) Sys() interface{} {
+ return nil
+}
+
+type nullnode struct{}
+
+func (*nullnode) Mkdir(string, os.FileMode) error {
+ return ErrInvalidOperation
+}
+
+func (*nullnode) Read([]byte, filenodePtr) (int, filenodePtr, error) {
+ return 0, filenodePtr{}, ErrInvalidOperation
+}
+
+func (*nullnode) Write([]byte, filenodePtr) (int, filenodePtr, error) {
+ return 0, filenodePtr{}, ErrInvalidOperation
+}
+
+func (*nullnode) Truncate(int64) error {
+ return ErrInvalidOperation
+}
+
+func (*nullnode) FileInfo() os.FileInfo {
+ return fileinfo{}
+}
+
+func (*nullnode) IsDir() bool {
+ return false
+}
+
+func (*nullnode) Readdir() []os.FileInfo {
+ return nil
+}
+
+func (*nullnode) Child(name string, replace func(inode) inode) inode {
+ return nil
+}
+
+type treenode struct {
+ fs FileSystem
+ parent inode
+ inodes map[string]inode
+ fileinfo fileinfo
+ sync.RWMutex
+ nullnode
+}
+
+func (n *treenode) FS() FileSystem {
+ return n.fs
+}
+
+func (n *treenode) SetParent(p inode, name string) {
+ n.Lock()
+ defer n.Unlock()
+ n.parent = p
+ n.fileinfo.name = name
+}
+
+func (n *treenode) Parent() inode {
+ n.RLock()
+ defer n.RUnlock()
+ return n.parent
+}
+
+func (n *treenode) IsDir() bool {
+ return true
+}
+
+func (n *treenode) Child(name string, replace func(inode) inode) (child inode) {
+ // TODO: special treatment for "", ".", ".."
+ child = n.inodes[name]
+ if replace != nil {
+ newchild := replace(child)
+ if newchild == nil {
+ delete(n.inodes, name)
+ } else if newchild != child {
+ n.inodes[name] = newchild
+ n.fileinfo.modTime = time.Now()
+ child = newchild
+ }
+ }
+ return
+}
+
+func (n *treenode) Size() int64 {
+ return n.FileInfo().Size()
+}
+
+func (n *treenode) FileInfo() os.FileInfo {
+ n.Lock()
+ defer n.Unlock()
+ n.fileinfo.size = int64(len(n.inodes))
+ return n.fileinfo
+}
+
+func (n *treenode) Readdir() (fi []os.FileInfo) {
+ n.RLock()
+ defer n.RUnlock()
+ fi = make([]os.FileInfo, 0, len(n.inodes))
+ for _, inode := range n.inodes {
+ fi = append(fi, inode.FileInfo())
+ }
+ return
+}
+
+type fileSystem struct {
+ root inode
+ fsBackend
+ mutex sync.Mutex
+}
+
+func (fs *fileSystem) rootnode() inode {
+ return fs.root
+}
+
+func (fs *fileSystem) locker() sync.Locker {
+ return &fs.mutex
+}
+
+// OpenFile is analogous to os.OpenFile().
+func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
+ return fs.openFile(name, flag, perm)
+}
+
+func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
+ if flag&os.O_SYNC != 0 {
+ return nil, ErrSyncNotSupported
+ }
+ dirname, name := path.Split(name)
+ parent := rlookup(fs.root, dirname)
+ if parent == nil {
+ return nil, os.ErrNotExist
+ }
+ var readable, writable bool
+ switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
+ case os.O_RDWR:
+ readable = true
+ writable = true
+ case os.O_RDONLY:
+ readable = true
+ case os.O_WRONLY:
+ writable = true
+ default:
+ return nil, fmt.Errorf("invalid flags 0x%x", flag)
+ }
+ if !writable && parent.IsDir() {
+ // A directory can be opened via "foo/", "foo/.", or
+ // "foo/..".
+ switch name {
+ case ".", "":
+ return &filehandle{inode: parent}, nil
+ case "..":
+ return &filehandle{inode: parent.Parent()}, nil
+ }
+ }
+ createMode := flag&os.O_CREATE != 0
+ if createMode {
+ parent.Lock()
+ defer parent.Unlock()
+ } else {
+ parent.RLock()
+ defer parent.RUnlock()
+ }
+ n := parent.Child(name, nil)
+ if n == nil {
+ if !createMode {
+ return nil, os.ErrNotExist
+ }
+ var err error
+ n = parent.Child(name, func(inode) inode {
+ n, err = parent.FS().newNode(name, perm|0755, time.Now())
+ n.SetParent(parent, name)
+ return n
+ })
+ if err != nil {
+ return nil, err
+ } else if n == nil {
+ // parent rejected new child
+ return nil, ErrInvalidOperation
+ }
+ } else if flag&os.O_EXCL != 0 {
+ return nil, ErrFileExists
+ } else if flag&os.O_TRUNC != 0 {
+ if !writable {
+ return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
+ } else if n.IsDir() {
+ return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
+ } else if err := n.Truncate(0); err != nil {
+ return nil, err
+ }
+ }
+ return &filehandle{
+ inode: n,
+ append: flag&os.O_APPEND != 0,
+ readable: readable,
+ writable: writable,
+ }, nil
+}
+
+func (fs *fileSystem) Open(name string) (http.File, error) {
+ return fs.OpenFile(name, os.O_RDONLY, 0)
+}
+
+func (fs *fileSystem) Create(name string) (File, error) {
+ return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
+}
+
+func (fs *fileSystem) Mkdir(name string, perm os.FileMode) (err error) {
+ dirname, name := path.Split(name)
+ n := rlookup(fs.root, dirname)
+ if n == nil {
+ return os.ErrNotExist
+ }
+ n.Lock()
+ defer n.Unlock()
+ if n.Child(name, nil) != nil {
+ return os.ErrExist
+ }
+ child := n.Child(name, func(inode) (child inode) {
+ child, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
+ child.SetParent(n, name)
+ return
+ })
+ if err != nil {
+ return err
+ } else if child == nil {
+ return ErrInvalidArgument
+ }
+ return nil
+}
+
+func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
+ node := rlookup(fs.root, name)
+ if node == nil {
+ err = os.ErrNotExist
+ } else {
+ fi = node.FileInfo()
+ }
+ return
+}
+
+func (fs *fileSystem) Rename(oldname, newname string) error {
+ olddir, oldname := path.Split(oldname)
+ if oldname == "" || oldname == "." || oldname == ".." {
+ return ErrInvalidArgument
+ }
+ olddirf, err := fs.openFile(olddir+".", os.O_RDONLY, 0)
+ if err != nil {
+ return fmt.Errorf("%q: %s", olddir, err)
+ }
+ defer olddirf.Close()
+
+ newdir, newname := path.Split(newname)
+ if newname == "." || newname == ".." {
+ return ErrInvalidArgument
+ } else if newname == "" {
+ // Rename("a/b", "c/") means Rename("a/b", "c/b")
+ newname = oldname
+ }
+ newdirf, err := fs.openFile(newdir+".", os.O_RDONLY, 0)
+ if err != nil {
+ return fmt.Errorf("%q: %s", newdir, err)
+ }
+ defer newdirf.Close()
+
+ // TODO: If the nearest common ancestor ("nca") of olddirf and
+ // newdirf is on a different filesystem than fs, we should
+ // call nca.FS().Rename() instead of proceeding. Until then
+ // it's awkward for filesystems to implement their own Rename
+ // methods effectively: the only one that runs is the one on
+ // the root filesystem exposed to the caller (webdav, fuse,
+ // etc).
+
+ // When acquiring locks on multiple inodes, avoid deadlock by
+ // locking the entire containing filesystem first.
+ cfs := olddirf.inode.FS()
+ cfs.locker().Lock()
+ defer cfs.locker().Unlock()
+
+ if cfs != newdirf.inode.FS() {
+ // Moving inodes across filesystems is not (yet)
+ // supported. Locking inodes from different
+ // filesystems could deadlock, so we must error out
+ // now.
+ return ErrInvalidArgument
+ }
+
+ // To ensure we can test reliably whether we're about to move
+ // a directory into itself, lock all potential common
+ // ancestors of olddir and newdir.
+ needLock := []sync.Locker{}
+ for _, node := range []inode{olddirf.inode, newdirf.inode} {
+ needLock = append(needLock, node)
+ for node.Parent() != node && node.Parent().FS() == node.FS() {
+ node = node.Parent()
+ needLock = append(needLock, node)
+ }
+ }
+ locked := map[sync.Locker]bool{}
+ for i := len(needLock) - 1; i >= 0; i-- {
+ if n := needLock[i]; !locked[n] {
+ n.Lock()
+ defer n.Unlock()
+ locked[n] = true
+ }
+ }
+
+ // Return ErrInvalidOperation if olddirf.inode doesn't even
+ // bother calling our "remove oldname entry" replacer func.
+ err = ErrInvalidArgument
+ olddirf.inode.Child(oldname, func(oldinode inode) inode {
+ err = nil
+ if oldinode == nil {
+ err = os.ErrNotExist
+ return nil
+ }
+ if locked[oldinode] {
+ // oldinode cannot become a descendant of itself.
+ err = ErrInvalidArgument
+ return oldinode
+ }
+ if oldinode.FS() != cfs && newdirf.inode != olddirf.inode {
+ // moving a mount point to a different parent
+ // is not (yet) supported.
+ err = ErrInvalidArgument
+ return oldinode
+ }
+ accepted := newdirf.inode.Child(newname, func(existing inode) inode {
+ if existing != nil && existing.IsDir() {
+ err = ErrIsDirectory
+ return existing
+ }
+ return oldinode
+ })
+ if accepted != oldinode {
+ if err == nil {
+ // newdirf didn't accept oldinode.
+ err = ErrInvalidArgument
+ }
+ // Leave oldinode in olddir.
+ return oldinode
+ }
+ accepted.SetParent(newdirf.inode, newname)
+ return nil
+ })
+ return err
+}
+
+func (fs *fileSystem) Remove(name string) error {
+ return fs.remove(strings.TrimRight(name, "/"), false)
+}
+
+func (fs *fileSystem) RemoveAll(name string) error {
+ err := fs.remove(strings.TrimRight(name, "/"), true)
+ if os.IsNotExist(err) {
+ // "If the path does not exist, RemoveAll returns
+ // nil." (see "os" pkg)
+ err = nil
+ }
+ return err
+}
+
+func (fs *fileSystem) remove(name string, recursive bool) (err error) {
+ dirname, name := path.Split(name)
+ if name == "" || name == "." || name == ".." {
+ return ErrInvalidArgument
+ }
+ dir := rlookup(fs.root, dirname)
+ if dir == nil {
+ return os.ErrNotExist
+ }
+ dir.Lock()
+ defer dir.Unlock()
+ dir.Child(name, func(node inode) inode {
+ if node == nil {
+ err = os.ErrNotExist
+ return nil
+ }
+ if !recursive && node.IsDir() && node.Size() > 0 {
+ err = ErrDirectoryNotEmpty
+ return node
+ }
+ return nil
+ })
+ return err
+}
+
+func (fs *fileSystem) Sync() error {
+ log.Printf("TODO: sync fileSystem")
+ return ErrInvalidOperation
+}
+
+// rlookup (recursive lookup) returns the inode for the file/directory
+// with the given name (which may contain "/" separators). If no such
+// file/directory exists, the returned node is nil.
+func rlookup(start inode, path string) (node inode) {
+ node = start
+ for _, name := range strings.Split(path, "/") {
+ if node == nil {
+ break
+ }
+ if node.IsDir() {
+ if name == "." || name == "" {
+ continue
+ }
+ if name == ".." {
+ node = node.Parent()
+ continue
+ }
+ }
+ node = func() inode {
+ node.RLock()
+ defer node.RUnlock()
+ return node.Child(name, nil)
+ }()
+ }
+ return
+}
package arvados
import (
- "errors"
+ "encoding/json"
"fmt"
"io"
- "net/http"
+ "log"
"os"
"path"
"regexp"
"time"
)
-var (
- ErrReadOnlyFile = errors.New("read-only file")
- ErrNegativeOffset = errors.New("cannot seek to negative offset")
- ErrFileExists = errors.New("file exists")
- ErrInvalidOperation = errors.New("invalid operation")
- ErrInvalidArgument = errors.New("invalid argument")
- ErrDirectoryNotEmpty = errors.New("directory not empty")
- ErrWriteOnlyMode = errors.New("file is O_WRONLY")
- ErrSyncNotSupported = errors.New("O_SYNC flag is not supported")
- ErrIsDirectory = errors.New("cannot rename file to overwrite existing directory")
- ErrPermission = os.ErrPermission
+var maxBlockSize = 1 << 26
- maxBlockSize = 1 << 26
-)
-
-// A File is an *os.File-like interface for reading and writing files
-// in a CollectionFileSystem.
-type File interface {
- io.Reader
- io.Writer
- io.Closer
- io.Seeker
- Size() int64
- Readdir(int) ([]os.FileInfo, error)
- Stat() (os.FileInfo, error)
- Truncate(int64) error
-}
-
-type keepClient interface {
- ReadAt(locator string, p []byte, off int) (int, error)
- PutB(p []byte) (string, int, error)
-}
-
-type fileinfo struct {
- name string
- mode os.FileMode
- size int64
- modTime time.Time
-}
-
-// Name implements os.FileInfo.
-func (fi fileinfo) Name() string {
- return fi.name
-}
-
-// ModTime implements os.FileInfo.
-func (fi fileinfo) ModTime() time.Time {
- return fi.modTime
-}
-
-// Mode implements os.FileInfo.
-func (fi fileinfo) Mode() os.FileMode {
- return fi.mode
-}
-
-// IsDir implements os.FileInfo.
-func (fi fileinfo) IsDir() bool {
- return fi.mode&os.ModeDir != 0
-}
-
-// Size implements os.FileInfo.
-func (fi fileinfo) Size() int64 {
- return fi.size
-}
-
-// Sys implements os.FileInfo.
-func (fi fileinfo) Sys() interface{} {
- return nil
-}
-
-// A CollectionFileSystem is an http.Filesystem plus Stat() and
-// support for opening writable files. All methods are safe to call
-// from multiple goroutines.
+// A CollectionFileSystem is a FileSystem that can be serialized as a
+// manifest and stored as a collection.
type CollectionFileSystem interface {
- http.FileSystem
-
- // analogous to os.Stat()
- Stat(name string) (os.FileInfo, error)
-
- // analogous to os.Create(): create/truncate a file and open it O_RDWR.
- Create(name string) (File, error)
-
- // Like os.OpenFile(): create or open a file or directory.
- //
- // If flag&os.O_EXCL==0, it opens an existing file or
- // directory if one exists. If flag&os.O_CREATE!=0, it creates
- // a new empty file or directory if one does not already
- // exist.
- //
- // When creating a new item, perm&os.ModeDir determines
- // whether it is a file or a directory.
- //
- // A file can be opened multiple times and used concurrently
- // from multiple goroutines. However, each File object should
- // be used by only one goroutine at a time.
- OpenFile(name string, flag int, perm os.FileMode) (File, error)
-
- Mkdir(name string, perm os.FileMode) error
- Remove(name string) error
- RemoveAll(name string) error
- Rename(oldname, newname string) error
+ FileSystem
// Flush all file data to Keep and return a snapshot of the
// filesystem suitable for saving as (Collection)ManifestText.
MarshalManifest(prefix string) (string, error)
}
-type fileSystem struct {
- dirnode
-}
-
-func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
- return fs.dirnode.OpenFile(name, flag, perm)
+type collectionFileSystem struct {
+ fileSystem
+ uuid string
}
-func (fs *fileSystem) Open(name string) (http.File, error) {
- return fs.dirnode.OpenFile(name, os.O_RDONLY, 0)
+// FileSystem returns a CollectionFileSystem for the collection.
+func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
+ var modTime time.Time
+ if c.ModifiedAt == nil {
+ modTime = time.Now()
+ } else {
+ modTime = *c.ModifiedAt
+ }
+ fs := &collectionFileSystem{
+ uuid: c.UUID,
+ fileSystem: fileSystem{
+ fsBackend: keepBackend{apiClient: client, keepClient: kc},
+ },
+ }
+ root := &dirnode{
+ fs: fs,
+ treenode: treenode{
+ fileinfo: fileinfo{
+ name: ".",
+ mode: os.ModeDir | 0755,
+ modTime: modTime,
+ },
+ inodes: make(map[string]inode),
+ },
+ }
+ root.SetParent(root, ".")
+ if err := root.loadManifest(c.ManifestText); err != nil {
+ return nil, err
+ }
+ backdateTree(root, modTime)
+ fs.root = root
+ return fs, nil
}
-func (fs *fileSystem) Create(name string) (File, error) {
- return fs.dirnode.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
+func backdateTree(n inode, modTime time.Time) {
+ switch n := n.(type) {
+ case *filenode:
+ n.fileinfo.modTime = modTime
+ case *dirnode:
+ n.fileinfo.modTime = modTime
+ for _, n := range n.inodes {
+ backdateTree(n, modTime)
+ }
+ }
}
-func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
- node := fs.dirnode.lookupPath(name)
- if node == nil {
- err = os.ErrNotExist
+func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
+ if name == "" || name == "." || name == ".." {
+ return nil, ErrInvalidArgument
+ }
+ if perm.IsDir() {
+ return &dirnode{
+ fs: fs,
+ treenode: treenode{
+ fileinfo: fileinfo{
+ name: name,
+ mode: perm | os.ModeDir,
+ modTime: modTime,
+ },
+ inodes: make(map[string]inode),
+ },
+ }, nil
} else {
- fi = node.Stat()
+ return &filenode{
+ fs: fs,
+ fileinfo: fileinfo{
+ name: name,
+ mode: perm & ^os.ModeDir,
+ modTime: modTime,
+ },
+ }, nil
}
- return
}
-type inode interface {
- Parent() inode
- Read([]byte, filenodePtr) (int, filenodePtr, error)
- Write([]byte, filenodePtr) (int, filenodePtr, error)
- Truncate(int64) error
- Readdir() []os.FileInfo
- Size() int64
- Stat() os.FileInfo
- sync.Locker
- RLock()
- RUnlock()
+func (fs *collectionFileSystem) Sync() error {
+ log.Printf("cfs.Sync()")
+ if fs.uuid == "" {
+ return nil
+ }
+ txt, err := fs.MarshalManifest(".")
+ if err != nil {
+ log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+ return err
+ }
+ coll := &Collection{
+ UUID: fs.uuid,
+ ManifestText: txt,
+ }
+ err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}})
+ if err != nil {
+ log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+ }
+ return err
}
-// filenode implements inode.
-type filenode struct {
- fileinfo fileinfo
- parent *dirnode
- segments []segment
- // number of times `segments` has changed in a
- // way that might invalidate a filenodePtr
- repacked int64
- memsize int64 // bytes in memSegments
- sync.RWMutex
+func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
+ fs.fileSystem.root.Lock()
+ defer fs.fileSystem.root.Unlock()
+ return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
}
// filenodePtr is an offset into a file that is (usually) efficient to
return
}
+// filenode implements inode.
+type filenode struct {
+ parent inode
+ fs FileSystem
+ fileinfo fileinfo
+ segments []segment
+ // number of times `segments` has changed in a
+ // way that might invalidate a filenodePtr
+ repacked int64
+ memsize int64 // bytes in memSegments
+ sync.RWMutex
+ nullnode
+}
+
// caller must have lock
func (fn *filenode) appendSegment(e segment) {
fn.segments = append(fn.segments, e)
fn.fileinfo.size += int64(e.Len())
}
+func (fn *filenode) SetParent(p inode, name string) {
+ fn.Lock()
+ defer fn.Unlock()
+ fn.parent = p
+ fn.fileinfo.name = name
+}
+
func (fn *filenode) Parent() inode {
fn.RLock()
defer fn.RUnlock()
return fn.parent
}
-func (fn *filenode) Readdir() []os.FileInfo {
- return nil
+func (fn *filenode) FS() FileSystem {
+ return fn.fs
}
// Read reads file data from a single segment, starting at startPtr,
return fn.fileinfo.Size()
}
-func (fn *filenode) Stat() os.FileInfo {
+func (fn *filenode) FileInfo() os.FileInfo {
fn.RLock()
defer fn.RUnlock()
return fn.fileinfo
if !ok || seg.Len() < maxBlockSize {
continue
}
- locator, _, err := fn.parent.kc.PutB(seg.buf)
+ locator, _, err := fn.FS().PutB(seg.buf)
if err != nil {
// TODO: stall (or return errors from)
// subsequent writes until flushing
}
fn.memsize -= int64(seg.Len())
fn.segments[idx] = storedSegment{
- kc: fn.parent.kc,
+ kc: fn.FS(),
locator: locator,
size: seg.Len(),
offset: 0,
}
}
-// FileSystem returns a CollectionFileSystem for the collection.
-func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
- var modTime time.Time
- if c.ModifiedAt == nil {
- modTime = time.Now()
- } else {
- modTime = *c.ModifiedAt
- }
- fs := &fileSystem{dirnode: dirnode{
- client: client,
- kc: kc,
- fileinfo: fileinfo{
- name: ".",
- mode: os.ModeDir | 0755,
- modTime: modTime,
- },
- parent: nil,
- inodes: make(map[string]inode),
- }}
- fs.dirnode.parent = &fs.dirnode
- if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
- return nil, err
- }
- return fs, nil
-}
-
-type filehandle struct {
- inode
- ptr filenodePtr
- append bool
- readable bool
- writable bool
- unreaddirs []os.FileInfo
-}
-
-func (f *filehandle) Read(p []byte) (n int, err error) {
- if !f.readable {
- return 0, ErrWriteOnlyMode
- }
- f.inode.RLock()
- defer f.inode.RUnlock()
- n, f.ptr, err = f.inode.Read(p, f.ptr)
- return
-}
-
-func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
- size := f.inode.Size()
- ptr := f.ptr
- switch whence {
- case io.SeekStart:
- ptr.off = off
- case io.SeekCurrent:
- ptr.off += off
- case io.SeekEnd:
- ptr.off = size + off
- }
- if ptr.off < 0 {
- return f.ptr.off, ErrNegativeOffset
- }
- if ptr.off != f.ptr.off {
- f.ptr = ptr
- // force filenode to recompute f.ptr fields on next
- // use
- f.ptr.repacked = -1
- }
- return f.ptr.off, nil
-}
-
-func (f *filehandle) Truncate(size int64) error {
- return f.inode.Truncate(size)
+type dirnode struct {
+ fs *collectionFileSystem
+ treenode
}
-func (f *filehandle) Write(p []byte) (n int, err error) {
- if !f.writable {
- return 0, ErrReadOnlyFile
- }
- f.inode.Lock()
- defer f.inode.Unlock()
- if fn, ok := f.inode.(*filenode); ok && f.append {
- f.ptr = filenodePtr{
- off: fn.fileinfo.size,
- segmentIdx: len(fn.segments),
- segmentOff: 0,
- repacked: fn.repacked,
- }
- }
- n, f.ptr, err = f.inode.Write(p, f.ptr)
- return
+func (dn *dirnode) FS() FileSystem {
+ return dn.fs
}
-func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) {
- if !f.inode.Stat().IsDir() {
- return nil, ErrInvalidOperation
- }
- if count <= 0 {
- return f.inode.Readdir(), nil
- }
- if f.unreaddirs == nil {
- f.unreaddirs = f.inode.Readdir()
- }
- if len(f.unreaddirs) == 0 {
- return nil, io.EOF
- }
- if count > len(f.unreaddirs) {
- count = len(f.unreaddirs)
+func (dn *dirnode) Child(name string, replace func(inode) inode) inode {
+ if dn == dn.fs.rootnode() && name == ".arvados#collection" {
+ gn := &getternode{Getter: func() ([]byte, error) {
+ var coll Collection
+ var err error
+ coll.ManifestText, err = dn.fs.MarshalManifest(".")
+ if err != nil {
+ return nil, err
+ }
+ data, err := json.Marshal(&coll)
+ if err == nil {
+ data = append(data, '\n')
+ }
+ return data, err
+ }}
+ gn.SetParent(dn, name)
+ return gn
}
- ret := f.unreaddirs[:count]
- f.unreaddirs = f.unreaddirs[count:]
- return ret, nil
-}
-
-func (f *filehandle) Stat() (os.FileInfo, error) {
- return f.inode.Stat(), nil
-}
-
-func (f *filehandle) Close() error {
- return nil
-}
-
-type dirnode struct {
- fileinfo fileinfo
- parent *dirnode
- client *Client
- kc keepClient
- inodes map[string]inode
- sync.RWMutex
+ return dn.treenode.Child(name, replace)
}
// sync flushes in-memory data (for all files in the tree rooted at
for _, sb := range sbs {
block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
}
- locator, _, err := dn.kc.PutB(block)
+ locator, _, err := dn.fs.PutB(block)
if err != nil {
return err
}
for _, sb := range sbs {
data := sb.fn.segments[sb.idx].(*memSegment).buf
sb.fn.segments[sb.idx] = storedSegment{
- kc: dn.kc,
+ kc: dn.fs,
locator: locator,
size: len(block),
offset: off,
return flush(pending)
}
-func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
- dn.Lock()
- defer dn.Unlock()
- return dn.marshalManifest(prefix)
-}
-
// caller must have read lock.
func (dn *dirnode) marshalManifest(prefix string) (string, error) {
var streamLen int64
blkLen = int(offset + length - pos - int64(blkOff))
}
fnode.appendSegment(storedSegment{
- kc: dn.kc,
+ kc: dn.fs,
locator: seg.locator,
size: seg.size,
offset: blkOff,
// only safe to call from loadManifest -- no locking
func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+ var node inode = dn
names := strings.Split(path, "/")
basename := names[len(names)-1]
if basename == "" || basename == "." || basename == ".." {
for _, name := range names[:len(names)-1] {
switch name {
case "", ".":
+ continue
case "..":
- dn = dn.parent
- default:
- switch node := dn.inodes[name].(type) {
- case nil:
- dn = dn.newDirnode(name, 0755, dn.fileinfo.modTime)
- case *dirnode:
- dn = node
- case *filenode:
- err = ErrFileExists
- return
+ if node == dn {
+ // can't be sure parent will be a *dirnode
+ return nil, ErrInvalidArgument
}
- }
- }
- switch node := dn.inodes[basename].(type) {
- case nil:
- fn = dn.newFilenode(basename, 0755, dn.fileinfo.modTime)
- case *filenode:
- fn = node
- case *dirnode:
- err = ErrIsDirectory
- }
- return
-}
-
-func (dn *dirnode) mkdir(name string) (*filehandle, error) {
- return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
-}
-
-func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
- f, err := dn.mkdir(name)
- if err == nil {
- err = f.Close()
- }
- return err
-}
-
-func (dn *dirnode) Remove(name string) error {
- return dn.remove(strings.TrimRight(name, "/"), false)
-}
-
-func (dn *dirnode) RemoveAll(name string) error {
- err := dn.remove(strings.TrimRight(name, "/"), true)
- if os.IsNotExist(err) {
- // "If the path does not exist, RemoveAll returns
- // nil." (see "os" pkg)
- err = nil
- }
- return err
-}
-
-func (dn *dirnode) remove(name string, recursive bool) error {
- dirname, name := path.Split(name)
- if name == "" || name == "." || name == ".." {
- return ErrInvalidArgument
- }
- dn, ok := dn.lookupPath(dirname).(*dirnode)
- if !ok {
- return os.ErrNotExist
- }
- dn.Lock()
- defer dn.Unlock()
- switch node := dn.inodes[name].(type) {
- case nil:
- return os.ErrNotExist
- case *dirnode:
- node.RLock()
- defer node.RUnlock()
- if !recursive && len(node.inodes) > 0 {
- return ErrDirectoryNotEmpty
- }
- }
- delete(dn.inodes, name)
- return nil
-}
-
-func (dn *dirnode) Rename(oldname, newname string) error {
- olddir, oldname := path.Split(oldname)
- if oldname == "" || oldname == "." || oldname == ".." {
- return ErrInvalidArgument
- }
- olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
- if err != nil {
- return fmt.Errorf("%q: %s", olddir, err)
- }
- defer olddirf.Close()
- newdir, newname := path.Split(newname)
- if newname == "." || newname == ".." {
- return ErrInvalidArgument
- } else if newname == "" {
- // Rename("a/b", "c/") means Rename("a/b", "c/b")
- newname = oldname
- }
- newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
- if err != nil {
- return fmt.Errorf("%q: %s", newdir, err)
- }
- defer newdirf.Close()
-
- // When acquiring locks on multiple nodes, all common
- // ancestors must be locked first in order to avoid
- // deadlock. This is assured by locking the path from root to
- // newdir, then locking the path from root to olddir, skipping
- // any already-locked nodes.
- needLock := []sync.Locker{}
- for _, f := range []*filehandle{olddirf, newdirf} {
- node := f.inode
- needLock = append(needLock, node)
- for node.Parent() != node {
node = node.Parent()
- needLock = append(needLock, node)
- }
- }
- locked := map[sync.Locker]bool{}
- for i := len(needLock) - 1; i >= 0; i-- {
- if n := needLock[i]; !locked[n] {
- n.Lock()
- defer n.Unlock()
- locked[n] = true
+ continue
}
- }
-
- olddn := olddirf.inode.(*dirnode)
- newdn := newdirf.inode.(*dirnode)
- oldinode, ok := olddn.inodes[oldname]
- if !ok {
- return os.ErrNotExist
- }
- if locked[oldinode] {
- // oldinode cannot become a descendant of itself.
- return ErrInvalidArgument
- }
- if existing, ok := newdn.inodes[newname]; ok {
- // overwriting an existing file or dir
- if dn, ok := existing.(*dirnode); ok {
- if !oldinode.Stat().IsDir() {
- return ErrIsDirectory
- }
- dn.RLock()
- defer dn.RUnlock()
- if len(dn.inodes) > 0 {
- return ErrDirectoryNotEmpty
+ node.Child(name, func(child inode) inode {
+ if child == nil {
+ child, err = node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
+ child.SetParent(node, name)
+ node = child
+ } else if !child.IsDir() {
+ err = ErrFileExists
+ } else {
+ node = child
}
+ return child
+ })
+ if err != nil {
+ return
}
- } else {
- if newdn.inodes == nil {
- newdn.inodes = make(map[string]inode)
- }
- newdn.fileinfo.size++
}
- newdn.inodes[newname] = oldinode
- switch n := oldinode.(type) {
- case *dirnode:
- n.parent = newdn
- case *filenode:
- n.parent = newdn
- default:
- panic(fmt.Sprintf("bad inode type %T", n))
- }
- delete(olddn.inodes, oldname)
- olddn.fileinfo.size--
- return nil
-}
-
-func (dn *dirnode) Parent() inode {
- dn.RLock()
- defer dn.RUnlock()
- return dn.parent
-}
-
-func (dn *dirnode) Readdir() (fi []os.FileInfo) {
- dn.RLock()
- defer dn.RUnlock()
- fi = make([]os.FileInfo, 0, len(dn.inodes))
- for _, inode := range dn.inodes {
- fi = append(fi, inode.Stat())
- }
- return
-}
-
-func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
- return 0, ptr, ErrInvalidOperation
-}
-
-func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
- return 0, ptr, ErrInvalidOperation
-}
-
-func (dn *dirnode) Size() int64 {
- dn.RLock()
- defer dn.RUnlock()
- return dn.fileinfo.Size()
-}
-
-func (dn *dirnode) Stat() os.FileInfo {
- dn.RLock()
- defer dn.RUnlock()
- return dn.fileinfo
-}
-
-func (dn *dirnode) Truncate(int64) error {
- return ErrInvalidOperation
-}
-
-// lookupPath returns the inode for the file/directory with the given
-// name (which may contain "/" separators), along with its parent
-// node. If no such file/directory exists, the returned node is nil.
-func (dn *dirnode) lookupPath(path string) (node inode) {
- node = dn
- for _, name := range strings.Split(path, "/") {
- dn, ok := node.(*dirnode)
- if !ok {
- return nil
- }
- if name == "." || name == "" {
- continue
- }
- if name == ".." {
- node = node.Parent()
- continue
+ node.Child(basename, func(child inode) inode {
+ switch child := child.(type) {
+ case nil:
+ child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
+ child.SetParent(node, basename)
+ fn = child.(*filenode)
+ return child
+ case *filenode:
+ fn = child
+ return child
+ case *dirnode:
+ err = ErrIsDirectory
+ return child
+ default:
+ err = ErrInvalidArgument
+ return child
}
- dn.RLock()
- node = dn.inodes[name]
- dn.RUnlock()
- }
+ })
return
}
-func (dn *dirnode) newDirnode(name string, perm os.FileMode, modTime time.Time) *dirnode {
- child := &dirnode{
- parent: dn,
- client: dn.client,
- kc: dn.kc,
- fileinfo: fileinfo{
- name: name,
- mode: os.ModeDir | perm,
- modTime: modTime,
- },
- }
- if dn.inodes == nil {
- dn.inodes = make(map[string]inode)
- }
- dn.inodes[name] = child
- dn.fileinfo.size++
- return child
-}
-
-func (dn *dirnode) newFilenode(name string, perm os.FileMode, modTime time.Time) *filenode {
- child := &filenode{
- parent: dn,
- fileinfo: fileinfo{
- name: name,
- mode: perm,
- modTime: modTime,
- },
- }
- if dn.inodes == nil {
- dn.inodes = make(map[string]inode)
- }
- dn.inodes[name] = child
- dn.fileinfo.size++
- return child
-}
-
-// OpenFile is analogous to os.OpenFile().
-func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
- if flag&os.O_SYNC != 0 {
- return nil, ErrSyncNotSupported
- }
- dirname, name := path.Split(name)
- dn, ok := dn.lookupPath(dirname).(*dirnode)
- if !ok {
- return nil, os.ErrNotExist
- }
- var readable, writable bool
- switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
- case os.O_RDWR:
- readable = true
- writable = true
- case os.O_RDONLY:
- readable = true
- case os.O_WRONLY:
- writable = true
- default:
- return nil, fmt.Errorf("invalid flags 0x%x", flag)
- }
- if !writable {
- // A directory can be opened via "foo/", "foo/.", or
- // "foo/..".
- switch name {
- case ".", "":
- return &filehandle{inode: dn}, nil
- case "..":
- return &filehandle{inode: dn.Parent()}, nil
- }
- }
- createMode := flag&os.O_CREATE != 0
- if createMode {
- dn.Lock()
- defer dn.Unlock()
- } else {
- dn.RLock()
- defer dn.RUnlock()
- }
- n, ok := dn.inodes[name]
- if !ok {
- if !createMode {
- return nil, os.ErrNotExist
- }
- if perm.IsDir() {
- n = dn.newDirnode(name, 0755, time.Now())
- } else {
- n = dn.newFilenode(name, 0755, time.Now())
- }
- } else if flag&os.O_EXCL != 0 {
- return nil, ErrFileExists
- } else if flag&os.O_TRUNC != 0 {
- if !writable {
- return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
- } else if fn, ok := n.(*filenode); !ok {
- return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
- } else {
- fn.Truncate(0)
- }
- }
- return &filehandle{
- inode: n,
- append: flag&os.O_APPEND != 0,
- readable: readable,
- writable: writable,
- }, nil
-}
-
type segment interface {
io.ReaderAt
Len() int
}
type storedSegment struct {
- kc keepClient
+ kc fsBackend
locator string
size int // size of stored block (also encoded in locator)
offset int // position of segment within the stored block
}
func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
+ if testing.Short() {
+ c.Skip("slow")
+ }
+
maxBlockSize = 8
defer func() { maxBlockSize = 2 << 26 }()
// expect ~2 seconds to load a manifest with 256K files
func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
+ if testing.Short() {
+ c.Skip("slow")
+ }
+
const (
dirCount = 512
fileCount = 512
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "log"
+ "os"
+ "sync"
+ "time"
+)
+
+func deferredCollectionFS(fs FileSystem, parent inode, coll Collection) inode {
+ var modTime time.Time
+ if coll.ModifiedAt != nil {
+ modTime = *coll.ModifiedAt
+ } else {
+ modTime = time.Now()
+ }
+ placeholder := &treenode{
+ fs: fs,
+ parent: parent,
+ inodes: nil,
+ fileinfo: fileinfo{
+ name: coll.Name,
+ modTime: modTime,
+ mode: 0755 | os.ModeDir,
+ },
+ }
+ return &deferrednode{wrapped: placeholder, create: func() inode {
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ if err != nil {
+ log.Printf("BUG: unhandled error: %s", err)
+ return placeholder
+ }
+ cfs, err := coll.FileSystem(fs, fs)
+ if err != nil {
+ log.Printf("BUG: unhandled error: %s", err)
+ return placeholder
+ }
+ root := cfs.rootnode()
+ root.SetParent(parent, coll.Name)
+ return root
+ }}
+}
+
+// A deferrednode wraps an inode that's expensive to build. Initially,
+// it responds to basic directory functions by proxying to the given
+// placeholder. If a caller uses a read/write/lock operation,
+// deferrednode calls the create() func to create the real inode, and
+// proxies to the real inode from then on.
+//
+// In practice, this means a deferrednode's parent's directory listing
+// can be generated using only the placeholder, instead of waiting for
+// create().
+type deferrednode struct {
+ wrapped inode
+ create func() inode
+ mtx sync.Mutex
+ created bool
+}
+
+func (dn *deferrednode) realinode() inode {
+ dn.mtx.Lock()
+ defer dn.mtx.Unlock()
+ if !dn.created {
+ dn.wrapped = dn.create()
+ dn.created = true
+ }
+ return dn.wrapped
+}
+
+func (dn *deferrednode) currentinode() inode {
+ dn.mtx.Lock()
+ defer dn.mtx.Unlock()
+ return dn.wrapped
+}
+
+func (dn *deferrednode) Read(p []byte, pos filenodePtr) (int, filenodePtr, error) {
+ return dn.realinode().Read(p, pos)
+}
+
+func (dn *deferrednode) Write(p []byte, pos filenodePtr) (int, filenodePtr, error) {
+ return dn.realinode().Write(p, pos)
+}
+
+func (dn *deferrednode) Child(name string, replace func(inode) inode) inode {
+ return dn.realinode().Child(name, replace)
+}
+
+func (dn *deferrednode) Truncate(size int64) error { return dn.realinode().Truncate(size) }
+func (dn *deferrednode) SetParent(p inode, name string) { dn.realinode().SetParent(p, name) }
+func (dn *deferrednode) IsDir() bool { return dn.currentinode().IsDir() }
+func (dn *deferrednode) Readdir() []os.FileInfo { return dn.realinode().Readdir() }
+func (dn *deferrednode) Size() int64 { return dn.currentinode().Size() }
+func (dn *deferrednode) FileInfo() os.FileInfo { return dn.currentinode().FileInfo() }
+func (dn *deferrednode) Lock() { dn.realinode().Lock() }
+func (dn *deferrednode) Unlock() { dn.realinode().Unlock() }
+func (dn *deferrednode) RLock() { dn.realinode().RLock() }
+func (dn *deferrednode) RUnlock() { dn.realinode().RUnlock() }
+func (dn *deferrednode) FS() FileSystem { return dn.currentinode().FS() }
+func (dn *deferrednode) Parent() inode { return dn.currentinode().Parent() }
--- /dev/null
+package arvados
+
+import (
+ "io"
+ "os"
+)
+
+type filehandle struct {
+ inode
+ ptr filenodePtr
+ append bool
+ readable bool
+ writable bool
+ unreaddirs []os.FileInfo
+}
+
+func (f *filehandle) Read(p []byte) (n int, err error) {
+ if !f.readable {
+ return 0, ErrWriteOnlyMode
+ }
+ f.inode.RLock()
+ defer f.inode.RUnlock()
+ n, f.ptr, err = f.inode.Read(p, f.ptr)
+ return
+}
+
+func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
+ size := f.inode.Size()
+ ptr := f.ptr
+ switch whence {
+ case io.SeekStart:
+ ptr.off = off
+ case io.SeekCurrent:
+ ptr.off += off
+ case io.SeekEnd:
+ ptr.off = size + off
+ }
+ if ptr.off < 0 {
+ return f.ptr.off, ErrNegativeOffset
+ }
+ if ptr.off != f.ptr.off {
+ f.ptr = ptr
+ // force filenode to recompute f.ptr fields on next
+ // use
+ f.ptr.repacked = -1
+ }
+ return f.ptr.off, nil
+}
+
+func (f *filehandle) Truncate(size int64) error {
+ return f.inode.Truncate(size)
+}
+
+func (f *filehandle) Write(p []byte) (n int, err error) {
+ if !f.writable {
+ return 0, ErrReadOnlyFile
+ }
+ f.inode.Lock()
+ defer f.inode.Unlock()
+ if fn, ok := f.inode.(*filenode); ok && f.append {
+ f.ptr = filenodePtr{
+ off: fn.fileinfo.size,
+ segmentIdx: len(fn.segments),
+ segmentOff: 0,
+ repacked: fn.repacked,
+ }
+ }
+ n, f.ptr, err = f.inode.Write(p, f.ptr)
+ return
+}
+
+func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) {
+ if !f.inode.IsDir() {
+ return nil, ErrInvalidOperation
+ }
+ if count <= 0 {
+ return f.inode.Readdir(), nil
+ }
+ if f.unreaddirs == nil {
+ f.unreaddirs = f.inode.Readdir()
+ }
+ if len(f.unreaddirs) == 0 {
+ return nil, io.EOF
+ }
+ if count > len(f.unreaddirs) {
+ count = len(f.unreaddirs)
+ }
+ ret := f.unreaddirs[:count]
+ f.unreaddirs = f.unreaddirs[count:]
+ return ret, nil
+}
+
+func (f *filehandle) Stat() (os.FileInfo, error) {
+ return f.inode.FileInfo(), nil
+}
+
+func (f *filehandle) Close() error {
+ return nil
+}
+
+func (f *filehandle) Sync() error {
+ // Sync the containing filesystem.
+ return f.FS().Sync()
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "bytes"
+ "os"
+ "time"
+)
+
+// A getternode is a read-only character device that returns whatever
+// data is returned by the supplied function.
+type getternode struct {
+ Getter func() ([]byte, error)
+
+ treenode
+ data *bytes.Reader
+}
+
+func (*getternode) IsDir() bool {
+ return false
+}
+
+func (*getternode) Child(string, func(inode) inode) inode {
+ return nil
+}
+
+func (gn *getternode) get() error {
+ if gn.data != nil {
+ return nil
+ }
+ data, err := gn.Getter()
+ if err != nil {
+ return err
+ }
+ gn.data = bytes.NewReader(data)
+ return nil
+}
+
+func (gn *getternode) Size() int64 {
+ return gn.FileInfo().Size()
+}
+
+func (gn *getternode) FileInfo() os.FileInfo {
+ gn.Lock()
+ defer gn.Unlock()
+ var size int64
+ if gn.get() == nil {
+ size = gn.data.Size()
+ }
+ return fileinfo{
+ modTime: time.Now(),
+ mode: 0444,
+ size: size,
+ }
+}
+
+func (gn *getternode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ if err := gn.get(); err != nil {
+ return 0, ptr, err
+ }
+ n, err := gn.data.ReadAt(p, ptr.off)
+ return n, filenodePtr{off: ptr.off + int64(n)}, err
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "log"
+ "os"
+ "sync"
+)
+
+// projectnode exposes an Arvados project as a filesystem directory.
+type projectnode struct {
+ inode
+ uuid string
+ setupOnce sync.Once
+ err error
+}
+
+func (pn *projectnode) setup() {
+ fs := pn.FS().(*siteFileSystem)
+ if pn.uuid == "" {
+ var resp User
+ pn.err = fs.RequestAndDecode(&resp, "GET", "arvados/v1/users/current", nil, nil)
+ if pn.err != nil {
+ return
+ }
+ pn.uuid = resp.UUID
+ }
+ filters := []Filter{{"owner_uuid", "=", pn.uuid}}
+ params := ResourceListParams{
+ Filters: filters,
+ Order: "uuid",
+ }
+ for {
+ var resp CollectionList
+ pn.err = fs.RequestAndDecode(&resp, "GET", "arvados/v1/collections", nil, params)
+ if pn.err != nil {
+ // TODO: retry on next access, instead of returning the same error forever
+ return
+ }
+ if len(resp.Items) == 0 {
+ break
+ }
+ for _, i := range resp.Items {
+ coll := i
+ if coll.Name == "" {
+ continue
+ }
+ pn.inode.Child(coll.Name, func(inode) inode {
+ return deferredCollectionFS(fs, pn, coll)
+ })
+ }
+ params.Filters = append(filters, Filter{"uuid", ">", resp.Items[len(resp.Items)-1].UUID})
+ }
+
+ filters = append(filters, Filter{"group_class", "=", "project"})
+ params.Filters = filters
+ for {
+ var resp GroupList
+ pn.err = fs.RequestAndDecode(&resp, "GET", "arvados/v1/groups", nil, params)
+ if pn.err != nil {
+ // TODO: retry on next access, instead of returning the same error forever
+ return
+ }
+ if len(resp.Items) == 0 {
+ break
+ }
+ for _, group := range resp.Items {
+ if group.Name == "" || group.Name == "." || group.Name == ".." {
+ continue
+ }
+ pn.inode.Child(group.Name, func(inode) inode {
+ return fs.newProjectNode(pn, group.Name, group.UUID)
+ })
+ }
+ params.Filters = append(filters, Filter{"uuid", ">", resp.Items[len(resp.Items)-1].UUID})
+ }
+}
+
+func (pn *projectnode) Readdir() []os.FileInfo {
+ pn.setupOnce.Do(pn.setup)
+ return pn.inode.Readdir()
+}
+
+func (pn *projectnode) Child(name string, replace func(inode) inode) inode {
+ pn.setupOnce.Do(pn.setup)
+ if pn.err != nil {
+ log.Printf("BUG: not propagating error setting up %T %v: %s", pn, pn, pn.err)
+ // TODO: propagate error, instead of just being empty
+ return nil
+ }
+ if replace == nil {
+ // lookup
+ return pn.inode.Child(name, nil)
+ }
+ return pn.inode.Child(name, func(existing inode) inode {
+ if repl := replace(existing); repl == nil {
+ // delete
+ // (TODO)
+ return pn.Child(name, nil) // not implemented
+ } else if repl.FileInfo().IsDir() {
+ // mkdir
+ // TODO: repl.SetParent(pn, name), etc.
+ return pn.Child(name, nil) // not implemented
+ } else {
+ // create file
+ // TODO: repl.SetParent(pn, name), etc.
+ return pn.Child(name, nil) // not implemented
+ }
+ })
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "bytes"
+ "encoding/json"
+ "io"
+ "os"
+
+ check "gopkg.in/check.v1"
+)
+
+type spiedRequest struct {
+ method string
+ path string
+ params map[string]interface{}
+}
+
+type spyingClient struct {
+ *Client
+ calls []spiedRequest
+}
+
+func (sc *spyingClient) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+ var paramsCopy map[string]interface{}
+ var buf bytes.Buffer
+ json.NewEncoder(&buf).Encode(params)
+ json.NewDecoder(&buf).Decode(¶msCopy)
+ sc.calls = append(sc.calls, spiedRequest{
+ method: method,
+ path: path,
+ params: paramsCopy,
+ })
+ return sc.Client.RequestAndDecode(dst, method, path, body, params)
+}
+
+func (s *SiteFSSuite) TestHomeProject(c *check.C) {
+ f, err := s.fs.Open("/home")
+ c.Assert(err, check.IsNil)
+ fis, err := f.Readdir(-1)
+ c.Check(len(fis), check.Not(check.Equals), 0)
+
+ ok := false
+ for _, fi := range fis {
+ c.Check(fi.Name(), check.Not(check.Equals), "")
+ if fi.Name() == "Unrestricted public data" {
+ ok = true
+ }
+ }
+ c.Check(ok, check.Equals, true)
+
+ f, err = s.fs.Open("/home/Unrestricted public data/..")
+ c.Assert(err, check.IsNil)
+ fi, err := f.Stat()
+ c.Check(err, check.IsNil)
+ c.Check(fi.IsDir(), check.Equals, true)
+ c.Check(fi.Name(), check.Equals, "home")
+
+ f, err = s.fs.Open("/home/Unrestricted public data/Subproject in anonymous accessible project")
+ c.Check(err, check.IsNil)
+ fi, err = f.Stat()
+ c.Check(err, check.IsNil)
+ c.Check(fi.IsDir(), check.Equals, true)
+
+ for _, nx := range []string{
+ "/home/A Project",
+ "/home/A Project/does not exist",
+ "/home/Unrestricted public data/does not exist",
+ } {
+ c.Log(nx)
+ f, err = s.fs.Open(nx)
+ c.Check(err, check.NotNil)
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "os"
+ "time"
+)
+
+type siteFileSystem struct {
+ fileSystem
+}
+
+// SiteFileSystem returns a FileSystem that maps collections and other
+// Arvados objects onto a filesystem layout.
+//
+// This is experimental: the filesystem layout is not stable, and
+// there are significant known bugs and shortcomings. For example,
+// writes are not persisted until Sync() is called.
+func (c *Client) SiteFileSystem(kc keepClient) FileSystem {
+ root := &vdirnode{}
+ fs := &siteFileSystem{
+ fileSystem: fileSystem{
+ fsBackend: keepBackend{apiClient: c, keepClient: kc},
+ root: root,
+ },
+ }
+ root.inode = &treenode{
+ fs: fs,
+ parent: root,
+ fileinfo: fileinfo{
+ name: "/",
+ mode: os.ModeDir | 0755,
+ modTime: time.Now(),
+ },
+ inodes: make(map[string]inode),
+ }
+ root.inode.Child("by_id", func(inode) inode {
+ return &vdirnode{
+ inode: &treenode{
+ fs: fs,
+ parent: fs.root,
+ inodes: make(map[string]inode),
+ fileinfo: fileinfo{
+ name: "by_id",
+ modTime: time.Now(),
+ mode: 0755 | os.ModeDir,
+ },
+ },
+ create: fs.mountCollection,
+ }
+ })
+ root.inode.Child("home", func(inode) inode {
+ return fs.newProjectNode(fs.root, "home", "")
+ })
+ return fs
+}
+
+func (fs *siteFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
+ return nil, ErrInvalidOperation
+}
+
+func (fs *siteFileSystem) mountCollection(parent inode, id string) inode {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil)
+ if err != nil {
+ return nil
+ }
+ cfs, err := coll.FileSystem(fs, fs)
+ if err != nil {
+ return nil
+ }
+ root := cfs.rootnode()
+ root.SetParent(parent, id)
+ return root
+}
+
+func (fs *siteFileSystem) newProjectNode(root inode, name, uuid string) inode {
+ return &projectnode{
+ uuid: uuid,
+ inode: &treenode{
+ fs: fs,
+ parent: root,
+ inodes: make(map[string]inode),
+ fileinfo: fileinfo{
+ name: name,
+ modTime: time.Now(),
+ mode: 0755 | os.ModeDir,
+ },
+ },
+ }
+}
+
+// vdirnode wraps an inode by ignoring any requests to add/replace
+// children, and calling a create() func when a non-existing child is
+// looked up.
+//
+// create() can return either a new node, which will be added to the
+// treenode, or nil for ENOENT.
+type vdirnode struct {
+ inode
+ create func(parent inode, name string) inode
+}
+
+func (vn *vdirnode) Child(name string, _ func(inode) inode) inode {
+ return vn.inode.Child(name, func(existing inode) inode {
+ if existing != nil {
+ return existing
+ } else if vn.create == nil {
+ return nil
+ }
+ n := vn.create(vn, name)
+ if n != nil {
+ n.SetParent(vn, name)
+ vn.inode.(*treenode).fileinfo.modTime = time.Now()
+ }
+ return n
+ })
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "net/http"
+ "os"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&SiteFSSuite{})
+
+type SiteFSSuite struct {
+ client *Client
+ fs FileSystem
+ kc keepClient
+}
+
+func (s *SiteFSSuite) SetUpTest(c *check.C) {
+ s.client = NewClientFromEnv()
+ s.kc = &keepClientStub{
+ blocks: map[string][]byte{
+ "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
+ }}
+ s.fs = s.client.SiteFileSystem(s.kc)
+}
+
+func (s *SiteFSSuite) TestHttpFileSystemInterface(c *check.C) {
+ _, ok := s.fs.(http.FileSystem)
+ c.Check(ok, check.Equals, true)
+}
+
+func (s *SiteFSSuite) TestByIDEmpty(c *check.C) {
+ f, err := s.fs.Open("/by_id")
+ c.Assert(err, check.IsNil)
+ fis, err := f.Readdir(-1)
+ c.Check(len(fis), check.Equals, 0)
+}
+
+func (s *SiteFSSuite) TestByUUID(c *check.C) {
+ f, err := s.fs.Open("/by_id")
+ c.Assert(err, check.IsNil)
+ fis, err := f.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(len(fis), check.Equals, 0)
+
+ err = s.fs.Mkdir("/by_id/"+arvadostest.FooCollection, 0755)
+ c.Check(err, check.Equals, os.ErrExist)
+
+ f, err = s.fs.Open("/by_id/" + arvadostest.NonexistentCollection)
+ c.Assert(err, check.Equals, os.ErrNotExist)
+
+ f, err = s.fs.Open("/by_id/" + arvadostest.FooCollection)
+ c.Assert(err, check.IsNil)
+ fis, err = f.Readdir(-1)
+ var names []string
+ for _, fi := range fis {
+ names = append(names, fi.Name())
+ }
+ c.Check(names, check.DeepEquals, []string{"foo"})
+
+ _, err = s.fs.OpenFile("/by_id/"+arvadostest.NonexistentCollection, os.O_RDWR|os.O_CREATE, 0755)
+ c.Check(err, check.Equals, ErrInvalidOperation)
+ err = s.fs.Rename("/by_id/"+arvadostest.FooCollection, "/by_id/beep")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+ err = s.fs.Rename("/by_id/"+arvadostest.FooCollection+"/foo", "/by_id/beep")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+ _, err = s.fs.Stat("/by_id/beep")
+ c.Check(err, check.Equals, os.ErrNotExist)
+ err = s.fs.Rename("/by_id/"+arvadostest.FooCollection+"/foo", "/by_id/"+arvadostest.FooCollection+"/bar")
+ c.Check(err, check.IsNil)
+
+ err = s.fs.Rename("/by_id", "/beep")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+}