name: ".",
mode: os.ModeDir | 0755,
modTime: modTime,
+ sys: func() interface{} { return c },
},
inodes: make(map[string]inode),
},
return ErrInvalidOperation
}
-// Check for and incorporate upstream changes -- unless that has
-// already been done recently, in which case this func is a no-op.
-func (fs *collectionFileSystem) checkChangesOnServer() error {
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
if fs.uuid == "" && fs.savedPDH.Load() == "" {
- return nil
+ return false, nil
}
- // First try UUID if any, then last known PDH. Stop if all
- // signatures are new enough.
- checkingAll := false
- for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
- if id == "" {
- continue
- }
-
- fs.lockCheckChanges.Lock()
- if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
- fs.lockCheckChanges.Unlock()
- return nil
- }
- remain, ttl := fs.signatureTimeLeft()
- if remain > 0.01 && !checkingAll {
- fs.holdCheckChanges = time.Now().Add(ttl / 100)
- }
+ fs.lockCheckChanges.Lock()
+ if !force && fs.holdCheckChanges.After(time.Now()) {
fs.lockCheckChanges.Unlock()
+ return false, nil
+ }
+ remain, ttl := fs.signatureTimeLeft()
+ if remain > 0.01 {
+ fs.holdCheckChanges = time.Now().Add(ttl / 100)
+ }
+ fs.lockCheckChanges.Unlock()
- if remain >= 0.5 {
- break
+ if !force && remain >= 0.5 {
+ // plenty of time left on current signatures
+ return false, nil
+ }
+
+ getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+ if fs.uuid != "" {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+ if err != nil {
+ return false, err
}
- checkingAll = true
+ if coll.PortableDataHash != fs.savedPDH.Load().(string) {
+ // collection has changed upstream since we
+ // last loaded or saved. Refresh local data,
+ // losing any unsaved local changes.
+ newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+ if err != nil {
+ return false, err
+ }
+ snap, err := Snapshot(newfs, "/")
+ if err != nil {
+ return false, err
+ }
+ err = Splice(fs, "/", snap)
+ if err != nil {
+ return false, err
+ }
+ fs.savedPDH.Store(coll.PortableDataHash)
+ return true, nil
+ }
+ fs.updateSignatures(coll.ManifestText)
+ return false, nil
+ }
+ if pdh := fs.savedPDH.Load().(string); pdh != "" {
var coll Collection
- err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams)
if err != nil {
- continue
+ return false, err
}
fs.updateSignatures(coll.ManifestText)
}
- return nil
+ return false, nil
}
// Refresh signature on a single locator, if necessary. Assume caller
if err != nil || exp.Sub(time.Now()) > time.Minute {
// Synchronous update is not needed. Start an
// asynchronous update if needed.
- go fs.checkChangesOnServer()
+ go fs.checkChangesOnServer(false)
return locator
}
var manifests string
}
func (fs *collectionFileSystem) Sync() error {
- err := fs.checkChangesOnServer()
+ refreshed, err := fs.checkChangesOnServer(true)
if err != nil {
return err
}
- if fs.uuid == "" {
+ if refreshed || fs.uuid == "" {
return nil
}
txt, err := fs.MarshalManifest(".")
"select": selectFields,
})
if err != nil {
- return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+ return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
}
fs.updateSignatures(coll.ManifestText)
fs.savedPDH.Store(coll.PortableDataHash)
}
func (fs *collectionFileSystem) MemorySize() int64 {
- fs.fileSystem.root.Lock()
- defer fs.fileSystem.root.Unlock()
return fs.fileSystem.root.(*dirnode).MemorySize()
}
//
// After seeking:
//
-// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
-// ||
-// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+// ||
+// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
ptr = startPtr
if ptr.off < 0 {
return fn.fs
}
+func (fn *filenode) MemorySize() (size int64) {
+ fn.RLock()
+ defer fn.RUnlock()
+ size = 64
+ for _, seg := range fn.segments {
+ size += seg.memorySize()
+ }
+ return
+}
+
// Read reads file data from a single segment, starting at startPtr,
// into p. startPtr is assumed not to be up-to-date. Caller must have
// RLock or Lock.
return cg.Wait()
}
-// caller must have write lock.
func (dn *dirnode) MemorySize() (size int64) {
- for _, name := range dn.sortedNames() {
- node := dn.inodes[name]
- node.Lock()
- defer node.Unlock()
- switch node := node.(type) {
- case *dirnode:
- size += node.MemorySize()
- case *filenode:
- for _, seg := range node.segments {
- switch seg := seg.(type) {
- case *memSegment:
- size += int64(seg.Len())
- }
- }
- }
+ dn.RLock()
+ todo := make([]inode, 0, len(dn.inodes))
+ for _, node := range dn.inodes {
+ todo = append(todo, node)
+ }
+ dn.RUnlock()
+ size = 64
+ for _, node := range todo {
+ size += node.MemorySize()
}
return
}
}
func (dn *dirnode) Splice(repl inode) error {
- repldn, ok := repl.(*dirnode)
- if !ok {
- return fmt.Errorf("cannot use Splice to replace a directory with a file: %w", ErrInvalidArgument)
- }
- repldn, err := repldn.snapshot()
+ repl, err := repl.Snapshot()
if err != nil {
- return err
+ return fmt.Errorf("cannot copy snapshot: %w", err)
+ }
+ switch repl := repl.(type) {
+ default:
+ return fmt.Errorf("cannot splice snapshot containing %T: %w", repl, ErrInvalidArgument)
+ case *dirnode:
+ dn.Lock()
+ defer dn.Unlock()
+ dn.inodes = repl.inodes
+ dn.setTreeFS(dn.fs)
+ case *filenode:
+ dn.parent.Lock()
+ defer dn.parent.Unlock()
+ removing, err := dn.parent.Child(dn.fileinfo.name, nil)
+ if err != nil {
+ return fmt.Errorf("cannot use Splice to replace a top-level directory with a file: %w", ErrInvalidOperation)
+ } else if removing != dn {
+ // If ../thisdirname is not this dirnode, it
+ // must be an inode that wraps a dirnode, like
+ // a collectionFileSystem or deferrednode.
+ if deferred, ok := removing.(*deferrednode); ok {
+ // More useful to report the type of
+ // the wrapped node rather than just
+ // *deferrednode. (We know the real
+ // inode is already loaded because dn
+ // is inside it.)
+ removing = deferred.realinode()
+ }
+ return fmt.Errorf("cannot use Splice to attach a file at top level of %T: %w", removing, ErrInvalidOperation)
+ }
+ dn.Lock()
+ defer dn.Unlock()
+ _, err = dn.parent.Child(dn.fileinfo.name, func(inode) (inode, error) { return repl, nil })
+ if err != nil {
+ return fmt.Errorf("error replacing filenode: dn.parent.Child(): %w", err)
+ }
+ repl.fs = dn.fs
}
- dn.Lock()
- defer dn.Unlock()
- dn.inodes = repldn.inodes
- dn.setTreeFS(dn.fs)
return nil
}
// Return a new segment with a subsection of the data from this
// one. length<0 means length=Len()-off.
Slice(off int, length int) segment
+ memorySize() int64
}
type memSegment struct {
return
}
+func (me *memSegment) memorySize() int64 {
+ return 64 + int64(len(me.buf))
+}
+
type storedSegment struct {
kc fsBackend
locator string
return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
}
+func (se storedSegment) memorySize() int64 {
+ return 64 + int64(len(se.locator))
+}
+
func canonicalName(name string) string {
name = path.Clean("/" + name)
if name == "/" || name == "./" {