X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/67bd03fc4c2b3fbb25613c76a9470b53ebaa832a..af83c3a047bebc48db9490f3523208c4c3f87b6f:/sdk/go/arvados/fs_collection.go diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go index e4511895ff..96977cbc66 100644 --- a/sdk/go/arvados/fs_collection.go +++ b/sdk/go/arvados/fs_collection.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "io" + "log" "os" "path" "regexp" @@ -20,11 +21,6 @@ import ( var maxBlockSize = 1 << 26 -type keepClient interface { - ReadAt(locator string, p []byte, off int) (int, error) - PutB(p []byte) (string, int, error) -} - // A CollectionFileSystem is a FileSystem that can be serialized as a // manifest and stored as a collection. type CollectionFileSystem interface { @@ -37,62 +33,97 @@ type CollectionFileSystem interface { MarshalManifest(prefix string) (string, error) } +type collectionFileSystem struct { + fileSystem + uuid string +} + // FileSystem returns a CollectionFileSystem for the collection. -func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) { +func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) { var modTime time.Time if c.ModifiedAt == nil { modTime = time.Now() } else { modTime = *c.ModifiedAt } - dn := &dirnode{ - client: client, - kc: kc, + fs := &collectionFileSystem{ + uuid: c.UUID, + fileSystem: fileSystem{ + fsBackend: keepBackend{apiClient: client, keepClient: kc}, + }, + } + root := &dirnode{ + fs: fs, treenode: treenode{ fileinfo: fileinfo{ name: ".", mode: os.ModeDir | 0755, modTime: modTime, }, - parent: nil, inodes: make(map[string]inode), }, } - dn.parent = dn - fs := &collectionFileSystem{fileSystem: fileSystem{inode: dn}} - if err := dn.loadManifest(c.ManifestText); err != nil { + root.SetParent(root) + if err := root.loadManifest(c.ManifestText); err != nil { return nil, err } + fs.root = root return fs, nil } -type collectionFileSystem struct { - fileSystem +func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) { + if name == "" || name == "." || name == ".." { + return nil, ErrInvalidArgument + } + if perm.IsDir() { + return &dirnode{ + fs: fs, + treenode: treenode{ + fileinfo: fileinfo{ + name: name, + mode: perm | os.ModeDir, + modTime: modTime, + }, + inodes: make(map[string]inode), + }, + }, nil + } else { + return &filenode{ + fs: fs, + fileinfo: fileinfo{ + name: name, + mode: perm & ^os.ModeDir, + modTime: modTime, + }, + }, nil + } } -func (fs collectionFileSystem) Child(name string, replace func(inode) inode) inode { - if name == ".arvados#collection" { - return &getternode{Getter: func() ([]byte, error) { - var coll Collection - var err error - coll.ManifestText, err = fs.MarshalManifest(".") - if err != nil { - return nil, err - } - data, err := json.Marshal(&coll) - if err == nil { - data = append(data, 10) - } - return data, err - }} +func (fs *collectionFileSystem) Sync() error { + log.Printf("cfs.Sync()") + if fs.uuid == "" { + return nil + } + txt, err := fs.MarshalManifest(".") + if err != nil { + log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err) + return err + } + coll := &Collection{ + UUID: fs.uuid, + ManifestText: txt, } - return fs.fileSystem.Child(name, replace) + err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}}) + if err != nil { + log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err) + } + return err } -func (fs collectionFileSystem) MarshalManifest(prefix string) (string, error) { - fs.fileSystem.inode.Lock() - defer fs.fileSystem.inode.Unlock() - return fs.fileSystem.inode.(*dirnode).marshalManifest(prefix) +func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) { + fs.fileSystem.root.Lock() + defer fs.fileSystem.root.Unlock() + return fs.fileSystem.root.(*dirnode).marshalManifest(prefix) } // filenodePtr is an offset into a file that is (usually) efficient to @@ -170,8 +201,9 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) { // filenode implements inode. type filenode struct { + parent inode + fs FileSystem fileinfo fileinfo - parent *dirnode segments []segment // number of times `segments` has changed in a // way that might invalidate a filenodePtr @@ -187,12 +219,22 @@ func (fn *filenode) appendSegment(e segment) { fn.fileinfo.size += int64(e.Len()) } +func (fn *filenode) SetParent(p inode) { + fn.RLock() + defer fn.RUnlock() + fn.parent = p +} + func (fn *filenode) Parent() inode { fn.RLock() defer fn.RUnlock() return fn.parent } +func (fn *filenode) FS() FileSystem { + return fn.fs +} + // Read reads file data from a single segment, starting at startPtr, // into p. startPtr is assumed not to be up-to-date. Caller must have // RLock or Lock. @@ -438,7 +480,7 @@ func (fn *filenode) pruneMemSegments() { if !ok || seg.Len() < maxBlockSize { continue } - locator, _, err := fn.parent.kc.PutB(seg.buf) + locator, _, err := fn.FS().PutB(seg.buf) if err != nil { // TODO: stall (or return errors from) // subsequent writes until flushing @@ -447,7 +489,7 @@ func (fn *filenode) pruneMemSegments() { } fn.memsize -= int64(seg.Len()) fn.segments[idx] = storedSegment{ - kc: fn.parent.kc, + kc: fn.FS(), locator: locator, size: seg.Len(), offset: 0, @@ -457,9 +499,33 @@ func (fn *filenode) pruneMemSegments() { } type dirnode struct { + fs *collectionFileSystem treenode - client *Client - kc keepClient +} + +func (dn *dirnode) FS() FileSystem { + return dn.fs +} + +func (dn *dirnode) Child(name string, replace func(inode) inode) inode { + if dn == dn.fs.rootnode() && name == ".arvados#collection" { + gn := &getternode{Getter: func() ([]byte, error) { + var coll Collection + var err error + coll.ManifestText, err = dn.fs.MarshalManifest(".") + if err != nil { + return nil, err + } + data, err := json.Marshal(&coll) + if err == nil { + data = append(data, 10) + } + return data, err + }} + gn.SetParent(dn) + return gn + } + return dn.treenode.Child(name, replace) } // sync flushes in-memory data (for all files in the tree rooted at @@ -480,7 +546,7 @@ func (dn *dirnode) sync() error { for _, sb := range sbs { block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...) } - locator, _, err := dn.kc.PutB(block) + locator, _, err := dn.fs.PutB(block) if err != nil { return err } @@ -488,7 +554,7 @@ func (dn *dirnode) sync() error { for _, sb := range sbs { data := sb.fn.segments[sb.idx].(*memSegment).buf sb.fn.segments[sb.idx] = storedSegment{ - kc: dn.kc, + kc: dn.fs, locator: locator, size: len(block), offset: off, @@ -709,7 +775,7 @@ func (dn *dirnode) loadManifest(txt string) error { blkLen = int(offset + length - pos - int64(blkOff)) } fnode.appendSegment(storedSegment{ - kc: dn.kc, + kc: dn.fs, locator: seg.locator, size: seg.size, offset: blkOff, @@ -738,7 +804,7 @@ func (dn *dirnode) loadManifest(txt string) error { // only safe to call from loadManifest -- no locking func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { - node := dn + var node inode = dn names := strings.Split(path, "/") basename := names[len(names)-1] if basename == "" || basename == "." || basename == ".." { @@ -754,20 +820,17 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { // can't be sure parent will be a *dirnode return nil, ErrInvalidArgument } - node = node.Parent().(*dirnode) + node = node.Parent() continue } node.Child(name, func(child inode) inode { - switch child.(type) { - case nil: - node, err = dn.newDirnode(node, name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime()) + if child == nil { + node, err = node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime()) child = node - case *dirnode: - node = child.(*dirnode) - case *filenode: + } else if !child.IsDir() { err = ErrFileExists - default: - err = ErrInvalidOperation + } else { + node = child } return child }) @@ -778,8 +841,9 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { node.Child(basename, func(child inode) inode { switch child := child.(type) { case nil: - fn, err = dn.newFilenode(node, basename, 0755, node.FileInfo().ModTime()) - return fn + child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime()) + fn = child.(*filenode) + return child case *filenode: fn = child return child @@ -787,75 +851,13 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { err = ErrIsDirectory return child default: - err = ErrInvalidOperation + err = ErrInvalidArgument return child } }) return } -// rlookup (recursive lookup) returns the inode for the file/directory -// with the given name (which may contain "/" separators). If no such -// file/directory exists, the returned node is nil. -func rlookup(start inode, path string) (node inode) { - node = start - for _, name := range strings.Split(path, "/") { - if node == nil { - break - } - if node.IsDir() { - if name == "." || name == "" { - continue - } - if name == ".." { - node = node.Parent() - continue - } - } - node = func() inode { - node.RLock() - defer node.RUnlock() - return node.Child(name, nil) - }() - } - return -} - -// Caller must have lock, and must have already ensured -// Children(name,nil) is nil. -func (dn *dirnode) newDirnode(parent *dirnode, name string, perm os.FileMode, modTime time.Time) (node *dirnode, err error) { - if name == "" || name == "." || name == ".." { - return nil, ErrInvalidArgument - } - return &dirnode{ - client: dn.client, - kc: dn.kc, - treenode: treenode{ - parent: parent, - fileinfo: fileinfo{ - name: name, - mode: perm | os.ModeDir, - modTime: modTime, - }, - inodes: make(map[string]inode), - }, - }, nil -} - -func (dn *dirnode) newFilenode(parent *dirnode, name string, perm os.FileMode, modTime time.Time) (node *filenode, err error) { - if name == "" || name == "." || name == ".." { - return nil, ErrInvalidArgument - } - return &filenode{ - parent: parent, - fileinfo: fileinfo{ - name: name, - mode: perm & ^os.ModeDir, - modTime: modTime, - }, - }, nil -} - type segment interface { io.ReaderAt Len() int @@ -920,7 +922,7 @@ func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) { } type storedSegment struct { - kc keepClient + kc fsBackend locator string size int // size of stored block (also encoded in locator) offset int // position of segment within the stored block