Merge branch '19362-all-webdav-via-sitefs'
[arvados.git] / sdk / go / arvados / fs_collection.go
index f09c60a57192a7e654c03a63407045ac302c6d65..a26c876b932304ab6fdfefbafe36145665cbac90 100644 (file)
@@ -290,44 +290,70 @@ func (fs *collectionFileSystem) Truncate(int64) error {
        return ErrInvalidOperation
 }
 
-// Check for and incorporate upstream changes -- unless that has
-// already been done recently, in which case this func is a no-op.
-func (fs *collectionFileSystem) checkChangesOnServer() error {
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
        if fs.uuid == "" && fs.savedPDH.Load() == "" {
-               return nil
+               return false, nil
        }
 
-       // First try UUID if any, then last known PDH. Stop if all
-       // signatures are new enough.
-       checkingAll := false
-       for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
-               if id == "" {
-                       continue
-               }
-
-               fs.lockCheckChanges.Lock()
-               if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
-                       fs.lockCheckChanges.Unlock()
-                       return nil
-               }
-               remain, ttl := fs.signatureTimeLeft()
-               if remain > 0.01 && !checkingAll {
-                       fs.holdCheckChanges = time.Now().Add(ttl / 100)
-               }
+       fs.lockCheckChanges.Lock()
+       if !force && fs.holdCheckChanges.After(time.Now()) {
                fs.lockCheckChanges.Unlock()
+               return false, nil
+       }
+       remain, ttl := fs.signatureTimeLeft()
+       if remain > 0.01 {
+               fs.holdCheckChanges = time.Now().Add(ttl / 100)
+       }
+       fs.lockCheckChanges.Unlock()
 
-               if remain >= 0.5 {
-                       break
+       if !force && remain >= 0.5 {
+               // plenty of time left on current signatures
+               return false, nil
+       }
+
+       getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+       if fs.uuid != "" {
+               var coll Collection
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+               if err != nil {
+                       return false, err
                }
-               checkingAll = true
+               if coll.PortableDataHash != fs.savedPDH.Load().(string) {
+                       // collection has changed upstream since we
+                       // last loaded or saved. Refresh local data,
+                       // losing any unsaved local changes.
+                       newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+                       if err != nil {
+                               return false, err
+                       }
+                       snap, err := Snapshot(newfs, "/")
+                       if err != nil {
+                               return false, err
+                       }
+                       err = Splice(fs, "/", snap)
+                       if err != nil {
+                               return false, err
+                       }
+                       fs.savedPDH.Store(coll.PortableDataHash)
+                       return true, nil
+               }
+               fs.updateSignatures(coll.ManifestText)
+               return false, nil
+       }
+       if pdh := fs.savedPDH.Load().(string); pdh != "" {
                var coll Collection
-               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams)
                if err != nil {
-                       continue
+                       return false, err
                }
                fs.updateSignatures(coll.ManifestText)
        }
-       return nil
+       return false, nil
 }
 
 // Refresh signature on a single locator, if necessary. Assume caller
@@ -339,7 +365,7 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
        if err != nil || exp.Sub(time.Now()) > time.Minute {
                // Synchronous update is not needed. Start an
                // asynchronous update if needed.
-               go fs.checkChangesOnServer()
+               go fs.checkChangesOnServer(false)
                return locator
        }
        var manifests string
@@ -368,11 +394,11 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
 }
 
 func (fs *collectionFileSystem) Sync() error {
-       err := fs.checkChangesOnServer()
+       refreshed, err := fs.checkChangesOnServer(true)
        if err != nil {
                return err
        }
-       if fs.uuid == "" {
+       if refreshed || fs.uuid == "" {
                return nil
        }
        txt, err := fs.MarshalManifest(".")
@@ -403,7 +429,7 @@ func (fs *collectionFileSystem) Sync() error {
                "select": selectFields,
        })
        if err != nil {
-               return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+               return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
        }
        fs.updateSignatures(coll.ManifestText)
        fs.savedPDH.Store(coll.PortableDataHash)
@@ -579,10 +605,7 @@ func (fn *filenode) MemorySize() (size int64) {
        defer fn.RUnlock()
        size = 64
        for _, seg := range fn.segments {
-               size += 64
-               if seg, ok := seg.(*memSegment); ok {
-                       size += int64(seg.Len())
-               }
+               size += seg.memorySize()
        }
        return
 }
@@ -1629,6 +1652,7 @@ type segment interface {
        // Return a new segment with a subsection of the data from this
        // one. length<0 means length=Len()-off.
        Slice(off int, length int) segment
+       memorySize() int64
 }
 
 type memSegment struct {
@@ -1707,6 +1731,10 @@ func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
        return
 }
 
+func (me *memSegment) memorySize() int64 {
+       return 64 + int64(len(me.buf))
+}
+
 type storedSegment struct {
        kc      fsBackend
        locator string
@@ -1744,6 +1772,10 @@ func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
        return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
 }
 
+func (se storedSegment) memorySize() int64 {
+       return 64 + int64(len(se.locator))
+}
+
 func canonicalName(name string) string {
        name = path.Clean("/" + name)
        if name == "/" || name == "./" {