X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/134815098b060c232d1eee381d1eeb8e9d6162ff..607033c33f2001c194fe8c68d0dc17e4bde849da:/sdk/go/arvados/fs_base.go diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go index 3058a7609c..2ad4d1f859 100644 --- a/sdk/go/arvados/fs_base.go +++ b/sdk/go/arvados/fs_base.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "io/fs" "log" "net/http" "os" @@ -29,8 +30,42 @@ var ( ErrIsDirectory = errors.New("cannot rename file to overwrite existing directory") ErrNotADirectory = errors.New("not a directory") ErrPermission = os.ErrPermission + DebugLocksPanicMode = false ) +type syncer interface { + Sync() error +} + +func debugPanicIfNotLocked(l sync.Locker, writing bool) { + if !DebugLocksPanicMode { + return + } + race := false + if rl, ok := l.(interface { + RLock() + RUnlock() + }); ok && writing { + go func() { + // Fail if we can grab the read lock during an + // operation that purportedly has write lock. + rl.RLock() + race = true + rl.RUnlock() + }() + } else { + go func() { + l.Lock() + race = true + l.Unlock() + }() + } + time.Sleep(100) + if race { + panic("bug: caller-must-have-lock func called, but nobody has lock") + } +} + // A File is an *os.File-like interface for reading and writing files // in a FileSystem. type File interface { @@ -43,6 +78,24 @@ type File interface { Stat() (os.FileInfo, error) Truncate(int64) error Sync() error + // Create a snapshot of a file or directory tree, which can + // then be spliced onto a different path or a different + // collection. + Snapshot() (*Subtree, error) + // Replace this file or directory with the given snapshot. + // The target must be inside a collection: Splice returns an + // error if the File is a virtual file or directory like + // by_id, a project directory, .arvados#collection, + // etc. Splice can replace directories with regular files and + // vice versa, except it cannot replace the root directory of + // a collection with a regular file. + Splice(snapshot *Subtree) error +} + +// A Subtree is a detached part of a filesystem tree that can be +// spliced into a filesystem via (File)Splice(). +type Subtree struct { + inode inode } // A FileSystem is an http.Filesystem plus Stat() and support for @@ -58,6 +111,9 @@ type FileSystem interface { // while locking multiple inodes. locker() sync.Locker + // throttle for limiting concurrent background writers + throttle() *throttle + // create a new node with nil parent. newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) @@ -86,7 +142,34 @@ type FileSystem interface { Remove(name string) error RemoveAll(name string) error Rename(oldname, newname string) error + + // Write buffered data from memory to storage, returning when + // all updates have been saved to persistent storage. Sync() error + + // Write buffered data from memory to storage, but don't wait + // for all writes to finish before returning. If shortBlocks + // is true, flush everything; otherwise, if there's less than + // a full block of buffered data at the end of a stream, leave + // it buffered in memory in case more data can be appended. If + // path is "", flush all dirs/streams; otherwise, flush only + // the specified dir/stream. + Flush(path string, shortBlocks bool) error + + // Estimate current memory usage. + MemorySize() int64 +} + +type fsFS struct { + FileSystem +} + +// FS returns an fs.FS interface to the given FileSystem, to enable +// the use of fs.WalkDir, etc. +func FS(fs FileSystem) fs.FS { return fsFS{fs} } +func (fs fsFS) Open(path string) (fs.File, error) { + f, err := fs.FileSystem.Open(path) + return f, err } type inode interface { @@ -100,6 +183,12 @@ type inode interface { Readdir() ([]os.FileInfo, error) Size() int64 FileInfo() os.FileInfo + // Create a snapshot of this node and its descendants. + Snapshot() (inode, error) + // Replace this node with a copy of the provided snapshot. + // Caller may provide the same snapshot to multiple Splice + // calls, but must not modify the snapshot concurrently. + Splice(inode) error // Child() performs lookups and updates of named child nodes. // @@ -137,6 +226,7 @@ type inode interface { sync.Locker RLock() RUnlock() + MemorySize() int64 } type fileinfo struct { @@ -144,6 +234,14 @@ type fileinfo struct { mode os.FileMode size int64 modTime time.Time + // If not nil, sys() returns the source data structure, which + // can be a *Collection, *Group, or nil. Currently populated + // only for project dirs and top-level collection dirs. Does + // not stay up to date with upstream changes. + // + // Intended to support keep-web's properties-as-s3-metadata + // feature (https://dev.arvados.org/issues/19088). + sys func() interface{} } // Name implements os.FileInfo. @@ -171,9 +269,12 @@ func (fi fileinfo) Size() int64 { return fi.size } -// Sys implements os.FileInfo. +// Sys implements os.FileInfo. See comment in fileinfo struct. func (fi fileinfo) Sys() interface{} { - return nil + if fi.sys == nil { + return nil + } + return fi.sys() } type nullnode struct{} @@ -210,6 +311,21 @@ func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, return nil, ErrNotADirectory } +func (*nullnode) MemorySize() int64 { + // Types that embed nullnode should report their own size, but + // if they don't, we at least report a non-zero size to ensure + // a large tree doesn't get reported as 0 bytes. + return 64 +} + +func (*nullnode) Snapshot() (inode, error) { + return nil, ErrInvalidOperation +} + +func (*nullnode) Splice(inode) error { + return ErrInvalidOperation +} + type treenode struct { fs FileSystem parent inode @@ -241,6 +357,7 @@ func (n *treenode) IsDir() bool { } func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) { + debugPanicIfNotLocked(n, false) child = n.inodes[name] if name == "" || name == "." || name == ".." { err = ErrInvalidArgument @@ -254,8 +371,10 @@ func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child return } if newchild == nil { + debugPanicIfNotLocked(n, true) delete(n.inodes, name) } else if newchild != child { + debugPanicIfNotLocked(n, true) n.inodes[name] = newchild n.fileinfo.modTime = time.Now() child = newchild @@ -284,16 +403,47 @@ func (n *treenode) Readdir() (fi []os.FileInfo, err error) { return } +func (n *treenode) Sync() error { + n.RLock() + defer n.RUnlock() + for _, inode := range n.inodes { + syncer, ok := inode.(syncer) + if !ok { + return ErrInvalidOperation + } + err := syncer.Sync() + if err != nil { + return err + } + } + return nil +} + +func (n *treenode) MemorySize() (size int64) { + n.RLock() + defer n.RUnlock() + debugPanicIfNotLocked(n, false) + for _, inode := range n.inodes { + size += inode.MemorySize() + } + return 64 + size +} + type fileSystem struct { root inode fsBackend mutex sync.Mutex + thr *throttle } func (fs *fileSystem) rootnode() inode { return fs.root } +func (fs *fileSystem) throttle() *throttle { + return fs.thr +} + func (fs *fileSystem) locker() sync.Locker { return &fs.mutex } @@ -324,24 +474,23 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha default: return nil, fmt.Errorf("invalid flags 0x%x", flag) } - if !writable && parent.IsDir() { + if parent.IsDir() { // A directory can be opened via "foo/", "foo/.", or // "foo/..". switch name { case ".", "": - return &filehandle{inode: parent}, nil + return &filehandle{inode: parent, readable: readable, writable: writable}, nil case "..": - return &filehandle{inode: parent.Parent()}, nil + return &filehandle{inode: parent.Parent(), readable: readable, writable: writable}, nil } } createMode := flag&os.O_CREATE != 0 - if createMode { - parent.Lock() - defer parent.Unlock() - } else { - parent.RLock() - defer parent.RUnlock() - } + // We always need to take Lock() here, not just RLock(). Even + // if we know we won't be creating a file, parent might be a + // lookupnode, which sometimes populates its inodes map during + // a Child() call. + parent.Lock() + defer parent.Unlock() n, err := parent.Child(name, nil) if err != nil { return nil, err @@ -466,7 +615,7 @@ func (fs *fileSystem) Rename(oldname, newname string) error { // supported. Locking inodes from different // filesystems could deadlock, so we must error out // now. - return ErrInvalidArgument + return ErrInvalidOperation } // To ensure we can test reliably whether we're about to move @@ -556,10 +705,21 @@ func (fs *fileSystem) remove(name string, recursive bool) error { } func (fs *fileSystem) Sync() error { - log.Printf("TODO: sync fileSystem") + if syncer, ok := fs.root.(syncer); ok { + return syncer.Sync() + } return ErrInvalidOperation } +func (fs *fileSystem) Flush(string, bool) error { + log.Printf("TODO: flush fileSystem") + return ErrInvalidOperation +} + +func (fs *fileSystem) MemorySize() int64 { + return fs.root.MemorySize() +} + // rlookup (recursive lookup) returns the inode for the file/directory // with the given name (which may contain "/" separators). If no such // file/directory exists, the returned node is nil. @@ -576,8 +736,8 @@ func rlookup(start inode, path string) (node inode, err error) { } } node, err = func() (inode, error) { - node.RLock() - defer node.RUnlock() + node.Lock() + defer node.Unlock() return node.Child(name, nil) }() if node == nil || err != nil { @@ -593,3 +753,32 @@ func rlookup(start inode, path string) (node inode, err error) { func permittedName(name string) bool { return name != "" && name != "." && name != ".." && !strings.Contains(name, "/") } + +// Snapshot returns a Subtree that's a copy of the given path. It +// returns an error if the path is not inside a collection. +func Snapshot(fs FileSystem, path string) (*Subtree, error) { + f, err := fs.OpenFile(path, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer f.Close() + return f.Snapshot() +} + +// Splice inserts newsubtree at the indicated target path. +// +// Splice returns an error if target is not inside a collection. +// +// Splice returns an error if target is the root of a collection and +// newsubtree is a snapshot of a file. +func Splice(fs FileSystem, target string, newsubtree *Subtree) error { + f, err := fs.OpenFile(target, os.O_WRONLY, 0) + if os.IsNotExist(err) { + f, err = fs.OpenFile(target, os.O_CREATE|os.O_WRONLY, 0700) + } + if err != nil { + return fmt.Errorf("open %s: %w", target, err) + } + defer f.Close() + return f.Splice(newsubtree) +}