X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4dda5d7b0ea74103c07617cc2ab9e5c97682c85d..d8d6bca4b5db4851a29473f08dc600816c977a21:/sdk/go/arvados/fs_base.go diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go index 3058a7609c..2478641df5 100644 --- a/sdk/go/arvados/fs_base.go +++ b/sdk/go/arvados/fs_base.go @@ -31,6 +31,10 @@ var ( ErrPermission = os.ErrPermission ) +type syncer interface { + Sync() error +} + // A File is an *os.File-like interface for reading and writing files // in a FileSystem. type File interface { @@ -58,6 +62,9 @@ type FileSystem interface { // while locking multiple inodes. locker() sync.Locker + // throttle for limiting concurrent background writers + throttle() *throttle + // create a new node with nil parent. newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) @@ -86,7 +93,22 @@ type FileSystem interface { Remove(name string) error RemoveAll(name string) error Rename(oldname, newname string) error + + // Write buffered data from memory to storage, returning when + // all updates have been saved to persistent storage. Sync() error + + // Write buffered data from memory to storage, but don't wait + // for all writes to finish before returning. If shortBlocks + // is true, flush everything; otherwise, if there's less than + // a full block of buffered data at the end of a stream, leave + // it buffered in memory in case more data can be appended. If + // path is "", flush all dirs/streams; otherwise, flush only + // the specified dir/stream. + Flush(path string, shortBlocks bool) error + + // Estimate current memory usage. + MemorySize() int64 } type inode interface { @@ -137,6 +159,7 @@ type inode interface { sync.Locker RLock() RUnlock() + MemorySize() int64 } type fileinfo struct { @@ -210,6 +233,13 @@ func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, return nil, ErrNotADirectory } +func (*nullnode) MemorySize() int64 { + // Types that embed nullnode should report their own size, but + // if they don't, we at least report a non-zero size to ensure + // a large tree doesn't get reported as 0 bytes. + return 64 +} + type treenode struct { fs FileSystem parent inode @@ -284,16 +314,46 @@ func (n *treenode) Readdir() (fi []os.FileInfo, err error) { return } +func (n *treenode) Sync() error { + n.RLock() + defer n.RUnlock() + for _, inode := range n.inodes { + syncer, ok := inode.(syncer) + if !ok { + return ErrInvalidOperation + } + err := syncer.Sync() + if err != nil { + return err + } + } + return nil +} + +func (n *treenode) MemorySize() (size int64) { + n.RLock() + defer n.RUnlock() + for _, inode := range n.inodes { + size += inode.MemorySize() + } + return +} + type fileSystem struct { root inode fsBackend mutex sync.Mutex + thr *throttle } func (fs *fileSystem) rootnode() inode { return fs.root } +func (fs *fileSystem) throttle() *throttle { + return fs.thr +} + func (fs *fileSystem) locker() sync.Locker { return &fs.mutex } @@ -556,10 +616,21 @@ func (fs *fileSystem) remove(name string, recursive bool) error { } func (fs *fileSystem) Sync() error { - log.Printf("TODO: sync fileSystem") + if syncer, ok := fs.root.(syncer); ok { + return syncer.Sync() + } + return ErrInvalidOperation +} + +func (fs *fileSystem) Flush(string, bool) error { + log.Printf("TODO: flush fileSystem") return ErrInvalidOperation } +func (fs *fileSystem) MemorySize() int64 { + return fs.root.MemorySize() +} + // rlookup (recursive lookup) returns the inode for the file/directory // with the given name (which may contain "/" separators). If no such // file/directory exists, the returned node is nil.