X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b714ab7401074991afe2fdc239c89107b3af6ca1..0b714d15c15340f3f52c331528d5cbb4cc422f07:/sdk/go/arvados/fs_base.go diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go index f5916f9571..2478641df5 100644 --- a/sdk/go/arvados/fs_base.go +++ b/sdk/go/arvados/fs_base.go @@ -31,6 +31,10 @@ var ( ErrPermission = os.ErrPermission ) +type syncer interface { + Sync() error +} + // A File is an *os.File-like interface for reading and writing files // in a FileSystem. type File interface { @@ -58,6 +62,9 @@ type FileSystem interface { // while locking multiple inodes. locker() sync.Locker + // throttle for limiting concurrent background writers + throttle() *throttle + // create a new node with nil parent. newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) @@ -95,8 +102,13 @@ type FileSystem interface { // for all writes to finish before returning. If shortBlocks // is true, flush everything; otherwise, if there's less than // a full block of buffered data at the end of a stream, leave - // it buffered in memory in case more data can be appended. - Flush(shortBlocks bool) error + // it buffered in memory in case more data can be appended. If + // path is "", flush all dirs/streams; otherwise, flush only + // the specified dir/stream. + Flush(path string, shortBlocks bool) error + + // Estimate current memory usage. + MemorySize() int64 } type inode interface { @@ -147,6 +159,7 @@ type inode interface { sync.Locker RLock() RUnlock() + MemorySize() int64 } type fileinfo struct { @@ -220,6 +233,13 @@ func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, return nil, ErrNotADirectory } +func (*nullnode) MemorySize() int64 { + // Types that embed nullnode should report their own size, but + // if they don't, we at least report a non-zero size to ensure + // a large tree doesn't get reported as 0 bytes. + return 64 +} + type treenode struct { fs FileSystem parent inode @@ -294,16 +314,46 @@ func (n *treenode) Readdir() (fi []os.FileInfo, err error) { return } +func (n *treenode) Sync() error { + n.RLock() + defer n.RUnlock() + for _, inode := range n.inodes { + syncer, ok := inode.(syncer) + if !ok { + return ErrInvalidOperation + } + err := syncer.Sync() + if err != nil { + return err + } + } + return nil +} + +func (n *treenode) MemorySize() (size int64) { + n.RLock() + defer n.RUnlock() + for _, inode := range n.inodes { + size += inode.MemorySize() + } + return +} + type fileSystem struct { root inode fsBackend mutex sync.Mutex + thr *throttle } func (fs *fileSystem) rootnode() inode { return fs.root } +func (fs *fileSystem) throttle() *throttle { + return fs.thr +} + func (fs *fileSystem) locker() sync.Locker { return &fs.mutex } @@ -566,15 +616,21 @@ func (fs *fileSystem) remove(name string, recursive bool) error { } func (fs *fileSystem) Sync() error { - log.Printf("TODO: sync fileSystem") + if syncer, ok := fs.root.(syncer); ok { + return syncer.Sync() + } return ErrInvalidOperation } -func (fs *fileSystem) Flush(bool) error { +func (fs *fileSystem) Flush(string, bool) error { log.Printf("TODO: flush fileSystem") return ErrInvalidOperation } +func (fs *fileSystem) MemorySize() int64 { + return fs.root.MemorySize() +} + // rlookup (recursive lookup) returns the inode for the file/directory // with the given name (which may contain "/" separators). If no such // file/directory exists, the returned node is nil.